In [1]:
import findspark
import json
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0,com.datastax.spark:spark-cassandra-connector_2.11:2.5.0,com.github.jnr:jffi:1.2.19 pyspark-shell'
findspark.init()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import col, struct, lit
import datetime
import calendar
from collections import Counter
spark = SparkSession.builder.getOrCreate()
from pyspark.sql import Window

In [2]:
df = spark.\
      readStream.\
      format("kafka").\
      option("kafka.bootstrap.servers", "localhost:9092").\
      option("subscribe", "raw-meetups").\
      option("startingOffsets", "earliest").\
      load()

In [3]:
struct = T.StructType([
    T.StructField('venue', T.StructType([
        T.StructField("venue_name", T.StringType()),
        T.StructField("lon", T.FloatType()),
        T.StructField("lat", T.FloatType()),
        T.StructField("venue_id", T.IntegerType())
    ])),
    T.StructField("visibility", T.StringType()),
    T.StructField("response", T.StringType()),
    T.StructField("guests", T.IntegerType()),
    T.StructField('member', T.StructType([
        T.StructField("member_id", T.IntegerType()),
        T.StructField("photo", T.StringType()),
        T.StructField("member_name", T.StringType())
    ])),
    T.StructField("rsvp_id", T.IntegerType()),
    T.StructField("mtime", T.LongType()),
    T.StructField('event', T.StructType([
        T.StructField("event_name", T.StringType()),
        T.StructField("event_id", T.StringType()),
        T.StructField("time", T.LongType()),
        T.StructField("event_url", T.StringType())
    ])),
    T.StructField('group', T.StructType([
        T.StructField("group_topics", T.ArrayType(T.StructType([
            T.StructField("urlkey", T.StringType()),
            T.StructField("topic_name", T.StringType())
        ]))),
        T.StructField("group_city", T.StringType()),
        T.StructField("group_country", T.StringType()),
        T.StructField("group_id", T.IntegerType()),
        T.StructField("group_name", T.StringType()),
        T.StructField("group_lon", T.FloatType()),
        T.StructField("group_urlname", T.StringType()),
        T.StructField("group_state", T.StringType()),
        T.StructField("group_lat", T.FloatType())
    ]))
])

In [4]:
states_names = spark.read.json("data/USstate.json")
countries_names = spark.read.json("data/Countries.json")

In [5]:
json_parsed_df = df.select(
    col('timestamp'),
    F.from_json(col("value").cast("string"), struct).alias("json_parsed")
).select(
    'timestamp',
    col('json_parsed.event.event_id'),
    col('json_parsed.event.event_name'),
    col('json_parsed.event.time'),
    col('json_parsed.group.group_id'),
    col('json_parsed.group.group_country'),
    col('json_parsed.group.group_state'),
    col('json_parsed.group.group_city'),
    col('json_parsed.group.group_topics.topic_name'),
    col('json_parsed.group.group_name')
).join(countries_names, col("group_country") == countries_names.country_code)

In [6]:
# TASK A_1 
json_parsed_df.withWatermark("timestamp", "1 minute").groupBy(
    F.window("timestamp", "1 minute", "1 minute"), 'country_name'
).agg(
    F.count('country_name').alias('count')
).select(
    F.struct(
        col('window.start').alias("datetime_start"), 
        col('window.end').alias("datetime_end"), 
        F.create_map(["country_name","count"]).alias("map_item")    
    ).alias("res")
).select(F.to_json('res').alias('value')).writeStream\
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "tt8-meetups") \
.option("checkpointLocation", "home/mmatsi/tmp/US-meetups_topick_ckp_aaaaaaa") \
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f1a296cc4e0>

In [7]:
def sum_maps(r):
    res = dict()
    for i in r:
        key = next(iter(i))
        if key in res:
            res[key] += i[key]
        else:
            res[key] = i[key]

    return [{key: res[key]} for key in res]

sum_maps_udf = F.udf(sum_maps, T.ArrayType( T.MapType(T.StringType(), T.IntegerType())))

