# Databricks Certified Associate Developer for Apache Spark 3.0

https://spark.apache.org/docs/latest/


Content: 
* Spark Architecture (30%)
* SQL - DF (40%) https://spark.apache.org/docs/latest/sql-programming-guide.html
* RDD low API (10%)
* Streaming (10%) https://spark.apache.org/docs/latest/streaming-programming-guide.html_
* ML (5%) https://spark.apache.org/docs/latest/ml-guide.html
* graphX (5%) https://spark.apache.org/docs/latest/graphx-programming-guide.html

Courses from https://academy.databricks.com/instructor-led-training/apache-spark-programming
* Day 1: DataFrames
 * __1 Introduction__ Course overview, Databricks ecosystem, Spark overview, Case study, Knowledge check
 * __2 Databricks Platform__ Databricks concepts, Workspace UI, Notebooks, Lab
 * __3 Spark SQL__ Spark SQL module, Documentation, DataFrame concepts, Lab
 * __4 Reader & Writer__ Data Sources, DataFrameReader & Writer, Schemas, Performance, Lab
 * __5 DataFrame & Column__ Columns and expressions, Transformations, Actions, Rows, Lab
 
* Day 2: Transformations
 * __1 Aggregation__ Groupby, Grouped data methods, Aggregate functions, Math functions, Lab
 * __2 Datetimes__ Dates & Timestamps, Datetime patterns, Datetime functions, Lab
 * __3 Complex Types__ String functions, Collection functions
 * __4 Additional Function__s Non-aggregate functions, NaFunctions, Lab
 * __5 User-Defined Functions__ User-defined functions, Vectorized UDFs, Performance, Lab
 
* Day 3: Spark Optimization
 * __1 Spark Architecture__ Spark Cluster, Spark Execution, Shuffling, Lab
 * __2 Shuffles & Caching__ Lineage, Shuffle files, Caching, Caching recommendations, Spark UI: Storage, Lab
 * __3 Query Optimization__ Catalyst Optimizer, Adaptive Query Execution, Best practices, Lab
 * __4 Spark UI__ Spark UI navigation, Spark UI: Jobs, Stages, SQL
 * __5 Partitioning__ Partitions vs cores, Default shuffle partitions, Repartition, Best practices, AQE, Lab
 
* Day 4: Structured Streaming
 * __1 Review__ DataFrames and Transformations, Lab
 * __2 Streaming Query__ Streaming concepts, Sources and Sinks, Streaming Query, Transformations, Lab
 * __3 Processing Streams__ Monitoring Streams, Lab
 * __4 Aggregating Streams__ Streaming aggregations, Windows, Watermarking, Lab
 * __5 Delta Lake__ Delta Lake concepts, Batch and streaming, Lab

videos :

https://www.youtube.com/watch?v=7ooZ4S7Ay6Y

https://www.youtube.com/watch?v=tFRPeU5HemU

# I) Spark Architecture (30%)

# 2) SQL Df (40%)

# 3) RDD low API (10%)

# 4) Streaming (10%)
Most of examples are taken from \
_https://spark.apache.org/docs/latest/streaming-programming-guide.html_ \
I tried them and completed them to be able to execute them

### Principle

"Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams."

<img src="https://spark.apache.org/docs/latest/img/streaming-arch.png"
     alt="Markdown Monster icon"
     style="height:200px;"/>
_from https://spark.apache.org/docs/latest/streaming-programming-guide.html_

First, we import StreamingContext, which is the main entry point for all streaming functionality. We create a local StreamingContext with two execution threads, and batch interval of 1 second

In [1]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")

In [2]:
ssc = StreamingContext(sc, 1)

Using this context, we can create a DStream that represents streaming data from a TCP source, specified as hostname (e.g. localhost) and port (e.g. 9999).

In [3]:
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

This lines DStream represents the stream of data that will be received from the data server. Each record in this DStream is a line of text. Next, we want to split the lines by space into words.

In [4]:
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

flatMap is a one-to-many DStream operation that creates a new DStream by generating multiple new records from each record in the source DStream. In this case, each line will be split into multiple words and the stream of words is represented as the words DStream. Next, we want to count these words.

In [5]:
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

The words DStream is further mapped (one-to-one transformation) to a DStream of (word, 1) pairs, which is then reduced to get the frequency of words in each batch of data. Finally, wordCounts.pprint() will print a few of the counts generated every second.

Note that when these lines are executed, Spark Streaming only sets up the computation it will perform when it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call

In [6]:
ssc.start()             # Start the computation
#ssc.awaitTermination()  # Wait for the computation to terminate due to manual exit or error
import time
time.sleep(3)
ssc.stop(stopSparkContext=False)  # Stop listening

