In [100]:
from spark_init import start_spark

spark = start_spark()
spark

In [101]:
from pyspark.sql.types import *

# schema = StructType([
#     StructField("InvoiceNo",   StringType(),  True),
#     StructField("StockCode",   StringType(),  True),
#     StructField("Description", StringType(),  True),
#     StructField("Quantity",    IntegerType(), True),
#     StructField("InvoiceDate", StringType(),  True),
#     StructField("UnitPrice",   DoubleType(),  True),
#     StructField("CustomerID",  DoubleType(),  True),
#     StructField("Country",     StringType(),  True),
# ])

staticDataFrame = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("../data/retail-data/all/online-retail-dataset.csv")

staticDataFrame.cache()
staticDataFrame.count()  # first full scan, then cached

staticDataFrame.createOrReplaceTempView("retail_data")

staticDataFrame.printSchema()
staticDataFrame.show(5)
staticDataFrame.describe().show()
staticDataFrame.describe("Quantity", "UnitPrice").show()

from pyspark.sql.functions import expr


df_ts = staticDataFrame.withColumn(
    "InvoiceTS",
    expr("to_timestamp(InvoiceDate, 'M/d/yyyy H:mm')")
)


root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|  

In [102]:
from pyspark.sql.functions import window, column, col, desc

df_ts.selectExpr(
    "CustomerID",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceTS")\
    .groupBy(
        col("CustomerID"),
        window(
            col("InvoiceTS"),
            "1 day"
        ),
    )\
    .sum("total_cost")\
    .orderBy(desc("CustomerID"))\
    .show(5, truncate=False)


+----------+------------------------------------------+------------------+
|CustomerID|window                                    |sum(total_cost)   |
+----------+------------------------------------------+------------------+
|18287     |{2011-05-21 19:00:00, 2011-05-22 19:00:00}|765.28            |
|18287     |{2011-10-11 19:00:00, 2011-10-12 19:00:00}|1001.3199999999999|
|18287     |{2011-10-27 19:00:00, 2011-10-28 19:00:00}|70.67999999999999 |
|18283     |{2011-01-05 18:00:00, 2011-01-06 18:00:00}|108.44999999999997|
|18283     |{2011-04-20 19:00:00, 2011-04-21 19:00:00}|117.67999999999994|
+----------+------------------------------------------+------------------+
only showing top 5 rows


In [103]:
staticDataFrame.select("InvoiceDate").show(5, False)


+--------------+
|InvoiceDate   |
+--------------+
|12/1/2010 8:26|
|12/1/2010 8:26|
|12/1/2010 8:26|
|12/1/2010 8:26|
|12/1/2010 8:26|
+--------------+
only showing top 5 rows


In [104]:
# from pyspark.sql.functions import to_timestamp

# df_ts = staticDataFrame.withColumn(
#     "InvoiceTimestamp",
#     to_timestamp("InvoiceDate", "MM/dd/yyyy HH:mm")  # adjust pattern if needed
# )


In [105]:
from pyspark.sql.functions import window, col, desc, try_to_timestamp

df_ts.selectExpr(
    "CustomerID",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceTS"
).groupBy(
    col("CustomerID"),
    window(col("InvoiceTS"), "1 day"),
).sum("total_cost") \
 .orderBy(desc("CustomerID")) \
 .show(5, truncate=False)


+----------+------------------------------------------+------------------+
|CustomerID|window                                    |sum(total_cost)   |
+----------+------------------------------------------+------------------+
|18287     |{2011-05-21 19:00:00, 2011-05-22 19:00:00}|765.28            |
|18287     |{2011-10-11 19:00:00, 2011-10-12 19:00:00}|1001.3199999999999|
|18287     |{2011-10-27 19:00:00, 2011-10-28 19:00:00}|70.67999999999999 |
|18283     |{2011-01-05 18:00:00, 2011-01-06 18:00:00}|108.44999999999997|
|18283     |{2011-04-20 19:00:00, 2011-04-21 19:00:00}|117.67999999999994|
+----------+------------------------------------------+------------------+
only showing top 5 rows


In [106]:
# from pyspark.sql.functions import to_timestamp

# df_ts = staticDataFrame.withColumn(
#     "InvoiceTimestamp",
#     to_timestamp("InvoiceDate", "M/d/yyyy H:mm")
# )
# df_ts.select("InvoiceDate", "InvoiceTimestamp").show(10, False)


In [107]:
from pyspark.sql.functions import window, col, desc

