In [48]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, col, from_json, when, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [49]:
# Spark Configuration
sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Sales_Streaming_Pipeline")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

data_bucket_uri = "data_de2024_a2"
temp_bucket = "temp_de2024_mh"
project_id = "core-synthesis-435410-v9"

spark.conf.set('temporaryGcsBucket', temp_bucket)

In [50]:
dataSchema = StructType([
    StructField("payment_key", StringType(), True),
    StructField("coustomer_key", StringType(), True),
    StructField("time_key", StringType(), True),
    StructField("item_key", StringType(), True),
    StructField("store_key", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("unit", StringType(), True),
    StructField("unit_price", FloatType(), True),
    StructField("total_price", FloatType(), True)
])

In [51]:
# Auxiliary dimension Tables
itemDF = spark.read.format("csv").option("header", "true").load(f"gs://{data_bucket_uri}/item_dim.csv")
storeDF = spark.read.format("csv").option("header", "true").load(f"gs://{data_bucket_uri}/store_dim.csv")
timeDF = spark.read.format("csv").option("header", "true").load(f"gs://{data_bucket_uri}/time_dim.csv")
transDF = spark.read.format("csv").option("header", "true").load(f"gs://{data_bucket_uri}/Trans_dim.csv")

In [52]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("failOnDataLoss", "false") \
    .option("subscribe", "transaction") \
    .option("startingOffsets", "earliest") \
    .load()

# Cast the `value` column to STRING
df_string = df.selectExpr("CAST(value AS STRING)")

# Parse the JSON data
df1 = df_string.select(from_json(col("value"), dataSchema).alias("parsed_value"))

# Extract fields from the parsed JSON
sdf = df1.select(col("parsed_value.*"))

sdf.printSchema()

root
 |-- payment_key: string (nullable = true)
 |-- coustomer_key: string (nullable = true)
 |-- time_key: string (nullable = true)
 |-- item_key: string (nullable = true)
 |-- store_key: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit: string (nullable = true)
 |-- unit_price: float (nullable = true)
 |-- total_price: float (nullable = true)



In [53]:
# Drop eventual duplicates
itemDF_cleaned = itemDF.drop("unit_price", "unit_price")
# Join Fact Table with Item and Store Dimension Tables
joinedDF = sdf.join(itemDF_cleaned, "item_key").join(storeDF, "store_key").join(timeDF, "time_key").join(transDF, "payment_key") 

In [54]:
# Select only the relevant columns for the dashboard
dashboard_df = joinedDF.select(
    col("coustomer_key").alias("customer"),  # Renamed to "customer"
    col("store_key").alias("store"),         # Renamed to "store"
    col("item_name"),
    col("quantity"),
    col("unit_price"),
    col("total_price"),
    to_timestamp(col("date"), "dd-MM-yyyy HH:mm").alias("timestamp"),
    col("trans_type")
)

# Add a new column "price_category"
dashboard_df = dashboard_df.withColumn(
    "price_category",
    when(col("unit_price") > 50, "High")
    .when(col("unit_price") > 20, "Medium")
    .otherwise("Low")
)

In [None]:
query = dashboard_df.writeStream \
    .outputMode("append") \
    .foreachBatch(lambda df, batch_id: (
        print(f"Pushed batch: '{batch_id}'", f"~ Consisting of {df.count()} rows."),
        df.write
          .format('bigquery')
          .option('table', f'{project_id}.a2.transactions')
          .mode("append")
          .save()
    )) \
    .start()

try:
    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")
except:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Unexpected error")
    print("Stoped the streaming query and the spark context")

Pushed dashboard batch: 0 0 rows
+--------+-----+---------+--------+----------+-----------+----+----------+--------------+
|customer|store|item_name|quantity|unit_price|total_price|date|trans_type|price_category|
+--------+-----+---------+--------+----------+-----------+----+----------+--------------+
+--------+-----+---------+--------+----------+-----------+----+----------+--------------+

Pushed dashboard batch: 1 1 rows
+--------+------+--------------------+--------+----------+-----------+-------------------+----------+--------------+
|customer| store|           item_name|quantity|unit_price|total_price|               date|trans_type|price_category|
+--------+------+--------------------+--------+----------+-----------+-------------------+----------+--------------+
| C004510|S00307|M&M Peanut Candy ...|       1|      35.0|       35.0|2016-04-26 17:13:00|      card|        Medium|
+--------+------+--------------------+--------+----------+-----------+-------------------+----------+----

In [285]:
spark.stop()

In [None]:
# transactionCounts = sdf.groupBy("store_key").count()

# query = transactionCounts \
#               .select(concat(col("store_key"), lit(" "), col("count")).alias("value")) \
#               .writeStream \
#               .format("kafka") \
#               .option("kafka.bootstrap.servers", "kafka1:9093").option("failOnDataLoss", "false") \
#               .option("checkpointLocation", "/home/jovyan/checkpoint/store_transaction_count")\
#               .option("topic", "store_transaction_count") \
#               .outputMode("complete") \
#               .start()