# Spark Structured Streaming (Consumer 6)
This is a basic streaming application that demonstrates [Spark/Kafka integration](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) when using [Spark structured streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) for stream processing. This means, in practical terms, that Spark is able to produce streaming DataFrames from Kafka topics, and operate on those DataFrames using most of the DataFrame transformation APIs. This makes streaming data processing look like batch data processing.

**To run this demo code, you shall have your mongodb server, zookeeper and kafka containers running.**  
First, import our database client, spark session entry point, and some basic spark sql functions for data transformation.

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, element_at, when

Set a global variable to store topic name.

In [None]:
topic_name = 'Scenario06'
hostip = "10.192.45.88" # change it to your IP

Initialize our spark session with `#threads = #logicalCPU` and the given application name.

In [None]:
spark = (
    SparkSession.builder
    .master('local[*]') # local[*] MEANS you are using all the available processors
    .appName('[Demo] Spark Streaming from Kafka into MongoDB')
    .getOrCreate()
)

Create a streaming dataframe with options providing the bootstrap server(s) and topic name.

In [None]:
# This is basically connecting to kafka server
topic_stream_df = (
    spark.readStream.format('kafka') # 'kafka' because u are receiving from kafka
    .option('kafka.bootstrap.servers', f'{hostip}:9092')
    .option('subscribe', topic_name)
    .load()
)

We can print the schema for this dataframe to see what columns we have to work with.

In [None]:
topic_stream_df.printSchema()

Generate our output stream.
1. We cast `col('value')` as a string, split `col('value')` on the separator `':'`, and name the result column as `'data'`.
1. We select the value at index 2 (1-based indexing) in `col('data')`. This will be the number (if present), or an empty string otherwise.
1. We perform a conditional transformation of `col('data')`.
  1. **if `col('data')` is the empty string**, we replace it with `'*'`
  1. **else** do nothing

In [None]:
output_stream_df = (
    topic_stream_df
    .select(                                      # 1
        split(
            topic_stream_df.value.cast('string'),
            ':'
        )
        .alias('data')
    )
    .withColumn('data', element_at('data', 2))    # 2
    .withColumn(
        'data',
        (
            when( col('data') == '', '*' )        # 3A
            .otherwise( col('data') )             # 3B
        )
    )
)

Show the schema of the result dataframe.

In [None]:
output_stream_df.printSchema()

Define a utility class to process the rows of our streaming dataframe.

In [None]:
class DbWriter:
    # called at the start of processing each partition in each output micro-batch
    def open(self, partition_id, epoch_id):
        self.mongo_client = MongoClient(
            host=f'{hostip}',
            port=27017
        )
        self.db = self.mongo_client['fit3182_db']
        return True
    
    # called once per row of the result dataframe
    # the current code DOES NOT handle duplicate processing
    #   e.g., query fails and restarts just before current micro-batch was fully inserted
    def process(self, row):
        self.db[topic_name].insert_one(row.asDict())
    
    # called once all rows have been processed (possibly with error)
    def close(self, err):
        self.mongo_client.close()

Now, define our stream writer for the MongoDB database sink.

In [None]:
db_writer = (
    output_stream_df
    .writeStream
    .outputMode('append')
    .foreach(DbWriter())
)

As an additional stream for debugging, we define a stream writer for the console sink.

In [None]:
console_logger = (
    output_stream_df
    .writeStream
    .outputMode('append')
    .format('console')
)

The variable `writer` points to the stream writer we wish to use.

In [None]:
# change this variable to either db_writer or console_logger to get output to the desired sink
writer = db_writer

Now we can start our streaming query. The workflow is straightforward.
1. Call the `StreamWriter` object's `start` method to begin execution of the query.
1. The `query` variable allows us to manage the running query. This is a simple demo, so we use `awaitTermination` to block our driver program until the streaming query is stopped due to failure or user-interrupts.

You can press "interrupt the kernel" to stop the running query anytime.

In [None]:
try:
    query = writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopped query')
except StreamingQueryException as exc:
    print(exc)
finally:
    query.stop()