In [1]:
twitter_schema = spark.read.json("twitter_sample.json").schema
instagram_schema = spark.read.json("instagram_sample.json").schema
facebook_schema = spark.read.json("facebook_sample.json").schema
youtube_schema = spark.read.json("youtube_sample.json").schema

23/04/27 11:47:21 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.


In [None]:
from pyspark.sql.types import MapType,StringType, ArrayType
from pyspark.sql.functions import from_json, from_unixtime, unix_timestamp, floor,window, approx_count_distinct, current_timestamp
from pyspark.sql.functions import col

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "mytopic") \
  .load()

df_twitter = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .filter(df.key == "twitter") \
    .select(col("key"), from_json(col("value").cast("string"), twitter_schema).alias("parsed_value")) \
    .select(col("key").alias('social_media'), from_unixtime(floor(unix_timestamp(col("parsed_value.created_at"),"EEE MMM dd HH:mm:ss ZZZZ yyyy") / 300)*300).alias('time'), col("parsed_value.user_id_str").alias('username')) \

df_instagram = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .filter(df.key == "instagram") \
    .select(col("key"), from_json(col("value").cast("string"), instagram_schema).alias("parsed_value")) \
    .select(col("key").alias('social_media'), from_unixtime(floor(col("parsed_value.created_time") / 300)*300).alias('time'), col("parsed_value.user.id").alias('username')) \

df_facebook = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .filter(df.key == "facebook") \
    .select(col("key"), from_json(col("value").cast("string"), facebook_schema).alias("parsed_value")) \
    .select(col("key").alias('social_media'), from_unixtime(floor(unix_timestamp(col("parsed_value.created_time"),format="yyyy-MM-dd'T'HH:mm:ss'+0000'") / 300)*300).alias('time'), col("parsed_value.from.id").alias('username')) \

df_youtube = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .filter(df.key == "youtube") \
    .select(col("key"), from_json(col("value").cast("string"), youtube_schema).alias("parsed_value")) \
    .select(col("key").alias('social_media'), from_unixtime(floor(unix_timestamp(col("parsed_value.snippet.publishedAt"),format="yyyy-MM-dd'T'HH:mm:ss'Z'") / 300)*300).alias('time'), col("parsed_value.snippet.channelId").alias('username')) \

df_all = df_twitter.union(df_instagram).union(df_facebook).union(df_youtube).selectExpr('social_media', 'CAST(time as timestamp)', 'username')
df_all = df_all.withWatermark("time", "15 seconds").groupBy(window(col('time'), "5 minutes"),col('social_media'))
# df_unique = df_all.agg(approx_count_distinct("username")).select(col('social_media'), col('window'), col('approx_count_distinct(username)').alias('unique_count'))
df_count = df_all.count()
# df_res = df_count.withColumn("unique_count",col("count"))\
#     .select(col('window.start').alias('time'), 'social_media', 'count', 'unique_count') \
#     .writeStream \
#     .format("console") \
#     .outputMode("append") \
#     .start() \
#     .awaitTermination()

def foreach_batch_function(df, epoch_id) -> None:
    df.write\
        .mode("append")\
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/pyspark") \
        .option("driver", "org.postgresql.Driver")\
        .option("dbtable", "social_media_dataframe") \
        .option("user", "bigdata")\
        .save()
    #df.format("console").outputMode("append")

df_res = df_count.withColumn("unique_count", col("count")).withColumn('created_time', current_timestamp())\
    .select(col('window.start').alias('time'), 'social_media', 'count', 'unique_count', 'created_time') 

df_res.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()
# df_res = df_count.withColumn("unique_count",col("count"))\
#     .select(col('window.start').alias('time'), 'social_media', 'count', 'unique_count') \
#     .writeStream \
#     .format("console") \
#     .outputMode("append") \
#     .start() \
#     .awaitTermination()

                                                                                