In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, column, desc, col

In [3]:
spark = SparkSession\
    .builder\
    .appName("StructStream")\
    .getOrCreate()

In [4]:
staticDataFrame = spark.read.format("csv")\
    .option("header","true")\
    .option("inferschema","true")\
    .load("/data/by-day/*.csv")

In [5]:
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [6]:
spark.conf.set("spark.sql.shuffle.partitions","5")

In [7]:
staticDataFrame\
    .selectExpr(
        "CustomerId",
        "(UnitPrice * Quantity) as total_cost",
        "InvoiceDate")\
    .groupBy(
        col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
    .sum("total_cost")\
    .show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   14075.0|[2011-12-05 00:00...|316.78000000000003|
|   18180.0|[2011-12-05 00:00...|            310.73|
|   15358.0|[2011-12-05 00:00...| 830.0600000000003|
|   15392.0|[2011-12-05 00:00...|304.40999999999997|
|   15290.0|[2011-12-05 00:00...|263.02000000000004|
+----------+--------------------+------------------+
only showing top 5 rows



In [8]:
staticDataFrame.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [9]:
staticSchema

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,TimestampType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

In [10]:
streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger",1)\
    .format("csv")\
    .option("header","true")\
    .load("/data/by-day/*.csv")

In [11]:
streamingDataFrame.isStreaming

True

In [12]:
purchaseByCustomerPerHour = streamingDataFrame\
    .selectExpr("CustomerId",
    "(UnitPrice*Quantity) as total_cost",
    "InvoiceDate")\
    .groupBy(
        col("CustomerId"),window(col("InvoiceDate"), "1 day"))\
    .sum("total_cost")

In [14]:
purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7fcb30138cf8>

In [18]:
spark.sql("""select *
                from customer_purchases
                order by 'sum(total_cost)' desc
                """)\
                .show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   15290.0|[2011-02-22 00:00...|             -1.65|
|   15036.0|[2011-06-13 00:00...|53.150000000000006|
|   17416.0|[2011-05-20 00:00...|1005.7000000000003|
|   12921.0|[2011-03-30 00:00...|-87.30000000000001|
|   17652.0|[2011-03-03 00:00...|             222.3|
+----------+--------------------+------------------+
only showing top 5 rows



In [19]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [22]:
from pyspark.sql.functions import date_format
preppedDataFrame = staticDataFrame\
    .na.fill(0)\
    .withColumn("day_of_week", date_format(col("InvoiceDate"),"EEEE"))\
    .coalesce(5)

In [24]:
trainDataFrame = preppedDataFrame\
    .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
    .where("InvoiceDate >= '2011-07-01'")

In [26]:
trainDataFrame.count()
#testDataFrame.count()

245903

In [27]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
    .setInputCol("day_of_week")\
    .setOutputCol("day_of_week_index")

In [28]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
    .setInputCol("day_of_week_index")\
    .setOutputCol("day_of_week_encoded")

In [29]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler()\
    .setInputCols(["UnitPrice","Quantity","day_of_week_encoded"])\
    .setOutputCol("features")

In [32]:
from pyspark.ml import Pipeline
transformationPipeline = Pipeline()\
    .setStages([indexer,encoder,vectorAssembler])

In [33]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)

In [34]:
transformedTraining = fittedPipeline.transform(trainDataFrame)

In [35]:
transformedTraining.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string, day_of_week_index: double, day_of_week_encoded: vector, features: vector]

In [38]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
    .setK(20)\
    .setSeed(1)

In [39]:
kmModel = kmeans.fit(transformedTraining)

In [40]:
kmModel.computeCost(transformedTraining)

84553739.96537484

In [41]:
transformedTest = fittedPipeline.transform(testDataFrame)

In [42]:
kmModel.computeCost(transformedTest)

517507094.72221166