df_ts.selectExpr(
    "CustomerID",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceTS"
).groupBy(
    col("CustomerID"),
    window(col("InvoiceTS"), "1 day"),
).sum("total_cost") \
 .orderBy(desc("CustomerID")) \
 .show(5, truncate=False)


+----------+------------------------------------------+------------------+
|CustomerID|window                                    |sum(total_cost)   |
+----------+------------------------------------------+------------------+
|18287     |{2011-05-21 19:00:00, 2011-05-22 19:00:00}|765.28            |
|18287     |{2011-10-11 19:00:00, 2011-10-12 19:00:00}|1001.3199999999999|
|18287     |{2011-10-27 19:00:00, 2011-10-28 19:00:00}|70.67999999999999 |
|18283     |{2011-01-05 18:00:00, 2011-01-06 18:00:00}|108.44999999999997|
|18283     |{2011-04-20 19:00:00, 2011-04-21 19:00:00}|117.67999999999994|
+----------+------------------------------------------+------------------+
only showing top 5 rows


In [108]:
from pyspark.sql.functions import window, col

df_ts.selectExpr(
    "CustomerID",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceTS"
).groupBy(
    col("CustomerID"),
    window(col("InvoiceTS"), "1 day"),
).sum("total_cost") \
 .select(
     "CustomerID",
     "window.start",
     "window.end",
     "sum(total_cost)"
 ) \
 .orderBy("CustomerID", "window.start") \
 .show(200, truncate=False)


+----------+-------------------+-------------------+-------------------+
|CustomerID|start              |end                |sum(total_cost)    |
+----------+-------------------+-------------------+-------------------+
|NULL      |2010-11-30 18:00:00|2010-12-01 18:00:00|12584.299999999988 |
|NULL      |2010-12-01 18:00:00|2010-12-02 18:00:00|431.84999999999985 |
|NULL      |2010-12-02 18:00:00|2010-12-03 18:00:00|23021.99999999999  |
|NULL      |2010-12-04 18:00:00|2010-12-05 18:00:00|3.35               |
|NULL      |2010-12-05 18:00:00|2010-12-06 18:00:00|23395.099999999904 |
|NULL      |2010-12-06 18:00:00|2010-12-07 18:00:00|-13124.980000000067|
|NULL      |2010-12-07 18:00:00|2010-12-08 18:00:00|11199.20000000001  |
|NULL      |2010-12-08 18:00:00|2010-12-09 18:00:00|15354.279999999955 |
|NULL      |2010-12-09 18:00:00|2010-12-10 18:00:00|25399.560000000012 |
|NULL      |2010-12-11 18:00:00|2010-12-12 18:00:00|23.3               |
|NULL      |2010-12-12 18:00:00|2010-12-13 18:00:00

In [109]:
streamingDataFrame = spark.readStream\
    .option("maxFilesPerTrigger", 1)\
    .schema(staticDataFrame.schema)\
    .format("csv") \
    .option("header", "true") \
    .csv("../data/retail-data/by-day")   # <-- DIRECTORY, no .csv file


In [110]:
streamingDataFrame.isStreaming

True

In [111]:
# purchaseByCustomerPerHour = streamingDataFrame \
#     .selectExpr(
#         "CustomerID",
#         "(UnitPrice * Quantity) as total_cost",
#         "InvoiceDate"
#     ) \
#     .groupBy(
#         col("CustomerID"),
#         window(col("InvoiceDate"), "1 hour")
#     ) \
#     .sum("total_cost")


In [112]:
purchaseByCustomerPerHour.isStreaming

True

In [113]:
# purchaseByCustomerPerHour.writeStream \
#     .format("memory") \
#     .queryName("customer_purchases") \
#     .outputMode("complete") \
#     .start()


In [114]:
query = purchaseByCustomerPerHour.writeStream \
    .format("memory") \
    .queryName("customer_purchases") \
    .outputMode("complete") \
    .start()


IllegalArgumentException: Cannot start query with name customer_purchases as a query with that name is already active in this SparkSession

In [None]:
query.isActive
query.status
query.lastProgress


In [None]:
print("isActive:", query.isActive)
print("status:", query.status)
print("lastProgress:", query.lastProgress)


isActive: True
status: {'message': 'Getting offsets from FileStreamSource[file:/c:/Users/ryanh/dv/spark-definitive-guide-ryan/data/retail-data/by-day]', 'isDataAvailable': False, 'isTriggerActive': True}
lastProgress: None