In [15]:
# TASK A_1_res
a1_struct = T.StructType([
    T.StructField("datetime_start", T.TimestampType()),
    T.StructField("datetime_end", T.TimestampType()),
    T.StructField("map_item", T.MapType(T.StringType(), T.IntegerType())),
])

a1_json_parsed_df = spark.\
      readStream.\
      format("kafka").\
      option("kafka.bootstrap.servers", "localhost:9092").\
      option("subscribe", "tt8-meetups").\
      option("startingOffsets", "earliest").\
      load().select(
            F.from_json(col("value").cast("string"), a1_struct).alias("json_parsed")
      ).select("json_parsed.*")

a1_json_parsed_df.withWatermark("datetime_end", "1 minute").groupBy(
    F.window("datetime_end", "6 minute", "1 minute")
)\
.agg(
    F.first("window.start").alias("timestamp_start"),
    F.first("window.end").alias("timestamp_end"),  
    F.collect_list("map_item").alias("statistics")
).select(
    F.struct(
         F.concat(F.hour('timestamp_start'),lit(":"),F.minute('timestamp_start')).alias("time_start"),
         F.concat(F.hour('timestamp_end'),lit(":"),F.minute('timestamp_end')).alias("time_end"),
         col('timestamp_end').alias("time_end"),
         sum_maps_udf(col('statistics')).alias("statistics")
    ).alias("res")
).select(F.to_json('res').alias('value')).writeStream\
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "meetups-by-country1") \
.option("checkpointLocation", "home/mmatsi/tmp/US-meetups_topick_ckp_bbb") \
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f1a297315f8>

In [6]:
# TASK A_2
json_parsed_df.filter(col('group_country')=='us').withColumn('topic_exploded', F.explode('topic_name')).withWatermark("timestamp", "1 minute").groupBy(
    F.window("timestamp", "1 minute", "1 minute"), 'group_state'
).agg(
    F.collect_set('topic_exploded').alias('topic_list')
).join(states_names, col("group_state") == states_names.code).select(
    F.struct(
        col('window.start').alias("datetime_start"), 
        col('window.end').alias("datetime_end"), 
        F.create_map(["state_name","topic_list"]).alias("map_topics")   
    ).alias("res")
).select(F.to_json('res').alias('value')).writeStream\
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "topics-by-state_preparation1") \
.option("checkpointLocation", "home/mmatsi/tmp/topics-by-stlate_preparation") \
.start()


<pyspark.sql.streaming.StreamingQuery at 0x7f3a33c952b0>

In [7]:
def concat_maps(r):
    res = dict()
    for i in r:
        key = next(iter(i))
        if key in res:
            res[key] += i[key]
        else:
            res[key] = i[key]

    return [{key: list(set(res[key]))} for key in res]

concat_maps_udf = F.udf(concat_maps, T.ArrayType( T.MapType(
        T.StringType(), 
        T.ArrayType(T.StringType())
    )))

In [8]:
# TASK A_2_res
a2_struct = T.StructType([
    T.StructField("datetime_start", T.TimestampType()),
    T.StructField("datetime_end", T.TimestampType()),
    T.StructField("map_topics", T.MapType(
        T.StringType(), 
        T.ArrayType(T.StringType())
    ))
])

a2_json_parsed_df = spark.\
      readStream.\
      format("kafka").\
      option("kafka.bootstrap.servers", "localhost:9092").\
      option("subscribe", "topics-by-state_preparation1").\
      option("startingOffsets", "earliest").\
      load().select(
            F.from_json(col("value").cast("string"), a2_struct).alias("json_parsed")
      ).select("json_parsed.*")

