In [2]:
import pyspark as spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
file_path = '/home/fung/dev/project/spark/raw_data/Spark-The-Definitive-Guide/data/retail-data/by-day'
staticDataFrame = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load(f"{file_path}/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema


In [3]:
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")\
  .sort(desc("sum(total_cost)"))\
  .show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   17450.0|[2011-09-20 08:00...|          71601.44|
|      null|[2011-11-14 08:00...|          55316.08|
|      null|[2011-11-07 08:00...|          42939.17|
|      null|[2011-03-29 08:00...| 33521.39999999998|
|      null|[2011-12-08 08:00...|31975.590000000007|
+----------+--------------------+------------------+
only showing top 5 rows



In [4]:
streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger", 1)\
    .format("csv")\
    .option("header", "true")\
    .load(f"{file_path}/*.csv")

In [5]:
streamingDataFrame.isStreaming

True

In [6]:
purchaseByCustomerPerHour = streamingDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")

In [7]:
purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()


<pyspark.sql.streaming.StreamingQuery at 0x7efbe57b94a8>

In [8]:
spark.sql("""
  SELECT *
  FROM customer_purchases
  ORDER BY `sum(total_cost)` DESC
  """)\
  .show(5)

+----------+------+---------------+
|CustomerId|window|sum(total_cost)|
+----------+------+---------------+
+----------+------+---------------+



In [1]:
purchaseByCustomerPerHour.writeStream.format('console').queryName('customer_purchase_2').outputMode('complete').start()

NameError: name 'purchaseByCustomerPerHour' is not defined

In [5]:
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
  .na.fill(0)\
  .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
  .coalesce(5)

In [7]:
trainDataFrame = preppedDataFrame\
  .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
  .where("InvoiceDate >= '2011-07-01'")

In [10]:
trainDataFrame.count()
testDataFrame.count()

296006

In [12]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
  .setInputCol("day_of_week")\
  .setOutputCol("day_of_week_index")


In [14]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
  .setInputCol("day_of_week_index")\
  .setOutputCol("day_of_week_encoded")

In [16]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
  .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
  .setOutputCol("features")

In [18]:
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
  .setStages([indexer, encoder, vectorAssembler])

In [20]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)

In [22]:

transformedTraining = fittedPipeline.transform(trainDataFrame)

In [26]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
  .setK(20)\
  .setSeed(1)

In [28]:
kmModel = kmeans.fit(transformedTraining)

In [30]:
transformedTest = fittedPipeline.transform(testDataFrame)

In [31]:
from pyspark.sql import Row

spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()

DataFrame[_1: bigint]