-------------------------------------------
Time: 2020-11-12 11:13:45
-------------------------------------------

-------------------------------------------
Time: 2020-11-12 11:13:46
-------------------------------------------

-------------------------------------------
Time: 2020-11-12 11:13:47
-------------------------------------------

-------------------------------------------
Time: 2020-11-12 11:13:48
-------------------------------------------



On an other terminal run the command to listen the port

nc -l 10222

"After a context is defined, you have to do the following.

    Define the input sources by creating input DStreams.
    Define the streaming computations by applying transformation and output operations to DStreams.
    Start receiving data and processing it using streamingContext.start().
    Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
    The processing can be manually stopped using streamingContext.stop().

Points to remember:

    Once a context has been started, no new streaming computations can be set up or added to it.
    Once a context has been stopped, it cannot be restarted.
    Only one StreamingContext can be active in a JVM at the same time.
    stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
    A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created."

_from https://spark.apache.org/docs/latest/streaming-programming-guide.html_

### Discretized stream

The stream is defined as a serie of discrete RDDs inscreasing by time and where all the transformation are applied \

<img src="https://spark.apache.org/docs/latest/img/streaming-dstream-ops.png"
     alt="Markdown Monster icon"
     style="height:200px;"/>
_image from https://spark.apache.org/docs/latest/streaming-programming-guide.html_

### Input DStreams and Receivers

" Input DStreams are DStreams representing the stream of input data received from streaming sources. In the quick example, lines was an input DStream as it represented the stream of data received from the netcat server. Every input DStream (except file stream, discussed later in this section) is associated with a Receiver (Scala doc, Java doc) object which receives the data from a source and stores it in Spark’s memory for processing.

Spark Streaming provides two categories of built-in streaming sources.

    Basic sources: Sources directly available in the StreamingContext API. Examples: file systems, and socket connections.
    Advanced sources: Sources like Kafka, Kinesis, etc. are available through extra utility classes. These require linking against extra dependencies as discussed in the linking section.

We are going to discuss some of the sources present in each category later in this section.

Note that, if you want to receive multiple streams of data in parallel in your streaming application, you can create multiple input DStreams (discussed further in the Performance Tuning section). This will create multiple receivers which will simultaneously receive multiple data streams. But note that a Spark worker/executor is a long-running task, hence it occupies one of the cores allocated to the Spark Streaming application. Therefore, it is important to remember that a Spark Streaming application needs to be allocated enough cores (or threads, if running locally) to process the received data, as well as to run the receiver(s).

