In [1]:
import os
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F
from settings import BOOTSTRAP_SERVERS, CONSUME_TOPIC_RIDES_CSV, RIDE_SCHEMA, TOPIC_WINDOWED_VENDOR_ID_COUNT

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.spark:spark-avro_2.12:3.3.1 pyspark-shell'

In [2]:
spark = SparkSession.builder.appName('Spark-Notebook').getOrCreate()

:: loading settings :: url = jar:file:/opt/homebrew/Cellar/apache-spark/3.5.0/libexec/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/iamraphson/.ivy2/cache
The jars for the packages stored in: /Users/iamraphson/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-07cd9df9-3c9b-48c2-8a53-a245487d4061;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central


In [3]:
df_kafka_raw = spark.readStream.format('kafka') \
    .option('kafka.bootstrap.servers', BOOTSTRAP_SERVERS) \
    .option('subscribe', CONSUME_TOPIC_RIDES_CSV) \
    .option("startingOffsets", "earliest") \
    .option("checkpointLocation", "checkpoint") \
    .load()

In [4]:
df_kafka_raw.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
def parse_ride_from_kafka_message(df_raw, schema):
    assert df_raw.isStreaming is True, "DataFrame doesn't receive streaming data"

    df = df_raw.selectExpr('CAST(key as STRING)', 'CAST(value as STRING)')

    col = F.split(df['value'],', ')
    for idx, field in enumerate(schema):
        df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))

    return df.select([field.name for field in schema])

In [6]:
rides_df = parse_ride_from_kafka_message(df_kafka_raw, RIDE_SCHEMA)
rides_df.printSchema()

root
 |-- vendor_id: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- total_amount: float (nullable = true)



In [7]:
def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
    return df.writeStream \
        .outputMode(output_mode) \
        .trigger(processingTime=processing_time) \
        .format("console") \
        .option("truncate", False) \
        .start()

In [8]:
sink_console(rides_df, output_mode='append')

24/03/19 03:01:24 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/wm/00zdb_c90lj1zjg999tvvklh0000gn/T/temporary-eed606cd-0d8b-47a2-b5a9-3cb51b0b75a5. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/19 03:01:24 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x121b68a90>

24/03/19 03:01:24 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
24/03/19 03:01:24 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
24/03/19 03:01:24 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
24/03/19 03:01:24 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known config.
24/03/19 03:01:24 WARN AdminClientConfig: The configuration 'auto.offset.reset' was supplied but isn't a known config.


In [9]:
def sink_memory(df, query_name, query_template):
    write_query = df \
        .writeStream \
        .queryName(query_name) \
        .format('memory') \
        .start()
    query_str = query_template.format(table_name=query_name)
    query_results = spark.sql(query_str)
    return write_query, query_results


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+--------------------+---------------------+---------------+-------------+------------+------------+
|vendor_id|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|payment_type|total_amount|
+---------+--------------------+---------------------+---------------+-------------+------------+------------+
|1        |2020-07-01 00:25:32 |2020-07-01 00:33:39  |1              |1.5          |2           |9.3         |
|1        |2020-07-01 00:03:19 |2020-07-01 00:25:43  |1              |9.5          |1           |27.8        |
|2        |2020-07-01 00:15:11 |2020-07-01 00:29:24  |1              |5.85         |2           |22.3        |
|2        |2020-07-01 00:30:49 |2020-07-01 00:38:26  |1              |1.9          |1           |14.16       |
|2        |2020-07-01 00:31:26 |2020-07-01 00:38:02  |1              |1.25         |2           |7.8         |
|1        |2020

In [10]:
query_name = 'vendor_id_counts'
query_template = 'select count(distinct(vendor_id)) from {table_name}'
write_query, df_vendor_id_counts = sink_memory(df=rides_df, query_name=query_name, query_template=query_template)

24/03/19 03:01:29 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/wm/00zdb_c90lj1zjg999tvvklh0000gn/T/temporary-eb1c0a24-d6c1-45c2-b6e8-66e27ad57c41. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/19 03:01:29 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/19 03:01:29 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
24/03/19 03:01:29 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
24/03/19 03:01:29 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
24/03/19 03:01:29 WARN AdminClientConfig: The configuration '

In [11]:
print(type(write_query)) # pyspark.sql.streaming.StreamingQuery
write_query.status

<class 'pyspark.sql.streaming.query.StreamingQuery'>


