### 0. Spark Setup

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.spark:spark-avro_2.12:3.3.1 pyspark-shell'

In [2]:
import dotenv
dotenv.load_dotenv("/home/adrian/dtc-de-zoomcamp/dtc-de-zoomcamp/week6/.envrc", override=True)

True

In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark = SparkSession \
    .builder \
    .appName("Spark-Notebook") \
    .getOrCreate()

:: loading settings :: url = jar:file:/home/adrian/spark/spark-3.3.2-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/adrian/.ivy2/cache
The jars for the packages stored in: /home/adrian/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-51c182d1-08d0-497a-894c-3c044c71121f;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org

23/03/24 13:04:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### 1. Reading from Kafka Stream

through `readStream`

#### 1.1 Raw Kafka Stream

In [4]:
# default for startingOffsets is "latest"
df_kafka_raw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092,broker:19092") \
    .option("subscribe", "adrian.fhv_rides.json") \
    .option("startingOffsets", "earliest") \
    .option("checkpointLocation", "checkpoint") \
    .load()

In [5]:
df_kafka_raw.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



#### 1.2 Encoded Kafka Stream

In [6]:
df_kafka_encoded = df_kafka_raw.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

In [7]:
df_kafka_encoded.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



#### 1.3 Structure Streaming DataFrame

In [8]:
def parse_ride_from_kafka_message(df_raw, schema):
    """ take a Spark Streaming df and parse value col based on <schema>, return streaming df cols in schema """
    assert df_raw.isStreaming is True, "DataFrame doesn't receive streaming data"

    df = df_raw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    # split attributes to nested array in one Column
    col = F.split(df['value'], ', ')

    # expand col to multiple top-level columns
    for idx, field in enumerate(schema):
        df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))
    return df.select([field.name for field in schema])

In [9]:
ride_schema = T.StructType(
    [T.StructField("dispatching_base_num", T.StringType()),
     T.StructField('pickup_datetime', T.TimestampType()),
     T.StructField('dropOff_datetime', T.TimestampType()),
     T.StructField("PUlocationID", T.StringType()),
     T.StructField("DOlocationID", T.StringType()),
     T.StructField("SR_Flag", T.StringType()),
     T.StructField("Affiliated_base_number", T.StringType()),
     ])

In [10]:
df_rides = parse_ride_from_kafka_message(df_raw=df_kafka_raw, schema=ride_schema)

In [11]:
df_rides.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: string (nullable = true)
 |-- DOlocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



### 2 Sink Operation & Streaming Query

through `writeStream`

---
**Output Sinks**
- File Sink: stores the output to the directory
- Kafka Sink: stores the output to one or more topics in Kafka
- Foreach Sink:
- (for debugging) Console Sink, Memory Sink

Further details can be found in [Output Sinks](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks)

---
There are three types of **Output Modes**:
- Complete: The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries.
- Append (default): Only new rows are added to the Result Table
- Update: Only updated rows are outputted

[Output Modes](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes) differs based on the set of transformations applied to the streaming data. 

--- 
**Triggers**

The [trigger settings](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) of a streaming query define the timing of streaming data processing. Spark streaming support micro-batch streamings schema and you can select following options based on requirements.

- default-micro-batch-mode
- fixed-interval-micro-batch-mode
- one-time-micro-batch-mode
- available-now-micro-batch-mode


#### Console and Memory Sink

In [12]:
def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
    write_query = df.writeStream \
        .outputMode(output_mode) \
        .trigger(processingTime=processing_time) \
        .format("console") \
        .option("truncate", False) \
        .start()
    return write_query # pyspark.sql.streaming.StreamingQuery

In [13]:
write_query = sink_console(df_rides, output_mode='append')

23/03/24 13:04:45 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-25a6663b-7143-4d30-821d-63ef60bba248. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/24 13:04:45 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


