### Create a streaming analytics job

In [1]:
import redis
import mariadb

#For production systems, use a class instead
#https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.foreach.html
def write_to_redis(row):
    stats_key="last-action-stats"
    redis_conn=redis.Redis(host="localhost", 
                     port=6379, decode_responses=True)
    redis_conn.zincrby(stats_key,
                        row["duration"],row["country"])
    redis_conn.quit()       

def write_to_mariadb(row):
    #Connect to website_stats database
    summary_conn = mariadb.connect(
                user="spark",
                password="spark",
                host="127.0.0.1",
                port=3306,
                database="website_stats",
                autocommit=True
            )
    summary_cursor = summary_conn.cursor()
    
    summary_sql=f"""
            INSERT INTO `website_stats`.`visit_stats` 
                (INTERVAL_TIMESTAMP, LAST_ACTION, DURATION)
            VALUES('{row["window"]["start"]}',
                    '{row["last_action"]}',
                    '{row["duration"]}')
            """
    summary_cursor.execute(summary_sql)
    



In [2]:
from pyspark.sql.types import *
from pyspark.sql import functions
from pyspark.sql import SparkSession
import os

print("*************Starting Streaming Analytics for Website visits*****************")

schema = StructType([
                    StructField("country", StringType()),
                    StructField("last_action", StringType()),
                    StructField("visit_date", TimestampNTZType()),
                    StructField("duration", IntegerType())
                    ])

#.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1')\
#create spark session
streaming_spark = SparkSession\
            .builder\
            .appName("StreamingWebsiteAnalyticsJob")\
            .config("spark.sql.shuffle.partitions", 2)\
            .config("spark.default.parallelism", 2)\
            .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True)\
            .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version","2")\
            .config("spark.jars", "jars/mysql-connector-j-8.4.0.jar," +\
                                    "jars/commons-pool2-2.12.0.jar," +\
                                    "jars/kafka-clients-3.6.0.jar," + \
                                    "jars/spark-sql-kafka-0-10_2.12-3.5.1.jar," +\
                                    "jars/spark-token-provider-kafka-0-10_2.12-3.5.1.jar," +\
                                    "jars/spark-streaming-kafka-0-10_2.12-3.5.1.jar") \
            .config("spark.driver.extraClassPath","jars/*") \
            .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True)\
            .config("spark.driver.host", "127.0.0.1") \
            .master("local[2]")\
            .getOrCreate()

print("Reading from Kafka...")
raw_visits_df = streaming_spark\
                .readStream\
                .format("kafka")\
                .option("kafka.bootstrap.servers","localhost:9092")\
                .option("subscribe","spark.streaming.website.visits")\
                .option("startingOffsets","latest")\
                .load()

visits_df = raw_visits_df\
            .selectExpr("CAST(value AS STRING) as value")\
            .select(functions.from_json("value",schema).alias("visits"))\
            .select("visits.*")

#Write abandoned shopping carts to a Kafka topic
shopping_cart_df = visits_df\
                    .filter("last_action == 'ShoppingCart'")

shopping_cart_df.selectExpr("format_string(\"%s,%s,%s,%d\",country,last_action,visit_date,duration) as value")\
                .writeStream\
                .format("kafka")\
                .option("checkpointLocation", "tmp/cp-shoppingcart2")\
                .option("kafka.bootstrap.servers", "localhost:9092")\
                .option("topic", "spark.streaming.carts.abandoned")\
                .start()

#Update countrywise visit duration in real time to Redis
visits_df.select("country","duration")\
        .writeStream\
        .foreach(write_to_redis)\
        .start()

#Create sum(duration) for last action every 5 seconds and write to mariaDB
windowed_summary = visits_df\
                    .withColumn("timestamp",functions.current_timestamp())\
                    .withWatermark("timestamp","5 seconds")\
                    .groupBy(functions.window(\
                            functions.col("timestamp"),"5 seconds"),\
                            functions.col("last_action"))\
                    .agg(functions.sum(functions.col("duration")).alias("duration"))

windowed_summary\
    .writeStream\
.foreach(write_to_mariadb)\
    .start()\
    .awaitTermination()


*************Starting Streaming Analytics for Website visits*****************


25/03/28 12:05:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Reading from Kafka...


25/03/28 12:05:09 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/03/28 12:05:09 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/95/n1_0_mn132jf8fhwzdt9_xhc0000gn/T/temporary-2fad4e67-151a-4799-bbf1-1505663f9cd8. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/28 12:05:09 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/03/28 12:05:09 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/95/n1_0_mn132jf8fhwzdt9_xhc0000gn/T/temporary-a4b80d3e-0f56-4333-a977-1425f0d035e5. If it's required to delete it und

Py4JError: An error occurred while calling o108.awaitTermination

                                                                                