In [0]:
dbutils.fs.rm("/FileStore/tables/", recurse=True)


Out[3]: True

In [0]:
dbutils.fs.ls("/FileStore/tables")


Out[10]: [FileInfo(path='dbfs:/FileStore/tables/s1.csv', name='s1.csv', size=592, modificationTime=1745242183000),
 FileInfo(path='dbfs:/FileStore/tables/s2.csv', name='s2.csv', size=247, modificationTime=1745244457000)]

In [0]:
from pyspark.sql.functions import col, sum as _sum, count, window, to_timestamp
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType

# ✅ Define schema matching your dataset (Date stays StringType initially)
schema = StructType([
    StructField("Transaction ID", IntegerType(), True),
    StructField("Date", StringType(), True),  # Parse later
    StructField("Customer ID", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Product Category", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("Price per Unit", DoubleType(), True),
    StructField("Total Amount", DoubleType(), True)
])

# ✅ Read stream from DBFS
df = spark.readStream \
    .option("header", True) \
    .schema(schema) \
    .csv("/FileStore/tables")

# ✅ Convert Date column to TimestampType using proper format
df = df.withColumn("Date", to_timestamp("Date", "dd-MM-yyyy"))

# ✅ 10-minute windowed revenue per product category
revenue_by_category = df.groupBy(
    window(col("Date"), "10 minutes"),
    col("Product Category")
).agg(
    _sum("Total Amount").alias("total_revenue")
)

# ✅ 10-minute windowed transaction count per gender
transactions_by_gender = df.groupBy(
    window(col("Date"), "10 minutes"),
    col("Gender")
).agg(
    count("Transaction ID").alias("total_transactions")
)

# ✅ Top categories overall (not windowed)
top_categories = df.groupBy("Product Category") \
    .agg(count("Transaction ID").alias("transaction_count")) \
    .orderBy(col("transaction_count").desc())

# ✅ Write to memory tables for SQL queries
revenue_query = revenue_by_category.writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("revenue_category_table") \
    .start()

gender_query = transactions_by_gender.writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("gender_transactions_table") \
    .start()

top_query = top_categories.writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("top_categories_table") \
    .start()


In [0]:
#Revenue by product category (10-minute windowed)
spark.sql("""
    SELECT 
        window.start AS window_start, 
        window.end AS window_end, 
        `Product Category`, 
        total_revenue 
    FROM revenue_category_table
    ORDER BY total_revenue DESC
""").show(truncate=False)

+-------------------+-------------------+----------------+-------------+
|window_start       |window_end         |Product Category|total_revenue|
+-------------------+-------------------+----------------+-------------+
|2023-08-05 00:00:00|2023-08-05 00:10:00|Electronics     |1500.0       |
|2023-02-27 00:00:00|2023-02-27 00:10:00|Clothing        |1000.0       |
|2023-12-13 00:00:00|2023-12-13 00:10:00|Electronics     |600.0        |
|2023-05-21 00:00:00|2023-05-21 00:10:00|Clothing        |500.0        |
|2023-10-07 00:00:00|2023-10-07 00:10:00|Clothing        |200.0        |
|2023-11-24 00:00:00|2023-11-24 00:10:00|Beauty          |150.0        |
|2023-02-22 00:00:00|2023-02-22 00:10:00|Electronics     |100.0        |
|2023-05-06 00:00:00|2023-05-06 00:10:00|Beauty          |100.0        |
|2023-02-14 00:00:00|2023-02-14 00:10:00|Clothing        |100.0        |
|2023-10-30 00:00:00|2023-10-30 00:10:00|Beauty          |75.0         |
|2023-03-13 00:00:00|2023-03-13 00:10:00|Clothing  

In [0]:
#Transactions by gender (10-minute windowed)
spark.sql("""
    SELECT 
        window.start AS window_start, 
        window.end AS window_end, 
        Gender, 
        total_transactions 
    FROM gender_transactions_table
    ORDER BY total_transactions DESC
""").show(truncate=False)

+-------------------+-------------------+------+------------------+
|window_start       |window_end         |Gender|total_transactions|
+-------------------+-------------------+------+------------------+
|2023-11-24 00:00:00|2023-11-24 00:10:00|Male  |1                 |
|2023-02-27 00:00:00|2023-02-27 00:10:00|Female|1                 |
|2023-04-25 00:00:00|2023-04-25 00:10:00|Female|1                 |
|2023-01-13 00:00:00|2023-01-13 00:10:00|Male  |1                 |
|2023-05-21 00:00:00|2023-05-21 00:10:00|Male  |1                 |
|2023-08-05 00:00:00|2023-08-05 00:10:00|Male  |1                 |
|2023-05-06 00:00:00|2023-05-06 00:10:00|Male  |1                 |
|2023-02-22 00:00:00|2023-02-22 00:10:00|Male  |1                 |
|2023-03-13 00:00:00|2023-03-13 00:10:00|Male  |1                 |
|2023-10-30 00:00:00|2023-10-30 00:10:00|Male  |1                 |
|2023-10-07 00:00:00|2023-10-07 00:10:00|Female|1                 |
|2023-02-14 00:00:00|2023-02-14 00:10:00|Male  |

In [0]:
# Top overall product categories
spark.sql("""
    SELECT 
        `Product Category`, 
        transaction_count 
    FROM top_categories_table
    ORDER BY transaction_count DESC
""").show(truncate=False)

+----------------+-----------------+
|Product Category|transaction_count|
+----------------+-----------------+
|Clothing        |5                |
|Electronics     |4                |
|Beauty          |4                |
+----------------+-----------------+



In [0]:
#Total Revenue per Category (No Time Window)
spark.sql("""
    SELECT `Product Category`, SUM(total_revenue) AS total_revenue
    FROM revenue_category_table
    GROUP BY `Product Category`
    ORDER BY total_revenue DESC
""").show(truncate=False)


+----------------+-------------+
|Product Category|total_revenue|
+----------------+-------------+
|Electronics     |2230.0       |
|Clothing        |1850.0       |
|Beauty          |355.0        |
+----------------+-------------+



In [0]:
#Gender-wise Total Revenue (Join Two Tables)
spark.sql("""
    SELECT g.window.start AS window_start, 
           g.window.end AS window_end, 
           g.Gender, 
           COUNT(*) AS transaction_count, 
           SUM(r.total_revenue) AS revenue_generated
    FROM gender_transactions_table g
    LEFT JOIN revenue_category_table r
    ON g.window = r.window
    GROUP BY g.window.start, g.window.end, g.Gender
    ORDER BY revenue_generated DESC
""").show(truncate=False)


+-------------------+-------------------+------+-----------------+-----------------+
|window_start       |window_end         |Gender|transaction_count|revenue_generated|
+-------------------+-------------------+------+-----------------+-----------------+
|2023-08-05 00:00:00|2023-08-05 00:10:00|Male  |1                |1500.0           |
|2023-02-27 00:00:00|2023-02-27 00:10:00|Female|1                |1000.0           |
|2023-12-13 00:00:00|2023-12-13 00:10:00|Male  |1                |600.0            |
|2023-05-21 00:00:00|2023-05-21 00:10:00|Male  |1                |500.0            |
|2023-10-07 00:00:00|2023-10-07 00:10:00|Female|1                |200.0            |
|2023-11-24 00:00:00|2023-11-24 00:10:00|Male  |1                |150.0            |
|2023-02-14 00:00:00|2023-02-14 00:10:00|Male  |1                |100.0            |
|2023-02-22 00:00:00|2023-02-22 00:10:00|Male  |1                |100.0            |
|2023-05-06 00:00:00|2023-05-06 00:10:00|Male  |1                

In [0]:
#Average Transaction Value per Category
spark.sql("""
    SELECT `Product Category`, 
           SUM(total_revenue) / COUNT(*) AS avg_transaction_value
    FROM revenue_category_table
    GROUP BY `Product Category`
    ORDER BY avg_transaction_value DESC
""").show(truncate=False)


+----------------+---------------------+
|Product Category|avg_transaction_value|
+----------------+---------------------+
|Electronics     |557.5                |
|Clothing        |370.0                |
|Beauty          |88.75                |
+----------------+---------------------+



In [0]:
# Gender Distribution of Transactions
spark.sql("""
    SELECT Gender, SUM(total_transactions) AS total_transactions
    FROM gender_transactions_table
    GROUP BY Gender
    ORDER BY total_transactions DESC
""").show(truncate=False)


+------+------------------+
|Gender|total_transactions|
+------+------------------+
|Male  |10                |
|Female|3                 |
+------+------------------+



In [0]:
spark.sql("""
    SELECT 
        Gender, 
        total_transactions 
    FROM gender_transactions_table
    ORDER BY total_transactions DESC
""").show(truncate=False)


+------+------------------+
|Gender|total_transactions|
+------+------------------+
|Female|1                 |
|Female|1                 |
|Male  |1                 |
|Male  |1                 |
|Male  |1                 |
|Female|1                 |
|Male  |1                 |
|Male  |1                 |
|Male  |1                 |
|Male  |1                 |
|Male  |1                 |
|Male  |1                 |
|Male  |1                 |
+------+------------------+