23/03/24 13:04:45 WARN ClientUtils: Couldn't resolve server broker:19092 from bootstrap.servers as DNS resolution failed for broker
23/03/24 13:04:48 WARN ClientUtils: Couldn't resolve server broker:19092 from bootstrap.servers as DNS resolution failed for broker


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+---------------------------------+---------------+----------------+------------------+---------------------+-------------+-----------------------------------+
|dispatching_base_num             |pickup_datetime|dropOff_datetime|PUlocationID      |DOlocationID         |SR_Flag      |Affiliated_base_number             |
+---------------------------------+---------------+----------------+------------------+---------------------+-------------+-----------------------------------+
|{"dispatching_base_num": "B00001"|null           |null            |"PUlocationID": ""|"DOlocationID": ""   |"SR_Flag": ""|"Affiliated_base_number": "B00001"}|
|{"dispatching_base_num": "B00001"|null           |null            |"PUlocationID": ""|"DOlocationID": ""   |"SR_Flag": ""|"Affiliated_base_number": "B00001"}|
|{"dispatching_base_num": "B00001"|null           |null            |"PUlocationID": ""|"DOlocationID": 

                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+---------------------------------+---------------+----------------+------------------+---------------------+-------------+-----------------------------------+
|dispatching_base_num             |pickup_datetime|dropOff_datetime|PUlocationID      |DOlocationID         |SR_Flag      |Affiliated_base_number             |
+---------------------------------+---------------+----------------+------------------+---------------------+-------------+-----------------------------------+
|{"dispatching_base_num": "B01079"|null           |null            |"PUlocationID": ""|"DOlocationID": "265"|"SR_Flag": ""|"Affiliated_base_number": "B01079"}|
|{"dispatching_base_num": "B01079"|null           |null            |"PUlocationID": ""|"DOlocationID": "265"|"SR_Flag": ""|"Affiliated_base_number": "B01079"}|
|{"dispatching_base_num": "B01079"|null           |null            |"PUlocationID": ""|"DOlocationID": 

In [14]:
def sink_memory(df, query_name, query_template):
    write_query = df \
        .writeStream \
        .queryName(query_name) \
        .format('memory') \
        .start()
    query_str = query_template.format(table_name=query_name)
    query_results = spark.sql(query_str)
    return write_query, query_results

-------------------------------------------
Batch: 8
-------------------------------------------
+---------------------------------+---------------+----------------+------------------+---------------------+-------------+-----------------------------------+
|dispatching_base_num             |pickup_datetime|dropOff_datetime|PUlocationID      |DOlocationID         |SR_Flag      |Affiliated_base_number             |
+---------------------------------+---------------+----------------+------------------+---------------------+-------------+-----------------------------------+
|{"dispatching_base_num": "B01145"|null           |null            |"PUlocationID": ""|"DOlocationID": "265"|"SR_Flag": ""|"Affiliated_base_number": "B01145"}|
|{"dispatching_base_num": "B01145"|null           |null            |"PUlocationID": ""|"DOlocationID": "265"|"SR_Flag": ""|"Affiliated_base_number": "B01145"}|
|{"dispatching_base_num": "B01145"|null           |null            |"PUlocationID": ""|"DOlocationID": 

In [16]:
query_name = 'dispatch_counts'
query_template = 'select count(distinct(dispatching_base_num)) from {table_name}'
write_query, df_vendor_id_counts = sink_memory(df=df_rides, query_name=query_name, query_template=query_template)

23/03/24 13:06:14 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-3d1b1487-8552-4616-a5a9-40d4311c4f39. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/24 13:06:14 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/03/24 13:06:14 WARN ClientUtils: Couldn't resolve server broker:19092 from bootstrap.servers as DNS resolution failed for broker


23/03/24 13:06:15 WARN ClientUtils: Couldn't resolve server broker:19092 from bootstrap.servers as DNS resolution failed for broker
-------------------------------------------
Batch: 18
-------------------------------------------
+------------------------------------------+---------------+----------------+---------------------+---------------------+-------------+--------------------------------------------+
|dispatching_base_num                      |pickup_datetime|dropOff_datetime|PUlocationID         |DOlocationID         |SR_Flag      |Affiliated_base_number                      |
+------------------------------------------+---------------+----------------+---------------------+---------------------+-------------+--------------------------------------------+
|{"dispatching_base_num": "B01233         "|null           |null            |"PUlocationID": "254"|"DOlocationID": "81" |"SR_Flag": ""|"Affiliated_base_number": "B01233         "}|
|{"dispatching_base_num": "B01233         "|nu

In [17]:
print(type(write_query)) # pyspark.sql.streaming.StreamingQuery
write_query.status

<class 'pyspark.sql.streaming.StreamingQuery'>


{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [18]:
df_vendor_id_counts.show()

-------------------------------------------
Batch: 22
-------------------------------------------
+---------------------------------+---------------+----------------+------------------+---------------------+-------------+-----------------------------------+
|dispatching_base_num             |pickup_datetime|dropOff_datetime|PUlocationID      |DOlocationID         |SR_Flag      |Affiliated_base_number             |
+---------------------------------+---------------+----------------+------------------+---------------------+-------------+-----------------------------------+
|{"dispatching_base_num": "B01239"|null           |null            |"PUlocationID": ""|"DOlocationID": "265"|"SR_Flag": ""|"Affiliated_base_number": "B01239"}|
|{"dispatching_base_num": "B01239"|null           |null            |"PUlocationID": ""|"DOlocationID": "265"|"SR_Flag": ""|"Affiliated_base_number": "B01239"}|
|{"dispatching_base_num": "B01239"|null           |null            |"PUlocationID": ""|"DOlocationID":

In [19]:
write_query.stop()

23/03/24 13:06:38 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@6604f168 is aborting.
23/03/24 13:06:38 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@6604f168 aborted.


23/03/24 13:06:38 ERROR Utils: Aborting task
org.apache.spark.TaskKilledException
	at org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:217)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:435)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
	at org.apache.spark.sql.execution.data

                                                                                

-------------------------------------------
Batch: 71
-------------------------------------------
+---------------------------------+---------------+----------------+------------------+------------------+-------------+-----------------------------------+
|dispatching_base_num             |pickup_datetime|dropOff_datetime|PUlocationID      |DOlocationID      |SR_Flag      |Affiliated_base_number             |
+---------------------------------+---------------+----------------+------------------+------------------+-------------+-----------------------------------+
|{"dispatching_base_num": "B01452"|null           |null            |"PUlocationID": ""|"DOlocationID": ""|"SR_Flag": ""|"Affiliated_base_number": "B01452"}|
|{"dispatching_base_num": "B01452"|null           |null            |"PUlocationID": ""|"DOlocationID": ""|"SR_Flag": ""|"Affiliated_base_number": "B01452"}|
|{"dispatching_base_num": "B01452"|null           |null            |"PUlocationID": ""|"DOlocationID": ""|"SR_Flag": 

                                                                                

-------------------------------------------
Batch: 72
-------------------------------------------
+---------------------------------+---------------+----------------+------------------+---------------------+-------------+-----------------------------------+
|dispatching_base_num             |pickup_datetime|dropOff_datetime|PUlocationID      |DOlocationID         |SR_Flag      |Affiliated_base_number             |
+---------------------------------+---------------+----------------+------------------+---------------------+-------------+-----------------------------------+
|{"dispatching_base_num": "B01455"|null           |null            |"PUlocationID": ""|"DOlocationID": "265"|"SR_Flag": ""|"Affiliated_base_number": "B01455"}|
|{"dispatching_base_num": "B01455"|null           |null            |"PUlocationID": ""|"DOlocationID": "265"|"SR_Flag": ""|"Affiliated_base_number": "B01455"}|
|{"dispatching_base_num": "B01455"|null           |null            |"PUlocationID": ""|"DOlocationID":

                                                                                

-------------------------------------------
Batch: 73
-------------------------------------------
+---------------------------------+---------------+----------------+------------------+---------------------+-------------+-----------------------------------+
|dispatching_base_num             |pickup_datetime|dropOff_datetime|PUlocationID      |DOlocationID         |SR_Flag      |Affiliated_base_number             |
+---------------------------------+---------------+----------------+------------------+---------------------+-------------+-----------------------------------+
|{"dispatching_base_num": "B01455"|null           |null            |"PUlocationID": ""|"DOlocationID": "265"|"SR_Flag": ""|"Affiliated_base_number": "B01455"}|
|{"dispatching_base_num": "B01455"|null           |null            |"PUlocationID": ""|"DOlocationID": "265"|"SR_Flag": ""|"Affiliated_base_number": "B01455"}|
|{"dispatching_base_num": "B01455"|null           |null            |"PUlocationID": ""|"DOlocationID":

                                                                                

-------------------------------------------
Batch: 74
-------------------------------------------
+---------------------------------+---------------+----------------+------------------+---------------------+-------------+-----------------------------------+
|dispatching_base_num             |pickup_datetime|dropOff_datetime|PUlocationID      |DOlocationID         |SR_Flag      |Affiliated_base_number             |
+---------------------------------+---------------+----------------+------------------+---------------------+-------------+-----------------------------------+
|{"dispatching_base_num": "B01455"|null           |null            |"PUlocationID": ""|"DOlocationID": "265"|"SR_Flag": ""|"Affiliated_base_number": "B01455"}|
|{"dispatching_base_num": "B01455"|null           |null            |"PUlocationID": ""|"DOlocationID": "265"|"SR_Flag": ""|"Affiliated_base_number": "B01455"}|
|{"dispatching_base_num": "B01455"|null           |null            |"PUlocationID": ""|"DOlocationID":

                                                                                

-------------------------------------------
Batch: 75
-------------------------------------------
+---------------------------------+---------------+----------------+------------------+---------------------+-------------+-----------------------------------+
|dispatching_base_num             |pickup_datetime|dropOff_datetime|PUlocationID      |DOlocationID         |SR_Flag      |Affiliated_base_number             |
+---------------------------------+---------------+----------------+------------------+---------------------+-------------+-----------------------------------+
|{"dispatching_base_num": "B01455"|null           |null            |"PUlocationID": ""|"DOlocationID": "265"|"SR_Flag": ""|"Affiliated_base_number": "B01455"}|
+---------------------------------+---------------+----------------+------------------+---------------------+-------------+-----------------------------------+



#### Kafka Sink

To write stream results to `kafka-topic`, the stream dataframe has at least a column with name `value`.

Therefore before starting `writeStream` in kafka format, dataframe needs to be updated accordingly.

More information regarding kafka sink expected data structure [here](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka)


In [21]:
def prepare_dataframe_to_kafka_sink(df, value_columns, key_column=None):
    columns = df.columns
    df = df.withColumn("value", F.concat_ws(', ',*value_columns))    
    if key_column:
        df = df.withColumnRenamed(key_column,"key")
        df = df.withColumn("key",df.key.cast('string'))
    return df.select(['key', 'value'])
    
def sink_kafka(df, topic, output_mode='append'):
    write_query = df.writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092,broker:19092") \
        .outputMode(output_mode) \
        .option("topic", topic) \
        .option("checkpointLocation", "checkpoint") \
        .start()
    return write_query