# Evil Net - Streamming Example



The intention of the Notebook is showcase how to:

1. Instantiate Spark to 
    1. Connect Spark to Kafka
1. Create and Streaming Context
1. Parser Data Fetched from Kafka
1. Produce a Sample Count for The key

## In Addtion The following function will be use
### updateStateByKey
The `updateStateByKey` operation allows you to maintain arbitrary state while continuously updating it with new information. To use this, you will have to do two steps.
1. Define the state - The state can be an arbitrary data type.
2. Define the state update function - Specify with a function how to update the state using the previous state and the new values from an input stream.
In every batch, Spark will apply the state update function for all existing keys, regardless of whether they have new data in a batch or not. If the update function returns None then the key-value pair will be eliminated.

Let’s illustrate this with an example. Say you want to maintain a running count of each word seen in a text data stream. Here, the running count is the state and it is an integer. We define the update function as:
```python
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)  # add the new values with the previous running count to get the new count
```
This is applied on a DStream containing words (say, the `pairs` DStream containing `(word, 1)` pairs in the earlier [example](https://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example)).
```python
runningCounts = pairs.updateStateByKey(updateFunction)
```
The update function will be called for each word, with `newValues` having a sequence of 1’s (from the (word, 1) pairs) and the `runningCount` having the previous count. For the complete Python code, take a look at the example [stateful_network_wordcount.py](https://github.com/apache/spark/blob/v2.2.0/examples/src/main/python/streaming/stateful_network_wordcount.py).

Note that using `updateStateByKey` requires the checkpoint directory to be configured, which is discussed in detail in the [checkpointing](https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing) section.


#  Libs Version Use

In [1]:
from IPython import __version__ as ipython_version
from pyspark import __version__ as pyspark_version

VERSION = " Versioning "
IPYTHON = " IPython: %s " % ipython_version
PYSPARK = " PySpark: %s " % pyspark_version

print(VERSION.center(40, '#'))
print(IPYTHON.center(40, "-"))
print(PYSPARK.center(40, "-"))
print("".center(40, '#'))

############## Versioning ##############
----------- IPython: 7.23.1 ------------
------------ PySpark: 2.4.7 ------------
########################################


# Import Python Modules

In [2]:
from pyspark import SparkConf, SparkContext
from json import loads as js_loads
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext

# Create a Spark Conf 

## Adding the Kafka Package
## Set Master-Spark

In [3]:
spark_conf = (SparkConf().set(
    "spark.jars.packages",
    "org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2").setMaster(
        "spark://spark-master:7077").setAppName("JupyterSpark"))
spark_context = SparkContext.getOrCreate(spark_conf)
spark_context

### Create Streaming Context

In [4]:
ssc = StreamingContext(spark_context, 1)
brokers, topics = 'localhost:9092', ['stream-tweets']
kvs = KafkaUtils.createDirectStream(
    ssc, topics, {
        'bootstrap.servers': 'by-kafka-broker:9092',
        'group.id': 'video-group',
        'auto.offset.reset': 'smallest'
    })
ssc.checkpoint("./checkpoint-tweet")

## Supportin Function

In [5]:
def shrink(x):
    """Just get the data.lang"""
    return x["data"].get("lang", "unknown")


def state_full_sum(new_values, global_sum):
    return sum(new_values) + (global_sum or 0)

### Kafka Data:
Kafka is a Key-value, our Evil net only focus on values

In [6]:
lines = kvs.map(lambda x: x[1])

## Tranform Data over Windows of 20 sec over slide of 4 secs

In [7]:
langs_count = lines.window(windowDuration=20, slideDuration=4).map(
    js_loads).map(shrink).map(lambda word: (word, 1)).updateStateByKey(
        state_full_sum)
langs_count.pprint()

# Start the Streaming

In [8]:
ssc.start()

-------------------------------------------
Time: 2021-05-14 19:54:26
-------------------------------------------
('en', 72)
('und', 4)
('tl', 1)
('ar', 1)
('ja', 1)
('es', 1)

-------------------------------------------
Time: 2021-05-14 19:54:30
-------------------------------------------
('en', 145)
('und', 8)
('tl', 2)
('ar', 2)
('ja', 3)
('es', 2)



# Stop Context

In [None]:
ssc.stop(stopSparkContext=True,stopGraceFully=True)