# Lecture 3: Spark Streaming

Spark streaming is an extension of the Spark API that enables scalabe stream processing. The stream of data can be ingested from many data sources such as *Kafka*, *s3* or *TCP socket*. The Spark API allows to process data via high-level functions such as *map* and *reduce*. As we are going to se, it is also possible to use dataframe operations. 

Processed data can be exported to an external database and used to make live dashboards or offline analysis. 

Spark streaming works by dividing the input data into micro-batches that can be treated as static dataset. In Spark this is called *discretized stream* (*DStream*). The DStream is represented using RDDs.

![DStream](imgs/lecture3/DStream.png)

A transformation of the DStream, i.e. `Dstream.map` will act independently on each batch. For example, in the image below, we can filter the original RDD to remove some data and produce a new stream. 

![DStream_filter](imgs/lecture3/DStream_filter.png)

In this lecture we will see how to setup a simple stream using as source a socket.

## Setup

In [1]:
# set this variable with one of the following values
# -> 'local'
# -> 'docker_container'
# -> 'docker_cluster'
CLUSTER_TYPE ='docker_container'

In [2]:
# set env variable
%env CLUSTER_TYPE $CLUSTER_TYPE

env: CLUSTER_TYPE=docker_container


## Start the cluster 

Environment variables need to be set only in the case of a local cluster

In [3]:
if CLUSTER_TYPE=='local':
    import findspark
    findspark.init('/Users/matteo/Work/MAPD/spark-3.1.1-bin-hadoop3.2/')

In [4]:
%%script bash --no-raise-error

if [[ "$CLUSTER_TYPE" != "docker_cluster" ]]; then
    echo "Launching master and worker"
    
    # start master 
    $SPARK_HOME/sbin/start-master.sh --host localhost \
        --port 7077 --webui-port 8080
    
    # start worker
    $SPARK_HOME/sbin/start-worker.sh spark://localhost:7077 \
        --cores 4 --memory 2g
fi

Launching master and worker
org.apache.spark.deploy.master.Master running as process 1073.  Stop it first.
org.apache.spark.deploy.worker.Worker running as process 1133.  Stop it first.


## Create the Spark session

Later on we will explain what is the role of [Apache Arrow](), but first we need to install it and create the spark session with it.

In [5]:
from pyspark.sql import SparkSession

if CLUSTER_TYPE in ['local', 'docker_container']:
    
    spark = SparkSession.builder \
        .master("spark://localhost:7077")\
        .appName("Spark streaming application")\
        .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
        .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")\
        .getOrCreate()

elif CLUSTER_TYPE == 'docker_cluster':
    
    # use the provided master
    spark = SparkSession.builder \
        .master("spark://spark-master:7077")\
        .appName("Spark streaming application")\
        .config("spark.executor.memory", "512m")\
        .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
        .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")\
        .getOrCreate()

In [6]:
sc = spark.sparkContext
sc

## Streaming context

The first step of a spark streaming application is the creation of a StreamingContext. The context can be initialized using `StreamingContext(SparkContext, batch_interval`). There could be at most one StreamingContext for each spark application. The processing start when `streamingContext.start()` is called and it can be stopped with `streamingContext.stop()`. If it is stoped without passing `stop(stopSparkContext=False)` the sparkContext is also stopped.

In [30]:
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 2)

For this example spark will read data from a TCP socket. Data are generated by a simple python program that can be found in `utils/producer.py`. The producer will write data on port `5555` of `localhost`. 

The dataset consists of fake credit card transactions.

In [31]:
socket_stream = ssc.socketTextStream("127.0.0.1", 5555)

The first thing we need to to is load the json describing each transaction.

In [32]:
import json

json_stream = socket_stream.map(lambda msg: json.loads(msg))

It is possible to print some elements of each batch with `pprint()`. This can be used to explore the RDDs.

In [10]:
json_stream.pprint()

Start the computations with `ssc.start()` and stop with `ssc.stop(stopSparkContext=False)`. Remember that after this last instruction the streaming context must be defined again.

In [11]:
ssc.start()