a2_json_parsed_df.withWatermark("datetime_end", "1 minute").groupBy(
    F.window("datetime_end", "3 minute", "1 minute")
)\
.agg(
    F.first("window.start").alias("timestamp_start"),
    F.first("window.end").alias("timestamp_end"),  
    F.collect_list("map_topics").alias("statistics")
)\
.select(
    F.struct(
         F.concat(F.hour('timestamp_start'),lit(":"),F.minute('timestamp_start')).alias("time_start"),
         F.concat(F.hour('timestamp_end'),lit(":"),F.minute('timestamp_end')).alias("time_end"),
         concat_maps_udf(col('statistics')).alias("statistics")
    ).alias("res")
)\
# .writeStream.format("console").start()
.select(F.to_json('res').alias('value')).writeStream\
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "topics-by-state2") \
.option("checkpointLocation", "home/mmatsi/tmp/US-meetupsl_topkklick_cfkp_bbsdfb") \
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f3a33cb34a8>

In [7]:
# TASK A_3
json_parsed_df.withColumn('topic_name_exp', F.explode('topic_name')).withWatermark("timestamp", "1 minute").groupBy(
    F.window("timestamp", "1 minute", "1 minute"), 'country_name', 'topic_name_exp'
).agg(
    F.count('topic_name_exp').alias('topic_count')
).select(
    F.struct(
        col('window.end').alias("datetime_end"), 
        col('country_name'),
        col('topic_name_exp'),
        col('topic_count')
    ).alias("res")
).select(F.to_json('res').alias('value')).writeStream\
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "topics-by-country_prefetching") \
.option("checkpointLocation", "home/mmatsi/tmp/topics-by-stlaste_preparation") \
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f4293d37940>

In [9]:
# TASK A_3_proc
a3_struct = T.StructType([
    T.StructField("datetime_end", T.TimestampType()),
    T.StructField("country_name", T.StringType()),
    T.StructField("topic_name_exp", T.StringType()),
    T.StructField("topic_count", T.IntegerType()),
])

a3_json_parsed_df = spark.\
      readStream.\
      format("kafka").\
      option("kafka.bootstrap.servers", "localhost:9092").\
      option("subscribe", "topics-by-country_prefetching").\
      option("startingOffsets", "earliest").\
      load().select(
            F.from_json(col("value").cast("string"), a3_struct).alias("json_parsed")
      ).select("json_parsed.*")

a3_json_parsed_df.withWatermark("datetime_end", "1 minute").groupBy(
    F.window("datetime_end", "6 minute", "1 minute"), 'country_name', 'topic_name_exp'
)\
.agg(
    F.sum('topic_count').alias("topic_sum")
).select(
    F.struct(
        col("window.start").alias("timetamp_start"),
        col("window.end").alias("timetamp_end"),
        col("country_name"),
        col("topic_name_exp"),
        col("topic_sum")    
    ).alias("res")
).select(F.to_json('res').alias('value')).writeStream\
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "topics-by-country_processed") \
.option("checkpointLocation", "home/mmatsi/tmp/topics-by-stlastke_preparation") \
.start()


<pyspark.sql.streaming.StreamingQuery at 0x7f4293c9c3c8>

In [12]:
# TASK A_3_postprocessing
a3_struct_res = T.StructType([
    T.StructField("timetamp_start", T.TimestampType()),
    T.StructField("timetamp_end", T.TimestampType()),
    T.StructField("country_name", T.StringType()),
    T.StructField("topic_name_exp", T.StringType()),
    T.StructField("topic_sum", T.IntegerType()),
])

a3_res_json_parsed_df = spark.\
      readStream.\
      format("kafka").\
      option("kafka.bootstrap.servers", "localhost:9092").\
      option("subscribe", "topics-by-country_processed").\
      option("startingOffsets", "earliest").\
      load().select(
            F.from_json(col("value").cast("string"), a3_struct_res).alias("json_parsed")
      ).select("json_parsed.*")

