In [1]:
import findspark
import json
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 pyspark-shell'
findspark.init()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import col, struct
from pyspark.sql.window import Window
import datetime
import calendar
from collections import Counter
spark = SparkSession.builder.getOrCreate()

In [2]:
df = spark.\
      readStream.\
      format("kafka").\
      option("kafka.bootstrap.servers", "localhost:9092").\
      option("subscribe", "raw-meetups").\
      option("startingOffsets", "earliest").\
      load()

In [3]:
struct = T.StructType([
    T.StructField('venue', T.StructType([
        T.StructField("venue_name", T.StringType()),
        T.StructField("lon", T.FloatType()),
        T.StructField("lat", T.FloatType()),
        T.StructField("venue_id", T.IntegerType())
    ])),
    T.StructField("visibility", T.StringType()),
    T.StructField("response", T.StringType()),
    T.StructField("guests", T.IntegerType()),
    T.StructField('member', T.StructType([
        T.StructField("member_id", T.IntegerType()),
        T.StructField("photo", T.StringType()),
        T.StructField("member_name", T.StringType())
    ])),
    T.StructField("rsvp_id", T.IntegerType()),
    T.StructField("mtime", T.LongType()),
    T.StructField('event', T.StructType([
        T.StructField("event_name", T.StringType()),
        T.StructField("event_id", T.StringType()),
        T.StructField("time", T.LongType()),
        T.StructField("event_url", T.StringType())
    ])),
    T.StructField('group', T.StructType([
        T.StructField("group_topics", T.ArrayType(T.StructType([
            T.StructField("urlkey", T.StringType()),
            T.StructField("topic_name", T.StringType())
        ]))),
        T.StructField("group_city", T.StringType()),
        T.StructField("group_country", T.StringType()),
        T.StructField("group_id", T.IntegerType()),
        T.StructField("group_name", T.StringType()),
        T.StructField("group_lon", T.FloatType()),
        T.StructField("group_urlname", T.StringType()),
        T.StructField("group_state", T.StringType()),
        T.StructField("group_lat", T.FloatType())
    ]))
])

In [4]:
states_names = spark.read.json("USstate.json")

In [5]:
json_parsed_df = df.select(
    col('timestamp'),
    F.from_json(col("value").cast("string"), struct).alias("json_parsed")
).filter(col('json_parsed').group.group_country=='us').select(
    col('timestamp'),
    col('json_parsed.event.event_name'),
    col('json_parsed.event.event_id'),
    F.from_unixtime(col('json_parsed.event.time')/1000).alias('time'),
    'json_parsed.group.group_topics.topic_name',
    'json_parsed.group.group_city',
    'json_parsed.group.group_country',
    'json_parsed.group.group_id',
    'json_parsed.group.group_name',
    'json_parsed.group.group_state'
).join(states_names, col("group_state") == states_names.code)

In [6]:
json_parsed_df.select(
    F.struct(
        F.struct(
            col('event_name'),
            col('event_id'),
            col('time'),
        ).alias('event'),
        col('group_city'),
        col('group_country'),
        col('group_id'),
        col('group_name'),
        col('name').alias("group_state")
    ).alias('res')
).select(F.to_json('res').alias('value')).writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "US-meetups") \
.option("checkpointLocation", "home/mmatsi/tmp/US-meetups_topick_ckp_2") \
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f0a82261780>

In [7]:
json_parsed_df.withWatermark("timestamp", "1 minute").groupBy(
    F.window("timestamp", "1 minute", "1 minute")
).agg(
    F.struct(
    
    F.month('window.end').alias('month'),
    F.dayofmonth('window.end').alias('day_of_the_month'),
    F.hour('window.end').alias('hour'),
    F.minute('window.end').alias("minute"),
    F.collect_list('group_city').alias('cities')
    ).alias('res')
).select(F.to_json('res').alias('value')).writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "US-cities-every-minute") \
.option("checkpointLocation", "home/mmatsi/tmp/US-meetups_topick_ckp2_2") \
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f0a82224630>

In [8]:
interesting_topics = F.array([
    F.lit('Computer programming'),
    F.lit('Big Data'),
    F.lit('Machine Learning'),
    F.lit('Python'),
    F.lit('Java'),
    F.lit('Web Development')
])

json_parsed_df.select(
        F.struct(
        F.struct(
            col('event_name'),
            col('event_id'),
            col('time'),
        ).alias('event'),
        col("topic_name").alias("group_topics"),
        col('group_city'),
        col('group_country'),
        col('group_id'),
        col('group_name'),
        col('name').alias("group_state")
    ).alias('res')
#     F.struct(
#         col('json_parsed.event.event_name').alias('event_name'),
#         col('json_parsed.event.event_id').alias('event_id'),
#         F.from_unixtime(col('json_parsed.event.time')/1000).alias('time'),
#     ).alias('event'),
#     col('json_parsed.group.group_topics.topic_name'),
#     'json_parsed.group.group_city',
#     'json_parsed.group.group_country',
#     'json_parsed.group.group_id',
#     'json_parsed.group.group_name'
#     STATE
).filter(F.arrays_overlap('res.group_topics', interesting_topics)).select(F.to_json('res').alias('value')).writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "Programming-meetups") \
.option("checkpointLocation", "home/mmatsi/tmp/US-meetups_topick_ckp3_2") \
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f0a82208710>