-------------------------------------------
Time: 2021-05-04 13:35:34
-------------------------------------------

-------------------------------------------
Time: 2021-05-04 13:35:36
-------------------------------------------

-------------------------------------------
Time: 2021-05-04 13:35:38
-------------------------------------------

-------------------------------------------
Time: 2021-05-04 13:35:40
-------------------------------------------

-------------------------------------------
Time: 2021-05-04 13:35:42
-------------------------------------------

-------------------------------------------
Time: 2021-05-04 13:35:44
-------------------------------------------

-------------------------------------------
Time: 2021-05-04 13:35:46
-------------------------------------------

-------------------------------------------
Time: 2021-05-04 13:35:48
-------------------------------------------

-------------------------------------------
Time: 2021-05-04 13:35:50
----------

-------------------------------------------
Time: 2021-05-04 13:36:14
-------------------------------------------
{'name': 'John', 'surname': 'Millers', 'amount': '998.55', 'delta_t': '2.28', 'flag': 0}
{'name': 'John', 'surname': 'Smith', 'amount': '592.64', 'delta_t': '3.71', 'flag': 1}
{'name': 'Joe', 'surname': 'Smith', 'amount': '486.15', 'delta_t': '4.24', 'flag': 0}
{'name': 'Andy', 'surname': 'Smith', 'amount': '972.44', 'delta_t': '4.30', 'flag': 0}
{'name': 'Joe', 'surname': 'Jones', 'amount': '411.29', 'delta_t': '7.20', 'flag': 0}
{'name': 'John', 'surname': 'Jones', 'amount': '665.96', 'delta_t': '2.71', 'flag': 0}
{'name': 'Joe', 'surname': 'Jones', 'amount': '778.86', 'delta_t': '8.03', 'flag': 0}
{'name': 'Joe', 'surname': 'Millers', 'amount': '448.57', 'delta_t': '6.16', 'flag': 0}
{'name': 'John', 'surname': 'Millers', 'amount': '354.80', 'delta_t': '1.94', 'flag': 0}
{'name': 'Andy', 'surname': 'Millers', 'amount': '275.73', 'delta_t': '9.59', 'flag': 1}
...

-------

-------------------------------------------
Time: 2021-05-04 13:36:32
-------------------------------------------
{'name': 'Alice', 'surname': 'Jones', 'amount': '850.30', 'delta_t': '5.05', 'flag': 0}
{'name': 'Alice', 'surname': 'Johnson', 'amount': '841.10', 'delta_t': '3.59', 'flag': 0}
{'name': 'Alice', 'surname': 'Millers', 'amount': '349.29', 'delta_t': '2.97', 'flag': 0}
{'name': 'Joe', 'surname': 'Johnson', 'amount': '88.90', 'delta_t': '1.40', 'flag': 0}
{'name': 'John', 'surname': 'Jones', 'amount': '172.81', 'delta_t': '9.59', 'flag': 0}
{'name': 'Joe', 'surname': 'Johnson', 'amount': '403.52', 'delta_t': '0.57', 'flag': 0}
{'name': 'Andy', 'surname': 'Johnson', 'amount': '499.65', 'delta_t': '5.45', 'flag': 1}
{'name': 'Alice', 'surname': 'Jones', 'amount': '633.61', 'delta_t': '3.77', 'flag': 0}
{'name': 'Alice', 'surname': 'Millers', 'amount': '452.61', 'delta_t': '4.43', 'flag': 0}
{'name': 'Joe', 'surname': 'Millers', 'amount': '76.75', 'delta_t': '0.25', 'flag': 1}
..

In [13]:
ssc.stop(stopSparkContext=False)

Before running the following cells, create again the streaming context and run the cells where the input data source is defined. Remember to skip `pprint` otherwise this operation will be appended to the DAG. 

We may want to convert each batch into a Spark DataFrame to have higher level API. To do that, let's first convert numeric features of the json into python floats and integers. The dictionary can then be used to create a `Row`for each transaction.

In [33]:
from pyspark.sql import Row

