# Lecture 3: Spark Streaming

_Spark Streaming_ is an extension of the Spark API that enables scalabe stream processing.

The continous stream of input data can be ingested from many data sources such as **Kafka**, **Amazon s3** or **TCP sockets**. 

The Spark API allows to process data via high-level functions such as *map* and *reduce*. As we are going to see, it is also possible to use dataframe operations. 

Processed data can be exported to an external database and used to make live dashboards or offline analyses, or stored in files, or be used in a further stage of a Kafka pipeline. 

Overall, the practice of reading data from a set of sources, pre-process it, and then store it in a different format for later analysis is extremely common, and has its own name: **realtime ETL pipelines**.
- **E**xtract
- **T**transform
- **L**oad

Spark streaming works by dividing the input data into _micro-batches_ that can be treated as static datasets. In Spark this is referred to as a *discretized stream* (*DStream*). The DStream is represented using RDDs.

![DStream](imgs/lecture3/DStream.png)

Any transformation applied on the DStream, i.e. anything like a `Dstream.map()`, will act independently on each batch. For example, in the image below, we can filter the original RDD to remove some data and produce a new stream. 

![DStream_filter](imgs/lecture3/Dstream_filter.png)

In this lecture we will see how to setup a simple stream using a TCP socket as a data source.

## Create and Start a Spark Session

In [11]:
# import the python libraries to create/connect to a Spark Session
from pyspark.sql import SparkSession

# build a SparkSession 
#   connect to the master node on the port where the master node is listening (7077)
#   declare the app name 
#   configure the executor memory to 512 MB
#   either *connect* or *create* a new Spark Context
spark = SparkSession.builder \
    .master("spark://spark-master:7077")\
    .appName("My streaming spark application")\
    .config("spark.executor.memory", "512m")\
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
    .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")\
    .getOrCreate()

In [12]:
spark

In [13]:
# create a spark context
sc = spark.sparkContext

# print its status
sc

## Spark _Streaming_ context

The first step of a Spark streaming application is the creation of a `StreamingContext`. 

The `StreamingContext` is a crucial component in Spark Streaming. It's responsible for initializing the Spark Streaming application and specifying how to handle micro-batches of data. 

The `StreamingContext` is a similar concept to the `sparkContext` but it requires to be initialized with some additional information to know how to handle the micro-batches.

To create a `StreamingContext`, you can use the `StreamingContext(SparkContext, batch_interval)` constructor. The `SparkContext` object provides the necessary environment for Spark Streaming, while the `batch_interval` parameter determines the (wall-time) duration of each batch in seconds.

It's important to note that you can only have at most **one** `StreamingContext` for each Spark application. Attempting to create multiple `StreamingContext` objects in a single application will result in errors.

Create a Spark `StreamingContext` with a batch interval of 2 seconds

In [14]:
from pyspark.streaming import StreamingContext

# create a streaming context with a batch interval of 2 seconds
ssc = StreamingContext(sc, 2) 

### Starting and Stopping Spark Streaming

To process data in real-time using Spark, we need to create a `StreamingContext`, define the operations to perform on the data, and specify the data source and sink to connect to.

Once the streaming operations are defined, we can start processing the stream by calling the `.start()` method of the `StreamingContext` object (`ssc` in our case). Similarly, we can stop the streaming processing by calling the `.stop()` method.

**NOTE:** It's important to note that when we stop the `StreamingContext`, the default behavior is to also stop the `SparkContext`. This means that the entire Spark application will be closed by default. To prevent this, we can pass the `stopSparkContext=False` option when stopping the `StreamingContext`.

### TCP Socket Source
One to one phone call as for internet

For this example spark will read data from a TCP socket using Spark Streaming.

A TCP socket is a communication endpoint used to establish a connection between two devices over a network.
You can think of it as a telephone connection: two endpoints have to enstablish a connection; once the connection is enstablished, a communication can occur, with a data transfer; as soon as one of the two ends interrupts the connection the whole communcation is lost. 

We will generate a dummy data stream representing fake credit card transactions.

A simple python program will be used to create this data stream.
You will be able to find it in `utils/producer.py`. 
When executed, the producer will try to enstablish a TCP connection and send data on port `5555` of a given `host` (`spark-master` in our case). 

Before executing the producer program, take a moment to review the `producer.py` code to understand how it works. It's important to understand the logic of the program before using it to generate the streaming data.

In [15]:
! cat utils/producer.py

import socket
import json
import time
import random
import argparse

