In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.spark:spark-avro_2.12:3.3.1 pyspark-shell'

In [None]:
! rm -r /opt/workspace/checkpoint/*

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark = SparkSession \
    .builder \
    .appName("Spark-Notebook") \
    .getOrCreate()

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-43fa8264-32ac-4d80-8222-72732b27bd98;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.common

23/03/13 21:09:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [28]:
def parse_ride_from_kafka_message(df_raw, schema):
    """ take a Spark Streaming df and parse value col based on <schema>, return streaming df cols in schema """
    assert df_raw.isStreaming is True, "DataFrame doesn't receive streaming data"

    df = df_raw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    # split attributes to nested array in one Column
    col = F.split(df['value'], ', ')

    # expand col to multiple top-level columns
    for idx, field in enumerate(schema):
        df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))
    return df.select([field.name for field in schema])

# %%
def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
    write_query = df.writeStream \
        .outputMode(output_mode) \
        .trigger(processingTime=processing_time) \
        .format("console") \
        .option("truncate", False) \
        .start()
    return write_query # pyspark.sql.streaming.StreamingQuery

# %%
def sink_memory(df, query_name, query_template):
    write_query = df \
        .writeStream \
        .queryName(query_name) \
        .format('memory') \
        .start()
    query_str = query_template.format(table_name=query_name)
    query_results = spark.sql(query_str)
    return write_query, query_results

In [29]:
df_kafka_green_raw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
    .option("subscribe", "green_rides_csv") \
    .option("startingOffsets", "earliest") \
    .option("checkpointLocation", "checkpoint") \
    .load()

# %%
df_kafka_green_raw.printSchema()

# %%
GREEN_RIDES_SCHEMA = T.StructType(
    [
     T.StructField("vendor_id", T.IntegerType()),
     T.StructField('pickup_datetime', T.TimestampType()),
     T.StructField('dropoff_datetime', T.TimestampType()),
     T.StructField("store_and_fwd_flag", T.StringType()),
     T.StructField("RatecodeID", T.IntegerType()),
     T.StructField("PULocationID", T.IntegerType()),
     T.StructField("DOLocationID", T.IntegerType()),
     T.StructField("passenger_count", T.IntegerType())
     ])

# %%
df_green_rides = parse_ride_from_kafka_message(df_raw=df_kafka_green_raw, schema=GREEN_RIDES_SCHEMA)
df_green_rides.printSchema()

# %%
# query_name = 'green_PUlocationID_popularity'
# query_template = 'select PUlocationID,count(*) as count_rides from {table_name} group by PUlocationID order by count_rides desc'
# write_query, df_green_PUlocationID_popularity = sink_memory(df=df_green_rides, query_name=query_name, query_template=query_template)
# print(type(write_query)) #

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)



In [30]:
df_kafka_fhv_raw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
    .option("subscribe", "fhv_rides_csv") \
    .option("startingOffsets", "earliest") \
    .option("checkpointLocation", "checkpoint") \
    .load()

# %%
df_kafka_fhv_raw.printSchema()

# %%
FHV_RIDES_SCHEMA =T.StructType(
    [
     T.StructField("dispatching_base_num", T.StringType()),
     T.StructField('pickup_datetime', T.TimestampType()),
     T.StructField('dropoff_datetime', T.TimestampType()),
     T.StructField("PUlocationID", T.IntegerType()),
     T.StructField("DOlocationID", T.IntegerType()),
     T.StructField("SR_Flag", T.StringType()),
     T.StructField("Affiliated_base_number", T.StringType()),
     ])

# %%
df_fhv_rides = parse_ride_from_kafka_message(df_raw=df_kafka_fhv_raw, schema=FHV_RIDES_SCHEMA)
df_fhv_rides.printSchema()

# %%
# query_name = 'fhv_PUlocationID_popularity'
# query_template = 'select PUlocationID,count(*) as count_rides from {table_name} group by PUlocationID order by count_rides desc'
# write_query, df_fhv_PUlocationID_popularity = sink_memory(df=df_fhv_rides, query_name=query_name, query_template=query_template)
# print(type(write_query)) # pyspark.sql.streaming.StreamingQuery
# write_query.status

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [37]:
def prepare_dataframe_to_kafka_sink(df, value_columns, key_column=None):
    columns = df.columns
    df = df.withColumn("value", F.concat_ws(', ',*value_columns))    
    if key_column:
        df = df.withColumnRenamed(key_column,"key")
        df = df.withColumn("key",df.key.cast('string'))
    return df.select(['key', 'value'])
    
def sink_kafka(df,checkpointname ,topic, output_mode='append'):
    write_query = df.writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
        .outputMode(output_mode) \
        .option("topic", topic) \
        .option("checkpointLocation", f"checkpoint{checkpointname}") \
        .start()
    return write_query

In [38]:
TOPIC_RIDES_ALL = "rides_all"

In [39]:
df_green_messages = prepare_dataframe_to_kafka_sink(df=df_green_rides,
                                                      value_columns=['PUlocationID','DOLocationID'], key_column='PULocationID')

In [40]:
kafka_sink_green_query = sink_kafka(df=df_green_messages,checkpointname='green', topic=TOPIC_RIDES_ALL)

23/03/13 21:19:01 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/03/13 21:19:01 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-8a64f53d-3304-43ef-8f90-6ecf53d5e60e-400859102-executor-5, groupId=spark-kafka-source-8a64f53d-3304-43ef-8f90-6ecf53d5e60e-400859102-executor] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
23/03/13 21:19:01 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-8a64f53d-3304-43ef-8f90-6ecf53d5e60e-400859102-executor-5, groupId=spark-kafka-source-8a64f53d-3304-43ef-8f90-6ecf53d5e60e-400859102-executor] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected


                                                                                

In [42]:
kafka_sink_green_query.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [43]:
df_fhv_messages = prepare_dataframe_to_kafka_sink(df=df_fhv_rides,
                                                      value_columns=['PUlocationID','DOLocationID'], key_column='PULocationID')

In [44]:
kafka_sink_fhv_query = sink_kafka(df=df_fhv_messages,checkpointname='fhv', topic=TOPIC_RIDES_ALL)

23/03/13 21:19:26 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

In [46]:
kafka_sink_fhv_query.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [47]:
df_kafka_all_raw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
    .option("subscribe", "rides_all") \
    .option("startingOffsets", "earliest") \
    .option("checkpointLocation", "checkpointrides") \
    .load()

# %%
df_kafka_all_raw.printSchema()

# %%
all_RIDES_SCHEMA =T.StructType(
    [
     T.StructField("PUlocationID", T.IntegerType()),
     T.StructField("DOlocationID", T.IntegerType())
     ])


root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [48]:
# %%
df_all_rides = parse_ride_from_kafka_message(df_raw=df_kafka_all_raw, schema=all_RIDES_SCHEMA)
df_all_rides.printSchema()

root
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)



In [49]:
# %%
query_name = 'PUlocationID_popularity'
query_template = 'select PUlocationID,count(*) as count_rides from {table_name} group by PUlocationID order by count_rides desc'
write_query, df_PUlocationID_popularity = sink_memory(df=df_all_rides, query_name=query_name, query_template=query_template)

23/03/13 21:20:13 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-c91a2564-4f25-4871-8676-4ee587592dd5. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/13 21:20:13 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/03/13 21:20:13 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-e3b9747f-e48d-4bbd-abf2-341e0d13cc38-1062345481-driver-0-8, groupId=spark-kafka-source-e3b9747f-e48d-4bbd-abf2-341e0d13cc38-1062345481-driver-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
23/03/13 21:20:13 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-e3b9747f-e48d-4bbd-abf2-341e0d13cc38-1062345481-driv

                                                                                

In [65]:
print(type(write_query)) # pyspark.sql.streaming.StreamingQuery
write_query.status

<class 'pyspark.sql.streaming.StreamingQuery'>


{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [66]:
df_PUlocationID_popularity.show()

+------------+-----------+
|PUlocationID|count_rides|
+------------+-----------+
|        null|        116|
|         265|         54|
|          49|         30|
|         256|         27|
|          85|         24|
|         255|         24|
|         146|         18|
|         129|         18|
|          76|         15|
|          82|         15|
|         264|         15|
|         189|         15|
|          97|         15|
|          25|         15|
|         223|          9|
|           7|          9|
|          80|          9|
|          74|          9|
|          71|          9|
|          66|          9|
+------------+-----------+



In [20]:
write_query.stop()