In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, column, desc, col, date_format

# Static DataFrame

In [3]:
spark = SparkSession.builder \
    .appName("basic_app") \
    .getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", "5")

23/11/06 23:37:14 WARN Utils: Your hostname, luan-Dell-G15-5520 resolves to a loopback address: 127.0.1.1; using 192.168.1.12 instead (on interface wlp0s20f3)
23/11/06 23:37:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/06 23:37:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
staticDataFrame = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("../../../spark_data_examples/retail-data/by-day/*.csv")
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

                                                                                

# Grouping data

In [5]:
staticDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.show(5)



+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   14075.0|{2011-12-05 00:00...|316.78000000000003|
|   18180.0|{2011-12-05 00:00...|            310.73|
|   15358.0|{2011-12-05 00:00...| 830.0600000000003|
|   15392.0|{2011-12-05 00:00...|304.40999999999997|
|   15290.0|{2011-12-05 00:00...|263.02000000000004|
+----------+--------------------+------------------+
only showing top 5 rows



                                                                                

# Streaming DataFrames

In [6]:
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger", 1)\
.format("csv")\
.option("header", "true")\
.load("../../../spark_data_examples/retail-data/by-day/*.csv")

## Checking the streaming

In [7]:
streamingDataFrame.isStreaming

True

## Defining a operation

In [8]:
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")

## Writing a stream

In [9]:
purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()

23/11/06 23:37:51 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-42ee316f-fddd-49da-9f40-bd01796234c7. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/11/06 23:37:51 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x7f5de2da86d0>

                                                                                

## Testing the stream

In [10]:
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""")\
.show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|      null|{2010-12-21 00:00...|31347.479999999938|
|   18102.0|{2010-12-07 00:00...|          25920.37|
|      null|{2010-12-10 00:00...|25399.560000000012|
|      null|{2010-12-17 00:00...|25371.769999999768|
|      null|{2010-12-06 00:00...|23395.099999999904|
+----------+--------------------+------------------+
only showing top 5 rows



## Write the results out to the console

In [11]:
purchaseByCustomerPerHour.writeStream\
.format("console")\
.queryName("customer_purchases_2")\
.outputMode("complete")\
.start()

23/11/06 23:38:09 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-779f31bf-651f-4a4c-ae27-acf8a6b02bcd. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/11/06 23:38:09 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x7f5de2daa750>

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   12921.0|{2010-12-01 00:00...|             322.4|
|   16583.0|{2010-12-01 00:00...|233.45000000000002|
|   17897.0|{2010-12-01 00:00...|            140.39|
|   12748.0|{2010-12-01 00:00...|              4.95|
|   15350.0|{2010-12-01 00:00...|            115.65|
|   17809.0|{2010-12-01 00:00...|              34.8|
|   13747.0|{2010-12-01 00:00...|              79.6|
|   16250.0|{2010-12-01 00:00...|            226.14|
|   15983.0|{2010-12-01 00:00...|            440.89|
|   17511.0|{2010-12-01 00:00...|           1825.74|
|   14001.0|{2010-12-01 00:00...|            301.24|
|   17460.0|{2010-12-01 00:00...|              19.9|
|   18074.0|{2010-12-01 00:00...|             489.6|
|   12868.0|{2010-12-01 00:00...|             203.3|
| 

# ML and Advanced analytics

## Checking the schema

In [12]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



## Transforming the data

In [21]:
preppedDataFrame = staticDataFrame\
.na.fill(0)\
.withColumn("day_of_week",
            date_format(col("InvoiceDate"),
                        "EEEE"))\
.coalesce(5)

In [23]:
preppedDataFrame.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|     Monday|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|     Monday|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|     Monday|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|     Monday|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|     Monday|
+---------+---------+-------------------

## Splitting data into train and test

In [24]:
trainDataFrame = preppedDataFrame\
.where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
.where("InvoiceDate >= '2011-07-01'")

In [25]:
train_size = trainDataFrame.count()
test_size = testDataFrame.count()
print(f'train size: {train_size} - test_size: {test_size}')

train size: 245903 - test_size: 296006


## Basic feature engineering

In [32]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

In [27]:
indexer = StringIndexer()\
.setInputCol("day_of_week")\
.setOutputCol("day_of_week_index")

In [33]:
encoder = OneHotEncoder()\
.setInputCol("day_of_week_index")\
.setOutputCol("day_of_week_encoded")

## The vector assembler

In [34]:
from pyspark.ml.feature import VectorAssembler

In [35]:
vectorAssembler = VectorAssembler()\
.setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
.setOutputCol("features")

## pyspark pipeline

In [36]:
from pyspark.ml import Pipeline

In [37]:
transformationPipeline = Pipeline()\
.setStages([indexer, encoder, vectorAssembler])

## Fitting a pipeline

In [38]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)

In [39]:
transformedTraining = fittedPipeline.transform(trainDataFrame)

In [42]:
transformedTraining.cache()

23/11/06 23:57:18 WARN CacheManager: Asked to cache already cached data.


DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string, day_of_week_index: double, day_of_week_encoded: vector, features: vector]

## K-means clustering

In [43]:
from pyspark.ml.clustering import KMeans

In [46]:
kmeans = KMeans()\
.setK(20)\
.setSeed(1)

In [47]:
kmModel = kmeans.fit(transformedTraining)

## Checking the cost

In [49]:
from pyspark.ml.evaluation import ClusteringEvaluator

                                                                                

# [Silhouette Score](https://en.wikipedia.org/wiki/Silhouette_(clustering))

In [53]:
predictions = kmModel.transform(transformedTraining)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

                                                                                

Silhouette with squared euclidean distance = 0.9643616445721893


                                                                                

# Lower level API example

## RDD

In [54]:
from pyspark.sql import Row

In [55]:
spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()

DataFrame[_1: bigint]

                                                                                