Points to remember

    When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using an input DStream based on a receiver (e.g. sockets, Kafka, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL, where n > number of receivers to run (see Spark Properties for information on how to set the master).

    Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process it. "

_by https://spark.apache.org/docs/latest/streaming-programming-guide.html_

### Basic sources

To get files like text log, when they arrive in a directory

In [7]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")

Context already setup


In [8]:
path = "C:\\Utilisateurs\\Millet\\AppData\\Roaming\\IBM Watson Studio\\logs\\"
#ssc = StreamingContext.textFileStream(path)

All files must be in the same data format.

A file is considered part of a time period based on its modification time, not its creation time.

### Queue of RDD for test purpose

In [10]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")

Context already setup


In [11]:
ssc = StreamingContext(sc, 1)

In [12]:
queueOfRDDs = [sc.range(0, 1000),
               sc.range(1000, 3000),
               sc.range(2000, 4000)]

In [13]:
rdd_received = ssc.queueStream(queueOfRDDs)

In [14]:
rdd_received.count().pprint()

In [15]:
ssc.start()
#ssc.awaitTermination()

import time

# Note Changes to the queue after the stream is created will not be recognized. 
# https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html?highlight=queue
queueOfRDDs.append(sc.range(0, 100))

time.sleep(5)
ssc.stop(stopSparkContext=False)

-------------------------------------------
Time: 2020-11-03 23:27:24
-------------------------------------------
1000

-------------------------------------------
Time: 2020-11-03 23:27:25
-------------------------------------------
2000

-------------------------------------------
Time: 2020-11-03 23:27:26
-------------------------------------------
2000

-------------------------------------------
Time: 2020-11-03 23:27:27
-------------------------------------------
0

-------------------------------------------
Time: 2020-11-03 23:27:28
-------------------------------------------
0



### Receiver Reliability

There are 2 kinds of receivers

Reliable Receiver - A reliable receiver correctly sends acknowledgment to a reliable source when the data has been received and stored in Spark with replication. (kafka...)

Unreliable Receiver - An unreliable receiver does not send acknowledgment to a source. This can be used for sources that do not support acknowledgment, or even for reliable sources when one does not want or need to go into the complexity of acknowledgment.

### Transformations on DStreams

"Transformation	Meaning
* __map(func)__ 	    Return a new DStream by passing each element of the source DStream through a function func.
* __flatMap(func)__ 	Similar to map, but each input item can be mapped to 0 or more output items.
* __filter(func)__ 	Return a new DStream by selecting only the records of the source DStream on which func returns true.
* __repartition(numPartitions)__ 	Changes the level of parallelism in this DStream by creating more or fewer partitions.
* __union(otherStream)__ 	Return a new DStream that contains the union of the elements in the source DStream and otherDStream.
* __count()__ 	Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream.
* __reduce(func)__ 	Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative and commutative so that it can be computed in parallel.
* __countByValue()__ 	When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream.
* __reduceByKey(func, [numTasks])__ 	When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
* __join(otherStream, [numTasks])__ 	When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.
* __cogroup(otherStream, [numTasks])__ 	When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples.
* __transform(func)__ 	Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary * RDD operations on the DStream.
* __updateStateByKey(func)__ 	Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key." \

_from https://spark.apache.org/docs/latest/streaming-programming-guide.html_

#### Classical transformations example
Dummy example: Get the number of letters for each document

In [16]:
# Example of map/reduce/count on StreamingContext

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import time

# launch spark and streaming context
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")
ssc = StreamingContext(sc, 1)

# RDD reading different text documents
queueOfRDDs = [sc.parallelize(["This is my first document"]),
               sc.parallelize(["This is a second longer document"]),
               sc.parallelize(["This is a document which doesn't know it is a document"])]
rdd_received = ssc.queueStream(queueOfRDDs)

                     
# Map the words with their lengths - cache because lineLengths is called with 2 actions
words = rdd_received.flatMap(lambda line: line.split(" "))
# format the RDD with key value pairs
line_lengths = words.map(lambda word : len(word))
# Count the words
total_length = line_lengths.reduce(lambda a, b: a + b)
# Display result
total_length.pprint()

# launch wait and terminate
ssc.start()
time.sleep(5)
ssc.stop(stopSparkContext=False)

Context already setup
-------------------------------------------
Time: 2020-11-03 23:27:29
-------------------------------------------
21

-------------------------------------------
Time: 2020-11-03 23:27:30
-------------------------------------------
27

-------------------------------------------
Time: 2020-11-03 23:27:31
-------------------------------------------
44

-------------------------------------------
Time: 2020-11-03 23:27:32
-------------------------------------------

-------------------------------------------
Time: 2020-11-03 23:27:33
-------------------------------------------



#### updateStateByKey transformation


Example: get the number of occurence of each word in a stream. not per document. \
Only one result is sent, and is updated for each new rdd coming. \
After the last RDD, the result is provided each second. \
Checkpoint use is necessaty here

In [17]:
# Example of updateStateByKey use
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import time

def update_word_count(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount) 


# launch spark and streaming context
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint_directory")

# RDD reading different text documents
queueOfRDDs = [sc.parallelize(["This is my first document"]),
               sc.parallelize(["This is a second longer document"]),
               sc.parallelize(["This is a document which doesn't know it is a document"])]
rdd_received = ssc.queueStream(queueOfRDDs)
                    
# Map the words with their lengths - cache because lineLengths is called with 2 actions
words = rdd_received.flatMap(lambda line: line.split(" "))

# format the RDD with key value pairs
line_lengths = words.map(lambda word : (word, 1))

# Count the words
word_count = line_lengths.reduceByKey(lambda a, b : a + b)
# word_count.pprint() # If the occurence of words per document is needed

# update each rdd state  by key when other rdd comes
total_word_count = word_count.updateStateByKey(update_word_count)

# Display result
total_word_count.pprint()

# launch wait and terminate
ssc.start()
time.sleep(5)
ssc.stop(stopSparkContext=False)

Context already setup
-------------------------------------------
Time: 2020-11-03 23:27:35
-------------------------------------------
('is', 1)
('This', 1)
('my', 1)
('first', 1)
('document', 1)

-------------------------------------------
Time: 2020-11-03 23:27:36
-------------------------------------------
('is', 2)
('longer', 1)
('This', 2)
('my', 1)
('first', 1)
('document', 2)
('a', 1)
('second', 1)

-------------------------------------------
Time: 2020-11-03 23:27:37
-------------------------------------------
('is', 4)
('longer', 1)
("doesn't", 1)
('know', 1)
('This', 3)
('my', 1)
('first', 1)
('document', 4)
('a', 3)
('second', 1)
...

-------------------------------------------
Time: 2020-11-03 23:27:38
-------------------------------------------
('is', 4)
('longer', 1)
("doesn't", 1)
('know', 1)
('This', 3)
('my', 1)
('first', 1)
('document', 4)
('a', 3)
('second', 1)
...

-------------------------------------------
Time: 2020-11-03 23:27:39
-------------------------------

#### transform(func) operations

"The transform operation (along with its variations like transformWith) allows arbitrary RDD-to-RDD functions to be applied on a DStream. It can be used to apply any RDD operation that is not exposed in the DStream API. For example, the functionality of joining every batch in a data stream with another dataset is not directly exposed in the DStream API. However, you can easily use transform to do this. This enables very powerful possibilities. For example, one can do real-time data cleaning by joining the input data stream with precomputed spam information (maybe generated with Spark as well) and then filtering based on it." 

_from https://spark.apache.org/docs/latest/streaming-programming-guide.html_

Example: Get the number of word occurence in the whole stream, \
with a transformation to filter unwanted words \
The rdd with unwanted words will be joined and filtered

In [18]:
# With a normal context outside a stream, the code would have been
from pyspark import SparkContext

# launch spark and streaming context
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")

# Initilize document and filter
rdd_document = sc.parallelize(["This document is my first document"])
rdd_unwanted_words = sc.parallelize(["This", "is", "a", "my"])


# Map the words 
words = rdd_document.flatMap(lambda line: line.split(" "))

# format the RDD with key value pairs
lines = words.map(lambda word : (word, 1))
unwanted = rdd_unwanted_words.map(lambda word : (word, 1))

#  Outer Join on the 2 rdds 
doc_clnd = lines.fullOuterJoin(unwanted)
display(doc_clnd.collect())
doc_clnd2 = doc_clnd.filter(lambda x: x[1][0] == 1 and  x[1][1] is None)
display(doc_clnd2.collect())
doc_clnd3 = doc_clnd2.map(lambda x: (x[0], 1))
display(doc_clnd3.collect())

# reduce to get the count of word occurence
word_count = doc_clnd3.reduceByKey(lambda a, b : a + b)
word_count.collect()

Context already setup


[('first', (1, None)),
 ('a', (None, 1)),
 ('is', (1, 1)),
 ('This', (1, 1)),
 ('document', (1, None)),
 ('document', (1, None)),
 ('my', (1, 1))]

[('first', (1, None)), ('document', (1, None)), ('document', (1, None))]

[('first', 1), ('document', 1), ('document', 1)]

[('first', 1), ('document', 2)]

In [19]:
# With transfert in a stream

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import time

# launch spark and streaming context
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")
ssc = StreamingContext(sc, 1)

# RDD reading different text documents
queueOfRDDs = [sc.parallelize(["This document is my first document"]),
               sc.parallelize(["This is a second longer document"]),
               sc.parallelize(["This is a document which doesn't know it is a document"])]
rdd_received = ssc.queueStream(queueOfRDDs)

# RDD to filter
rdd_unwanted_words = sc.parallelize(["This", "is", "a", "my", "it"])

# Map the words 
words = rdd_received.flatMap(lambda line: line.split(" "))
unwanted_words = rdd_unwanted_words.map(lambda word : (word, 1))

# format the RDD with key value pairs
line_lengths = words.map(lambda word : (word, 1))

#line_lengths.pprint()
cleanedDStream = line_lengths.transform(lambda rdd: rdd.fullOuterJoin(unwanted_words) \
                                                       .filter(lambda x: x[1][0] == 1 and  x[1][1] is None) \
                                                       .map(lambda x: (x[0], 1)))
cleanedDStream.pprint()

# Count the words
word_count = cleanedDStream.reduceByKey(lambda a, b : a + b)
word_count.pprint() 

# launch wait and terminate
ssc.start()
time.sleep(5)
ssc.stop(stopSparkContext=False)

Context already setup
-------------------------------------------
Time: 2020-11-03 23:27:41
-------------------------------------------
('first', 1)
('document', 1)
('document', 1)

-------------------------------------------
Time: 2020-11-03 23:27:41
-------------------------------------------
('first', 1)
('document', 2)

-------------------------------------------
Time: 2020-11-03 23:27:42
-------------------------------------------
('longer', 1)
('second', 1)
('document', 1)

-------------------------------------------
Time: 2020-11-03 23:27:42
-------------------------------------------
('longer', 1)
('second', 1)
('document', 1)

-------------------------------------------
Time: 2020-11-03 23:27:43
-------------------------------------------
("doesn't", 1)
('which', 1)
('know', 1)
('document', 1)
('document', 1)

-------------------------------------------
Time: 2020-11-03 23:27:43
-------------------------------------------
("doesn't", 1)
('know', 1)
('which', 1)
('document', 2)

#### Window Operations

"Spark Streaming also provides windowed computations, which allow you to apply transformations over a sliding window of data. The following figure illustrates this sliding window."

<img src="https://spark.apache.org/docs/latest/img/streaming-dstream-window.png"
     alt="Markdown Monster icon"
     style="height:200px;"/>
     
"Some of the common window operations are as follows. All of these operations take the said two parameters - windowLength and slideInterval.
* __window(windowLength, slideInterval)__ 	Return a new DStream which is computed based on windowed batches of the source DStream.
* __countByWindow(windowLength, slideInterval__) 	Return a sliding window count of elements in the stream.
* __reduceByWindow(func, windowLength, slideInterval)__ 	Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative and commutative so that it can be computed correctly in parallel.
* __reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])__ 	When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
* __reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])__ A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enters the sliding window, and “inverse reducing” the old data that leaves the window. An example would be that of “adding” and “subtracting” counts of keys as the window slides. However, it is applicable only to “invertible reduce functions”, that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameter invFunc). Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. Note that checkpointing must be enabled for using this operation.
* __countByValueAndWindow(windowLength, slideInterval, [numTasks])__ 	When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. 

