/databricks-datasets/definitive-guide/data

In [0]:
staticDataFrame = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("/databricks-datasets/definitive-guide/data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema


In [0]:
staticDataFrame.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [0]:
display(staticDataFrame.limit(5))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
580538,23084,RABBIT NIGHT LIGHT,48,2011-12-05T08:38:00.000+0000,1.79,14075.0,United Kingdom
580538,23077,DOUGHNUT LIP GLOSS,20,2011-12-05T08:38:00.000+0000,1.25,14075.0,United Kingdom
580538,22906,12 MESSAGE CARDS WITH ENVELOPES,24,2011-12-05T08:38:00.000+0000,1.65,14075.0,United Kingdom
580538,21914,BLUE HARMONICA IN BOX,24,2011-12-05T08:38:00.000+0000,1.25,14075.0,United Kingdom
580538,22467,GUMBALL COAT RACK,6,2011-12-05T08:38:00.000+0000,2.55,14075.0,United Kingdom


In [0]:
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")\
  .sort(desc("sum(total_cost)"))\
  .show(5)


+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   17450.0|{2011-09-20 00:00...|          71601.44|
|      null|{2011-11-14 00:00...|          55316.08|
|      null|{2011-11-07 00:00...|          42939.17|
|      null|{2011-03-29 00:00...| 33521.39999999998|
|      null|{2011-12-08 00:00...|31975.590000000007|
+----------+--------------------+------------------+
only showing top 5 rows



- Note the nulls: we don't have customer ids for these

In [0]:
spark.conf.set("spark.sql.shuffle.partitions","5")

In [0]:
streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger", 1)\
    .format("csv")\
    .option("header", "true")\
    .load("/databricks-datasets/definitive-guide/data/retail-data/by-day/*.csv")


In [0]:
streamingDataFrame.isStreaming

Out[11]: True

In [0]:
purchaseByCustomerPerHour = streamingDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")


In [0]:
purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()


Out[13]: <pyspark.sql.streaming.StreamingQuery at 0x7fd9f5fe1760>

In [0]:
spark.sql("""
  SELECT *
  FROM customer_purchases
  ORDER BY `sum(total_cost)` DESC
  """)\
  .show(5)


+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|      null|{2010-12-21 00:00...|31347.479999999938|
|   18102.0|{2010-12-07 00:00...|          25920.37|
|      null|{2010-12-10 00:00...|25399.560000000012|
|      null|{2010-12-17 00:00...|25371.769999999768|
|      null|{2010-12-06 00:00...|23395.099999999904|
+----------+--------------------+------------------+
only showing top 5 rows



- Note how the results change (for a while) as the data streams

In [0]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
  .na.fill(0)\
  .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
  .coalesce(5)


In [0]:
trainDataFrame = preppedDataFrame\
  .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
  .where("InvoiceDate >= '2011-07-01'")


In [0]:
trainDataFrame.count(), testDataFrame.count()

Out[22]: (245903, 296006)

In [0]:
# Give every day of the week a numerical value
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
  .setInputCol("day_of_week")\
  .setOutputCol("day_of_week_index")


In [0]:
# OneHotEncode the days of the week (since we can't have 6-Saturday be "higher" than 1-Monday)
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
  .setInputCol("day_of_week_index")\
  .setOutputCol("day_of_week_encoded")


In [0]:
# turn into a vector
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
  .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
  .setOutputCol("features")


In [0]:
# make a pipeline of it, to incorporate new data
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
  .setStages([indexer, encoder, vectorAssembler])


In [0]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)


In [0]:
transformedTraining.cache()

Out[29]: DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string, day_of_week_index: double, day_of_week_encoded: vector, features: vector]

In [0]:
transformedTraining = fittedPipeline.transform(trainDataFrame)


In [0]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
  .setK(20)\
  .setSeed(1)


In [0]:
kmModel = kmeans.fit(transformedTraining)


In [0]:
transformedTest = fittedPipeline.transform(testDataFrame)


In [0]:
display(transformedTest.limit(5))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,day_of_week,day_of_week_index,day_of_week_encoded,features
580538,23084,RABBIT NIGHT LIGHT,48,2011-12-05T08:38:00.000+0000,1.79,14075.0,United Kingdom,Monday,2.0,"Map(vectorType -> sparse, length -> 5, indices -> List(2), values -> List(1.0))","Map(vectorType -> sparse, length -> 7, indices -> List(0, 1, 4), values -> List(1.79, 48.0, 1.0))"
580538,23077,DOUGHNUT LIP GLOSS,20,2011-12-05T08:38:00.000+0000,1.25,14075.0,United Kingdom,Monday,2.0,"Map(vectorType -> sparse, length -> 5, indices -> List(2), values -> List(1.0))","Map(vectorType -> sparse, length -> 7, indices -> List(0, 1, 4), values -> List(1.25, 20.0, 1.0))"
580538,22906,12 MESSAGE CARDS WITH ENVELOPES,24,2011-12-05T08:38:00.000+0000,1.65,14075.0,United Kingdom,Monday,2.0,"Map(vectorType -> sparse, length -> 5, indices -> List(2), values -> List(1.0))","Map(vectorType -> sparse, length -> 7, indices -> List(0, 1, 4), values -> List(1.65, 24.0, 1.0))"
580538,21914,BLUE HARMONICA IN BOX,24,2011-12-05T08:38:00.000+0000,1.25,14075.0,United Kingdom,Monday,2.0,"Map(vectorType -> sparse, length -> 5, indices -> List(2), values -> List(1.0))","Map(vectorType -> sparse, length -> 7, indices -> List(0, 1, 4), values -> List(1.25, 24.0, 1.0))"
580538,22467,GUMBALL COAT RACK,6,2011-12-05T08:38:00.000+0000,2.55,14075.0,United Kingdom,Monday,2.0,"Map(vectorType -> sparse, length -> 5, indices -> List(2), values -> List(1.0))","Map(vectorType -> sparse, length -> 7, indices -> List(0, 1, 4), values -> List(2.55, 6.0, 1.0))"


In [0]:
from pyspark.sql import Row

spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()


Out[35]: DataFrame[_1: bigint]