def create_row_rdd(t):
    t['amount'] = float(t['amount'])
    t['delta_t'] = float(t['delta_t'])
    t['flag'] = int(t['flag'])
    
    return Row(**t)

row_stream = json_stream.map(create_row_rdd)

The method `DStream.foreachRDD` can be used to apply custom transformations to each batch of data. In our case we are insterested in converting each batch of data into a dataframe and perform operations like finding the number of transactions for each user. Transacion where the user performed more than one transaction, with the flag equal to one,  inside the microbatch and can be flagged as fraudulent.

In [34]:
spark.conf.set("spark.sql.shuffle.partitions", 3)

In [35]:
from pyspark.sql.functions import concat, col, lit, countDistinct

def process_batch(rdd):
    # convert to df
    df = rdd.toDF(
        schema='name string, surname string, amount float, delta_t float, flag int'
    )
    
    # find number of transactions for each user when flag = 1 
    num_transactions = (
        df
        .where(col('flag')==1)
        .withColumn('id', concat(col('name'), col('surname')))
        .groupBy('id')
        .count()
    )
    
    # find suspicious transactions
    sus_transactions = (
        num_transactions
        .where(col('count')>1)
        .whitColumnt('fraud', lit(1))
        .select(col('id'), col('fraud'))
    )
    
    # send allarm
    sus_transaction.show(5)

row_stream.foreachRDD(process_batch)

In [36]:
ssc.start()

+---+-----+
| id|count|
+---+-----+
+---+-----+

+----------+-----+
|        id|count|
+----------+-----+
|AliceJones|    1|
+----------+-----+

+---+-----+
| id|count|
+---+-----+
+---+-----+

+-----------+-----+
|         id|count|
+-----------+-----+
|   JoeJones|    1|
|AndyJohnson|    1|
|  JohnSmith|    2|
| JoeJohnson|    1|
+-----------+-----+

+-----------+-----+
|         id|count|
+-----------+-----+
| JoeMillers|    1|
|JohnMillers|    1|
+-----------+-----+

+-----------+-----+
|         id|count|
+-----------+-----+
|JohnMillers|    1|
+-----------+-----+

+-----------+-----+
|         id|count|
+-----------+-----+
| AliceSmith|    2|
|   JoeSmith|    2|
|JohnMillers|    1|
+-----------+-----+

+------------+-----+
|          id|count|
+------------+-----+
|AliceJohnson|    1|
|    JoeSmith|    2|
|  JoeMillers|    1|
| AndyMillers|    1|
+------------+-----+

+----------+-----+
|        id|count|
+----------+-----+
|JoeMillers|    1|
|  JoeSmith|    2|
|JoeJohnson|    1|

+------------+-----+
|          id|count|
+------------+-----+
| JohnJohnson|    1|
|  AliceSmith|    1|
|AliceMillers|    1|
| JohnMillers|    1|
+------------+-----+

+----------+-----+
|        id|count|
+----------+-----+
| JohnSmith|    1|
|  JoeSmith|    1|
|AliceJones|    1|
+----------+-----+

+------------+-----+
|          id|count|
+------------+-----+
| JohnJohnson|    1|
|AliceJohnson|    1|
|   AndyJones|    1|
|    JoeSmith|    1|
+------------+-----+

+------------+-----+
|          id|count|
+------------+-----+
|  AliceSmith|    1|
|AliceJohnson|    1|
| JohnJohnson|    1|
|  AliceJones|    1|
+------------+-----+

+------------+-----+
|          id|count|
+------------+-----+
|AliceJohnson|    1|
+------------+-----+

+------------+-----+
|          id|count|
+------------+-----+
| JohnJohnson|    1|
|AliceMillers|    1|
|  AliceJones|    1|
+------------+-----+



In [37]:
# stop streaming context

ssc.stop(stopSparkContext=False)

+------------+-----+
|          id|count|
+------------+-----+
|AliceJohnson|    2|
| JohnJohnson|    1|
|  AliceSmith|    1|
|  JoeMillers|    1|
|   AndySmith|    1|
+------------+-----+
only showing top 5 rows