_from https://spark.apache.org/docs/latest/streaming-programming-guide.html_

Example: Count the number of occurence of each words in the document \
This time does it reducing the last 2 documents in one rdd \
Only the first and the last (n+1) return a result from an unique rdd

In [20]:
# Example with reduceByKeyAndWindow in a stream

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import time

# launch spark and streaming context
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")
ssc = StreamingContext(sc, 1)

# RDD reading different text documents
queueOfRDDs = [sc.parallelize(["This document is my first document"]),
               sc.parallelize(["This is a second longer document"]),
               sc.parallelize(["This is a document which doesn't know it is a document"])]
rdd_received = ssc.queueStream(queueOfRDDs)

# Map the words 
words = rdd_received.flatMap(lambda line: line.split(" "))
# format the RDD with key value pairs
line_lengths = words.map(lambda word : (word, 1))
# Count the words by window, reduce last 2 second every second
word_count = line_lengths.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 2, 1)
word_count.pprint() 

# launch wait and terminate
ssc.start()
time.sleep(5)
ssc.stop(stopSparkContext=False)

Context already setup
-------------------------------------------
Time: 2020-11-03 23:27:47
-------------------------------------------
('is', 1)
('This', 1)
('document', 2)
('my', 1)
('first', 1)

-------------------------------------------
Time: 2020-11-03 23:27:48
-------------------------------------------
('is', 2)
('longer', 1)
('This', 2)
('document', 3)
('my', 1)
('first', 1)
('a', 1)
('second', 1)

