In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, column, desc, col

In [0]:
#dbutils.fs.mkdirs('/FileStore/tables/retail-data/by-day')
display(dbutils.fs.ls("/FileStore/tables/retail-data/by-day"))

path,name,size,modificationTime
dbfs:/FileStore/tables/retail-data/by-day/2010_12_01.csv,2010_12_01.csv,275001,1747917158000
dbfs:/FileStore/tables/retail-data/by-day/2010_12_02.csv,2010_12_02.csv,191826,1747917158000
dbfs:/FileStore/tables/retail-data/by-day/2010_12_03.csv,2010_12_03.csv,190700,1747917012000
dbfs:/FileStore/tables/retail-data/by-day/2010_12_05.csv,2010_12_05.csv,246056,1747917012000
dbfs:/FileStore/tables/retail-data/by-day/2010_12_06.csv,2010_12_06.csv,339039,1747917013000
dbfs:/FileStore/tables/retail-data/by-day/2010_12_07.csv,2010_12_07.csv,255832,1747917014000
dbfs:/FileStore/tables/retail-data/by-day/2010_12_08.csv,2010_12_08.csv,235974,1747917014000
dbfs:/FileStore/tables/retail-data/by-day/2010_12_09.csv,2010_12_09.csv,252904,1747917014000
dbfs:/FileStore/tables/retail-data/by-day/2010_12_10.csv,2010_12_10.csv,241468,1747917015000
dbfs:/FileStore/tables/retail-data/by-day/2010_12_12.csv,2010_12_12.csv,132120,1747917015000


In [0]:
# Create my_spark
spark = SparkSession.builder.appName("my_spark").getOrCreate()

# Print my_spark
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f951e249940>


In [0]:
staticDataFrame = spark.read.format("csv")\
    .option("header", "true")\
        .option('inferSchema', 'true')\
            .load('/FileStore/tables/retail-data/by-day/*.csv')

staticDataFrame.createOrReplaceTempView('retail_data')
staticSchema = staticDataFrame.schema

In [0]:
staticDataFrame\
    .selectExpr(
        "CustomerId",
        "(UnitPrice * Quantity) as total_cost",
        "InvoiceDate")\
            .groupby(
                col('CustomerId'), window(col("InvoiceDate"), "1 day"))\
                    .sum("total_cost")\
                        .show(5)

+----------+--------------------+-----------------+
|CustomerId|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|{2011-12-05 00:00...|            -37.6|
|   14126.0|{2011-11-29 00:00...|643.6300000000001|
|   13500.0|{2011-11-16 00:00...|497.9700000000001|
|   17160.0|{2011-11-08 00:00...|516.8499999999999|
|   15608.0|{2011-11-11 00:00...|            122.4|
+----------+--------------------+-----------------+
only showing top 5 rows



In [0]:
# Streaming code
streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
        .option('maxFilesPerTrigger', 1)\
            .format('csv')\
                .option('header', 'true')\
                    .load('/FileStore/tables/retail-data/by-day/*.csv')

In [0]:
streamingDataFrame.isStreaming

Out[7]: True

In [0]:
purchaseByCustomerPerHour = streamingDataFrame\
    .selectExpr(
        'CustomerId',
        '(UnitPrice * Quantity) as total_cost',
        'InvoiceDate')\
            .groupby(
                col('CustomerId'), window(col('InvoiceDate'), '1 day'))\
                    .sum('total_cost')

In [0]:
# memory = store in-memory table
# customer_purchases = name of the in-memory table
# complete = all the counts should be in the table
purchaseByCustomerPerHour.writeStream\
    .format('memory')\
        .queryName('customer_purchases')\
            .outputMode('complete')\
                .start()

In [0]:
spark.sql("""SELECT * FROM CUSTOMER_PURCHASES
          ORDER BY  `SUM(TOTAL_COST)` DESC""")\
              .show(5)

In [0]:
# console = printing to console
# customer_purchases = name of the in-memory table
# complete = all the counts should be in the table
purchaseByCustomerPerHour.writeStream\
    .format('console')\
        .queryName('customer_purchases')\
            .outputMode('complete')\
                .start()

Out[9]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f94e8d93640>

In [0]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



## Machine Learning and Advanced Analytics


In [0]:
from pyspark.sql.functions import date_format, col
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [0]:
preppedDataFrame = staticDataFrame\
    .na.fill(0)\
        .withColumn('day_of_week', date_format(col('InvoiceDate'), 'EEEE'))\
            .coalesce(5)

In [0]:
trainDataFrame = preppedDataFrame\
    .where("InvoiceDate < '2011-07-01'")

testDataFrame = preppedDataFrame\
    .where("InvoiceDate >= '2011-07-01'")

In [0]:
indexer = StringIndexer()\
    .setInputCol('day_of_week')\
        .setOutputCol('day_of_week_index')

In [0]:
# one hot encoding of weekdays
encoder = OneHotEncoder()\
    .setInputCol('day_of_week_index')\
        .setOutputCol('day_of_week_encoded')

In [0]:
# assemble the columns into a vector
vectorAssembler = VectorAssembler()\
    .setInputCols(['UnitPrice', 'Quantity', 'day_of_week_encoded'])\
        .setOutputCol('features')

In [0]:
# set up a pipeline to process the data
transformationPipeline = Pipeline()\
    .setStages([indexer, encoder, vectorAssembler])

In [0]:
# Fit transformer to dataset
fittedPipeline = transformationPipeline.fit(trainDataFrame)

In [0]:
# Use fitted pipeline to transform all our data
transformedTraining = fittedPipeline.transform(trainDataFrame)

In [0]:
transformedTraining.cache()

Out[33]: DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string, day_of_week_index: double, day_of_week_encoded: vector, features: vector]

In [0]:
# import and instantiate the relevant model
kmeans = KMeans()\
    .setK(20)\
        .setSeed(1)

In [0]:
kmModel = kmeans.fit(transformedTraining)


In [0]:
transformedTest = fittedPipeline.transform(testDataFrame)

In [0]:
# computeCost() has been removed in newer versions of PySpark (>= 3.0).
# computeCost() was deprecated and eventually removed because it was computationally inefficient.
# It calculated the sum of squared distances (SSD) from each point to its cluster center — 
# something you can now compute more explicitly using a transform().

# kmModel.computeCost(transformedTest)

In [0]:
predictions = kmModel.transform(transformedTest)

In [0]:
evaluator = ClusteringEvaluator(metricName = 'silhouette',
                                distanceMeasure = 'squaredEuclidean')

# Silhouette score (preferred over raw cost)
silhouette_score = evaluator.evaluate(predictions)
print(f'Silhouette score = {silhouette_score}')

Silhouette score = 0.5427938390491535


## Lower-Level APIs

In [0]:
from pyspark.sql import Row

In [0]:
spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()

Out[44]: DataFrame[_1: bigint]