In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col
from pyspark.sql.functions import current_date, current_timestamp, datediff, months_between, to_date
from pyspark.sql.functions import split, explode, create_map
from pyspark.sql.functions import struct, udf
from pyspark.sql.types import IntegerType, DoubleType

In [2]:
sparkSession = SparkSession.builder.appName('project1').getOrCreate()

### 第六章

In [3]:
df = sparkSession.read.format("csv")\
.option("header", "true").option("inferSchema", "true")\
.load("./data/retail-data/by-day/2010-12-01.csv")

#### 6.5 字符匹配与判断

In [4]:
from pyspark.sql.functions import expr, locate
simpleColors = ["black", "white", "red", "green", "blue"]
def color_locator(column, color_string):
    return locate(color_string.upper(), column)\
    .cast("boolean")\
    .alias("is_" + c)

In [5]:
selectedColumns = [color_locator(df.Description, c) for c in simpleColors]
selectedColumns.append(expr("*")) # has to a be Column type
df.select(*selectedColumns).where(expr("is_white OR is_red"))\
.select("is_white","is_red","is_black","Description").show(3, False)

+--------+------+--------+----------------------------------+
|is_white|is_red|is_black|Description                       |
+--------+------+--------+----------------------------------+
|true    |false |false   |WHITE HANGING HEART T-LIGHT HOLDER|
|true    |false |false   |WHITE METAL LANTERN               |
|true    |true  |false   |RED WOOLLY HOTTIE WHITE HEART.    |
+--------+------+--------+----------------------------------+
only showing top 3 rows



#### 6.6 日期与时间戳


In [6]:
dateDF = sparkSession.range(10).withColumn("today", current_date())\
.withColumn("now", current_timestamp())

dateDF.createOrReplaceTempView("dateTable")

In [7]:
dateDF.printSchema()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)



In [32]:
dateFormat = "yyyy-dd-MM"
cleanDateDF = sparkSession.range(1).select(
to_date(lit("2017-12-11"), dateFormat).alias("date"),
to_date(lit("2017-20-12"), dateFormat).alias("date2"))
cleanDateDF.createOrReplaceTempView("dateTable2")

In [34]:
cleanDateDF.filter(col("date2") > lit("2017-12-12")).show()
cleanDateDF.filter(col("date2") > "'2017-12-12'").show()

+----------+----------+
|      date|     date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+

+----------+----------+
|      date|     date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+



#### 6.9 复杂类型——结构体、数组、映射

In [19]:
# struct
df.selectExpr("(Description, InvoiceNo) as complex", "*")
df.selectExpr("struct(Description, InvoiceNo) as complex", "*")

DataFrame[complex: struct<Description:string,InvoiceNo:string>, InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

In [26]:
complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexDF")
complexDF.select("complex.Description")
complexDF.select(col("complex").getField("Description"))

complexDF.select("complex.*")

DataFrame[Description: string, InvoiceNo: string]

In [10]:
#array
df.withColumn("splitted", split(col("Description"), " "))\
.withColumn("exploded", explode(col("splitted")))\
.select("Description", "InvoiceNo", "exploded").show(20)

+--------------------+---------+--------+
|         Description|InvoiceNo|exploded|
+--------------------+---------+--------+
|WHITE HANGING HEA...|   536365|   WHITE|
|WHITE HANGING HEA...|   536365| HANGING|
|WHITE HANGING HEA...|   536365|   HEART|
|WHITE HANGING HEA...|   536365| T-LIGHT|
|WHITE HANGING HEA...|   536365|  HOLDER|
| WHITE METAL LANTERN|   536365|   WHITE|
| WHITE METAL LANTERN|   536365|   METAL|
| WHITE METAL LANTERN|   536365| LANTERN|
|CREAM CUPID HEART...|   536365|   CREAM|
|CREAM CUPID HEART...|   536365|   CUPID|
|CREAM CUPID HEART...|   536365|  HEARTS|
|CREAM CUPID HEART...|   536365|    COAT|
|CREAM CUPID HEART...|   536365|  HANGER|
|KNITTED UNION FLA...|   536365| KNITTED|
|KNITTED UNION FLA...|   536365|   UNION|
|KNITTED UNION FLA...|   536365|    FLAG|
|KNITTED UNION FLA...|   536365|     HOT|
|KNITTED UNION FLA...|   536365|   WATER|
|KNITTED UNION FLA...|   536365|  BOTTLE|
|RED WOOLLY HOTTIE...|   536365|     RED|
+--------------------+---------+--

In [18]:
# map
df.select(col("InvoiceNo"),create_map(col("Description"), col("InvoiceNo")).alias("complex_map")).show(8)
df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))\
.selectExpr("complex_map['WHITE METAL LANTERN']").show(8)

+---------+--------------------+
|InvoiceNo|         complex_map|
+---------+--------------------+
|   536365|[WHITE HANGING HE...|
|   536365|[WHITE METAL LANT...|
|   536365|[CREAM CUPID HEAR...|
|   536365|[KNITTED UNION FL...|
|   536365|[RED WOOLLY HOTTI...|
|   536365|[SET 7 BABUSHKA N...|
|   536365|[GLASS STAR FROST...|
|   536366|[HAND WARMER UNIO...|
+---------+--------------------+
only showing top 8 rows

+--------------------------------+
|complex_map[WHITE METAL LANTERN]|
+--------------------------------+
|                            null|
|                          536365|
|                            null|
|                            null|
|                            null|
|                            null|
|                            null|
|                            null|
+--------------------------------+
only showing top 8 rows



#### 6.10 用户定义函数

In [27]:
udfExampleDF = sparkSession.range(5).toDF("num")

In [28]:
def power3(double_value):
    return double_value ** 3
power3(2.0)

8.0

In [31]:
# 注册到spark从而在列、SQL使用
power3udf = udf(power3)
udfExampleDF.select(power3udf(col("num"))).show()

+-----------+
|power3(num)|
+-----------+
|          0|
|          1|
|          8|
|         27|
|         64|
+-----------+



In [36]:
# 只能作为dataframe使用该函数，不能应用为string expression,即selectExpr中无法使用
# 为了规范和方便，最好规定返回值的类型，python与静态类型语言java,scala交互最好需要有的习惯

sparkSession.udf.register("power3py", power3, IntegerType())
udfExampleDF.selectExpr("power3py(num)").show()

+-------------+
|power3py(num)|
+-------------+
|            0|
|            1|
|            8|
|           27|
|           64|
+-------------+