-------------------------------------------
Time: 2020-11-03 23:27:49
-------------------------------------------
('is', 3)
('longer', 1)
("doesn't", 1)
('know', 1)
('This', 2)
('a', 3)
('second', 1)
('document', 3)
('which', 1)
('it', 1)

-------------------------------------------
Time: 2020-11-03 23:27:50
-------------------------------------------
('is', 2)
("doesn't", 1)
('know', 1)
('This', 1)
('a', 2)
('document', 2)
('which', 1)
('it', 1)

-------------------------------------------
Time: 2020-11-03 23:27:51
-------------------------------------------



### Join Stream operation

####  join all the rdds of the stream

stream1 = ... \
stream2 = ... \
joinedStream = stream1.join(stream2)

####  join the rdds of specific windows

windowed_stream1 = stream1.window(20) \
windowed_stream2 = stream2.window(60) \
joined_stream = windowed_stream1.join(windowed_stream2)

#### transform: stream and rdd join

dataset = ... # some RDD \
windowed_stream = stream.window(20) \
joinedStream = windowed_stream.transform(lambda rdd: rdd.join(dataset))

### Output Operations on DStreams

* __print()__ 	Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging.
Python API This is called pprint() in the Python API.
* __saveAsTextFiles(prefix, [suffix])__ 	Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
* __saveAsObjectFiles(prefix, [suffix])__ 	Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
Python API This is not available in the Python API.
* __saveAsHadoopFiles(prefix, [suffix])__ 	Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
Python API This is not available in the Python API.
* __foreachRDD(func)__ 	The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

### Design Patterns for using foreachRDD

Use a function to apply to each RDD:

In [21]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext


def sendPartition(iter):
    connection = createNewConnection()
    for record in iter:
        connection.send(record)
    connection.close()

