In [31]:
from google.cloud import bigquery

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, avg, concat, lit, from_json, when, asc, desc, count, expr, date_format, unix_timestamp, max
from pyspark.sql.types import ArrayType, StructType, StructField, LongType, StringType, DoubleType, IntegerType, DateType
from threading import Thread
from time import sleep

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Assignment2_Streaming")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

WINDOW_LENGTH = 15

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

dataSchemaIMDB = StructType(
    [
    StructField("id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("type", StringType(), True),
    StructField("genres", StringType(), True),
    StructField("averageRating", DoubleType(), True),
    StructField("numVotes", IntegerType(), True),
    StructField("releaseYear", IntegerType(), True)
    ])

# Because csv.DictReader() reads every value in the row as a string, we have to convert them manually in the processing pipeline
dataSchemaService = StructType(
    [StructField("show_id", StringType(), True),
     StructField("type", StringType(), True),
     StructField("title", StringType(), True),
     StructField("director", StringType(), True),
     StructField("cast", StringType(), True),
     StructField("date_added", StringType(), True),
     StructField("release_year", StringType(), True),
     StructField("rating", StringType(), True),
     StructField("duration", StringType(), True),
     StructField("listed_in", StringType(), True),
     StructField("description", StringType(), True),
     StructField("service", StringType(), True),
     StructField("timestamp_in_ms", LongType(), True)
     ])

# Read from BigQuery 
imdb_data = spark.read.format('bigquery') \
  .option('table', 'IMDB_data.score_data') \
  .load()

# Update the 'type' column to change 'movie' to 'Movie'
imdb_data = imdb_data.withColumn(
    "type",
    when(col("type") == "movie", "Movie").otherwise(col("type"))
)

# Update the 'type' column to change 'tvSeries' to 'TV Show'
imdb_data = imdb_data.withColumn(
    "type",
    when(col("type") == "tvSeries", "TV Show").otherwise(col("type"))
)

imdb_data.show(10)

# Read the whole Netflix dataset as a batch
serviceStream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("failOnDataLoss", "false") \
    .option("subscribe", "scraper") \
    .option("startingOffsets", "latest") \
    .load()

df_services = serviceStream.selectExpr("CAST(value AS STRING)")

df_services_good = df_services.select(from_json(df_services.value, dataSchemaService.simpleString()))

df_services_good.printSchema()

data_services = df_services_good.select(col("from_json(value).*"))

data_services.printSchema()

# Cast 'release_year' from string to integer
data_services = data_services.withColumn(
    "release_year",
    col("release_year").cast(IntegerType())
)

# create the event time column 
timed_data_services = data_services.selectExpr(
    "*",
    "cast(timestamp_in_ms/1000.0 as timestamp) as event_time")

timed_data_services.printSchema()

rated_data = timed_data_services.join(imdb_data, ["title", "type"], "inner")

rated_data = rated_data.withColumn(
    "averageRating",
    col("averageRating").cast(DoubleType())
)

top_score = rated_data \
    .where(col("averageRating").isNotNull()) \
    .withWatermark("event_time", "3 minute") \
    .groupBy(window(col("event_time"), "15 seconds"), "service") \
    .agg(
        # If-statement in sql starts with CASE and ends with END, this creates a column where if the averageRating >= 9 gets a 1 and then sums it up
        expr("SUM(CASE WHEN averageRating >= 9.0 THEN 1 ELSE 0 END)").alias("Score >9"),
        expr("SUM(CASE WHEN (averageRating >= 8.0 AND averageRating < 9.0) THEN 1 ELSE 0 END)").alias("Score 8-9")
    ) \

above_9 = top_score.select(concat(date_format(col("window.start"), "yyyy-MM-dd H:mm:ss"), lit(";"), lit(WINDOW_LENGTH), lit(";"), col("service")).alias("key"), col("Score >9").alias("value").cast("string"))

between_8_9 = top_score.select(concat(date_format(col("window.start"), "yyyy-MM-dd H:mm:ss"), lit(";"), lit(WINDOW_LENGTH), lit(";"), col("service")).alias("key"), col("Score 8-9").alias("value").cast("string"))  

query_above_9 = above_9 \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("checkpointLocation", "/home/jovyan/checkpoint/assignment/top_movies_window") \
    .option("topic", "top_movies") \
    .outputMode("update") \
    .start()

query_between_8_9 = between_8_9 \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("checkpointLocation", "/home/jovyan/checkpoint/assignment/subtop_movies_window") \
    .option("topic", "subtop_movies") \
    .outputMode("update") \
    .start()

try:
    query_above_9.awaitTermination()
    query_between_8_9.awaitTermination()
except KeyboardInterrupt:
    query_above_9.stop()
    query_between_8_9.stop()
    
    # Stop the spark context
    spark.stop()
    print("Stopped the streaming query and the spark context")

+---------+--------------------+-----+--------------------+-------------+--------+-----------+
|       id|               title| type|              genres|averageRating|numVotes|releaseYear|
+---------+--------------------+-----+--------------------+-------------+--------+-----------+
|tt0000147|The Corbett-Fitzs...|Movie|Documentary, News...|          5.2|     539|       1897|
|tt0000574|The Story of the ...|Movie|Action, Adventure...|          6.0|     941|       1906|
|tt0000679|The Fairylogue an...|Movie|  Adventure, Fantasy|          5.2|      78|       1908|
|tt0001049|      Gøngehøvdingen|Movie|          Drama, War|          4.2|      19|       1909|
|tt0001184|Don Juan de Serra...|Movie|    Adventure, Drama|          3.8|      22|       1910|
|tt0001285|   The Life of Moses|Movie|Biography, Drama,...|          5.5|      63|       1909|
|tt0001370|Rainha Depois de ...|Movie|      Drama, History|          5.3|      25|       1910|
|tt0001440|       Valdemar Sejr|Movie|      Drama,

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: reentrant call inside <_io.BufferedReader name=56>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip

Py4JError: An error occurred while calling o2857.awaitTermination

In [32]:
# Stop the spark context
spark.stop()