### 0. Spark Setup

In [1]:
import os
# change spark-sql-kafka-0-10_2.12:[your-spark-version]
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.apache.spark:spark-avro_2.12:3.5.1 pyspark-shell'

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark = SparkSession \
    .builder \
    .appName("Spark-Notebook") \
    .getOrCreate()

your 131072x1 screen size is bogus. expect trouble
24/03/11 00:45:51 WARN Utils: Your hostname, Cinders resolves to a loopback address: 127.0.1.1; using 172.17.156.62 instead (on interface eth0)
24/03/11 00:45:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/ellabelle/spark/spark-3.5.1-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ellabelle/.ivy2/cache
The jars for the packages stored in: /home/ellabelle/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-155ecb2e-747a-4726-a1b7-8e6d597c7eee;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
	

### 1. Reading from Kafka Stream

through `readStream`

#### 1.1 Raw Kafka Stream

Before running this notebook, check [Redpanda Console UI](http://localhost:8080/topics) for the `fhv_taxi_rides` topic to have some data populated. If not, run the `producer.py` command first in the terminal.

In [3]:
# default for startingOffsets is "latest"
df_kafka_raw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
    .option("subscribe", "fhv_taxi_rides") \
    .option("startingOffsets", "earliest") \
    .option("checkpointLocation", "checkpoint") \
    .load()

In [4]:
df_kafka_raw.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



#### 1.2 Encoded Kafka Stream

In [5]:
df_kafka_encoded = df_kafka_raw.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

In [6]:
df_kafka_encoded.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



#### 1.3 Structure Streaming DataFrame

In [7]:
def parse_ride_from_kafka_message(df_raw, schema):
    """ take a Spark Streaming df and parse value col based on <schema>, return streaming df cols in schema """
    assert df_raw.isStreaming is True, "DataFrame doesn't receive streaming data"

    df = df_raw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    # split attributes to nested array in one Column
    col = F.split(df['value'], ', ')

    # expand col to multiple top-level columns
    for idx, field in enumerate(schema):
        df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))
    return df.select([field.name for field in schema])

In [14]:
ride_schema = T.StructType(
    [   
        # T.StructField("pickup_datetime", T.TimestampType(), True),
        # T.StructField("dropoff_datetime", T.TimestampType(), True),
        T.StructField("pu_location_id", T.StringType()),
        T.StructField("do_location_id", T.StringType()),
     ])

In [15]:
df_rides = parse_ride_from_kafka_message(df_raw=df_kafka_raw, schema=ride_schema)

In [16]:
df_rides.printSchema()

root
 |-- pu_location_id: string (nullable = true)
 |-- do_location_id: string (nullable = true)



In [None]:
df_rides.show()

### 2 Sink Operation & Streaming Query

through `writeStream`

---
**Output Sinks**
- File Sink: stores the output to the directory
- Kafka Sink: stores the output to one or more topics in Kafka
- Foreach Sink:
- (for debugging) Console Sink, Memory Sink

Further details can be found in [Output Sinks](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks)

---
There are three types of **Output Modes**:
- Complete: The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries.
- Append (default): Only new rows are added to the Result Table
- Update: Only updated rows are outputted

[Output Modes](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes) differs based on the set of transformations applied to the streaming data. 

--- 
**Triggers**

The [trigger settings](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) of a streaming query define the timing of streaming data processing. Spark streaming support micro-batch streamings schema and you can select following options based on requirements.

- default-micro-batch-mode
- fixed-interval-micro-batch-mode
- one-time-micro-batch-mode
- available-now-micro-batch-mode


#### Console and Memory Sink

In [18]:
def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
    write_query = df.writeStream \
        .outputMode(output_mode) \
        .trigger(processingTime=processing_time) \
        .format("console") \
        .option("truncate", False) \
        .start()
    return write_query # pyspark.sql.streaming.StreamingQuery

In [19]:
write_query = sink_console(df_rides, output_mode='append')

24/03/11 00:52:17 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-4cac923f-2346-4fb6-b943-0d572c2ed7a3. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/11 00:52:17 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


24/03/11 00:52:17 WARN ClientUtils: Couldn't resolve server broker:29092 from bootstrap.servers as DNS resolution failed for broker
24/03/11 00:52:17 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
24/03/11 00:52:17 WARN ClientUtils: Couldn't resolve server broker:29092 from bootstrap.servers as DNS resolution failed for broker
[Stage 1:>                                                          (0 + 1) / 1]

In [24]:
def sink_memory(df, query_name, query_template):
    write_query = df \
        .writeStream \
        .queryName(query_name) \
        .format('memory') \
        .start()
    query_str = query_template.format(table_name=query_name)
    query_results = spark.sql(query_str)
    return write_query, query_results

In [30]:
query_name = 'mosy_popular_pickup_id'
query_template = '''
    select 
        pu_location_id as pickup_id,
        count(1) AS number_records 
    from {table_name}
    GROUP BY pickup_id
    ORDER BY number_records
    '''
