In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.spark:spark-avro_2.12:3.3.1 pyspark-shell'


In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark = SparkSession \
    .builder \
    .appName("Spark-Notebook") \
    .getOrCreate()

In [None]:
def parse_ride_from_kafka_message(df_raw, schema):
    """ take a Spark Streaming df and parse value col based on <schema>, return streaming df cols in schema """
    assert df_raw.isStreaming is True, "DataFrame doesn't receive streaming data"

    df = df_raw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    # split attributes to nested array in one Column
    col = F.split(df['value'], ', ')

    # expand col to multiple top-level columns
    for idx, field in enumerate(schema):
        df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))
    return df.select([field.name for field in schema])

# %%
def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
    write_query = df.writeStream \
        .outputMode(output_mode) \
        .trigger(processingTime=processing_time) \
        .format("console") \
        .option("truncate", False) \
        .start()
    return write_query # pyspark.sql.streaming.StreamingQuery

# %%
def sink_memory(df, query_name, query_template):
    write_query = df \
        .writeStream \
        .queryName(query_name) \
        .format('memory') \
        .start()
    query_str = query_template.format(table_name=query_name)
    query_results = spark.sql(query_str)
    return write_query, query_results

In [None]:
df_kafka_green_raw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
    .option("subscribe", "green_rides_csv") \
    .option("startingOffsets", "earliest") \
    .option("checkpointLocation", "checkpoint") \
    .load()

# %%
df_kafka_green_raw.printSchema()

# %%
GREEN_RIDES_SCHEMA = T.StructType(
    [
     T.StructField("vendor_id", T.IntegerType()),
     T.StructField('pickup_datetime', T.TimestampType()),
     T.StructField('dropoff_datetime', T.TimestampType()),
     T.StructField("store_and_fwd_flag", T.StringType()),
     T.StructField("RatecodeID", T.IntegerType()),
     T.StructField("PULocationID", T.IntegerType()),
     T.StructField("DOLocationID", T.IntegerType()),
     T.StructField("passenger_count", T.IntegerType())
     ])

# %%
df_green_rides = parse_ride_from_kafka_message(df_raw=df_kafka_green_raw, schema=GREEN_RIDES_SCHEMA)
df_green_rides.printSchema()

# %%
# query_name = 'green_PUlocationID_popularity'
# query_template = 'select PUlocationID,count(*) as count_rides from {table_name} group by PUlocationID order by count_rides desc'
# write_query, df_green_PUlocationID_popularity = sink_memory(df=df_green_rides, query_name=query_name, query_template=query_template)
# print(type(write_query)) #

In [None]:
df_kafka_fhv_raw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
    .option("subscribe", "fhv_rides_csv") \
    .option("startingOffsets", "earliest") \
    .option("checkpointLocation", "checkpoint") \
    .load()

# %%
df_kafka_fhv_raw.printSchema()

# %%
FHV_RIDES_SCHEMA =T.StructType(
    [
     T.StructField("dispatching_base_num", T.StringType()),
     T.StructField('pickup_datetime', T.TimestampType()),
     T.StructField('dropoff_datetime', T.TimestampType()),
     T.StructField("PUlocationID", T.IntegerType()),
     T.StructField("DOlocationID", T.IntegerType()),
     T.StructField("SR_Flag", T.StringType()),
     T.StructField("Affiliated_base_number", T.StringType()),
     ])

# %%
df_fhv_rides = parse_ride_from_kafka_message(df_raw=df_kafka_fhv_raw, schema=FHV_RIDES_SCHEMA)
df_fhv_rides.printSchema()

# %%
# query_name = 'fhv_PUlocationID_popularity'
# query_template = 'select PUlocationID,count(*) as count_rides from {table_name} group by PUlocationID order by count_rides desc'
# write_query, df_fhv_PUlocationID_popularity = sink_memory(df=df_fhv_rides, query_name=query_name, query_template=query_template)
# print(type(write_query)) # pyspark.sql.streaming.StreamingQuery
# write_query.status

In [None]:
def prepare_dataframe_to_kafka_sink(df, value_columns, key_column=None):
    columns = df.columns
    df = df.withColumn("value", F.concat_ws(', ',*value_columns))    
    if key_column:
        df = df.withColumnRenamed(key_column,"key")
        df = df.withColumn("key",df.key.cast('string'))
    return df.select(['key', 'value'])
    
def sink_kafka(df, topic, output_mode='append'):
    write_query = df.writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
        .outputMode(output_mode) \
        .option("topic", topic) \
        .option("checkpointLocation", "checkpoint") \
        .start()
    return write_query

In [None]:
TOPIC_RIDES_ALL = "rides_all"

In [None]:
df_green_messages = prepare_dataframe_to_kafka_sink(df=df_green_rides,
                                                      value_columns=['PUlocationID','DOLocationID'], key_column='PULocationID')

In [None]:
kafka_sink_green_query = sink_kafka(df=df_green_messages, topic=TOPIC_RIDES_ALL)

In [None]:
kafka_sink_green_query.status

In [None]:
df_fhv_messages = prepare_dataframe_to_kafka_sink(df=df_fhv_rides,
                                                      value_columns=['PUlocationID','DOLocationID'], key_column='PULocationID')

In [None]:
kafka_sink_fhv_query = sink_kafka(df=df_fhv_messages, topic=TOPIC_RIDES_ALL)

In [None]:
kafka_sink_fhv_query.status

In [None]:
df_kafka_all_raw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
    .option("subscribe", "rides_all") \
    .option("startingOffsets", "earliest") \
    .option("checkpointLocation", "checkpoint") \
    .load()

# %%
df_kafka_all_raw.printSchema()

# %%
all_RIDES_SCHEMA =T.StructType(
    [
     T.StructField("PUlocationID", T.IntegerType()),
     T.StructField("DOlocationID", T.IntegerType())
     ])


In [None]:
# %%
df_all_rides = parse_ride_from_kafka_message(df_raw=df_kafka_all_raw, schema=all_RIDES_SCHEMA)
df_all_rides.printSchema()

In [None]:
# %%
query_name = 'PUlocationID_popularity'
query_template = 'select PUlocationID,count(*) as count_rides from {table_name} group by PUlocationID order by count_rides desc'
write_query, df_PUlocationID_popularity = sink_memory(df=df_all_rides, query_name=query_name, query_template=query_template)

In [None]:
print(type(write_query)) # pyspark.sql.streaming.StreamingQuery
write_query.status

In [None]:
df_PUlocationID_popularity.show()

In [None]:
write_query.stop()