In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, TimestampType, IntegerType
from pyspark.sql.functions import from_json, col, avg, sum
import time

# Define the path to the jars on the EC2 instance
spark_jars_path = "/home/ec2-user/stream-processing-template/jars"  # <-- Update this path

spark = SparkSession.builder.appName("retail_pysaprk_consumer") \
    .config("spark.jars", f"{spark_jars_path}/commons-pool2-2.11.1.jar,"
            f"{spark_jars_path}/spark-sql-kafka-0-10_2.12-3.4.0.jar,"
            f"{spark_jars_path}/spark-streaming-kafka-0-10-assembly_2.12-3.4.0.jar") \
    .getOrCreate()


23/12/13 15:06:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Define the schema for our data
schema = StructType([
    StructField("store_location", StringType(), True),
    StructField("time_of_purchase", TimestampType(), True),
    StructField("product_ID", StringType(), True),
    StructField("transaction_amount", IntegerType(), True)
])

# Stream from Kafka topic
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "b-1.monstercluster1.6xql65.c3.kafka.eu-west-2.amazonaws.com:9092") \
    .option("subscribe", "retail_transactions") \
    .load()

In [4]:
transactions = (df.selectExpr("CAST(value AS STRING)")
                .withColumn("data", from_json(col("value"), schema))
                .select("data.*"))

query = transactions.writeStream \
    .format("memory") \
    .queryName("temporary_view_one") \
    .start()

query.awaitTermination(15)

23/12/13 15:06:30 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-50fe8e47-e374-42b7-9b30-c607543978e2. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/12/13 15:06:30 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/12/13 15:06:32 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

False

In [5]:
processed_data = spark.sql("SELECT * FROM temporary_view_one")

# Now you can perform aggregations or other transformations on `processed_data`


                                                                                

In [13]:
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

transactions = processed_data.withColumn("transaction_amount", col("transaction_amount").cast(DoubleType()))


In [14]:
transactions.tail(5)

[Row(store_location='Chicago', time_of_purchase=datetime.datetime(2023, 12, 13, 15, 7, 50), product_ID='P00029', transaction_amount=103.0),
 Row(store_location='Phoenix', time_of_purchase=datetime.datetime(2023, 12, 13, 15, 7, 52), product_ID='P00047', transaction_amount=239.0),
 Row(store_location='Chicago', time_of_purchase=datetime.datetime(2023, 12, 13, 15, 7, 54), product_ID='P00017', transaction_amount=244.0),
 Row(store_location='Chicago', time_of_purchase=datetime.datetime(2023, 12, 13, 15, 7, 56), product_ID='P00068', transaction_amount=696.0),
 Row(store_location='Houston', time_of_purchase=datetime.datetime(2023, 12, 13, 15, 7, 59), product_ID='P00047', transaction_amount=515.0)]

                                                                                

In [None]:
from pyspark.sql.functions import sum
total_transactions = transactions.groupBy("store_location").agg(sum("transaction_amount").alias("total_amount"))
total_transactions.show()

In [None]:
transaction_counts = transactions.groupBy("store_location").count().withColumnRenamed("count", "transaction_count")
transaction_counts.show()

In [None]:
average_transactions = transactions.groupBy("store_location").agg(avg("transaction_amount").alias("average_amount"))
average_transactions.show()