# Define some lists of first and last names to use for generating random messages
first_names=('John','Andy','Joe','Alice','Jill')
last_names=('Johnson','Smith','Jones', 'Millers','Darby')

# Define a function for sending messages over the socket
def send_messages(client_socket):
    try:
        while 1:
            # Generate a random message with a random name, surname, amount, delta_t, and flag
            msg = {
                'name': random.choice(first_names),
                'surname': random.choice(last_names),
                'amount': '{:.2f}'.format(random.random()*1000),
                'delta_t': '{:.2f}'.format(random.random()*10),
                'flag': random.choices([0,1], weights=[0.8, 0.2])[0]
            }
            # Encode the message as JSON and send it over the socket
            client_socket.send((json.dumps(msg)+"\n").encode('utf-8'))
            # Sleep for a s

The producer will generate new records in the form of a random combination of:
- `name`
- `surname`
- `amount`: amount of the credit card transaction
- `delta_t`: time between transactions
- `flag`: random flag to indicate if potentially fraudolent or not

These information will be formatted into a `.json` data format

### Declaring the `StreamingContext` data source as a TCP socket

To inform Spark that the StreamingContext data source will be a TCP socket located at a specific `hostname` and `port`, we can use the `socketTextStream(hostname, port)` method.

Refer to the [StreamingContext documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.streaming.StreamingContext.html) for additional available options.


In [16]:
# the hostname and port number
hostname = "spark-master"
portnumber = 5555

# declare the Spark Streaming source as TCP socket 
socket_stream = ssc.socketTextStream(hostname, portnumber)

### Start the python producer.py script

From a terminal/WSL, connect to the `spark-master` Docker container using the command
```bash
docker exec -it spark-master bash
``` 

From inside the docker container, move to the `/mapd-workspace` folder and execute the python script with the option `--hostname spark-master`:

```bash
python notebooks/utils/producer.py --hostname spark-master
```

## Exploring the data stream

The first thing we need to to is load the data describing each transaction, formatted as `json`.

In [17]:
import json

# use the map() transformation to apply the same function to all rdds
# the function we want to run is the json.load() of the messages
#json_stream = socket_stream.map(""" --- """)
json_stream = socket_stream.map(lambda msg: json.load(msg))

It is possible to print some elements of each batch with `pprint()`. This can be used to explore the RDDs.

In [19]:
json_stream.pprint()

**Start the computations with `ssc.start()` and stop with `ssc.stop(stopSparkContext=False)`.** 

_Remember that once the StreamingContext has been stopped, it must be redefined anew if we want to restart the streaming computations._

In [14]:
ssc.start()

23/05/10 07:43:29 WARN StreamingContext: StreamingContext has already been started


[Stage 2:>                  (0 + 1) / 1][Stage 3:>                  (0 + 0) / 1]

In [15]:
ssc.stop(stopSparkContext=False)