# Initialize spark context
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")

queueOfRDDs = [sc.parallelize(["This document is my first document"]),
               sc.parallelize(["This is a second longer document"])]

# Initilize Streamcontext and execute te function for each rdd
ssc = StreamingContext(sc, 1)
rdd_received = ssc.queueStream(queueOfRDDs)
rdd_received.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

# Launch, wait and stop
ssc.start()
time.sleep(5)
ssc.stop(stopSparkContext=False)

Context already setup


### DataFrame and SQL Operations

In [22]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext


def process(time, rdd):
    print("========= %s =========" % str(time))
    # here you can do things about RDD or SQL...
    rdd_count = rdd.map(lambda x: len(x))
    rdd_count.show()
    

# Initialize spark context
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")

queueOfRDDs = [sc.parallelize(["This document is my first document"]),
               sc.parallelize(["This is a second longer document"])]

# Initilize Streamcontext and execute te function for each rdd
ssc = StreamingContext(sc, 1)
rdd_received = ssc.queueStream(queueOfRDDs)
rdd_received.foreachRDD(process)

# Launch, wait and stop
ssc.start()
time.sleep(5)
ssc.stop(stopSparkContext=False)

Context already setup


Doesn't do anything because the queueStream is manual and the function awaits new data coming...

### Caching / Persistence

Similar to RDDs, DStreams also allow developers to persist the stream’s data in memory. That is, using the persist() method on a DStream will automatically persist every RDD of that DStream in memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple operations on the same data). For window-based operations like reduceByWindow and reduceByKeyAndWindow and state-based operations like updateStateByKey, this is implicitly true. Hence, DStreams generated by window-based operations are automatically persisted in memory, without the developer calling persist().

For input streams that receive data over the network (such as, Kafka, sockets, etc.), the default persistence level is set to replicate the data to two nodes for fault-tolerance.

Note that, unlike RDDs, the default persistence level of DStreams keeps the data serialized in memory. This is further discussed in the Performance Tuning section. More information on different persistence levels can be found in the Spark Programming Guide.

_from https://spark.apache.org/docs/latest/streaming-programming-guide.html#window-operations_

### Checkpointing

"A streaming application must operate 24/7 and hence must be resilient to failures unrelated to the application logic (e.g., system failures, JVM crashes, etc.). For this to be possible, Spark Streaming needs to checkpoint enough information to a fault- tolerant storage system such that it can recover from failures. There are two types of data that are checkpointed.

* Metadata checkpointing - Saving of the information defining the streaming computation to fault-tolerant storage like HDFS. This is used to recover from failure of the node running the driver of the streaming application (discussed in detail later). Metadata includes:
 * Configuration - The configuration that was used to create the streaming application.
 * DStream operations - The set of DStream operations that define the streaming application.
 * Incomplete batches - Batches whose jobs are queued but have not completed yet.

* Data checkpointing - Saving of the generated RDDs to reliable storage. This is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depend on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time. To avoid such unbounded increases in recovery time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically checkpointed to reliable storage (e.g. HDFS) to cut off the dependency chains.

To summarize, metadata checkpointing is primarily needed for recovery from driver failures, whereas data or RDD checkpointing is necessary even for basic functioning if stateful transformations are used."

_from https://spark.apache.org/docs/latest/streaming-programming-guide.html#window-operations_

In [23]:
# Example of use of checkpoints

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import time

# launch spark and streaming context
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")
ssc = StreamingContext(sc, 1)

# set checkpoint directory
ssc.checkpoint("checkpointDirectory")  

# RDD reading different text documents
queueOfRDDs = [sc.parallelize(["This document is my first document"]),
               sc.parallelize(["This is a second longer document"]),
               sc.parallelize(["This is a document which doesn't know it is a document"])]
rdd_received = ssc.queueStream(queueOfRDDs)

# set checkpoint Interval, 3 second
rdd_received.checkpoint(3)

# Execute an action
rdd_received.pprint()

# launch wait and terminate
ssc.start()
time.sleep(5)
ssc.stop(stopSparkContext=False)

Context already setup
-------------------------------------------
Time: 2020-11-03 23:28:02
-------------------------------------------
This document is my first document

-------------------------------------------
Time: 2020-11-03 23:28:03
-------------------------------------------
This is a second longer document

-------------------------------------------
Time: 2020-11-03 23:28:04
-------------------------------------------
This is a document which doesn't know it is a document

-------------------------------------------
Time: 2020-11-03 23:28:05
-------------------------------------------

-------------------------------------------
Time: 2020-11-03 23:28:06
-------------------------------------------



# 5) ML (5%)

