## Spark Streaming Use Case: Analyzing Retail Data

- **Use Case Overview 📝**
 -  I'm working with a dataset comprising 243 CSV files containing retail sales data. These files are located at this [Github](https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/retail-data).
 -My  goal is to perform real-time analysis on this data using Spark Structured Streaming. This includes processing existing data and ensuring that any new rows added to existing files or new CSV files are also processed automatically.

**1 - Build SparkSession:**

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=4b65244883f5bdf4994790e5b77eadaa4c90356d23d2748676b15112c170bd04
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [5]:
spark = (SparkSession
.builder
.appName("project")
.getOrCreate())

**2- Read Static Data**

In [6]:
data_path = "/content/drive/MyDrive/retail-data"

In [7]:
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("recursiveFileLookup", "true") \
    .load(data_path)

In [8]:
# Create or replace a temporary view for SQL operations
df.createOrReplaceTempView("retail_data")

In [9]:
# Capture the schema of the DataFrame for reference
df_schema = df.schema

In [10]:
# Print the schema of the DataFrame to understand its structure
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [11]:
# Get the number of files read by the DataFrame
num_files_read = len(df.inputFiles())

In [12]:
# Print the number of files read
num_files_read

243

In [13]:
df.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   537226|    22811|SET OF 6 T-LIGHTS...|       6|2010-12-06 08:34:00|     2.95|   15987.0|United Kingdom|
|   537226|    21713|CITRONELLA CANDLE...|       8|2010-12-06 08:34:00|      2.1|   15987.0|United Kingdom|
|   537226|    22927|GREEN GIANT GARDE...|       2|2010-12-06 08:34:00|     5.95|   15987.0|United Kingdom|
|   537226|    20802|SMALL GLASS SUNDA...|       6|2010-12-06 08:34:00|     1.65|   15987.0|United Kingdom|
|   537226|    22052|VINTAGE CARAVAN G...|      25|2010-12-06 08:34:00|     0.42|   15987.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



**Now let's make some transformation to  all loaded files 🛠️**
-  grouping the data to determine the total cost for each customer per day, ordered by total cost, and retrieving the top 10 records. We then repeat the same process with streamed data to ensure that the top 10 records are identical.

In [14]:
# Define the SQL query for transformation
sql_query = """
SELECT
    CustomerID as CustomerId,
    sum(UnitPrice * Quantity) as total_cost,
    window(InvoiceDate, '1 day') as date_window
FROM
    retail_data
GROUP BY
    CustomerId, window(InvoiceDate, '1 day')
ORDER BY
    total_cost DESC
"""

# Execute the SQL query using Spark SQL
aggDF = spark.sql(sql_query)

# Show the top 10 rows of the aggregated DataFrame
aggDF.show(10)