23/05/10 07:45:52 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
23/05/10 07:45:53 ERROR TaskSchedulerImpl: Lost executor 0 on 172.18.0.3: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/05/10 07:45:53 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 71) (172.18.0.3 executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/05/10 07:45:53 WARN BlockManagerMasterEndpoint: No more replicas available for input-0-1683704701200 !
23/05/10 07:45:53 WARN BlockManagerMasterEndpoint: No more replicas available for input-0-1683704572400 !
23/05/10 07:45:53 WARN BlockManagerMasterEndpoint: No more replicas available for input-0-1683704714800 !
23/05/10 07:45:53 WARN BlockManagerMasterEndpoint: No more

[Stage 3:>                                                          (0 + 1) / 1]

23/05/10 07:45:55 WARN TaskSetManager: Lost task 0.1 in stage 3.0 (TID 72) (172.18.0.3 executor 1): java.lang.Exception: Could not compute split, block input-0-1683704523600 of RDD 4 not found
	at org.apache.spark.errors.SparkCoreErrors$.rddBlockNotFoundError(SparkCoreErrors.scala:51)
	at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.ex

## Working with Streaming data

Now that we know how to stream data into Spark, let's explore how we can perform basic distributed operations on the data.

However, before we can proceed, we need to make sure that we have properly restarted the `StreamingContext` object, as the connection between the socket and Spark will be lost when the context is stopped.

To restart the streaming context, we need to:
1. Create a new `StreamingContext` object (we can reuse the `ssc` object in our case).
2. Point it to the correct TCP socket and port where the data is being streamed from.
3. Restart the Python producer application.

Once the `StreamingContext` is properly set up and running, we can start applying distributed operations to the streaming data. 

In [None]:
# create a new Spark StreamingContext with a batch wall-time of 2 seconds
ssc = """ --- """

In [None]:
# define the socket stream using the appropriate endpoint and port
socket_stream = """ --- """

In [None]:
# start the python producer script
### from the terminal/WSL shell

We now start listening on the TCP socket, interpreting the input data stream as `json` loads.

**Remember to get rid of the `pprint()` action, that would otherwise be performed continously, dumping the input data into the Jupyter cells.**

In [None]:
# create a new json_stream object by reading the json loads from the socket
json_stream = socket_stream.map(""" --- """)

#### Converting Streaming Data to a DataFrame

To make use of Spark's higher-level APIs, we can convert each batch of streaming data into a DataFrame. 

To do so, we'll first need to convert the numeric features of the incoming JSON data into Python floats and integers. This is a simple type cast operation that can be easily parallelized.

After casting the data, we can create a `Row` object for each transaction using the resulting Python dictionary. These `Row` objects can then be used to create a DataFrame, allowing us to use Spark's higher-level APIs for data processing.

In [None]:
from pyspark.sql import Row

# create a row for each message 
#   convert each numerical value to the proper python type
#   create a row from each message
def create_row_rdd(t):
    t['amount'] = float(t['amount'])
    t['delta_t'] = float(t['delta_t'])
    t['flag'] = int(t['flag'])
    
    return Row(**t)
#** to create an RDD of rows?
#

# apply the transformation to the json_stream rdd
row_stream = json_stream.map(create_row_rdd)

The method `DStream.foreachRDD` can be used to apply custom transformations to each *batch* of data. 

In our case, we are insterested in converting each batch of data into a Spark DataFrame and perform operations, such as counting the number of transactions for each user. 

In this specific use-case, we can identify batches where a user has performed more than one transaction with the `flag` field equal to one as fraudulent. For simplicity, we will assume that these batches represent fraudulent activity.

In reality, this might be a flag you might set on the fly using statig-rules or a ML-based model.

**NOTE**: If left unconstrained, Spark might want to create a very large number of partitions for this streaming application.

Using way more partitions than necessary always results in a huge over-head due to the partition-to-partition communications.

We can force Spark to use a small yet reasonable (given the problem and resources we have) number of partitions
thus making it more efficient in the case of small workloads and few executors

In [None]:
# this line is a trick to force Spark to use a small number of partitions (4 in this example)
spark.conf.set("spark.sql.shuffle.partitions", 4)

### Process each bach to identify possibly fraudolent transactions


1. convert the RDD into a DataFrame (provide the schema if necessary)
2. compute the _number of flagged transactions per batch per user_ (create a unique `userID` field as the combination of _FirstLastname_ to idenfity individual users)
3. identify all the "suspicios" transactions per user: all users with more than one flagged transaction per batch will be assigned a `isFraud` boolean variable
4. format the resulting `userID` and `isFraud` information in a DataFrame to mimick a "live-report" of the suspicious transactions

In [None]:
from pyspark.sql.functions import concat, col, lit, countDistinct

def process_batch(rdd):
    # convert rdd to df
    #   check the documentation and/or the Lecture2 notebook 
    #   for details on how to create and pass a schema to a dataframe   
    df = """ --- """
    #df = rdd.toDF(
            #schema = ...)
    
    # find number of transactions for each user when flag = 1 
    #    declare a new column to create a unique user identifier 
    #    this can be easily done by concatenating first- and last-name fields
    #    check the concat function from pyspark.sql.functions 
    num_transactions = """ --- """
    num_transactions = df.where(col('flag')==1\
                         .

    
    # find suspicious transactions
    #    filter only users with more than one transaction per batch
    #    create a "fraud" column with a value of 1 for the selected users (check the lit function)
    #    from the dataframe, project only the unique id and fraud columns
    sus_transactions = """ --- """
    
    # (trigger an automatic alert)
    # print the first 5 items of the resulting dataframe
    sus_transactions.show(5)

Finally, instruct Spark to execute this `process_batch` function **for each RDD** you will have in your DStream

In [None]:
row_stream.foreachRDD(process_batch)

Now you should be ready to start the spark streaming context

In [None]:
ssc.start()

In [None]:
# stop streaming context
ssc.stop(stopSparkContext=False)

## Stop worker and master

In [10]:
sc.stop()
spark.stop()

Finally, use `docker compose down` to stop and clear all running containers.