# Spark streaming

[Documentstion](https://spark.apache.org/docs/latest/streaming-programming-guide.html)

## Window Operations Types

### window
Compute new DStream based on windowed batches of source DS.

### countByWindow
Sliding window count of elements.

### reduceByKeyAndWindow
Applied on PairDStreams. Values for each key are aggregated over a sliding window. Improve performance with inverse function.

### countByValueAndWindow
Returns DStream of (K,V} pairs where the value of each key is its frequency.

## Quiz:

- Which operation returns a new single-element stream, created by aggregating elements in the stream over a sliding interval using func?
    - reduceByWindow


- What is Spark Streaming?
    - It is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerance stream processing of live data streams.


- What stateful operations exist?
    - updateStateByKey
    - mapWithState


- You cannot develop custom streaming sources for your own needs in Spark Streaming.
    - False


- SparkContext is the entry point for all spark streaming functionality.
    - False

## Spark Data Streaming

### Quiz:

- What is true about streaming data?
    - Streaming data is generated continuously by data sources
    - Wide variety of sources
    - Data is processed sequentially


- In what way is streaming implemented in spark streaming?
    - Micro batching


- What is DStream?
    - It is a key abstraction, which represents a stream of data divided into small batches.


- On what data type DStream is based on?
    - RDD

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

os.environ["SPARK_HOME"] = "/mnt/d/Ubuntu/Programs/spark-3.1.1-bin-hadoop3.2"
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python3"
os.environ["PYSPARK_SUBMIT_ARGS"] = "pyspark-shell"

In [2]:
# master = "local"
# spark = SparkSession.builder.master(master).appName("NetworkWordCount").getOrCreate()

In [3]:
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint('./checkpoints')

In [4]:
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

In [5]:
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

In [6]:
# # Count each word in each batch
# pairs = words.map(lambda word: (word, 1))
# wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# # Print the first ten elements of each RDD generated in this DStream to the console
# wordCounts.pprint()

In [7]:
# Window Operations

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))

# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)

# Print the first ten elements of each RDD generated in this DStream to the console
windowedWordCounts.pprint()

In [8]:
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

-------------------------------------------
Time: 2021-05-12 18:55:03
-------------------------------------------

-------------------------------------------
Time: 2021-05-12 18:55:13
-------------------------------------------

-------------------------------------------
Time: 2021-05-12 18:55:23
-------------------------------------------



KeyboardInterrupt: 