### Load data for the part 5

In [24]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Create spark context
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
spark = SparkSession.builder.getOrCreate()

# delete files from previous runs
!rm -f hmp.parquet*

# download the file containing the data in PARQUET format
!wget https://github.com/IBM/coursera/raw/master/hmp.parquet
    
# create a dataframe out of it
df = spark.read.parquet('hmp.parquet')

# register a corresponding query table
df.createOrReplaceTempView('df')

from IPython.display import clear_output
clear_output()

## 5.1) ML fit, transform, pipelines, estimator

In [25]:
df.show(5)

+---+---+---+--------------------+-----------+
|  x|  y|  z|              source|      class|
+---+---+---+--------------------+-----------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|
+---+---+---+--------------------+-----------+
only showing top 5 rows



### Use fit, transform on StringIndexer estimator

In [26]:
from pyspark.ml.feature import StringIndexer#, VectorAssembler, Normalizer, OneHotEncoderEstimator
est_indexer = StringIndexer(inputCol="class", outputCol="classIndex")
model = est_indexer.fit(df)
df_indexed = model.transform(df)
df_indexed.show(5)

+---+---+---+--------------------+-----------+----------+
|  x|  y|  z|              source|      class|classIndex|
+---+---+---+--------------------+-----------+----------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|       6.0|
+---+---+---+--------------------+-----------+----------+
only showing top 5 rows



### Use a second fit transform on OneHotEncoder estimator

In [27]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
est_indexer = StringIndexer(inputCol="class", outputCol="classIndex")
est_encoder = OneHotEncoder(inputCol="classIndex", outputCol="categoryVec")
df_indexed = est_indexer.fit(df).transform(df)
df_encoded = est_encoder.fit(df_indexed).transform(df_indexed)
df_encoded.show(5)

+---+---+---+--------------------+-----------+----------+--------------+
|  x|  y|  z|              source|      class|classIndex|   categoryVec|
+---+---+---+--------------------+-----------+----------+--------------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
+---+---+---+--------------------+-----------+----------+--------------+
only showing top 5 rows



### Use a pipeline to make different transformation in a row 

In [28]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, Normalizer, MinMaxScaler
from pyspark.ml import Pipeline

# Define transformations
indexer = StringIndexer(inputCol="class", outputCol="classIndex")
encoder = OneHotEncoder(inputCol="classIndex", outputCol="categoryVec")
vectorAssembler = VectorAssembler(inputCols=["x","y","z"], outputCol="features")
normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
# Define pipeline
pipeline = Pipeline(stages=[indexer, encoder, vectorAssembler, normalizer])
model = pipeline.fit(df)
df_result = model.transform(df)
df_result.show(5)

