In [7]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, IntegerType

from pyspark.sql.functions import *
from time import sleep

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("StreamingReviews")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

#Review_ID,Movie_ID,Reviewer_Name,Review_Rating,Reviewer_Nationality,Reviewer_Age,Review_Date,Sex
dataSchema = StructType(
    [StructField("Review_ID", StringType(), True),
     StructField("Movie_ID", StringType(), True),
     StructField("Reviewer_Name", StringType(), True),
     StructField("Review_Rating", IntegerType(), True),
     StructField("Reviewer_Nationality", StringType(), True),
     StructField("Reviewer_Age", IntegerType(), True),
     StructField("Review_Date", StringType(), True),
     StructField("Sex", StringType(), True)
     ])

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# We need to set the following configuration whenever we need to use GCS.
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set(
    "fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
)

# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "temp_de2024_2069997"
spark.conf.set("temporaryGcsBucket", bucket)

# Read the whole dataset as a batch
kafkaStream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("subscribe", "reviews") \
    .option("startingOffsets", "latest") \
    .load()

df = kafkaStream.selectExpr("CAST(value AS STRING)")

df1 = df.select(from_json(df.value, dataSchema.simpleString()))

df1.printSchema()

sdf = df1.select(col("from_json(value).*"))

# Converting releaseDateTheaters to datetime
sdf = sdf.withColumn(
    "Review_Date", to_date(col("Review_Date"), "M/d/yyyy")
)

sdf.printSchema()

# Step 1: Calculate the daily average rating for each movie
daily_movie_ratings = (
    sdf
    .groupBy("Movie_ID", "Review_Date")
    .agg(avg("Review_Rating").alias("Daily_Average_Rating"))
)

# Step 2: Filter movies with very low average ratings
low_rated_movies = (
    daily_movie_ratings
    .filter(col("Daily_Average_Rating") < 4.0)  # Threshold for low ratings
)

def my_foreach_batch_function(df, batch_id):
   # Saving the data to BigQuery as batch processing sink -see, use write(), save(), etc.
    df.write.format('bigquery') \
      .option('table', 'dataengineering-439112.labdataset.low_rated_movies') \
      .mode("overwrite") \
      .save()

# Output to the console or sink
query = (
    low_rated_movies
    .writeStream
    .outputMode("complete") 
    .trigger(processingTime="2 seconds")
    .foreachBatch(my_foreach_batch_function)
    .start()
)

try:
    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")

root
 |-- from_json(value): struct (nullable = true)
 |    |-- Review_ID: string (nullable = true)
 |    |-- Movie_ID: string (nullable = true)
 |    |-- Reviewer_Name: string (nullable = true)
 |    |-- Review_Rating: integer (nullable = true)
 |    |-- Reviewer_Nationality: string (nullable = true)
 |    |-- Reviewer_Age: integer (nullable = true)
 |    |-- Review_Date: string (nullable = true)
 |    |-- Sex: string (nullable = true)

root
 |-- Review_ID: string (nullable = true)
 |-- Movie_ID: string (nullable = true)
 |-- Reviewer_Name: string (nullable = true)
 |-- Review_Rating: integer (nullable = true)
 |-- Reviewer_Nationality: string (nullable = true)
 |-- Reviewer_Age: integer (nullable = true)
 |-- Review_Date: string (nullable = true)
 |-- Sex: string (nullable = true)



AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;
GlobalLimit 10
+- LocalLimit 10
   +- Sort [Movie_Average_Rating#262 DESC NULLS LAST], true
      +- Aggregate [Movie_ID#198], [Movie_ID#198, avg(Review_Rating#200) AS Movie_Average_Rating#262]
         +- Join Inner, (Reviewer_Nationality#201 = Reviewer_Nationality#230)
            :- Project [from_json(value)#195.Review_ID AS Review_ID#197, from_json(value)#195.Movie_ID AS Movie_ID#198, from_json(value)#195.Reviewer_Name AS Reviewer_Name#199, from_json(value)#195.Review_Rating AS Review_Rating#200, from_json(value)#195.Reviewer_Nationality AS Reviewer_Nationality#201, from_json(value)#195.Reviewer_Age AS Reviewer_Age#202, from_json(value)#195.Review_Date AS Review_Date#203, from_json(value)#195.Sex AS Sex#204]
            :  +- Project [from_json(StructField(Review_ID,StringType,true), StructField(Movie_ID,StringType,true), StructField(Reviewer_Name,StringType,true), StructField(Review_Rating,IntegerType,true), StructField(Reviewer_Nationality,StringType,true), StructField(Reviewer_Age,IntegerType,true), StructField(Review_Date,StringType,true), StructField(Sex,StringType,true), value#193, Some(Etc/UTC)) AS from_json(value)#195]
            :     +- Project [cast(value#180 as string) AS value#193]
            :        +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@1aebc2b3, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@27cf614b, [startingOffsets=latest, kafka.bootstrap.servers=kafka1:9093, subscribe=reviews], [key#179, value#180, topic#181, partition#182, offset#183L, timestamp#184, timestampType#185], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@59d8c1ad,kafka,List(),None,List(),None,Map(kafka.bootstrap.servers -> kafka1:9093, subscribe -> reviews, startingOffsets -> latest),None), kafka, [key#172, value#173, topic#174, partition#175, offset#176L, timestamp#177, timestampType#178]
            +- SubqueryAlias Lowest_Nationality
               +- Project [Reviewer_Nationality#230]
                  +- GlobalLimit 1
                     +- LocalLimit 1
                        +- Sort [Average_Rating#222 ASC NULLS FIRST], true
                           +- Aggregate [Reviewer_Nationality#230], [Reviewer_Nationality#230, avg(Review_Rating#229) AS Average_Rating#222]
                              +- Project [from_json(value)#195.Review_ID AS Review_ID#226, from_json(value)#195.Movie_ID AS Movie_ID#227, from_json(value)#195.Reviewer_Name AS Reviewer_Name#228, from_json(value)#195.Review_Rating AS Review_Rating#229, from_json(value)#195.Reviewer_Nationality AS Reviewer_Nationality#230, from_json(value)#195.Reviewer_Age AS Reviewer_Age#231, from_json(value)#195.Review_Date AS Review_Date#232, from_json(value)#195.Sex AS Sex#233]
                                 +- Project [from_json(StructField(Review_ID,StringType,true), StructField(Movie_ID,StringType,true), StructField(Reviewer_Name,StringType,true), StructField(Review_Rating,IntegerType,true), StructField(Reviewer_Nationality,StringType,true), StructField(Reviewer_Age,IntegerType,true), StructField(Review_Date,StringType,true), StructField(Sex,StringType,true), value#193, Some(Etc/UTC)) AS from_json(value)#195]
                                    +- Project [cast(value#180 as string) AS value#193]
                                       +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@1aebc2b3, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@27cf614b, [startingOffsets=latest, kafka.bootstrap.servers=kafka1:9093, subscribe=reviews], [key#179, value#180, topic#181, partition#182, offset#183L, timestamp#184, timestampType#185], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@59d8c1ad,kafka,List(),None,List(),None,Map(kafka.bootstrap.servers -> kafka1:9093, subscribe -> reviews, startingOffsets -> latest),None), kafka, [key#172, value#173, topic#174, partition#175, offset#176L, timestamp#177, timestampType#178]