a3_res_json_parsed_df.withColumn("topic_map", F.struct(
        col('topic_sum'),
        col('topic_name_exp'),
)).withWatermark("timetamp_start", "1 minute").groupBy(
    "timetamp_start", "timetamp_end", "country_name"
).agg(
    F.max("topic_map").alias("max_map")
).select(
    F.struct(
        col('timetamp_start'),
        col('timetamp_end'),
        col('country_name'),
        col('max_map.topic_name_exp').alias("topic_name_exp"),
        col('max_map.topic_sum').alias("topic_sum")
    ).alias("res")
).select(F.to_json('res').alias('value')).writeStream\
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "topics-by-country_postprocess2") \
.option("checkpointLocation", "home/mmatsi/tmp/topics-by-stllkastke_preparation") \
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f4293f84cf8>

In [21]:
a3_res_json_parsed_df = spark.\
      readStream.\
      format("kafka").\
      option("kafka.bootstrap.servers", "localhost:9092").\
      option("subscribe", "topics-by-country_postprocess2").\
      option("startingOffsets", "earliest").\
      load().select(
            F.from_json(col("value").cast("string"), a3_struct_res).alias("json_parsed")
      ).select("json_parsed.*")

a3_res_json_parsed_df.withWatermark("timetamp_start", "1 minute").groupBy(
    "timetamp_start", "timetamp_end"
).agg(
    F.collect_list(F.create_map(["country_name","topic_sum"])).alias("statistics")
).select(
    F.struct(
        F.concat(F.hour('timetamp_start'),lit(":"),F.minute('timetamp_start')).alias("time_start"),
        F.concat(F.hour('timetamp_end'),lit(":"),F.minute('timetamp_end')).alias("time_end"),
        col('statistics')
    ).alias("res")
).select(F.to_json('res').alias('value')).writeStream\
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "topics-by-country") \
.option("checkpointLocation", "home/mmatsi/tmp/topics-bykk-stlkastke_preparation") \
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f4293ca30f0>

In [9]:
hosts = {
    "spark.cassandra.connection.host": '34.240.57.255',
    "spark.cassandra.auth.username": 'cassandra',
    "spark.cassandra.auth.password": 'cassandra'
}

In [10]:
# TASK_B_1
json_parsed_df.select(
    col('country_name')
).writeStream.format("org.apache.spark.sql.cassandra")\
.option("checkpointLocation", "kkk1")\
.options(**hosts)\
.option("keyspace", "meetup_net_project")\
.option("table", "countries_set")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7fb557ce5278>

In [11]:
# TASK_B_2
json_parsed_df.select(
    col('country_name'),
    col('group_city'),
).writeStream\
.format("org.apache.spark.sql.cassandra")\
.option("checkpointLocation", "kkk3")\
.options(**hosts)\
.option("keyspace", "meetup_net_project")\
.option("table", "cities_by_country").start()\

<pyspark.sql.streaming.StreamingQuery at 0x7fb577d3a4a8>

In [13]:
# TASK_B_3
json_parsed_df.select(
    'event_id',
    'event_name',
     F.from_unixtime(col('time') / 1000).alias('event_time'),
    col('topic_name').alias("topics"),
    col('group_name'),
    col('country_name'),
    col('group_city')
).writeStream.format("org.apache.spark.sql.cassandra")\
.option("checkpointLocation", "kkk2")\
.options(**hosts)\
.option("keyspace", "meetup_net_project")\
.option("table", "event_by_id")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7fb557cee940>

In [17]:
# TASK_B_4
json_parsed_df.select(
    col('group_city'), 
    col('group_name'),
    col('group_id')
).writeStream.format("org.apache.spark.sql.cassandra")\
.option("checkpointLocation", "kkk5")\
.options(**hosts)\
.option("keyspace", "meetup_net_project")\
.option("table", "groups_by_city")\
.start()


<pyspark.sql.streaming.StreamingQuery at 0x7fb557ce5fd0>

In [19]:
# TASK_B_5
json_parsed_df.select(
    col('group_id'),
    col('event_id')
).writeStream.format("org.apache.spark.sql.cassandra")\
.option("checkpointLocation", "kkk6")\
.options(**hosts)\
.option("keyspace", "meetup_net_project")\
.option("table", "events_by_group")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7fb577d3af98>