In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, TimestampType, IntegerType
from pyspark.sql.functions import from_json, col
import time

# Define the path to the jars on the EC2 instance
spark_jars_path = "/home/ec2-user/stream-processing-template/jars"  # <-- Update this path

spark = SparkSession.builder.appName("retail_pysaprk_consumer") \
    .config("spark.jars", f"{spark_jars_path}/commons-pool2-2.11.1.jar,"
            f"{spark_jars_path}/spark-sql-kafka-0-10_2.12-3.4.0.jar,"
            f"{spark_jars_path}/spark-streaming-kafka-0-10-assembly_2.12-3.4.0.jar") \
    .getOrCreate()


In [30]:
# Define the schema for our data
schema = StructType([
    StructField("store_location", StringType(), True),
    StructField("time_of_purchase", TimestampType(), True),
    StructField("product_ID", StringType(), True),
    StructField("transaction_amount", IntegerType(), True)
])

# Stream from Kafka topic
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "b-1.monstercluster1.6xql65.c3.kafka.eu-west-2.amazonaws.com:9092") \
    .option("subscribe", "retail_transactions") \
    .load()

In [31]:
transactions = (df.selectExpr("CAST(value AS STRING)")
                .withColumn("data", from_json(col("value"), schema))
                .select("data.*"))

query = transactions.writeStream \
.format("memory") \
.queryName("temporary_view_three") \
.start()

query.awaitTermination(30)



23/12/14 17:00:39 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-4542f858-f2c3-4b8c-8af9-4aec4a394c77. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/12/14 17:00:39 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/12/14 17:00:39 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

False

In [32]:
processed_data = spark.sql("SELECT * FROM temporary_view_three")

In [33]:
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

transactions = processed_data.withColumn("transaction_amount", col("transaction_amount").cast(DoubleType()))

In [34]:
transactions.show()

+--------------+-------------------+----------+------------------+
|store_location|   time_of_purchase|product_ID|transaction_amount|
+--------------+-------------------+----------+------------------+
|   San Antonio|2023-12-14 17:00:40|    P00031|             923.0|
|  Philadelphia|2023-12-14 17:00:42|    P00008|             709.0|
|       Chicago|2023-12-14 17:00:45|    P00089|             910.0|
|  Philadelphia|2023-12-14 17:00:47|    P00082|              14.0|
|  Philadelphia|2023-12-14 17:00:49|    P00096|             588.0|
|      New York|2023-12-14 17:00:51|    P00048|             982.0|
|       Houston|2023-12-14 17:00:54|    P00099|             314.0|
|   San Antonio|2023-12-14 17:00:57|    P00019|              24.0|
|   San Antonio|2023-12-14 17:00:59|    P00069|             700.0|
|   San Antonio|2023-12-14 17:01:01|    P00062|             859.0|
|       Phoenix|2023-12-14 17:01:04|    P00085|             989.0|
|       Chicago|2023-12-14 17:01:05|    P00091|             92

In [35]:
from pyspark.sql.functions import sum
total_transactions = transactions.groupBy("store_location").agg(sum("transaction_amount").alias("total_amount"))
total_transactions.show()

+--------------+------------+
|store_location|total_amount|
+--------------+------------+
|       Phoenix|       989.0|
|   San Antonio|      2605.0|
|  Philadelphia|      1311.0|
|       Chicago|      1838.0|
|       Houston|       314.0|
|      New York|      1161.0|
+--------------+------------+



In [36]:
with_discount = transactions.withColumn("discounted_amount", col("transaction_amount") * 0.9)
with_discount.show()

+--------------+-------------------+----------+------------------+-----------------+
|store_location|   time_of_purchase|product_ID|transaction_amount|discounted_amount|
+--------------+-------------------+----------+------------------+-----------------+
|   San Antonio|2023-12-14 17:00:40|    P00031|             923.0|            830.7|
|  Philadelphia|2023-12-14 17:00:42|    P00008|             709.0|            638.1|
|       Chicago|2023-12-14 17:00:45|    P00089|             910.0|            819.0|
|  Philadelphia|2023-12-14 17:00:47|    P00082|              14.0|             12.6|
|  Philadelphia|2023-12-14 17:00:49|    P00096|             588.0|            529.2|
|      New York|2023-12-14 17:00:51|    P00048|             982.0|883.8000000000001|
|       Houston|2023-12-14 17:00:54|    P00099|             314.0|            282.6|
|   San Antonio|2023-12-14 17:00:57|    P00019|              24.0|             21.6|
|   San Antonio|2023-12-14 17:00:59|    P00069|             700.0

In [37]:
with_discount.filter(with_discount.store_location != 'online').show()

+--------------+-------------------+----------+------------------+-----------------+
|store_location|   time_of_purchase|product_ID|transaction_amount|discounted_amount|
+--------------+-------------------+----------+------------------+-----------------+
|   San Antonio|2023-12-14 17:00:40|    P00031|             923.0|            830.7|
|  Philadelphia|2023-12-14 17:00:42|    P00008|             709.0|            638.1|
|       Chicago|2023-12-14 17:00:45|    P00089|             910.0|            819.0|
|  Philadelphia|2023-12-14 17:00:47|    P00082|              14.0|             12.6|
|  Philadelphia|2023-12-14 17:00:49|    P00096|             588.0|            529.2|
|      New York|2023-12-14 17:00:51|    P00048|             982.0|883.8000000000001|
|       Houston|2023-12-14 17:00:54|    P00099|             314.0|            282.6|
|   San Antonio|2023-12-14 17:00:57|    P00019|              24.0|             21.6|
|   San Antonio|2023-12-14 17:00:59|    P00069|             700.0

In [38]:
with_discount.filter(with_discount.store_location == 'online').show()

+--------------+----------------+----------+------------------+-----------------+
|store_location|time_of_purchase|product_ID|transaction_amount|discounted_amount|
+--------------+----------------+----------+------------------+-----------------+
+--------------+----------------+----------+------------------+-----------------+



                                                                                

There don't seem to be any transactions online no matter how long I run the query for.

In [47]:
import plotly
import plotly.express as px
import pandas as pd

                                                                                

In [51]:
total_transactions_pd = total_transactions.toPandas()


                                                                                