{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [12]:
df_vendor_id_counts.show()

+-------------------------+
|count(DISTINCT vendor_id)|
+-------------------------+
|                        2|
+-------------------------+



In [13]:
def op_groupby(df, column_names):
    df_aggregation = df.groupBy(column_names).count()
    return df_aggregation

def op_windowed_groupby(df, window_duration, slide_duration):
    return df.groupBy(
        F.window(timeColumn=df.tpep_pickup_datetime, windowDuration=window_duration, slideDuration=slide_duration),
        df.vendor_id
    ).count()

In [14]:
df_trip_count_by_vendor_id = op_groupby(rides_df, ['vendor_id'])
df_trip_count_by_pickup_date_vendor_id = op_windowed_groupby(
        rides_df, 
        window_duration="10 minutes",
        slide_duration='5 minutes'
)

In [15]:
sink_console(df_trip_count_by_vendor_id)

24/03/19 03:01:38 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/wm/00zdb_c90lj1zjg999tvvklh0000gn/T/temporary-8ef24528-1652-4482-b3ee-ccf5c18976f5. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/19 03:01:38 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x121b77a90>

24/03/19 03:01:38 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
24/03/19 03:01:38 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
24/03/19 03:01:38 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
24/03/19 03:01:38 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known config.
24/03/19 03:01:38 WARN AdminClientConfig: The configuration 'auto.offset.reset' was supplied but isn't a known config.


In [16]:
sink_console(df_trip_count_by_pickup_date_vendor_id)

24/03/19 03:01:39 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/wm/00zdb_c90lj1zjg999tvvklh0000gn/T/temporary-2b3b7cf2-b17a-4b27-bf10-dac00b1c2431. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/19 03:01:39 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x1219d8dd0>

24/03/19 03:01:39 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
24/03/19 03:01:39 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
24/03/19 03:01:39 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
24/03/19 03:01:39 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known config.
24/03/19 03:01:39 WARN AdminClientConfig: The configuration 'auto.offset.reset' was supplied but isn't a known config.

In [17]:
def prepare_df_to_kafka_sink(df, value_columns, key_column=None):
    columns = df.columns

    df = df.withColumn("value", F.concat_ws(', ', *value_columns))
    if key_column:
        df = df.withColumnRenamed(key_column, "key")
        df = df.withColumn("key", df.key.cast('string'))
    return df.select(['key', 'value'])

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+
|vendor_id|count|
+---------+-----+
|1        |8    |
|2        |12   |
+---------+-----+



24/03/19 03:01:45 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 6433 milliseconds


-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+---------+-----+
|window                                    |vendor_id|count|
+------------------------------------------+---------+-----+
|{2020-07-01 00:25:00, 2020-07-01 00:35:00}|1        |4    |
|{2020-07-01 00:30:00, 2020-07-01 00:40:00}|2        |8    |
|{2020-06-30 23:55:00, 2020-07-01 00:05:00}|1        |4    |
|{2020-07-01 00:15:00, 2020-07-01 00:25:00}|2        |4    |
|{2020-07-01 00:25:00, 2020-07-01 00:35:00}|2        |8    |
|{2020-07-01 00:00:00, 2020-07-01 00:10:00}|1        |4    |
|{2020-07-01 00:10:00, 2020-07-01 00:20:00}|2        |4    |
|{2020-07-01 00:20:00, 2020-07-01 00:30:00}|1        |4    |
+------------------------------------------+---------+-----+



In [18]:
df_trip_count_messages = prepare_df_to_kafka_sink(
     df=df_trip_count_by_pickup_date_vendor_id,
     value_columns=['count'], 
     key_column='vendor_id'
)

In [19]:
def sink_kafka(df, topic):
    print('topic', topic, BOOTSTRAP_SERVERS)
    return df.writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
        .outputMode('complete') \
        .option("topic", topic) \
        .option("checkpointLocation", "checkpoint") \
        .start()

In [22]:
df_trip_count_messages

DataFrame[key: string, value: string]

In [26]:
sink_kafka(df=df_trip_count_messages, topic=TOPIC_WINDOWED_VENDOR_ID_COUNT)

topic vendor_counts_windowed.via.redpanda localhost:9093


24/03/19 03:04:46 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x121b51110>

24/03/19 03:04:46 ERROR MicroBatchExecution: Query [id = 7544c618-aae4-497a-ba37-e7e877610d72, runId = 46e5a129-e62e-4932-bab7-38f9eb001751] terminated with error
java.lang.NoSuchMethodError: 'scala.collection.Seq org.apache.spark.sql.types.StructType.toAttributes()'
	at org.apache.spark.sql.kafka010.KafkaStreamingWrite.<init>(KafkaStreamingWrite.scala:42)
	at org.apache.spark.sql.kafka010.KafkaWrite.toStreaming(KafkaWrite.scala:39)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:93)
	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:44)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apa