+----------+------------------+--------------------+
|CustomerId|        total_cost|         date_window|
+----------+------------------+--------------------+
|   17450.0|          71601.44|{2011-09-20 00:00...|
|      NULL| 33521.39999999998|{2011-03-29 00:00...|
|   18102.0|31661.540000000005|{2011-09-15 00:00...|
|   18102.0|          25920.37|{2010-12-07 00:00...|
|      NULL|25399.560000000012|{2010-12-10 00:00...|
|      NULL|25371.769999999768|{2010-12-17 00:00...|
|   12415.0| 23426.81000000001|{2011-06-15 00:00...|
|      NULL|23395.099999999904|{2010-12-06 00:00...|
|      NULL| 23032.59999999993|{2011-08-30 00:00...|
|      NULL| 23021.99999999999|{2010-12-03 00:00...|
+----------+------------------+--------------------+
only showing top 10 rows



**3 - Streaming Data**

In [15]:
# Create a streaming DataFrame using the specified schema from static data
streamingdf = spark.readStream.schema(df_schema).option("maxFilesPerTrigger", 1).format("csv").option("header", "true").load("/content/drive/MyDrive/retail-data/*.csv")

# Register the streaming DataFrame as a temporary view for SQL operations
streamingdf.createOrReplaceTempView("streaming_data")

In [16]:
# Define the SQL query for transformation
sql_query = """
SELECT
    CustomerID as CustomerId,
    (UnitPrice * Quantity) as total_cost,
    InvoiceDate
FROM
    streaming_data
"""
# Register the result of the SQL query as another temporary view
transformedDataFrame = spark.sql(sql_query)
transformedDataFrame.createOrReplaceTempView("transformed_data")

In [20]:
# Define the final SQL query for aggregation
final_sql_query = """
SELECT
    CustomerId,
    window(InvoiceDate, '1 day') as window,
    SUM(total_cost) as total_cost
FROM
    transformed_data
GROUP BY
    CustomerId, window
"""

In [21]:
# Register the aggregated result as another temporary view
purchaseByCustomerPerDay = spark.sql(final_sql_query)
purchaseByCustomerPerDay.createOrReplaceTempView("customer_purchases")

In [22]:
# Start the streaming query to write aggregated data to memory
query = purchaseByCustomerPerDay.writeStream.format("memory").queryName("customer_purchases").outputMode("complete").start()

In [23]:
# Check the current status of the streaming query
status = query.status

In [24]:
# Check if the streaming query is active
is_active = query.isActive

In [25]:
status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [26]:
is_active

True

In [28]:
# get the last progress report
last_progress = query.lastProgress
print(f"Last progress: {last_progress}")

Last progress: {'id': '41bb6ce1-19fd-46f8-a1ce-2c8f468639be', 'runId': 'da86c286-e77d-491f-b1e4-c8286ff45ff9', 'name': 'customer_purchases', 'timestamp': '2024-07-25T01:21:29.220Z', 'batchId': 9, 'numInputRows': 963, 'inputRowsPerSecond': 91.80171591992374, 'processedRowsPerSecond': 87.13355048859935, 'durationMs': {'addBatch': 10866, 'commitOffsets': 48, 'getBatch': 9, 'latestOffset': 65, 'queryPlanning': 32, 'triggerExecution': 11052, 'walCommit': 29}, 'stateOperators': [{'operatorName': 'stateStoreSave', 'numRowsTotal': 597, 'numRowsUpdated': 24, 'allUpdatesTimeMs': 824, 'numRowsRemoved': 0, 'allRemovalsTimeMs': 0, 'commitTimeMs': 13800, 'memoryUsedBytes': 245216, 'numRowsDroppedByWatermark': 0, 'numShufflePartitions': 200, 'numStateStoreInstances': 200, 'customMetrics': {'loadedMapCacheHitCount': 3600, 'loadedMapCacheMissCount': 0, 'stateOnCurrentVersionSizeBytes': 153304}}], 'sources': [{'description': 'FileStreamSource[file:/content/drive/MyDrive/retail-data/*.csv]', 'startOffset

In [29]:
# Stop the query
query.stop()

In [30]:
# Query the result to display the top 10 rows
result = spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY total_cost DESC
""")
result.show(10)

+----------+--------------------+------------------+
|CustomerId|              window|        total_cost|
+----------+--------------------+------------------+
|      NULL|{2010-12-17 00:00...|25371.769999999768|
|      NULL|{2010-12-14 00:00...|15929.879999999974|
|   16029.0|{2010-12-16 00:00...| 8361.599999999999|
|      NULL|{2011-01-21 00:00...|  8360.54000000001|
|   14646.0|{2011-01-21 00:00...|8060.2999999999965|
|      NULL|{2010-12-13 00:00...| 7949.909999999991|
|   14088.0|{2011-01-21 00:00...| 7544.910000000001|
|      NULL|{2010-12-20 00:00...| 7167.169999999999|
|   12415.0|{2011-01-06 00:00...| 7011.379999999997|
|      NULL|{2010-12-23 00:00...| 6412.139999999996|
+----------+--------------------+------------------+
only showing top 10 rows

