In [4]:
staticDataFrame = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("../data/retail-data/by-day/*.csv")

In [5]:
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [6]:
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")\
  .show(5)

+----------+--------------------+-----------------+
|CustomerId|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|[2011-12-05 08:00...|            -37.6|
|   14126.0|[2011-11-29 08:00...|643.6300000000001|
|   13500.0|[2011-11-16 08:00...|497.9700000000001|
|   17160.0|[2011-11-08 08:00...|516.8499999999999|
|   15608.0|[2011-11-11 08:00...|            122.4|
+----------+--------------------+-----------------+
only showing top 5 rows



In [7]:
streamingDataFrame = spark.readStream\
  .schema(staticSchema)\
  .option("maxFilesPerTrigger", 1)\
  .format("csv")\
  .option("header", "true")\
  .load("../data/retail-data/by-day/*.csv")

In [8]:
streamingDataFrame.isStreaming

True

In [9]:
purchaseByCustomerPerHour = streamingDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupby(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")

In [10]:
purchaseByCustomerPerHour.writeStream\
  .format("memory")\
  .queryName("customer_purchases")\
  .outputMode("complete")\
  .start()

<pyspark.sql.streaming.StreamingQuery at 0x1df31fc9438>

In [11]:
spark.sql("""
  SELECT *
  FROM customer_purchases
  ORDER BY "sum(total_cost)" DESC
"""
)\
.show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   17460.0|[2010-12-01 08:00...|              19.9|
|   13408.0|[2010-12-01 08:00...|1024.6800000000003|
|   15235.0|[2010-12-01 08:00...|              79.5|
|   13295.0|[2010-12-02 08:00...|             -3.25|
|   12779.0|[2010-12-03 08:00...|            248.16|
+----------+--------------------+------------------+
only showing top 5 rows



Machine Learning and Advanced Analytics

In [12]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [14]:
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
  .na.fill(0)\
  .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
  .coalesce(5)

In [16]:
trainDataFrame = preppedDataFrame\
  .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
  .where("InvoiceDate >= '2011-07-01'")

In [17]:
trainDataFrame.count()

245903

In [18]:
testDataFrame.count()

296006

In [19]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
  .setInputCol("day_of_week")\
  .setOutputCol("day_of_week_index")

In [20]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
  .setInputCol("day_of_week_index")\
  .setOutputCol("day_of_week_encoded")

In [21]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
  .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
  .setOutputCol("features")

In [22]:
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
  .setStages([indexer, encoder, vectorAssembler])

In [23]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)

In [24]:
transformedTraining = fittedPipeline.transform(trainDataFrame)

In [31]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
  .setK(20)\
  .setSeed(1)

In [32]:
kmModel = kmeans.fit(transformedTraining)

In [35]:
kmModel.computeCost(transformedTraining)

84553739.96537486

In [36]:
transformedTest = fittedPipeline.transform(testDataFrame)
kmModel.computeCost(transformedTest)

517507094.7222117