In [None]:
pip install nltk

In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession, Row, Window
from pyspark.sql.functions import *
from pyspark.sql.types import (
    StructType,
    StructField,
    LongType,
    StringType,
    DoubleType,
    IntegerType,
    BooleanType,
    FloatType,
)
from time import sleep

from nltk.corpus import stopwords
import nltk
import numpy as np

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("StreamReviews")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# We need to set the following configuration whenever we need to use GCS.
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set(
    "fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
)

# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "temp_de2023_2065718"
spark.conf.set("temporaryGcsBucket", bucket)


dataSchema = StructType(
    [
        StructField("id", StringType(), True),
        StructField("reviewId", StringType(), True),
        StructField("creationDate", StringType(), True),
        StructField("criticName", StringType(), True),
        StructField("isTopCritic", BooleanType(), True),
        StructField("originalScore", StringType(), True),
        StructField("reviewState", StringType(), True),
        StructField("publicatioName", StringType(), True),
        StructField("reviewText", StringType(), True),
        StructField("scoreSentiment", StringType(), True),
        StructField("reviewUrl", StringType(), True),
    ]
)

# Read the whole dataset as a batch
kafkaStream = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "kafka1:9093")
    .option("subscribe", "review")
    .option("startingOffsets", "earliest")
    .load()
)

df = kafkaStream.selectExpr("CAST(value AS STRING)")

df1 = df.select(from_json(df.value, dataSchema.simpleString()))

df1.printSchema()

sdf = df1.select(col("from_json(value).*"))

# Converting releaseDateTheaters to datetime
review_df = sdf.withColumn(
    "creationDate", to_date(col("creationDate"), "y-M-d")
)  # our dataframe with date column

# Define the grade dictionary as a broadcast variable for efficient use in UDF
grade_dct = spark.sparkContext.broadcast(
    {
        "A+": 0.985,
        "A": 0.945,
        "A-": 0.91,
        "B+": 0.88,
        "B": 0.845,
        "B-": 0.81,
        "C+": 0.78,
        "C": 0.745,
        "C-": 0.71,
        "D+": 0.68,
        "D": 0.645,
        "D-": 0.61,
        "F": 0.295,
    }
)


# Define the UDF for fixing the rating
def fix_rating_udf(x):
    try:
        if x in grade_dct.value.keys():
            return grade_dct.value[x]
        else:
            split_values = list(
                map(
                    lambda s: float(
                        s.replace('"', "").replace("'", "").replace("*", "").strip()
                    ),
                    x.split("/"),
                )
            )
            return split_values[0] / split_values[1]
    except:
        return None


# Register the UDF
fix_rating_spark_udf = udf(fix_rating_udf, FloatType())

# Apply the UDF to the 'originalScore' column
review_df = review_df.withColumn(
    "originalScore", fix_rating_spark_udf(col("originalScore"))
)

# Apply additional conditions to 'originalScore' column
review_df = review_df.withColumn(
    "originalScore", when(col("originalScore") > 1, 1).otherwise(col("originalScore"))
)
review_df = review_df.withColumn(
    "originalScore", when(col("originalScore") < 0, 0).otherwise(col("originalScore"))
)

# Tokenize and lower the token
word_df = review_df.withColumn("tokenized_review", split(col("reviewText"), " "))

# Load the English stopwords set
nltk.download("stopwords")
stop_wrds = set(stopwords.words("english"))


# Define a UDF to remove stopwords
def remove_stopwords_udf(tokens):
    return [word for word in tokens if word.lower() not in stop_wrds]


# Register the UDF
remove_stopwords_spark_udf = udf(remove_stopwords_udf, ArrayType(StringType()))

# Apply the UDF to the 'tokenized_review' column
word_df = word_df.dropna(subset="tokenized_review")
word_df = word_df.withColumn(
    "filtered_review", remove_stopwords_spark_udf(col("tokenized_review"))
)

# Obtain the word counts per "tokenized_review"
word_df = word_df.withColumn("word", explode("filtered_review"))
# Group by the word and count the occurrences
word_df = word_df.groupBy("id", "word").count()


# Get critic with harshes reviews


avgratingf = review_df.groupBy(window(col("creationDate"), "1 day")).agg(
    avg("originalScore")
)

rating_df = review_df.select(col("id"), col("originalScore"))

query2 = (
    word_df.orderBy(col("id"))
    .writeStream.queryName("token_count")
    .format("memory")
    .outputMode("complete")
    .start()
)

query1 = (
    avgratingf.writeStream.queryName("avg_score")
    .format("memory")
    .outputMode("complete")
    .start()
)

query3 = (
    rating_df.writeStream.queryName("rating")
    .format("memory")
    .outputMode("append")
    .start()
)


def my_foreach_batch_function_1(df, batch_id):
    # Saving the data to BigQuery as batch processing sink -see, use write(), save(), etc.
    df.write.format("bigquery").option(
        "table", "de23-398309.assignment2dataset.tokencount"
    ).mode("overwrite").save()


def my_foreach_batch_function_2(df, batch_id):
    # Saving the data to BigQuery as batch processing sink -see, use write(), save(), etc.
    df.write.format("bigquery").option(
        "table", "de23-398309.assignment2dataset.avgrating"
    ).mode("overwrite").save()


def my_foreach_batch_function_3(df, batch_id):
    # Saving the data to BigQuery as batch processing sink -see, use write(), save(), etc.
    df.write.format("bigquery").option(
        "table", "de23-398309.assignment2dataset.dist"
    ).mode("overwrite").save()


# Write to a sink - here, the output is written to a Big Query Table
# Use your gcp bucket name.
# ProcessingTime trigger with two-seconds micro-batch interval
token_countQuery = (
    word_df.orderBy(col("id"))
    .writeStream.outputMode("complete")
    .trigger(processingTime="2 seconds")
    .foreachBatch(my_foreach_batch_function_1)
    .start()
)

avg_ratingQuery = (
    avgratingf.writeStream.outputMode("complete")
    .trigger(processingTime="2 seconds")
    .foreachBatch(my_foreach_batch_function_2)
    .start()
)

rating_distQuery = (
    rating_df.writeStream.outputMode("append")
    .trigger(processingTime="2 seconds")
    .foreachBatch(my_foreach_batch_function_3)
    .start()
)

try:
    token_countQuery.awaitTermination()
    avg_ratingQuery.awaitTermination()
    rating_distQuery.awaitTermination()
except KeyboardInterrupt:
    token_countQuery.stop()
    avg_ratingQuery.stop()
    rating_distQuery.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")

try:
    for x in range(50):
        spark.sql("SELECT * FROM token_count").show()

        spark.sql("SELECT * FROM avg_score").show()

        spark.sql("SELECT * FROM rating").show()
        sleep(10)
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")

KeyboardInterrupt: 

root
 |-- id: string (nullable = true)
 |-- reviewId: string (nullable = true)
 |-- creationDate: string (nullable = true)
 |-- criticName: string (nullable = true)
 |-- isTopCritic: string (nullable = true)
 |-- originalScore: string (nullable = true)
 |-- reviewState: string (nullable = true)
 |-- publicatioName: string (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- scoreSentiment: string (nullable = true)
 |-- reviewUrl: string (nullable = true)