In [278]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, avg, concat, lit, from_csv
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, IntegerType
from time import sleep

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Assignment2_stream")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

dataSchema = StructType(
    [StructField("title", StringType(), True),
     StructField("score", StringType(), True),
     StructField("id", StringType(), True),
     StructField("Subreddit", StringType(), True),
     StructField("url", StringType(), True),
     StructField("num_comments", StringType(), True),
     StructField("body", StringType(), True),
     StructField("timestamp_in_ms", StringType(), True)
     ])

kafkaStream = spark \
    .readStream\
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("subscribe", "reddit") \
    .option("startingOffsets", "earliest") \
    .load()

#Create structured data frame from Kafka stream
df = kafkaStream.selectExpr("CAST(value AS STRING)")
df1 = df.select(from_csv(df.value, dataSchema.simpleString()))
sdf = df1.select(col("from_csv(value).*"))

#type casting
sdf = sdf.withColumn("num_comments_int", col("num_comments").cast("int"))
sdf = sdf.withColumn("score_int", col("score").cast("int"))
sdf.printSchema()

withEventTimedf = sdf.selectExpr(
    "*",
    "cast(timestamp_in_ms/1000.0 as timestamp) as event_time")



root
 |-- title: string (nullable = true)
 |-- score: string (nullable = true)
 |-- id: string (nullable = true)
 |-- Subreddit: string (nullable = true)
 |-- url: string (nullable = true)
 |-- num_comments: string (nullable = true)
 |-- body: string (nullable = true)
 |-- timestamp_in_ms: string (nullable = true)
 |-- num_comments_int: integer (nullable = true)
 |-- score_int: integer (nullable = true)



In [279]:
#Calculate correlation score between number of comments and score of a topic.
corr_score = withEventTimedf.select(corr("num_comments_int", "score_int").alias("Score_Comment_Corr"))
corr_score = corr_score.withColumn("Group", lit("All Data Score")) 

resultsdf = corr_score
resultsdf = resultsdf.withColumn("value", col("Score_Comment_Corr").cast("string"))

In [280]:
resultdf = resultsdf.select(col("Group").alias("key"), col("value").alias("value"))

In [281]:
from pyspark.sql.functions import *

query = resultdf \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("checkpointLocation", "/home/jovyan/checkpoint") \
    .option("topic", "corr_score") \
    .outputMode("complete") \
    .start()
try:
    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")

Stoped the streaming query and the spark context


In [277]:
query2 = resultdf \
    .writeStream \
    .queryName("avg_score_window") \
    .format("memory") \
    .outputMode("complete") \
    .start()

try:
    for x in range(100):
        spark.sql("SELECT * FROM avg_score_window").show()
        sleep(10)
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")

+---+-----+
|key|value|
+---+-----+
+---+-----+

+--------------+------------------+
|           key|             value|
+--------------+------------------+
|All Data Score|0.7569652509225021|
+--------------+------------------+

+--------------+------------------+
|           key|             value|
+--------------+------------------+
|All Data Score|0.7569652509225021|
+--------------+------------------+

+--------------+------------------+
|           key|             value|
+--------------+------------------+
|All Data Score|0.7569652509225021|
+--------------+------------------+

+--------------+------------------+
|           key|             value|
+--------------+------------------+
|All Data Score|0.7569652509225021|
+--------------+------------------+

+--------------+------------------+
|           key|             value|
+--------------+------------------+
|All Data Score|0.7569652509225021|
+--------------+------------------+

Stoped the streaming query and the spark conte

In [282]:
spark.stop()

In [260]:
#The following operations are not used because of the limited usage of aggregate functions with streaming 
#data we got error.However, 
from pyspark.sql.functions import corr

ordered_desc_df = withEventTimedf.orderBy(col("score_int").desc())
ordered_asc_df = withEventTimedf.orderBy(col("score_int").asc())

highest_scores = ordered_desc_df.limit(10)
lowest_scores = ordered_asc_df.where("score_int is not null").limit(10)

corr_score1 = highest_scores.select(corr("num_comments_int", "score_int").alias("Score_Comment_Corr"))
corr_score1 = corr_score1.withColumn("Group", lit("High Score")) 
corr_score2 = lowest_scores.select(corr("num_comments_int", "score_int").alias("Score_Comment_Corr"))
corr_score2 = corr_score2.withColumn("Group", lit("Low Score"))

resultsdf = corr_score1.union(corr_score2)
resultsdf = resultsdf.withColumn("value", col("Score_Comment_Corr").cast("string"))

resultsdf.show()

+-------------------+----------+-------------------+
| Score_Comment_Corr|     Group|              value|
+-------------------+----------+-------------------+
|0.44981273791379345|High Score|0.44981273791379345|
|-0.7249747545295777| Low Score|-0.7249747545295777|
+-------------------+----------+-------------------+

