In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col, sum

spark = SparkSession.builder.appName("Retail_Sales_Streaming_ETL").getOrCreate()
print("Spark session started!")

Spark session started!


In [4]:
# Use any sample sales CSV with proper headers and valid data in your input folder
sample_file = '/home/jovyan/work/data/streaming_input/sales_1970-01-01.csv'  # update filename as needed
schema = spark.read.csv(sample_file, header=True, inferSchema=True).schema
print("Schema loaded!")

Schema loaded!


In [9]:
stream_df = spark.readStream.option("header", True).schema(schema).csv('/home/jovyan/work/data/streaming_input/')
stream_df = stream_df.withColumn("SALE_DATETIME", col("YEAR").cast("timestamp"))
print("Streaming DataFrame ready!")

Streaming DataFrame ready!


In [10]:
agg_df = stream_df.withWatermark("SALE_DATETIME", "10 days") \
    .groupBy("SALE_DATETIME", "ITEM TYPE") \
    .agg(
        sum(col("RETAIL SALES")).alias("total_retail_sales"),
        sum(col("WAREHOUSE SALES")).alias("total_warehouse_sales"),
        sum(col("RETAIL TRANSFERS")).alias("total_retail_transfers")
    )
print("Aggregation pipeline set up!")

Aggregation pipeline set up!


In [11]:
query = agg_df.writeStream.outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .start()

print("Streaming to console; drop valid CSV files in the streaming_input folder and watch output below.")
query.awaitTermination(10)

Streaming to console; drop valid CSV files in the streaming_input folder and watch output below.


False