write_query, df_popular_pickup_id = sink_memory(
    df=df_rides, 
    query_name=query_name, 
    query_template=query_template
)

24/03/11 01:07:47 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-cadf50dd-8235-4f0c-99d9-eaec4713daa2. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/11 01:07:47 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/11 01:07:47 WARN ClientUtils: Couldn't resolve server broker:29092 from bootstrap.servers as DNS resolution failed for broker
24/03/11 01:07:47 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


24/03/11 01:07:47 WARN ClientUtils: Couldn't resolve server broker:29092 from bootstrap.servers as DNS resolution failed for broker
[Stage 9:>                                                          (0 + 1) / 1]

In [31]:
print(type(write_query)) # pyspark.sql.streaming.StreamingQuery
write_query.status

<class 'pyspark.sql.streaming.query.StreamingQuery'>


{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [None]:
# df_joined\
#     .groupBy('Zone')\
#     .count()\
#     .orderBy('count', ascending=True)\
#     .first()[0]

In [34]:
df_popular_pickup_id.show()

24/03/11 01:09:49 WARN TaskSetManager: Stage 1575 contains a task of very large size (5578 KiB). The maximum recommended task size is 1000 KiB.
[Stage 1575:==>                                                   (1 + 19) / 20]

+-------------------+--------------+
|          pickup_id|number_records|
+-------------------+--------------+
|2019-10-01 04:09:21|             1|
|2019-10-01 05:25:25|             1|
|2019-10-01 04:39:08|             1|
|2019-10-01 05:06:23|             1|
|2019-10-01 05:22:37|             1|
|                  2|             1|
|2019-10-01 05:20:24|             1|
|2019-10-01 00:41:28|             1|
|2019-10-01 06:15:11|             1|
|2019-10-01 00:45:55|             1|
|2019-10-01 06:16:17|             1|
|2019-10-01 01:28:45|             1|
|2019-10-01 01:45:28|             1|
|2019-10-01 01:09:07|             1|
|2019-10-01 01:23:08|             1|
|2019-10-01 02:25:57|             1|
|2019-10-01 02:09:05|             1|
|2019-10-01 05:39:25|             1|
|2019-10-01 03:30:23|             1|
|2019-10-01 00:26:59|             1|
+-------------------+--------------+
only showing top 20 rows



                                                                                

24/03/11 01:27:04 WARN KafkaOffsetReaderAdmin: Error in attempt 1 getting Kafka offsets: 
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions(ConsumerStrategy.scala:66)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions$(ConsumerStrategy.scala:65)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.retrieveAllPartitions(ConsumerStrategy.scala:102)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.assignedTopicPartitions(ConsumerStrategy.scala:113)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.$anonfun$partitionsAssignedToAd

In [33]:
write_query.stop()

-------------------------------------------
Batch: 7
-------------------------------------------
-------------------------------------------
Batch: 7
-------------------------------------------
+-------------------+-------------------+
|pu_location_id     |do_location_id     |
+-------------------+-------------------+
|2019-10-02 13:59:00|2019-10-02 15:07:00|
|2019-10-02 13:15:00|2019-10-02 13:36:00|
|2019-10-02 13:16:38|2019-10-02 13:36:18|
|2019-10-02 13:48:28|2019-10-02 14:41:45|
|2019-10-02 13:34:31|2019-10-02 13:56:15|
|2019-10-02 13:20:12|2019-10-02 13:23:36|
|2019-10-02 13:19:19|2019-10-02 14:05:31|
|2019-10-02 13:33:28|2019-10-02 14:14:39|
|2019-10-02 13:06:45|2019-10-02 13:12:56|
|2019-10-02 13:15:55|2019-10-02 13:46:28|
|2019-10-02 13:49:01|2019-10-02 14:48:17|
|2019-10-02 13:10:08|2019-10-02 13:23:23|
|2019-10-02 13:23:39|2019-10-02 13:49:08|
|2019-10-02 13:41:33|2019-10-02 13:48:16|
|2019-10-02 13:55:08|2019-10-02 14:06:28|
|2019-10-02 13:17:25|2019-10-02 13:32:30|
|2019-10

#### Kafka Sink

To write stream results to `kafka-topic`, the stream dataframe has at least a column with name `value`.

Therefore before starting `writeStream` in kafka format, dataframe needs to be updated accordingly.

More information regarding kafka sink expected data structure [here](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka)


In [None]:
def prepare_dataframe_to_kafka_sink(df, value_columns, key_column=None):
    columns = df.columns
    df = df.withColumn("value", F.concat_ws(', ',*value_columns))    
    if key_column:
        df = df.withColumnRenamed(key_column,"key")
        df = df.withColumn("key",df.key.cast('string'))
    return df.select(['key', 'value'])
    
def sink_kafka(df, topic, output_mode='append'):
    write_query = df.writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
        .outputMode(output_mode) \
        .option("topic", topic) \
        .option("checkpointLocation", "checkpoint") \
        .start()
    return write_query