+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+
|  x|  y|  z|              source|      class|classIndex|   categoryVec|        features|       features_norm|
+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[21.0,52.0,34.0]|[0.19626168224299...|
+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+
o

In [29]:
# Clean dataframe for the next part
df_cleaned = df_result.drop("x").drop("y").drop("z").drop("source").drop("class").drop("categoryVec").drop("features")
df_cleaned = df_cleaned.withColumnRenamed("classIndex", "target")
df_cleaned = df_cleaned.withColumnRenamed("features_norm", "features")
df_cleaned.show(5)

+------+--------------------+
|target|            features|
+------+--------------------+
|   6.0|[0.20754716981132...|
|   6.0|[0.20754716981132...|
|   6.0|[0.20183486238532...|
|   6.0|[0.20183486238532...|
|   6.0|[0.19626168224299...|
+------+--------------------+
only showing top 5 rows



## 5.2) Model Selection, evaluator, parameter grid

### Split train and validation datasets 

In [30]:
# Prepare training and test data.
df_learn, df_validation = df_cleaned.randomSplit([0.8, 0.2], seed=12345)

### Train and estimate model with fixed parameters

In [31]:
# Train model
from pyspark.ml.regression import LinearRegression
estim_lr = LinearRegression(featuresCol = 'features', labelCol='target',
                            maxIter=10, regParam=0.3, elasticNetParam=0.8)
model_lr = estim_lr.fit(df_learn)

In [32]:
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(model_lr.coefficients))
print("Intercept: %s" % str(model_lr.intercept))
# Summarize the model over the training set and print out some metrics
trainingSummary = model_lr.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [9.276474699807274,0.0,-2.7557906328884165]
Intercept: 3.2530328439765346
RMSE: 3.509002
r2: 0.086105


In [33]:
# Test model
df_prediction = model_lr.transform(df_validation)
df_prediction.show(5)

# Evaluate model
from pyspark.ml.evaluation import RegressionEvaluator
eval_lr = RegressionEvaluator(predictionCol="prediction", \
                              labelCol="target",metricName="r2")
print("R Squared (R2) on test data = %g" % eval_lr.evaluate(df_prediction))

+------+--------------------+------------------+
|target|            features|        prediction|
+------+--------------------+------------------+
|   0.0|[0.0,0.4852941176...|1.8346111946957322|
|   0.0|[0.0,0.4933333333...|1.8567655899797368|
|   0.0|[0.0,0.4941176470...|1.8589269943976887|
|   0.0|       [0.0,0.5,0.5]|1.8751375275323263|
|   0.0|       [0.0,0.5,0.5]|1.8751375275323263|
+------+--------------------+------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.0862592


In [34]:
# The prediction and evaluation can be done in one step
test_result = model_lr.evaluate(df_validation)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 3.51257


In [35]:
# To summarize:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Train model
estim_lr = LinearRegression(featuresCol = 'features', labelCol='target',
                            maxIter=10, regParam=0.3, elasticNetParam=0.8)
model_lr = estim_lr.fit(df_learn)
# Evaluate model
df_prediction = model_lr.transform(df_validation)
eval_lr = RegressionEvaluator(predictionCol="prediction", \
                              labelCol="target", metricName="r2")
print("R Squared (R2) on test data = %g" % eval_lr.evaluate(df_prediction))

R Squared (R2) on test data = 0.0862592


### Cross validation selection to find best parameters

In [36]:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
temp_df = spark.createDataFrame([Row(V4366=0.0, V4460=0.232, V4916=-0.017, V1495=-0.104, V1639=0.005, V1967=-0.008, V3049=0.177, V3746=-0.675, V3869=-3.451, V524=0.004, V5409=0), Row(V4366=0.0, V4460=0.111, V4916=-0.003, V1495=-0.137, V1639=0.001, V1967=-0.01, V3049=0.01, V3746=-0.867, V3869=-2.759, V524=0.0, V5409=0), Row(V4366=0.0, V4460=-0.391, V4916=-0.003, V1495=-0.155, V1639=-0.006, V1967=-0.019, V3049=-0.706, V3746=0.166, V3869=0.189, V524=0.001, V5409=0), Row(V4366=0.0, V4460=0.098, V4916=-0.012, V1495=-0.108, V1639=0.005, V1967=-0.002, V3049=0.033, V3746=-0.787, V3869=-0.926, V524=0.002, V5409=0), Row(V4366=0.0, V4460=0.026, V4916=-0.004, V1495=-0.139, V1639=0.003, V1967=-0.006, V3049=-0.045, V3746=-0.208, V3869=-0.782, V524=0.001, V5409=0)])
trainingData=temp_df.rdd.map(lambda x:(Vectors.dense(x[0:-1]), x[-1])).toDF(["features", "label"])
trainingData.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0,0.232,-0.017...|    0|
|[0.0,0.111,-0.003...|    0|
|[0.0,-0.391,-0.00...|    0|
|[0.0,0.098,-0.012...|    0|
|[0.0,0.026,-0.004...|    0|
+--------------------+-----+



In [37]:
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize estimators, pipeline and evaluators
estim_lr = LinearRegression(featuresCol = 'features', labelCol='target', 
                            maxIter=10) #, regParam=0.3, elasticNetParam=0.8)
pipeline = Pipeline(stages=[estim_lr])
evaluator = RegressionEvaluator(predictionCol="prediction", 
                                labelCol="target", metricName="r2")

# Define the different parameters to test
paramGrid = ParamGridBuilder() \
    .addGrid(estim_lr.regParam, [0.3, 0.5]) \
    .addGrid(estim_lr.elasticNetParam, [0.5, 0.8]) \
    .build()

# Run cross validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)
cvModel = crossval.fit(df_learn)

In [38]:
# print regression result and the best parameters found
print(cvModel.bestModel.stages[0].intercept)
print(cvModel.bestModel.stages[0].coefficients)
print(cvModel.bestModel.stages[0].getRegParam())
print(cvModel.bestModel.stages[0].getElasticNetParam())

3.6854132664170263
[9.779454982679272,0.0,-4.111892840696731]
0.3
0.5


In [39]:
# CV model on the k folds, then train on the whole training set. Can be directly used for validation set
df_prediction = cvModel.transform(df_validation)
eval_lr = RegressionEvaluator(predictionCol="prediction", 
                              labelCol="target", 
                              metricName="r2")
print("R Squared (R2) on test data = %g" % eval_lr.evaluate(df_prediction))

R Squared (R2) on test data = 0.0862592


# 6) Graph (5%)