<span style="font-size:2em;"> Databricks Certified Associate Developer for Apache Spark 3.0 </span>

There is no official documentation indicating what we are supposed to know to pass the databricks Certified Associate Developer for Apache Spark 3.0. However databricks website advises to take 4 courses which cost each $1500. As I don't want to take them, I record the table of contents and try to learn all the points reading the documentation. https://spark.apache.org/docs/latest/ \
The purpose of this notebook is to test all the codes of the official documentation, try the functions and options  when no example are provided and see when and why it works or doesn't.

__Table of contents of the databrick courses__

Courses from https://academy.databricks.com/instructor-led-training/apache-spark-programming
* Day 1: DataFrames
 * __1 Introduction__ Course overview, Databricks ecosystem, Spark overview, Case study, Knowledge check
 * __2 Databricks Platform__ Databricks concepts, Workspace UI, Notebooks, Lab
 * __3 Spark SQL__ Spark SQL module, Documentation, DataFrame concepts, Lab
 * __4 Reader & Writer__ Data Sources, DataFrameReader & Writer, Schemas, Performance, Lab
 * __5 DataFrame & Column__ Columns and expressions, Transformations, Actions, Rows, Lab
 
* Day 2: Transformations
 * __1 Aggregation__ Groupby, Grouped data methods, Aggregate functions, Math functions, Lab
 * __2 Datetimes__ Dates & Timestamps, Datetime patterns, Datetime functions, Lab
 * __3 Complex Types__ String functions, Collection functions
 * __4 Additional Function__s Non-aggregate functions, NaFunctions, Lab
 * __5 User-Defined Functions__ User-defined functions, Vectorized UDFs, Performance, Lab
 
* Day 3: Spark Optimization
 * __1 Spark Architecture__ Spark Cluster, Spark Execution, Shuffling, Lab
 * __2 Shuffles & Caching__ Lineage, Shuffle files, Caching, Caching recommendations, Spark UI: Storage, Lab
 * __3 Query Optimization__ Catalyst Optimizer, Adaptive Query Execution, Best practices, Lab
 * __4 Spark UI__ Spark UI navigation, Spark UI: Jobs, Stages, SQL
 * __5 Partitioning__ Partitions vs cores, Default shuffle partitions, Repartition, Best practices, AQE, Lab
 
* Day 4: Structured Streaming
 * __1 Review__ DataFrames and Transformations, Lab
 * __2 Streaming Query__ Streaming concepts, Sources and Sinks, Streaming Query, Transformations, Lab
 * __3 Processing Streams__ Monitoring Streams, Lab
 * __4 Aggregating Streams__ Streaming aggregations, Windows, Watermarking, Lab
 * __5 Delta Lake__ Delta Lake concepts, Batch and streaming, Lab

<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Spark-Architecture" data-toc-modified-id="Spark-Architecture-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Spark Architecture</a></span><ul class="toc-item"><li><span><a href="#Dataframes-and-RDD" data-toc-modified-id="Dataframes-and-RDD-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Dataframes and RDD</a></span><ul class="toc-item"><li><span><a href="#Create-Dataframes" data-toc-modified-id="Create-Dataframes-1.1.1"><span class="toc-item-num">1.1.1&nbsp;&nbsp;</span>Create Dataframes</a></span></li><li><span><a href="#Dataframe-to-pandas-and-pandas-to-df" data-toc-modified-id="Dataframe-to-pandas-and-pandas-to-df-1.1.2"><span class="toc-item-num">1.1.2&nbsp;&nbsp;</span>Dataframe to pandas and pandas to df</a></span></li><li><span><a href="#RDD-Distributed-dataframe" data-toc-modified-id="RDD-Distributed-dataframe-1.1.3"><span class="toc-item-num">1.1.3&nbsp;&nbsp;</span>RDD Distributed dataframe</a></span></li></ul></li><li><span><a href="#Parallelization-principle" data-toc-modified-id="Parallelization-principle-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Parallelization principle</a></span><ul class="toc-item"><li><span><a href="#Transformations,-Actions,-Shuffling" data-toc-modified-id="Transformations,-Actions,-Shuffling-1.2.1"><span class="toc-item-num">1.2.1&nbsp;&nbsp;</span>Transformations, Actions, Shuffling</a></span></li><li><span><a href="#RDD-lazyness" data-toc-modified-id="RDD-lazyness-1.2.2"><span class="toc-item-num">1.2.2&nbsp;&nbsp;</span>RDD lazyness</a></span></li><li><span><a href="#Caching" data-toc-modified-id="Caching-1.2.3"><span class="toc-item-num">1.2.3&nbsp;&nbsp;</span>Caching</a></span></li><li><span><a href="#Broadcast" data-toc-modified-id="Broadcast-1.2.4"><span class="toc-item-num">1.2.4&nbsp;&nbsp;</span>Broadcast</a></span></li><li><span><a href="#Accumulator" data-toc-modified-id="Accumulator-1.2.5"><span class="toc-item-num">1.2.5&nbsp;&nbsp;</span>Accumulator</a></span></li></ul></li><li><span><a href="#Partitioning-Partitions-vs-cores" data-toc-modified-id="Partitioning-Partitions-vs-cores-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Partitioning Partitions vs cores</a></span><ul class="toc-item"><li><span><a href="#Nodes-architecture" data-toc-modified-id="Nodes-architecture-1.3.1"><span class="toc-item-num">1.3.1&nbsp;&nbsp;</span>Nodes architecture</a></span></li><li><span><a href="#Task,-partition,-executor-optimization" data-toc-modified-id="Task,-partition,-executor-optimization-1.3.2"><span class="toc-item-num">1.3.2&nbsp;&nbsp;</span>Task, partition, executor optimization</a></span></li></ul></li></ul></li><li><span><a href="#DataFrames" data-toc-modified-id="DataFrames-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>DataFrames</a></span><ul class="toc-item"><li><span><a href="#Reader-&amp;-Writer" data-toc-modified-id="Reader-&amp;-Writer-2.1"><span class="toc-item-num">2.1&nbsp;&nbsp;</span>Reader &amp; Writer</a></span><ul class="toc-item"><li><span><a href="#Read-one-text-file-or-the-whole-directory" data-toc-modified-id="Read-one-text-file-or-the-whole-directory-2.1.1"><span class="toc-item-num">2.1.1&nbsp;&nbsp;</span>Read one text file or the whole directory</a></span></li><li><span><a href="#Read-and-save-a-RDD-as-squence-file" data-toc-modified-id="Read-and-save-a-RDD-as-squence-file-2.1.2"><span class="toc-item-num">2.1.2&nbsp;&nbsp;</span>Read and save a RDD as squence file</a></span></li><li><span><a href="#Read-and-save-a-dataframe---csv" data-toc-modified-id="Read-and-save-a-dataframe---csv-2.1.3"><span class="toc-item-num">2.1.3&nbsp;&nbsp;</span>Read and save a dataframe - csv</a></span></li><li><span><a href="#Read-and-save-a-json" data-toc-modified-id="Read-and-save-a-json-2.1.4"><span class="toc-item-num">2.1.4&nbsp;&nbsp;</span>Read and save a json</a></span></li><li><span><a href="#Read-and-save-a-parquet" data-toc-modified-id="Read-and-save-a-parquet-2.1.5"><span class="toc-item-num">2.1.5&nbsp;&nbsp;</span>Read and save a parquet</a></span></li></ul></li><li><span><a href="#DataFrame-&amp;-Column-Columns-and-expressions,-Transformations,-Actions,-Rows," data-toc-modified-id="DataFrame-&amp;-Column-Columns-and-expressions,-Transformations,-Actions,-Rows,-2.2"><span class="toc-item-num">2.2&nbsp;&nbsp;</span>DataFrame &amp; Column Columns and expressions, Transformations, Actions, Rows,</a></span><ul class="toc-item"><li><span><a href="#Select-columns" data-toc-modified-id="Select-columns-2.2.1"><span class="toc-item-num">2.2.1&nbsp;&nbsp;</span>Select columns</a></span></li><li><span><a href="#DataFrame-filter-rows" data-toc-modified-id="DataFrame-filter-rows-2.2.2"><span class="toc-item-num">2.2.2&nbsp;&nbsp;</span>DataFrame filter rows</a></span></li><li><span><a href="#Drop,-rename,-replace-columns" data-toc-modified-id="Drop,-rename,-replace-columns-2.2.3"><span class="toc-item-num">2.2.3&nbsp;&nbsp;</span>Drop, rename, replace columns</a></span></li><li><span><a href="#Replace-value,-replace-na" data-toc-modified-id="Replace-value,-replace-na-2.2.4"><span class="toc-item-num">2.2.4&nbsp;&nbsp;</span>Replace value, replace na</a></span></li><li><span><a href="#Create-dataframe" data-toc-modified-id="Create-dataframe-2.2.5"><span class="toc-item-num">2.2.5&nbsp;&nbsp;</span>Create dataframe</a></span></li><li><span><a href="#Schemas" data-toc-modified-id="Schemas-2.2.6"><span class="toc-item-num">2.2.6&nbsp;&nbsp;</span>Schemas</a></span></li><li><span><a href="#Dataframe-rows" data-toc-modified-id="Dataframe-rows-2.2.7"><span class="toc-item-num">2.2.7&nbsp;&nbsp;</span>Dataframe rows</a></span></li></ul></li><li><span><a href="#Aggregation-Groupby,-Grouped-data-methods,-Aggregate-functions" data-toc-modified-id="Aggregation-Groupby,-Grouped-data-methods,-Aggregate-functions-2.3"><span class="toc-item-num">2.3&nbsp;&nbsp;</span>Aggregation Groupby, Grouped data methods, Aggregate functions</a></span><ul class="toc-item"><li><span><a href="#Merge,-joins" data-toc-modified-id="Merge,-joins-2.3.1"><span class="toc-item-num">2.3.1&nbsp;&nbsp;</span>Merge, joins</a></span></li><li><span><a href="#Aggregation" data-toc-modified-id="Aggregation-2.3.2"><span class="toc-item-num">2.3.2&nbsp;&nbsp;</span>Aggregation</a></span></li></ul></li><li><span><a href="#Datetimes-Dates,-string" data-toc-modified-id="Datetimes-Dates,-string-2.4"><span class="toc-item-num">2.4&nbsp;&nbsp;</span>Datetimes Dates, string</a></span><ul class="toc-item"><li><span><a href="#Datetime" data-toc-modified-id="Datetime-2.4.1"><span class="toc-item-num">2.4.1&nbsp;&nbsp;</span>Datetime</a></span></li><li><span><a href="#Complex-Types-String-functions" data-toc-modified-id="Complex-Types-String-functions-2.4.2"><span class="toc-item-num">2.4.2&nbsp;&nbsp;</span>Complex Types String functions</a></span></li><li><span><a href="#Lit" data-toc-modified-id="Lit-2.4.3"><span class="toc-item-num">2.4.3&nbsp;&nbsp;</span>Lit</a></span></li></ul></li><li><span><a href="#User-Defined-Functions-User-defined-functions,-Vectorized-UDFs,-Performance" data-toc-modified-id="User-Defined-Functions-User-defined-functions,-Vectorized-UDFs,-Performance-2.5"><span class="toc-item-num">2.5&nbsp;&nbsp;</span>User-Defined Functions User-defined functions, Vectorized UDFs, Performance</a></span><ul class="toc-item"><li><span><a href="#User-defined-functions" data-toc-modified-id="User-defined-functions-2.5.1"><span class="toc-item-num">2.5.1&nbsp;&nbsp;</span>User-defined functions</a></span></li><li><span><a href="#Vectorized-UDFs" data-toc-modified-id="Vectorized-UDFs-2.5.2"><span class="toc-item-num">2.5.2&nbsp;&nbsp;</span>Vectorized UDFs</a></span></li><li><span><a href="#Performance" data-toc-modified-id="Performance-2.5.3"><span class="toc-item-num">2.5.3&nbsp;&nbsp;</span>Performance</a></span></li></ul></li><li><span><a href="#Spark-SQL" data-toc-modified-id="Spark-SQL-2.6"><span class="toc-item-num">2.6&nbsp;&nbsp;</span>Spark SQL</a></span></li></ul></li><li><span><a href="#Streaming" data-toc-modified-id="Streaming-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Streaming</a></span><ul class="toc-item"><li><span><a href="#Principle" data-toc-modified-id="Principle-3.1"><span class="toc-item-num">3.1&nbsp;&nbsp;</span>Principle</a></span><ul class="toc-item"><li><span><a href="#Example" data-toc-modified-id="Example-3.1.1"><span class="toc-item-num">3.1.1&nbsp;&nbsp;</span>Example</a></span></li><li><span><a href="#Discretized-stream" data-toc-modified-id="Discretized-stream-3.1.2"><span class="toc-item-num">3.1.2&nbsp;&nbsp;</span>Discretized stream</a></span></li></ul></li><li><span><a href="#Input-/-output" data-toc-modified-id="Input-/-output-3.2"><span class="toc-item-num">3.2&nbsp;&nbsp;</span>Input / output</a></span><ul class="toc-item"><li><span><a href="#Input-DStreams-and-Receivers" data-toc-modified-id="Input-DStreams-and-Receivers-3.2.1"><span class="toc-item-num">3.2.1&nbsp;&nbsp;</span>Input DStreams and Receivers</a></span></li><li><span><a href="#Basic-sources" data-toc-modified-id="Basic-sources-3.2.2"><span class="toc-item-num">3.2.2&nbsp;&nbsp;</span>Basic sources</a></span></li><li><span><a href="#Queue-of-RDD-for-test-purpose" data-toc-modified-id="Queue-of-RDD-for-test-purpose-3.2.3"><span class="toc-item-num">3.2.3&nbsp;&nbsp;</span>Queue of RDD for test purpose</a></span></li><li><span><a href="#Receiver-Reliability" data-toc-modified-id="Receiver-Reliability-3.2.4"><span class="toc-item-num">3.2.4&nbsp;&nbsp;</span>Receiver Reliability</a></span></li></ul></li><li><span><a href="#Transformations-on-DStreams" data-toc-modified-id="Transformations-on-DStreams-3.3"><span class="toc-item-num">3.3&nbsp;&nbsp;</span>Transformations on DStreams</a></span><ul class="toc-item"><li><span><a href="#Classical-transformations-example" data-toc-modified-id="Classical-transformations-example-3.3.1"><span class="toc-item-num">3.3.1&nbsp;&nbsp;</span>Classical transformations example</a></span></li><li><span><a href="#updateStateByKey-transformation" data-toc-modified-id="updateStateByKey-transformation-3.3.2"><span class="toc-item-num">3.3.2&nbsp;&nbsp;</span>updateStateByKey transformation</a></span></li><li><span><a href="#transform(func)-operations" data-toc-modified-id="transform(func)-operations-3.3.3"><span class="toc-item-num">3.3.3&nbsp;&nbsp;</span>transform(func) operations</a></span></li><li><span><a href="#Window-Operations" data-toc-modified-id="Window-Operations-3.3.4"><span class="toc-item-num">3.3.4&nbsp;&nbsp;</span>Window Operations</a></span></li></ul></li><li><span><a href="#Join-Stream-operation" data-toc-modified-id="Join-Stream-operation-3.4"><span class="toc-item-num">3.4&nbsp;&nbsp;</span>Join Stream operation</a></span><ul class="toc-item"><li><span><a href="#join-all-the-rdds-of-the-stream" data-toc-modified-id="join-all-the-rdds-of-the-stream-3.4.1"><span class="toc-item-num">3.4.1&nbsp;&nbsp;</span>join all the rdds of the stream</a></span></li><li><span><a href="#join-the-rdds-of-specific-windows" data-toc-modified-id="join-the-rdds-of-specific-windows-3.4.2"><span class="toc-item-num">3.4.2&nbsp;&nbsp;</span>join the rdds of specific windows</a></span></li><li><span><a href="#transform:-stream-and-rdd-join" data-toc-modified-id="transform:-stream-and-rdd-join-3.4.3"><span class="toc-item-num">3.4.3&nbsp;&nbsp;</span>transform: stream and rdd join</a></span></li></ul></li><li><span><a href="#Output-Operations-on-DStreams" data-toc-modified-id="Output-Operations-on-DStreams-3.5"><span class="toc-item-num">3.5&nbsp;&nbsp;</span>Output Operations on DStreams</a></span></li><li><span><a href="#Design-Patterns-for-using-foreachRDD" data-toc-modified-id="Design-Patterns-for-using-foreachRDD-3.6"><span class="toc-item-num">3.6&nbsp;&nbsp;</span>Design Patterns for using foreachRDD</a></span></li><li><span><a href="#DataFrame-and-SQL-Operations" data-toc-modified-id="DataFrame-and-SQL-Operations-3.7"><span class="toc-item-num">3.7&nbsp;&nbsp;</span>DataFrame and SQL Operations</a></span></li><li><span><a href="#Caching-/-Persistence" data-toc-modified-id="Caching-/-Persistence-3.8"><span class="toc-item-num">3.8&nbsp;&nbsp;</span>Caching / Persistence</a></span></li><li><span><a href="#Checkpointing" data-toc-modified-id="Checkpointing-3.9"><span class="toc-item-num">3.9&nbsp;&nbsp;</span>Checkpointing</a></span></li></ul></li><li><span><a href="#ML" data-toc-modified-id="ML-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>ML</a></span><ul class="toc-item"><li><span><a href="#ML-fit,-transform,-pipelines,-estimator" data-toc-modified-id="ML-fit,-transform,-pipelines,-estimator-4.1"><span class="toc-item-num">4.1&nbsp;&nbsp;</span>ML fit, transform, pipelines, estimator</a></span><ul class="toc-item"><li><span><a href="#Use-fit,-transform-on-StringIndexer-estimator" data-toc-modified-id="Use-fit,-transform-on-StringIndexer-estimator-4.1.1"><span class="toc-item-num">4.1.1&nbsp;&nbsp;</span>Use fit, transform on StringIndexer estimator</a></span></li><li><span><a href="#Use-a-second-fit-transform-on-OneHotEncoder-estimator" data-toc-modified-id="Use-a-second-fit-transform-on-OneHotEncoder-estimator-4.1.2"><span class="toc-item-num">4.1.2&nbsp;&nbsp;</span>Use a second fit transform on OneHotEncoder estimator</a></span></li><li><span><a href="#Use-a-pipeline-to-make-different-transformation-in-a-row" data-toc-modified-id="Use-a-pipeline-to-make-different-transformation-in-a-row-4.1.3"><span class="toc-item-num">4.1.3&nbsp;&nbsp;</span>Use a pipeline to make different transformation in a row</a></span></li></ul></li><li><span><a href="#Model-Selection,-evaluator,-parameter-grid" data-toc-modified-id="Model-Selection,-evaluator,-parameter-grid-4.2"><span class="toc-item-num">4.2&nbsp;&nbsp;</span>Model Selection, evaluator, parameter grid</a></span><ul class="toc-item"><li><span><a href="#Split-train-and-validation-datasets" data-toc-modified-id="Split-train-and-validation-datasets-4.2.1"><span class="toc-item-num">4.2.1&nbsp;&nbsp;</span>Split train and validation datasets</a></span></li><li><span><a href="#Train-and-estimate-model-with-fixed-parameters" data-toc-modified-id="Train-and-estimate-model-with-fixed-parameters-4.2.2"><span class="toc-item-num">4.2.2&nbsp;&nbsp;</span>Train and estimate model with fixed parameters</a></span></li><li><span><a href="#Cross-validation-selection-to-find-best-parameters" data-toc-modified-id="Cross-validation-selection-to-find-best-parameters-4.2.3"><span class="toc-item-num">4.2.3&nbsp;&nbsp;</span>Cross validation selection to find best parameters</a></span></li></ul></li></ul></li></ul></div>

# Spark Architecture

Codes taken from this documentation: \
https://spark.apache.org/docs/latest/rdd-programming-guide.html

## Dataframes and RDD

### Create Dataframes

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Create a local StreamingContext if it has not been launched previously
try:
    conf = SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
    sc = SparkContext(conf=SparkConf())
except:
    print("Context already setup")

# Need to use SparkSession(sc) to create DataFrame
spark = SparkSession(sc)

In [2]:
# Creates an empty dataframe
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType

schema = StructType([
    StructField("column1", StringType(), True),
    StructField("column2", StringType(), True)
])
df = spark.createDataFrame(sc.emptyRDD(), schema)
df.show()

+-------+-------+
|column1|column2|
+-------+-------+
+-------+-------+



In [3]:
from pyspark.sql.types import ArrayType
# Creates a dataframe from list
cSchema = StructType([StructField("WordList", ArrayType(StringType()))])

# notice extra square brackets around each element of list 
test_list = [['Hello', 'world']], [['I', 'am', 'fine']]

df = spark.createDataFrame(test_list, schema=cSchema) 
df.show()

+--------------+
|      WordList|
+--------------+
|[Hello, world]|
| [I, am, fine]|
+--------------+



### Dataframe to pandas and pandas to df

In [4]:
# converts the dataframe into pandas dataframe
df_pd = df.toPandas()
df_pd

Unnamed: 0,WordList
0,"[Hello, world]"
1,"[I, am, fine]"


In [5]:
# Still work even if pandas has never been imported
df_pd.loc[0]

WordList    [Hello, world]
Name: 0, dtype: object

In [6]:
# converts the pandas dataframe into spark dataframe 
import pandas as pd
df_pd = pd.DataFrame([[0, 1, 2], [3, 4, 5]], columns=["a", "b", "c"])
df_pd

Unnamed: 0,a,b,c
0,0,1,2
1,3,4,5


In [7]:
df = spark.createDataFrame(df_pd)
df

DataFrame[a: bigint, b: bigint, c: bigint]

In [8]:
df.show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  0|  1|  2|
|  3|  4|  5|
+---+---+---+



### RDD Distributed dataframe

In [9]:
# Instantiation
data = [[1, 2, 3, 4, 5], [1, 2, 3, 4, 5]]
distData = sc.parallelize(data)
distData

ParallelCollectionRDD[23] at readRDDFromFile at PythonRDD.scala:262

In [10]:
# Action on the RDD
distData.collect()

[[1, 2, 3, 4, 5], [1, 2, 3, 4, 5]]

In [11]:
# Create RDD from dataframe which is from pandas
df_pd = pd.DataFrame([[0, 1, 2], [3, 4, 5]], columns=["a", "b", "c"])
df = spark.createDataFrame(df_pd)
rdd = df.rdd
rdd

MapPartitionsRDD[32] at javaToPython at NativeMethodAccessorImpl.java:0

In [12]:
rdd.collect()

[Row(a=0, b=1, c=2), Row(a=3, b=4, c=5)]

In [13]:
# RDD to DF
df = rdd.toDF()
df.show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  0|  1|  2|
|  3|  4|  5|
+---+---+---+



In [14]:
# RDD to pandas through  df
df_pd = df.toPandas()
df_pd

Unnamed: 0,a,b,c
0,0,1,2
1,3,4,5


## Parallelization principle

### Transformations, Actions, Shuffling

The main principle of the parallelisation is to split a RDD into many partitions. The RDD works with a key/value pair system. Transformation are applied, then a shuffling and a reduction to get the result through an action.

<img src="https://miro.medium.com/max/700/1*dlsSJ3LQ1l70S_VfGzeM0Q.png"
     alt="Markdown Monster icon"
     style="height:200px;"/>
_from https://medium.com/@millet.david.julien/how-to-use-pyspark-to-get-opencv-images-descriptors-with-aws-emr-dd3875503a75_

In [15]:
# Create an input
text = "we have a string with words and we want to get the number of occurence \
of the words"
lst_text = text.split(" ")

In [16]:
# Mapping
rdd_text = sc.parallelize(lst_text)
rdd_text_map = rdd_text.map(lambda w: (w, 1))
rdd_text_map.collect()

[('we', 1),
 ('have', 1),
 ('a', 1),
 ('string', 1),
 ('with', 1),
 ('words', 1),
 ('and', 1),
 ('we', 1),
 ('want', 1),
 ('to', 1),
 ('get', 1),
 ('the', 1),
 ('number', 1),
 ('of', 1),
 ('occurence', 1),
 ('of', 1),
 ('the', 1),
 ('words', 1)]

In [17]:
# Shuffling and reducing
rdd_text_red = rdd_text_map.reduceByKey(lambda a, b: a + b)
rdd_text_red.collect()

[('have', 1),
 ('of', 2),
 ('a', 1),
 ('string', 1),
 ('words', 2),
 ('and', 1),
 ('to', 1),
 ('the', 2),
 ('number', 1),
 ('occurence', 1),
 ('we', 2),
 ('with', 1),
 ('want', 1),
 ('get', 1)]

In [18]:
# Result in a dataframe
df = rdd_text_red.toDF(["words", "unique count"])
df.show()

+---------+------------+
|    words|unique count|
+---------+------------+
|     have|           1|
|       of|           2|
|        a|           1|
|   string|           1|
|    words|           2|
|      and|           1|
|       to|           1|
|      the|           2|
|   number|           1|
|occurence|           1|
|       we|           2|
|     with|           1|
|     want|           1|
|      get|           1|
+---------+------------+



In [19]:
# Result in a pandas df
df_pd = df.toPandas()
df_pd

Unnamed: 0,words,unique count
0,have,1
1,of,2
2,a,1
3,string,1
4,words,2
5,and,1
6,to,1
7,the,2
8,number,1
9,occurence,1


### RDD lazyness

The RDD is computed only when an action is called.

In [20]:
import time
from loremipsum import get_sentences

# Create an input
begin = time.time()
str_text = " ".join(get_sentences(50000))
lst_text = str_text.split(" ")
rdd_text = sc.parallelize(lst_text)
print("Initialization duration = {:.4f} seconds".format(time.time() - begin))

# Transformation, mapping and reducing
begin = time.time()
rdd_text_map = rdd_text.map(lambda w: (w, 1))
rdd_text_red = rdd_text_map.reduceByKey(lambda a, b: a + b)
print("Transformations duration = {:.4f} seconds".format(time.time() - begin))

# Action to collect
begin = time.time()
rdd_text_red.collect()
print("Action duration = {:.4f} seconds".format(time.time() - begin))

# Second action called to show the df after a rdd/df transformation
df = rdd_text_red.toDF(["words", "unique count"])
begin = time.time()
df.collect()
print("Second action duration = {:.4f} seconds".format(time.time() - begin))

Initialization duration = 3.4442 seconds
Transformations duration = 0.0252 seconds
Action duration = 0.5362 seconds
Second action duration = 0.2229 seconds


### Caching

Caching after transformation and before the action save the transformations when they are executed after the first action call. The second action call is much faster

<img src="https://miro.medium.com/max/700/1*toxoeQ4Rec4aAAcYKw55ow.jpeg"
     alt="Markdown Monster icon"
     style="height:200px;"/>
_from https://medium.com/@millet.david.julien/how-to-use-pyspark-to-get-opencv-images-descriptors-with-aws-emr-dd3875503a75_

<img src="https://miro.medium.com/max/700/1*WQaW52PWPiUKJ8tcIfb8vw.jpeg"
     alt="Markdown Monster icon"
     style="height:200px;"/>
_from https://medium.com/@millet.david.julien/how-to-use-pyspark-to-get-opencv-images-descriptors-with-aws-emr-dd3875503a75_

In [21]:
# Create an input
begin = time.time()
str_text = " ".join(get_sentences(50000))
lst_text = str_text.split(" ")
rdd_text = sc.parallelize(lst_text)
print("Initialization duration = {:.4f} seconds".format(time.time() - begin))

# Transformation, mapping and reducing
begin = time.time()
rdd_text_map = rdd_text.map(lambda w: (w, 1))
rdd_text_red = rdd_text_map.reduceByKey(lambda a, b: a + b).cache()
print("Transformations duration = {:.4f} seconds".format(time.time() - begin))

# Action to collect
begin = time.time()
rdd_text_red.collect()
print("First action duration = {:.4f} seconds".format(time.time() - begin))

# Action to collect
begin = time.time()
rdd_text_red.collect()
print("Second same action duration = {:.4f} seconds".format(time.time() - begin))


Initialization duration = 3.4269 seconds
Transformations duration = 0.0374 seconds
First action duration = 0.5840 seconds
Second same action duration = 0.0494 seconds


.cache() is the equivalent of .persist() called with the default option StorageLevel.MEMORY_ONLY \
The other options are

* __MEMORY_ONLY__ Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
* __MEMORY_AND_DISK__ Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
* __MEMORY_ONLY_SER__ (Java and Scala) Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
* __MEMORY_AND_DISK_SER__ (Java and Scala) Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
* __DISK_ONLY__ Store the RDD partitions only on disk.
* __MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.__ 	Same as the levels above, but replicate each partition on two cluster nodes.
* __OFF_HEAP (experimental)__ Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled. 

In [22]:
from pyspark import StorageLevel

# Create an input
begin = time.time()
str_text = " ".join(get_sentences(50000))
lst_text = str_text.split(" ")
rdd_text = sc.parallelize(lst_text)
print("Initialization duration = {:.4f} seconds".format(time.time() - begin))

# Transformation, mapping and reducing
begin = time.time()
rdd_text_map = rdd_text.map(lambda w: (w, 1))
rdd_text_red = rdd_text_map.reduceByKey(lambda a, b: a + b)\
.persist(StorageLevel.MEMORY_ONLY)
print("Transformations duration = {:.4f} seconds".format(time.time() - begin))

# Action to collect
begin = time.time()
rdd_text_red.collect()
print("First action duration = {:.4f} seconds".format(time.time() - begin))

# Action to collect
begin = time.time()
rdd_text_red.collect()
print("Second action duration = {:.4f} seconds".format(time.time() - begin))

Initialization duration = 3.5446 seconds
Transformations duration = 0.0743 seconds
First action duration = 0.5398 seconds
Second action duration = 0.0402 seconds


.unpersist() removes the files from cache.

### Broadcast

Broadcast is a read only rdd. "creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important."

In [23]:
broadcast_var = sc.broadcast([1, 2, 3])
broadcast_var.value

[1, 2, 3]

In [24]:
# broadcast_var.map(lambda s: s + 1)
# ---------------------------------------------------------------------------
# AttributeError                            Traceback (most recent call last)
# <ipython-input-65-7ac03e92578c> in <module>
# ----> 1 broadcast_var.map(lambda s: s + 1)
# 
# AttributeError: 'Broadcast' object has no attribute 'map'

### Accumulator

Serves as a global variable than can be increased. The tasks are in several different machines, a local variable can be incremented but its value will be different among workers...

"An accumulator is created from an initial value v by calling SparkContext.accumulator(v). Tasks running on a cluster can then add to it using the add method or the += operator. However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method."

In [25]:
accum = sc.accumulator(0)
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
accum.value

10

It doesn't work with a simple variable. Even if it worked, this could be true in local mode but not in production

In [26]:
def inc_var(x):
    global var
    var = var + x
    return x

var = 0
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: inc_var(x))
var

0

As an example is works with the same function and pandas

In [27]:
import pandas as pd
var = 0
pd.DataFrame([1, 2, 3, 4]).applymap(lambda x: inc_var(x))
var

10

## Partitioning Partitions vs cores

### Nodes architecture

<img src="https://miro.medium.com/max/700/1*8kQ2nvNThU19D6ejFYS4uQ.jpeg"
     alt="Markdown Monster icon"
     style="height:200px;"/>
_from https://medium.com/@millet.david.julien/how-to-use-pyspark-to-get-opencv-images-descriptors-with-aws-emr-dd3875503a75_

<img src="https://miro.medium.com/max/700/1*z6nry1dMtIiaCzlpxgxagA.jpeg"
     alt="Markdown Monster icon"
     style="height:300px;"/>
_from https://medium.com/@millet.david.julien/how-to-use-pyspark-to-get-opencv-images-descriptors-with-aws-emr-dd3875503a75_

### Task, partition, executor optimization

__Cores per instances__ \
For  each instance of n cores, only n-1 can be used for parallelisation, one is used to manage the others. \
4 instances of 4 cores leads to 12 effective computing cores used.

__Tasks per core__ \
If one task or partition is used per core the total process duration is the time of the longest partition treatment. If other partition is faster, the other cores have to wait and the resources is useless during this time (fig.1) To improve the variance in duration, a good thing is to have many tasks per cores, executed sequentially (fig.2) A good number depends on each project but the order of magnitude of tasks per cores is 3 to 5. \
4 instances of 4 cores leads to 12 effective cores, so to 36 partitions.

__Executors per core__ \
When there is a huge amount of cores per instance there can be many executors per nodes. We can list 3 main methods.
* __One executor per core__ : There are a lot of executors and sometimes not enough memory for each executor.
* __One executor per node__ : We don’t optimize the parallel possibilities in case of many cores.
* __Five cores per executor__: It is the average solution most often chosen.

4 instances of 4 cores leads to 12 effective computing cores used, so 2 executors.

<img src="https://miro.medium.com/max/687/1*7EcqpIX04bss2V3DgPcloQ.jpeg"
     alt="Markdown Monster icon"
     style="height:200px;"/>
_from https://medium.com/@millet.david.julien/how-to-use-pyspark-to-get-opencv-images-descriptors-with-aws-emr-dd3875503a75_

<img src="https://miro.medium.com/max/682/1*GbK9baZf0AJUKBKnyBkqPw.jpeg"
     alt="Markdown Monster icon"
     style="height:200px;"/>
_from https://medium.com/@millet.david.julien/how-to-use-pyspark-to-get-opencv-images-descriptors-with-aws-emr-dd3875503a75_

__The spark options to launch the code are__
* __num-executors__ : Total number of executors.
* __executor-cores__ : Number of cores per executors.
* __executor-memory__ : memory per executor.

We can make an example with an architecture of 5 instances of 11 cores, with 64 GB per instance. \
With this solution we take the option __--executor-cores=5__. The number of executors is calculated with the number of useful cores. One core per instance is used to cluster management. It remains here 5∗(11−1)=50 cores. \
With five cores per executors, the number of executors is 50÷5=10. So __--num-executors=10__. \
The memory per executor is 64∗5÷10=32. We remove from this generally 7%. We calculate it 32∗(1−0.07)=29.76. We can set __--executor-memory=29GB__.

A point to consider to choose the number of cores is the memory size of partitions. The maximum partition size is 128MB. In the upper example of 36 partitions, the dataset shouldn't excess 4.5Go. \
Concerning memory uses, the shuffle blocks can not be larger than 2GB. This means the total process of reducing by executors can not exceed 2GB. This is a reason of why no more than 5 cores per executors is choosen.

Second example. I have a 100Gb dataset. what could I do ? \
First is to get the number of partitions of 128MB, 782.  \
Get the number of cores, 3 partitions per cores, so 261 cores. \
Best to have 5 cores per executors so 52.2 executors. \
If is needed 1Gb per executor \
So a solution can be to use 26 instances of 11 cores. 10 are used in each instance. The memory per instances has to be 2Gb+7%. The parameters to send are __--executor-cores=5__, __--num-executors=52__, __--executor-memory=1GB__. 

In [28]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

# DataFrames

Codes taken from this documentation: \
https://spark.apache.org/docs/latest/sql-programming-guide.html

In [29]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Create a local StreamingContext if it has not been launched previously
try:
    conf = SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
    sc = SparkContext(conf=SparkConf())
except:
    print("Context already setup")

# Need to use SparkSession(sc) to create DataFrame
spark = SparkSession(sc)

Context already setup


## Reader & Writer

### Read one text file or the whole directory

In [30]:
# Read one file
dist_file = sc.textFile("data/first_document.txt")
dist_file.collect()

['This is the line one',
 'This is the second line',
 "Like the spoon the third line doesn't exists"]

In [31]:
# Proceed an action on the RDD , count caracters
line_lengths = dist_file.map(lambda s: len(s))
line_lengths.sum()

87

In [32]:
# Read many files, stack the lines
dist_file = sc.textFile("./data/*.txt")
dist_file.map(lambda file: sc.textFile(file))
dist_file.collect()

['This is the first line of the second document',
 'This is the second line',
 'This is the line one',
 'This is the second line',
 "Like the spoon the third line doesn't exists"]

In [33]:
line_lengths = dist_file.map(lambda s: len(s))
line_lengths.sum()

155

### Read and save a RDD as squence file

In [34]:
# Create rdd
rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))

In [35]:
# Delete rdd saved if exists. saveAsSequenceFile can't overwrite 
from os import listdir
from os.path import isfile, join
import shutil

onlyfolders = [f for f in listdir("./data/") if not isfile(join("./data/", f))]
if "rdd_saved" in onlyfolders:
    shutil.rmtree("./data/rdd_saved") 

In [36]:
# Save RDD
rdd.saveAsSequenceFile("./data/rdd_saved")

In [37]:
# Read RDD
sc.sequenceFile("data/rdd_saved").collect()

[(3, 'aaa'), (2, 'aa'), (1, 'a')]

### Read and save a dataframe - csv

In [38]:
# Create a rdd
rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
df = rdd.toDF()

In [39]:
# csv specific function
df \
.repartition(1) \
.write \
.mode('overwrite') \
.option("header", "true") \
.csv("data/csv_saved")

In [40]:
# generic  save function
df \
.repartition(1) \
.write \
.format('csv')\
.mode('overwrite')\
.option("header", "true") \
.save("data/csv_saved")

Save the csv with partitions in a folder. If repartition is set to 1, there are only one partition so one csv in the resulting folder.

In [41]:
# read with the generic load function
df = spark\
.read\
.format('csv')\
.load("data/csv_saved")
df.show()

+---+---+
|_c0|_c1|
+---+---+
| _1| _2|
|  1|  a|
|  2| aa|
|  3|aaa|
+---+---+



In [42]:
df = spark \
.read \
.format('csv') \
.option("header", "true") \
.load("data/csv_saved")
df.show()

+---+---+
| _1| _2|
+---+---+
|  1|  a|
|  2| aa|
|  3|aaa|
+---+---+



In [43]:
# Read with the specific csv function
df = spark \
.read \
.option("header", "true") \
.csv("data/csv_saved")
df.show()

+---+---+
| _1| _2|
+---+---+
|  1|  a|
|  2| aa|
|  3|aaa|
+---+---+



### Read and save a json

In [44]:
json_strings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
otherPeopleRDD = sc.parallelize(json_strings)
otherPeopleRDD.collect()

['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']

In [45]:
# df = otherPeopleRDD.toDF()
# df.show()
# generates error

In [46]:
# read a serialized json string
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()

+----------------+----+
|         address|name|
+----------------+----+
|[Columbus, Ohio]| Yin|
+----------------+----+



In [47]:
otherPeople.printSchema()

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |-- name: string (nullable = true)



### Read and save a parquet

In [48]:
rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
df = rdd.toDF(["a", "b"])
df.show()

+---+---+
|  a|  b|
+---+---+
|  1|  a|
|  2| aa|
|  3|aaa|
+---+---+



In [49]:
df.select("a") \
.write \
.format('parquet')\
.mode('overwrite')\
.save("data/pq_saved.parquet")

In [50]:
df = spark \
.read \
.format('parquet')\
.load("data/pq_saved.parquet")
df.show()

+---+
|  a|
+---+
|  2|
|  3|
|  1|
+---+



The advantage of parquet is to save place but also to be requested on writing and reading

In [51]:
rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
df = rdd.toDF(["a", "b"])
df.show()

df.select("a", "b") \
.write \
.partitionBy("a") \
.format('parquet')\
.mode('overwrite')\
.save("data/pq_saved.parquet")

df = spark.read.parquet("data/pq_saved.parquet/a=2")
df.show()

+---+---+
|  a|  b|
+---+---+
|  1|  a|
|  2| aa|
|  3|aaa|
+---+---+

+---+
|  b|
+---+
| aa|
+---+



In [52]:
# an other solution is to load as df then sql on df
df = spark.read.format('parquet').load("data/pq_saved.parquet")
df.createOrReplaceTempView("parquetFile")
a = spark.sql("SELECT a FROM parquetFile WHERE a=2")
a.show()

+---+
|  a|
+---+
|  2|
+---+



## DataFrame & Column Columns and expressions, Transformations, Actions, Rows,

### Select columns

In [53]:
rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
df = rdd.toDF(["a", "b"])
df.show()

+---+---+
|  a|  b|
+---+---+
|  1|  a|
|  2| aa|
|  3|aaa|
+---+---+



In [54]:
df.select("a").show()

+---+
|  a|
+---+
|  1|
|  2|
|  3|
+---+



In [56]:
# Equivalent
from pyspark.sql.functions import col
df.select(["a", "b"]).show()
df.select("a", "b").show()
df.select(col("a"), col("b")).show()

from pyspark.sql.functions import column
df.select(column("a"), column("b")).show()

+---+---+
|  a|  b|
+---+---+
|  1|  a|
|  2| aa|
|  3|aaa|
+---+---+

+---+---+
|  a|  b|
+---+---+
|  1|  a|
|  2| aa|
|  3|aaa|
+---+---+

+---+---+
|  a|  b|
+---+---+
|  1|  a|
|  2| aa|
|  3|aaa|
+---+---+

+---+---+
|  a|  b|
+---+---+
|  1|  a|
|  2| aa|
|  3|aaa|
+---+---+



In [57]:
# Select with expression
from pyspark.sql.functions import length
df.select(length(df.b)).show()
df.select(length(col("b"))).show()
# or
df.selectExpr("length(b)").show()

+---------+
|length(b)|
+---------+
|        1|
|        2|
|        3|
+---------+

+---------+
|length(b)|
+---------+
|        1|
|        2|
|        3|
+---------+

+---------+
|length(b)|
+---------+
|        1|
|        2|
|        3|
+---------+



In [58]:
df.select("*").show()

+---+---+
|  a|  b|
+---+---+
|  1|  a|
|  2| aa|
|  3|aaa|
+---+---+



In [59]:
from pyspark.sql.functions import upper
df.select(upper("b")).show()
df.select(upper(col("b"))).show()
df.select(upper(column("b"))).show()

+--------+
|upper(b)|
+--------+
|       A|
|      AA|
|     AAA|
+--------+

+--------+
|upper(b)|
+--------+
|       A|
|      AA|
|     AAA|
+--------+

+--------+
|upper(b)|
+--------+
|       A|
|      AA|
|     AAA|
+--------+



### DataFrame filter rows

In [60]:
df.filter(col("a") > 2).show()

+---+---+
|  a|  b|
+---+---+
|  3|aaa|
+---+---+



In [61]:
df[df.a > 2].show()

+---+---+
|  a|  b|
+---+---+
|  3|aaa|
+---+---+



In [62]:
df.filter(col("a").isNotNull()).show()

+---+---+
|  a|  b|
+---+---+
|  1|  a|
|  2| aa|
|  3|aaa|
+---+---+



In [63]:
df.filter((col("a").isNotNull()) & (col("a") > 2)).show()

+---+---+
|  a|  b|
+---+---+
|  3|aaa|
+---+---+



In [64]:
# filter on condition removes na values
df = spark.createDataFrame([['Joe',25], ['Smith',27], [None,28], ['Sharon',33]],
                           ('name','age'))
df = df.filter(col("age") < 30).filter(col("age") > 20)
df = df.filter(col("name").isNotNull())
df.show()

+-----+---+
| name|age|
+-----+---+
|  Joe| 25|
|Smith| 27|
+-----+---+



### Drop, rename, replace columns

In [65]:
rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
df = rdd.toDF(["a", "b"])
df.show()

+---+---+
|  a|  b|
+---+---+
|  1|  a|
|  2| aa|
|  3|aaa|
+---+---+



In [66]:
# Drop column
df.drop("c").show() # Still works
df.drop("a").show()

+---+---+
|  a|  b|
+---+---+
|  1|  a|
|  2| aa|
|  3|aaa|
+---+---+

+---+
|  b|
+---+
|  a|
| aa|
|aaa|
+---+



In [67]:
df = spark.createDataFrame([['John','Doe','25','2800'],
                            ['Joe','Smith','24.0','2600']],
                           ('fname','lname','age','sales'))

In [68]:
# Add a column, with eventually an expression
df.withColumn("age2", col("age") * 0.5).show()

+-----+-----+----+-----+----+
|fname|lname| age|sales|age2|
+-----+-----+----+-----+----+
| John|  Doe|  25| 2800|12.5|
|  Joe|Smith|24.0| 2600|12.0|
+-----+-----+----+-----+----+



In [69]:
from pyspark.sql.types import IntegerType
df.withColumn("age", col("age").cast(IntegerType())).show()

+-----+-----+---+-----+
|fname|lname|age|sales|
+-----+-----+---+-----+
| John|  Doe| 25| 2800|
|  Joe|Smith| 24| 2600|
+-----+-----+---+-----+



In [70]:
# Rename a column. 
df.withColumnRenamed("age", "Age").show()

+-----+-----+----+-----+
|fname|lname| Age|sales|
+-----+-----+----+-----+
| John|  Doe|  25| 2800|
|  Joe|Smith|24.0| 2600|
+-----+-----+----+-----+



### Replace value, replace na

In [71]:
df = spark.createDataFrame([['1','a'], ['2', 'aa'], ['3', None]],
                           ['a','b'])
df.show()

+---+----+
|  a|   b|
+---+----+
|  1|   a|
|  2|  aa|
|  3|null|
+---+----+



In [72]:
# replace value. Why .na ??
df.na.replace("aa", "bb", subset=["b"]).show()

+---+----+
|  a|   b|
+---+----+
|  1|   a|
|  2|  bb|
|  3|null|
+---+----+



In [73]:
# replace na
df.na.fill("ab", subset=["b"]).show()

+---+---+
|  a|  b|
+---+---+
|  1|  a|
|  2| aa|
|  3| ab|
+---+---+



In [74]:
# Same to replace na
df.fillna("ab", subset=["b"]).show()

+---+---+
|  a|  b|
+---+---+
|  1|  a|
|  2| aa|
|  3| ab|
+---+---+



### Create dataframe

In [75]:
df = spark.createDataFrame([['John','Doe','25','2800'],
                            ['Joe','Smith','24.0','2600']],
                           ['fname','lname','age','sales']).show()

+-----+-----+----+-----+
|fname|lname| age|sales|
+-----+-----+----+-----+
| John|  Doe|  25| 2800|
|  Joe|Smith|24.0| 2600|
+-----+-----+----+-----+



In [76]:
df = spark.createDataFrame([['John','Doe','25','2800'],
                            ['Joe','Smith','24.0','2600']],
                           ('fname','lname','age','sales')).show()

+-----+-----+----+-----+
|fname|lname| age|sales|
+-----+-----+----+-----+
| John|  Doe|  25| 2800|
|  Joe|Smith|24.0| 2600|
+-----+-----+----+-----+



In [77]:
df = spark.createDataFrame([['John','Doe','25','2800'],
                            ['Joe','Smith','24.0','2600']])\
.toDF('fname','lname','age','sales').show()

+-----+-----+----+-----+
|fname|lname| age|sales|
+-----+-----+----+-----+
| John|  Doe|  25| 2800|
|  Joe|Smith|24.0| 2600|
+-----+-----+----+-----+



In [78]:
df = spark.createDataFrame([['John','Doe','25','2800'],
                            ['Joe','Smith','24.0','2600']],
                           ('fname','lname','age','sales'))\
.withColumnRenamed('fname', 'test').show()

+----+-----+----+-----+
|test|lname| age|sales|
+----+-----+----+-----+
|John|  Doe|  25| 2800|
| Joe|Smith|24.0| 2600|
+----+-----+----+-----+



### Schemas

In [79]:
SCHEMA = StructType([
            StructField("name",StringType(),True),
            StructField("dept",StringType(),True),
            StructField("salary",IntegerType(),True)
])

dfEmp = spark.createDataFrame([
                    ['Tony',"HR",8000],
                    ['Mona',None,10000],
                    ['Jill','Sls',5000],
                    ['Tim','Admin',7000]],SCHEMA)

SCHEMA

StructType(List(StructField(name,StringType,true),StructField(dept,StringType,true),StructField(salary,IntegerType,true)))

In [80]:
dfEmp.schema

StructType(List(StructField(name,StringType,true),StructField(dept,StringType,true),StructField(salary,IntegerType,true)))

In [81]:
SCHEMA.fieldNames()

['name', 'dept', 'salary']

In [82]:
SCHEMA.add(StructField("salary",IntegerType(),True))
SCHEMA

StructType(List(StructField(name,StringType,true),StructField(dept,StringType,true),StructField(salary,IntegerType,true),StructField(salary,IntegerType,true)))

In [83]:
SCHEMA.add("salary",IntegerType(),True)
SCHEMA

StructType(List(StructField(name,StringType,true),StructField(dept,StringType,true),StructField(salary,IntegerType,true),StructField(salary,IntegerType,true),StructField(salary,IntegerType,true)))

### Dataframe rows

In [85]:
from pyspark.sql import Row
cust = Row("fname","lname")
custRow = cust("John","Doe")

In [86]:
custRow[:]

('John', 'Doe')

## Aggregation Groupby, Grouped data methods, Aggregate functions

### Merge, joins

In [87]:
rdd1 = sc.parallelize(range(1, 4)).map(lambda x: (x, "b" * x))
df1 = rdd1.toDF(["a", "b"])
df1.show()

rdd2 = sc.parallelize(range(1, 4)).map(lambda x: (x, "c" * x))
df2 = rdd2.toDF(["a", "c"])
df2.show()

+---+---+
|  a|  b|
+---+---+
|  1|  b|
|  2| bb|
|  3|bbb|
+---+---+

+---+---+
|  a|  c|
+---+---+
|  1|  c|
|  2| cc|
|  3|ccc|
+---+---+



In [88]:
df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")
df_res = spark.sql("SELECT df1.a, b, c FROM df1 INNER JOIN df2 ON df1.a=df2.a")
df_res.show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  3|bbb|ccc|
|  2| bb| cc|
|  1|  b|  c|
+---+---+---+



In [89]:
df_res = df1.join(df2, ["a"], "inner")
df_res.show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  3|bbb|ccc|
|  2| bb| cc|
|  1|  b|  c|
+---+---+---+



In [90]:
df_res = df1.join(df2, df1.a == df2.a, "inner")
df_res.show()

+---+---+---+---+
|  a|  b|  a|  c|
+---+---+---+---+
|  3|bbb|  3|ccc|
|  2| bb|  2| cc|
|  1|  b|  1|  c|
+---+---+---+---+



### Aggregation

In [91]:
rdd1 = sc.parallelize([1, 2, 3, 1]).map(lambda x: (x, "b" * x))
df1 = rdd1.toDF(["a", "b"])
df1.show()

+---+---+
|  a|  b|
+---+---+
|  1|  b|
|  2| bb|
|  3|bbb|
|  1|  b|
+---+---+



In [92]:
df1.groupBy(["a"]).count().show()

+---+-----+
|  a|count|
+---+-----+
|  3|    1|
|  2|    1|
|  1|    2|
+---+-----+



In [93]:
from pyspark.sql.functions import col
df1.groupBy(col("a")).count().show()

+---+-----+
|  a|count|
+---+-----+
|  3|    1|
|  2|    1|
|  1|    2|
+---+-----+



In [94]:
from pyspark.sql.functions import col
df1.groupBy(col("a"), col("b")).count().show()

+---+---+-----+
|  a|  b|count|
+---+---+-----+
|  3|bbb|    1|
|  1|  b|    2|
|  2| bb|    1|
+---+---+-----+



In [95]:
df1.groupBy(col("a"), col("b")).count().select("count").show()

+-----+
|count|
+-----+
|    1|
|    2|
|    1|
+-----+



In [96]:
df1.groupBy(col("a"), col("b")).count().selectExpr("(count * 2) as double").show()

+------+
|double|
+------+
|     2|
|     4|
|     2|
+------+



## Datetimes Dates, string

### Datetime

In [97]:
df = sc.parallelize([(' a   ','2018-01-01','yyyy-MM-dd'),
                      ('b','2018-02-02','yyyy-MM-dd'),
                      ('c','02-02-2018','dd-MM-yyyy')]).toDF(
                    ["col_name","value","format"])
df.show()

+--------+----------+----------+
|col_name|     value|    format|
+--------+----------+----------+
|    a   |2018-01-01|yyyy-MM-dd|
|       b|2018-02-02|yyyy-MM-dd|
|       c|02-02-2018|dd-MM-yyyy|
+--------+----------+----------+



In [98]:
from pyspark.sql.functions import expr
df.withColumn("test3", expr("to_date(value, format)")).show()

+--------+----------+----------+----------+
|col_name|     value|    format|     test3|
+--------+----------+----------+----------+
|    a   |2018-01-01|yyyy-MM-dd|2018-01-01|
|       b|2018-02-02|yyyy-MM-dd|2018-02-02|
|       c|02-02-2018|dd-MM-yyyy|2018-02-02|
+--------+----------+----------+----------+



In [99]:
from pyspark.sql.functions import to_date
from pyspark.sql.functions import lit
df.withColumn("test3", to_date(lit("20180101"), "yyyyMMdd")).show()

+--------+----------+----------+----------+
|col_name|     value|    format|     test3|
+--------+----------+----------+----------+
|    a   |2018-01-01|yyyy-MM-dd|2018-01-01|
|       b|2018-02-02|yyyy-MM-dd|2018-01-01|
|       c|02-02-2018|dd-MM-yyyy|2018-01-01|
+--------+----------+----------+----------+



In [100]:
df.withColumn("test3", to_date("value", "format")).show()

+--------+----------+----------+-----+
|col_name|     value|    format|test3|
+--------+----------+----------+-----+
|    a   |2018-01-01|yyyy-MM-dd| null|
|       b|2018-02-02|yyyy-MM-dd| null|
|       c|02-02-2018|dd-MM-yyyy| null|
+--------+----------+----------+-----+



### Complex Types String functions

In [101]:
from pyspark.sql.functions import trim
df.withColumn("a trimed", trim("col_name")).show()

+--------+----------+----------+--------+
|col_name|     value|    format|a trimed|
+--------+----------+----------+--------+
|    a   |2018-01-01|yyyy-MM-dd|       a|
|       b|2018-02-02|yyyy-MM-dd|       b|
|       c|02-02-2018|dd-MM-yyyy|       c|
+--------+----------+----------+--------+



In [102]:
from pyspark.sql.functions import ltrim
df.withColumn("a left trimed", ltrim("col_name")).show()

+--------+----------+----------+-------------+
|col_name|     value|    format|a left trimed|
+--------+----------+----------+-------------+
|    a   |2018-01-01|yyyy-MM-dd|         a   |
|       b|2018-02-02|yyyy-MM-dd|            b|
|       c|02-02-2018|dd-MM-yyyy|            c|
+--------+----------+----------+-------------+



In [103]:
from pyspark.sql.functions import rtrim
df.withColumn("a right trimed", rtrim("col_name")).show()

+--------+----------+----------+--------------+
|col_name|     value|    format|a right trimed|
+--------+----------+----------+--------------+
|    a   |2018-01-01|yyyy-MM-dd|             a|
|       b|2018-02-02|yyyy-MM-dd|             b|
|       c|02-02-2018|dd-MM-yyyy|             c|
+--------+----------+----------+--------------+



In [104]:
df = spark.createDataFrame([
                    ['William T',"HR",8000],
                    ['Bob H',"HR",8000],
                    ['Lisa M','HR',5000],
                    ['Marie A','ADMIN',7000]],('name','dept','salary'))

In [105]:
from pyspark.sql.functions import split
df = df.withColumn("splitName", split(col("name"), " "))
df.show()

+---------+-----+------+------------+
|     name| dept|salary|   splitName|
+---------+-----+------+------------+
|William T|   HR|  8000|[William, T]|
|    Bob H|   HR|  8000|    [Bob, H]|
|   Lisa M|   HR|  5000|   [Lisa, M]|
|  Marie A|ADMIN|  7000|  [Marie, A]|
+---------+-----+------+------------+



In [106]:
from pyspark.sql.functions import explode
df = df.withColumn("explodeName", explode(col("splitName")))
df.show()

+---------+-----+------+------------+-----------+
|     name| dept|salary|   splitName|explodeName|
+---------+-----+------+------------+-----------+
|William T|   HR|  8000|[William, T]|    William|
|William T|   HR|  8000|[William, T]|          T|
|    Bob H|   HR|  8000|    [Bob, H]|        Bob|
|    Bob H|   HR|  8000|    [Bob, H]|          H|
|   Lisa M|   HR|  5000|   [Lisa, M]|       Lisa|
|   Lisa M|   HR|  5000|   [Lisa, M]|          M|
|  Marie A|ADMIN|  7000|  [Marie, A]|      Marie|
|  Marie A|ADMIN|  7000|  [Marie, A]|          A|
+---------+-----+------+------------+-----------+



### Lit

In [107]:
from pyspark.sql.functions import create_map, map_values
df = df.select(create_map(lit("1"), lit("John"), lit("2"),lit("Marie")).\
               alias("empDetails")).limit(1)
df.show()

+--------------------+
|          empDetails|
+--------------------+
|[1 -> John, 2 -> ...|
+--------------------+



In [108]:
df.select(map_values("empDetails").alias("keys")).show()

+-------------+
|         keys|
+-------------+
|[John, Marie]|
+-------------+



In [109]:
from pyspark.sql.functions import map_keys
df.select(map_keys("empDetails").alias("ids")).show()

+------+
|   ids|
+------+
|[1, 2]|
+------+



## User-Defined Functions User-defined functions, Vectorized UDFs, Performance

### User-defined functions

In [110]:
def myFunc(s):
        words = s.split(" ")
        return len(words)

sc.textFile("data/first_document.txt").map(myFunc).collect()

[5, 5, 8]

## Spark SQL 

In [111]:
rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
df = rdd.toDF(["a", "b"])
df.show()

+---+---+
|  a|  b|
+---+---+
|  1|  a|
|  2| aa|
|  3|aaa|
+---+---+



In [112]:
df.createOrReplaceTempView("sql_view")
a = spark.sql("SELECT a FROM sql_view WHERE a=2")
a.show()

+---+
|  a|
+---+
|  2|
+---+



# Streaming 
Most of examples are taken from \
_https://spark.apache.org/docs/latest/streaming-programming-guide.html_ \
I tried them and completed them to be able to execute them

## Principle

"Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams."

<img src="https://spark.apache.org/docs/latest/img/streaming-arch.png"
     alt="Markdown Monster icon"
     style="height:200px;"/>
_from https://spark.apache.org/docs/latest/streaming-programming-guide.html_

First, we import StreamingContext, which is the main entry point for all streaming functionality. We create a local StreamingContext with two execution threads, and batch interval of 1 second

### Example

In [113]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
from pyspark import SparkContext, SparkConf
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")

Context already setup


In [114]:
ssc = StreamingContext(sc, 1)

Using this context, we can create a DStream that represents streaming data from a TCP source, specified as hostname (e.g. localhost) and port (e.g. 9999).

In [115]:
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

This lines DStream represents the stream of data that will be received from the data server. Each record in this DStream is a line of text. Next, we want to split the lines by space into words.

In [116]:
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

flatMap is a one-to-many DStream operation that creates a new DStream by generating multiple new records from each record in the source DStream. In this case, each line will be split into multiple words and the stream of words is represented as the words DStream. Next, we want to count these words.

In [117]:
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

The words DStream is further mapped (one-to-one transformation) to a DStream of (word, 1) pairs, which is then reduced to get the frequency of words in each batch of data. Finally, wordCounts.pprint() will print a few of the counts generated every second.

Note that when these lines are executed, Spark Streaming only sets up the computation it will perform when it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call

In [118]:
ssc.start()             # Start the computation
#ssc.awaitTermination()  # Wait for the computation to terminate due to manual exit or error
import time
time.sleep(3)
ssc.stop(stopSparkContext=False)  # Stop listening

-------------------------------------------
Time: 2020-11-18 16:41:06
-------------------------------------------

-------------------------------------------
Time: 2020-11-18 16:41:07
-------------------------------------------

-------------------------------------------
Time: 2020-11-18 16:41:08
-------------------------------------------

-------------------------------------------
Time: 2020-11-18 16:41:09
-------------------------------------------



On an other terminal run the command to listen the port

nc -l 10222

"After a context is defined, you have to do the following.

    Define the input sources by creating input DStreams.
    Define the streaming computations by applying transformation and output operations to DStreams.
    Start receiving data and processing it using streamingContext.start().
    Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
    The processing can be manually stopped using streamingContext.stop().

Points to remember:

    Once a context has been started, no new streaming computations can be set up or added to it.
    Once a context has been stopped, it cannot be restarted.
    Only one StreamingContext can be active in a JVM at the same time.
    stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
    A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created."

_from https://spark.apache.org/docs/latest/streaming-programming-guide.html_

### Discretized stream

The stream is defined as a serie of discrete RDDs inscreasing by time and where all the transformation are applied \

<img src="https://spark.apache.org/docs/latest/img/streaming-dstream-ops.png"
     alt="Markdown Monster icon"
     style="height:200px;"/>
_image from https://spark.apache.org/docs/latest/streaming-programming-guide.html_

## Input / output

### Input DStreams and Receivers

" Input DStreams are DStreams representing the stream of input data received from streaming sources. In the quick example, lines was an input DStream as it represented the stream of data received from the netcat server. Every input DStream (except file stream, discussed later in this section) is associated with a Receiver (Scala doc, Java doc) object which receives the data from a source and stores it in Spark’s memory for processing.

Spark Streaming provides two categories of built-in streaming sources.

    Basic sources: Sources directly available in the StreamingContext API. Examples: file systems, and socket connections.
    Advanced sources: Sources like Kafka, Kinesis, etc. are available through extra utility classes. These require linking against extra dependencies as discussed in the linking section.

We are going to discuss some of the sources present in each category later in this section.

Note that, if you want to receive multiple streams of data in parallel in your streaming application, you can create multiple input DStreams (discussed further in the Performance Tuning section). This will create multiple receivers which will simultaneously receive multiple data streams. But note that a Spark worker/executor is a long-running task, hence it occupies one of the cores allocated to the Spark Streaming application. Therefore, it is important to remember that a Spark Streaming application needs to be allocated enough cores (or threads, if running locally) to process the received data, as well as to run the receiver(s).

Points to remember

    When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using an input DStream based on a receiver (e.g. sockets, Kafka, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL, where n > number of receivers to run (see Spark Properties for information on how to set the master).

    Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process it. "

_by https://spark.apache.org/docs/latest/streaming-programming-guide.html_

### Basic sources

To get files like text log, when they arrive in a directory

In [119]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")

Context already setup


In [120]:
path = "C:\\Utilisateurs\\Millet\\AppData\\Roaming\\IBM Watson Studio\\logs\\"
#ssc = StreamingContext.textFileStream(path)

All files must be in the same data format.

A file is considered part of a time period based on its modification time, not its creation time.

### Queue of RDD for test purpose

In [121]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")

Context already setup


In [122]:
ssc = StreamingContext(sc, 1)

In [123]:
queueOfRDDs = [sc.range(0, 1000),
               sc.range(1000, 3000),
               sc.range(2000, 4000)]

In [124]:
rdd_received = ssc.queueStream(queueOfRDDs)

In [125]:
rdd_received.count().pprint()

In [126]:
ssc.start()
#ssc.awaitTermination()

import time

# Note Changes to the queue after the stream is created will not be recognized. 
# https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html?highlight=queue
queueOfRDDs.append(sc.range(0, 100))

time.sleep(5)
ssc.stop(stopSparkContext=False)

-------------------------------------------
Time: 2020-11-18 16:41:10
-------------------------------------------
1000

-------------------------------------------
Time: 2020-11-18 16:41:11
-------------------------------------------
2000

-------------------------------------------
Time: 2020-11-18 16:41:12
-------------------------------------------
2000

-------------------------------------------
Time: 2020-11-18 16:41:13
-------------------------------------------
0

-------------------------------------------
Time: 2020-11-18 16:41:14
-------------------------------------------
0



### Receiver Reliability

There are 2 kinds of receivers

Reliable Receiver - A reliable receiver correctly sends acknowledgment to a reliable source when the data has been received and stored in Spark with replication. (kafka...)

Unreliable Receiver - An unreliable receiver does not send acknowledgment to a source. This can be used for sources that do not support acknowledgment, or even for reliable sources when one does not want or need to go into the complexity of acknowledgment.

## Transformations on DStreams

"Transformation	Meaning
* __map(func)__ 	    Return a new DStream by passing each element of the source DStream through a function func.
* __flatMap(func)__ 	Similar to map, but each input item can be mapped to 0 or more output items.
* __filter(func)__ 	Return a new DStream by selecting only the records of the source DStream on which func returns true.
* __repartition(numPartitions)__ 	Changes the level of parallelism in this DStream by creating more or fewer partitions.
* __union(otherStream)__ 	Return a new DStream that contains the union of the elements in the source DStream and otherDStream.
* __count()__ 	Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream.
* __reduce(func)__ 	Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative and commutative so that it can be computed in parallel.
* __countByValue()__ 	When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream.
* __reduceByKey(func, [numTasks])__ 	When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
* __join(otherStream, [numTasks])__ 	When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.
* __cogroup(otherStream, [numTasks])__ 	When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples.
* __transform(func)__ 	Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary * RDD operations on the DStream.
* __updateStateByKey(func)__ 	Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key." \

_from https://spark.apache.org/docs/latest/streaming-programming-guide.html_

### Classical transformations example
Dummy example: Get the number of letters for each document

In [127]:
# Example of map/reduce/count on StreamingContext

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import time

# launch spark and streaming context
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")
ssc = StreamingContext(sc, 1)

# RDD reading different text documents
queueOfRDDs = [sc.parallelize(["This is my first document"]),
               sc.parallelize(["This is a second longer document"]),
               sc.parallelize(["This is a document which doesn't know it is a document"])]
rdd_received = ssc.queueStream(queueOfRDDs)

                     
# Map the words with their lengths - cache because lineLengths is called with 2 actions
words = rdd_received.flatMap(lambda line: line.split(" "))
# format the RDD with key value pairs
line_lengths = words.map(lambda word : len(word))
# Count the words
total_length = line_lengths.reduce(lambda a, b: a + b)
# Display result
total_length.pprint()

# launch wait and terminate
ssc.start()
time.sleep(5)
ssc.stop(stopSparkContext=False)

Context already setup
-------------------------------------------
Time: 2020-11-18 16:41:15
-------------------------------------------
21

-------------------------------------------
Time: 2020-11-18 16:41:16
-------------------------------------------
27

-------------------------------------------
Time: 2020-11-18 16:41:17
-------------------------------------------
44

-------------------------------------------
Time: 2020-11-18 16:41:18
-------------------------------------------

-------------------------------------------
Time: 2020-11-18 16:41:19
-------------------------------------------



### updateStateByKey transformation


Example: get the number of occurence of each word in a stream. not per document. \
Only one result is sent, and is updated for each new rdd coming. \
After the last RDD, the result is provided each second. \
Checkpoint use is necessaty here

In [128]:
# Example of updateStateByKey use
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import time

def update_word_count(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount) 


# launch spark and streaming context
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("data/checkpoint_directory")

# RDD reading different text documents
queueOfRDDs = [sc.parallelize(["This is my first document"]),
               sc.parallelize(["This is a second longer document"]),
               sc.parallelize(["This is a document which doesn't know it is a document"])]
rdd_received = ssc.queueStream(queueOfRDDs)
                    
# Map the words with their lengths - cache because lineLengths is called with 2 actions
words = rdd_received.flatMap(lambda line: line.split(" "))

# format the RDD with key value pairs
line_lengths = words.map(lambda word : (word, 1))

# Count the words
word_count = line_lengths.reduceByKey(lambda a, b : a + b)
# word_count.pprint() # If the occurence of words per document is needed

# update each rdd state  by key when other rdd comes
total_word_count = word_count.updateStateByKey(update_word_count)

# Display result
total_word_count.pprint()

# launch wait and terminate
ssc.start()
time.sleep(5)
ssc.stop(stopSparkContext=False)

Context already setup
-------------------------------------------
Time: 2020-11-18 16:41:20
-------------------------------------------
('first', 1)
('is', 1)
('This', 1)
('my', 1)
('document', 1)

-------------------------------------------
Time: 2020-11-18 16:41:21
-------------------------------------------
('first', 1)
('a', 1)
('is', 2)
('longer', 1)
('This', 2)
('my', 1)
('document', 2)
('second', 1)

-------------------------------------------
Time: 2020-11-18 16:41:22
-------------------------------------------
("doesn't", 1)
('first', 1)
('a', 3)
('which', 1)
('is', 4)
('longer', 1)
('know', 1)
('This', 3)
('my', 1)
('document', 4)
...

-------------------------------------------
Time: 2020-11-18 16:41:23
-------------------------------------------
("doesn't", 1)
('first', 1)
('a', 3)
('which', 1)
('is', 4)
('longer', 1)
('know', 1)
('This', 3)
('my', 1)
('document', 4)
...

-------------------------------------------
Time: 2020-11-18 16:41:24
---------------------------------

### transform(func) operations

"The transform operation (along with its variations like transformWith) allows arbitrary RDD-to-RDD functions to be applied on a DStream. It can be used to apply any RDD operation that is not exposed in the DStream API. For example, the functionality of joining every batch in a data stream with another dataset is not directly exposed in the DStream API. However, you can easily use transform to do this. This enables very powerful possibilities. For example, one can do real-time data cleaning by joining the input data stream with precomputed spam information (maybe generated with Spark as well) and then filtering based on it." 

_from https://spark.apache.org/docs/latest/streaming-programming-guide.html_

Example: Get the number of word occurence in the whole stream, \
with a transformation to filter unwanted words \
The rdd with unwanted words will be joined and filtered

In [129]:
# With a normal context outside a stream, the code would have been
from pyspark import SparkContext

# launch spark and streaming context
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")

# Initilize document and filter
rdd_document = sc.parallelize(["This document is my first document"])
rdd_unwanted_words = sc.parallelize(["This", "is", "a", "my"])


# Map the words 
words = rdd_document.flatMap(lambda line: line.split(" "))

# format the RDD with key value pairs
lines = words.map(lambda word : (word, 1))
unwanted = rdd_unwanted_words.map(lambda word : (word, 1))

#  Outer Join on the 2 rdds 
doc_clnd = lines.fullOuterJoin(unwanted)
display(doc_clnd.collect())
doc_clnd2 = doc_clnd.filter(lambda x: x[1][0] == 1 and  x[1][1] is None)
display(doc_clnd2.collect())
doc_clnd3 = doc_clnd2.map(lambda x: (x[0], 1))
display(doc_clnd3.collect())

# reduce to get the count of word occurence
word_count = doc_clnd3.reduceByKey(lambda a, b : a + b)
word_count.collect()

Context already setup


[('a', (None, 1)),
 ('This', (1, 1)),
 ('document', (1, None)),
 ('document', (1, None)),
 ('my', (1, 1)),
 ('first', (1, None)),
 ('is', (1, 1))]

[('document', (1, None)), ('document', (1, None)), ('first', (1, None))]

[('document', 1), ('document', 1), ('first', 1)]

[('document', 2), ('first', 1)]

In [130]:
# With transfert in a stream

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import time

# launch spark and streaming context
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")
ssc = StreamingContext(sc, 1)

# RDD reading different text documents
queueOfRDDs = [sc.parallelize(["This document is my first document"]),
               sc.parallelize(["This is a second longer document"]),
               sc.parallelize(["This is a document which doesn't know it is a document"])]
rdd_received = ssc.queueStream(queueOfRDDs)

# RDD to filter
rdd_unwanted_words = sc.parallelize(["This", "is", "a", "my", "it"])

# Map the words 
words = rdd_received.flatMap(lambda line: line.split(" "))
unwanted_words = rdd_unwanted_words.map(lambda word : (word, 1))

# format the RDD with key value pairs
line_lengths = words.map(lambda word : (word, 1))

#line_lengths.pprint()
cleanedDStream = line_lengths.transform(lambda rdd: rdd.fullOuterJoin(unwanted_words) \
                                                       .filter(lambda x: x[1][0] == 1 and  x[1][1] is None) \
                                                       .map(lambda x: (x[0], 1)))
cleanedDStream.pprint()

# Count the words
word_count = cleanedDStream.reduceByKey(lambda a, b : a + b)
word_count.pprint() 

# launch wait and terminate
ssc.start()
time.sleep(5)
ssc.stop(stopSparkContext=False)

Context already setup
-------------------------------------------
Time: 2020-11-18 16:41:26
-------------------------------------------
('document', 1)
('document', 1)
('first', 1)

-------------------------------------------
Time: 2020-11-18 16:41:26
-------------------------------------------
('first', 1)
('document', 2)

-------------------------------------------
Time: 2020-11-18 16:41:27
-------------------------------------------
('longer', 1)
('document', 1)
('second', 1)

-------------------------------------------
Time: 2020-11-18 16:41:27
-------------------------------------------
('longer', 1)
('document', 1)
('second', 1)

-------------------------------------------
Time: 2020-11-18 16:41:28
-------------------------------------------
("doesn't", 1)
('which', 1)
('document', 1)
('document', 1)
('know', 1)

-------------------------------------------
Time: 2020-11-18 16:41:28
-------------------------------------------
("doesn't", 1)
('which', 1)
('know', 1)
('document', 2)

### Window Operations

"Spark Streaming also provides windowed computations, which allow you to apply transformations over a sliding window of data. The following figure illustrates this sliding window."

<img src="https://spark.apache.org/docs/latest/img/streaming-dstream-window.png"
     alt="Markdown Monster icon"
     style="height:200px;"/>
     
"Some of the common window operations are as follows. All of these operations take the said two parameters - windowLength and slideInterval.
* __window(windowLength, slideInterval)__ 	Return a new DStream which is computed based on windowed batches of the source DStream.
* __countByWindow(windowLength, slideInterval__) 	Return a sliding window count of elements in the stream.
* __reduceByWindow(func, windowLength, slideInterval)__ 	Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative and commutative so that it can be computed correctly in parallel.
* __reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])__ 	When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
* __reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])__ A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enters the sliding window, and “inverse reducing” the old data that leaves the window. An example would be that of “adding” and “subtracting” counts of keys as the window slides. However, it is applicable only to “invertible reduce functions”, that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameter invFunc). Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. Note that checkpointing must be enabled for using this operation.
* __countByValueAndWindow(windowLength, slideInterval, [numTasks])__ 	When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. 

_from https://spark.apache.org/docs/latest/streaming-programming-guide.html_

Example: Count the number of occurence of each words in the document \
This time does it reducing the last 2 documents in one rdd \
Only the first and the last (n+1) return a result from an unique rdd

In [131]:
# Example with reduceByKeyAndWindow in a stream

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import time

# launch spark and streaming context
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")
ssc = StreamingContext(sc, 1)

# RDD reading different text documents
queueOfRDDs = [sc.parallelize(["This document is my first document"]),
               sc.parallelize(["This is a second longer document"]),
               sc.parallelize(["This is a document which doesn't know it is a document"])]
rdd_received = ssc.queueStream(queueOfRDDs)

# Map the words 
words = rdd_received.flatMap(lambda line: line.split(" "))
# format the RDD with key value pairs
line_lengths = words.map(lambda word : (word, 1))
# Count the words by window, reduce last 2 second every second
word_count = line_lengths.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 2, 1)
word_count.pprint() 

# launch wait and terminate
ssc.start()
time.sleep(5)
ssc.stop(stopSparkContext=False)

Context already setup
-------------------------------------------
Time: 2020-11-18 16:41:31
-------------------------------------------
('first', 1)
('is', 1)
('This', 1)
('document', 2)
('my', 1)

-------------------------------------------
Time: 2020-11-18 16:41:32
-------------------------------------------
('first', 1)
('a', 1)
('is', 2)
('longer', 1)
('This', 2)
('document', 3)
('my', 1)
('second', 1)

-------------------------------------------
Time: 2020-11-18 16:41:33
-------------------------------------------
("doesn't", 1)
('a', 3)
('which', 1)
('is', 3)
('longer', 1)
('know', 1)
('This', 2)
('second', 1)
('document', 3)
('it', 1)

-------------------------------------------
Time: 2020-11-18 16:41:34
-------------------------------------------
("doesn't", 1)
('a', 2)
('which', 1)
('is', 2)
('know', 1)
('This', 1)
('document', 2)
('it', 1)

-------------------------------------------
Time: 2020-11-18 16:41:35
-------------------------------------------



## Join Stream operation

###  join all the rdds of the stream

stream1 = ... \
stream2 = ... \
joinedStream = stream1.join(stream2)

###  join the rdds of specific windows

windowed_stream1 = stream1.window(20) \
windowed_stream2 = stream2.window(60) \
joined_stream = windowed_stream1.join(windowed_stream2)

### transform: stream and rdd join

dataset = ... # some RDD \
windowed_stream = stream.window(20) \
joinedStream = windowed_stream.transform(lambda rdd: rdd.join(dataset))

## Output Operations on DStreams

* __print()__ 	Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging.
Python API This is called pprint() in the Python API.
* __saveAsTextFiles(prefix, [suffix])__ 	Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
* __saveAsObjectFiles(prefix, [suffix])__ 	Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
Python API This is not available in the Python API.
* __saveAsHadoopFiles(prefix, [suffix])__ 	Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
Python API This is not available in the Python API.
* __foreachRDD(func)__ 	The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

## Design Patterns for using foreachRDD

Use a function to apply to each RDD:

In [132]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext


def sendPartition(iter):
    connection = createNewConnection()
    for record in iter:
        connection.send(record)
    connection.close()

# Initialize spark context
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")

queueOfRDDs = [sc.parallelize(["This document is my first document"]),
               sc.parallelize(["This is a second longer document"])]

# Initilize Streamcontext and execute te function for each rdd
ssc = StreamingContext(sc, 1)
rdd_received = ssc.queueStream(queueOfRDDs)
rdd_received.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

# Launch, wait and stop
ssc.start()
time.sleep(5)
ssc.stop(stopSparkContext=False)

Context already setup


## DataFrame and SQL Operations

In [133]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext


def process(time, rdd):
    print("========= %s =========" % str(time))
    # here you can do things about RDD or SQL...
    rdd_count = rdd.map(lambda x: len(x))
    rdd_count.show()
    

# Initialize spark context
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")

queueOfRDDs = [sc.parallelize(["This document is my first document"]),
               sc.parallelize(["This is a second longer document"])]

# Initilize Streamcontext and execute te function for each rdd
ssc = StreamingContext(sc, 1)
rdd_received = ssc.queueStream(queueOfRDDs)
rdd_received.foreachRDD(process)

# Launch, wait and stop
ssc.start()
time.sleep(5)
ssc.stop(stopSparkContext=False)

Context already setup


Doesn't do anything because the queueStream is manual and the function awaits new data coming...

## Caching / Persistence

Similar to RDDs, DStreams also allow developers to persist the stream’s data in memory. That is, using the persist() method on a DStream will automatically persist every RDD of that DStream in memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple operations on the same data). For window-based operations like reduceByWindow and reduceByKeyAndWindow and state-based operations like updateStateByKey, this is implicitly true. Hence, DStreams generated by window-based operations are automatically persisted in memory, without the developer calling persist().

For input streams that receive data over the network (such as, Kafka, sockets, etc.), the default persistence level is set to replicate the data to two nodes for fault-tolerance.

Note that, unlike RDDs, the default persistence level of DStreams keeps the data serialized in memory. This is further discussed in the Performance Tuning section. More information on different persistence levels can be found in the Spark Programming Guide.

_from https://spark.apache.org/docs/latest/streaming-programming-guide.html#window-operations_

## Checkpointing

"A streaming application must operate 24/7 and hence must be resilient to failures unrelated to the application logic (e.g., system failures, JVM crashes, etc.). For this to be possible, Spark Streaming needs to checkpoint enough information to a fault- tolerant storage system such that it can recover from failures. There are two types of data that are checkpointed.

* Metadata checkpointing - Saving of the information defining the streaming computation to fault-tolerant storage like HDFS. This is used to recover from failure of the node running the driver of the streaming application (discussed in detail later). Metadata includes:
 * Configuration - The configuration that was used to create the streaming application.
 * DStream operations - The set of DStream operations that define the streaming application.
 * Incomplete batches - Batches whose jobs are queued but have not completed yet.

* Data checkpointing - Saving of the generated RDDs to reliable storage. This is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depend on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time. To avoid such unbounded increases in recovery time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically checkpointed to reliable storage (e.g. HDFS) to cut off the dependency chains.

To summarize, metadata checkpointing is primarily needed for recovery from driver failures, whereas data or RDD checkpointing is necessary even for basic functioning if stateful transformations are used."

_from https://spark.apache.org/docs/latest/streaming-programming-guide.html#window-operations_

In [134]:
# Example of use of checkpoints

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import time

# launch spark and streaming context
try:
    sc = SparkContext("local[2]", "NetworkWordCount")
except:
    print("Context already setup")
ssc = StreamingContext(sc, 1)

# set checkpoint directory
ssc.checkpoint("data/checkpoint_Directory2")  

# RDD reading different text documents
queueOfRDDs = [sc.parallelize(["This document is my first document"]),
               sc.parallelize(["This is a second longer document"]),
               sc.parallelize(["This is a document which doesn't know it is a document"])]
rdd_received = ssc.queueStream(queueOfRDDs)

# set checkpoint Interval, 3 second
rdd_received.checkpoint(3)

# Execute an action
rdd_received.pprint()

# launch wait and terminate
ssc.start()
time.sleep(5)
ssc.stop(stopSparkContext=False)

Context already setup
-------------------------------------------
Time: 2020-11-18 16:41:46
-------------------------------------------
This document is my first document

-------------------------------------------
Time: 2020-11-18 16:41:47
-------------------------------------------
This is a second longer document

-------------------------------------------
Time: 2020-11-18 16:41:48
-------------------------------------------
This is a document which doesn't know it is a document

-------------------------------------------
Time: 2020-11-18 16:41:49
-------------------------------------------

-------------------------------------------
Time: 2020-11-18 16:41:50
-------------------------------------------

-------------------------------------------
Time: 2020-11-18 16:41:51
-------------------------------------------



# ML

In [148]:
# Load data for the part 5
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Create spark context
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
spark = SparkSession.builder.getOrCreate()

# delete files from previous runs
!rm -f data/hmp.parquet*

# download the file containing the data in PARQUET format
!wget https://github.com/IBM/coursera/raw/master/hmp.parquet
!mv hmp.parquet data/hmp.parquet
    
# create a dataframe out of it
df = spark.read.parquet('data/hmp.parquet')

# register a corresponding query table
df.createOrReplaceTempView('df')

from IPython.display import clear_output
clear_output()

## ML fit, transform, pipelines, estimator

In [149]:
df.show(5)

+---+---+---+--------------------+-----------+
|  x|  y|  z|              source|      class|
+---+---+---+--------------------+-----------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|
+---+---+---+--------------------+-----------+
only showing top 5 rows



### Use fit, transform on StringIndexer estimator

In [150]:
from pyspark.ml.feature import StringIndexer#, VectorAssembler, Normalizer, OneHotEncoderEstimator
est_indexer = StringIndexer(inputCol="class", outputCol="classIndex")
model = est_indexer.fit(df)
df_indexed = model.transform(df)
df_indexed.show(5)

+---+---+---+--------------------+-----------+----------+
|  x|  y|  z|              source|      class|classIndex|
+---+---+---+--------------------+-----------+----------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|       6.0|
+---+---+---+--------------------+-----------+----------+
only showing top 5 rows



### Use a second fit transform on OneHotEncoder estimator

In [151]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
est_indexer = StringIndexer(inputCol="class", outputCol="classIndex")
est_encoder = OneHotEncoder(inputCol="classIndex", outputCol="categoryVec")
df_indexed = est_indexer.fit(df).transform(df)
df_encoded = est_encoder.fit(df_indexed).transform(df_indexed)
df_encoded.show(5)

+---+---+---+--------------------+-----------+----------+--------------+
|  x|  y|  z|              source|      class|classIndex|   categoryVec|
+---+---+---+--------------------+-----------+----------+--------------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|
+---+---+---+--------------------+-----------+----------+--------------+
only showing top 5 rows



### Use a pipeline to make different transformation in a row 

In [152]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, Normalizer, MinMaxScaler
from pyspark.ml import Pipeline

# Define transformations
indexer = StringIndexer(inputCol="class", outputCol="classIndex")
encoder = OneHotEncoder(inputCol="classIndex", outputCol="categoryVec")
vectorAssembler = VectorAssembler(inputCols=["x","y","z"], outputCol="features")
normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
# Define pipeline
pipeline = Pipeline(stages=[indexer, encoder, vectorAssembler, normalizer])
model = pipeline.fit(df)
df_result = model.transform(df)
df_result.show(5)

+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+
|  x|  y|  z|              source|      class|classIndex|   categoryVec|        features|       features_norm|
+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[21.0,52.0,34.0]|[0.19626168224299...|
+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+
o

In [153]:
# Clean dataframe for the next part
df_cleaned = df_result.drop("x").drop("y").drop("z").drop("source").drop("class").drop("categoryVec").drop("features")
df_cleaned = df_cleaned.withColumnRenamed("classIndex", "target")
df_cleaned = df_cleaned.withColumnRenamed("features_norm", "features")
df_cleaned.show(5)

+------+--------------------+
|target|            features|
+------+--------------------+
|   6.0|[0.20754716981132...|
|   6.0|[0.20754716981132...|
|   6.0|[0.20183486238532...|
|   6.0|[0.20183486238532...|
|   6.0|[0.19626168224299...|
+------+--------------------+
only showing top 5 rows



## Model Selection, evaluator, parameter grid

### Split train and validation datasets 

In [154]:
# Prepare training and test data.
df_learn, df_validation = df_cleaned.randomSplit([0.8, 0.2], seed=12345)

### Train and estimate model with fixed parameters

In [155]:
# Train model
from pyspark.ml.regression import LinearRegression
estim_lr = LinearRegression(featuresCol = 'features', labelCol='target',
                            maxIter=10, regParam=0.3, elasticNetParam=0.8)
model_lr = estim_lr.fit(df_learn)

In [156]:
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(model_lr.coefficients))
print("Intercept: %s" % str(model_lr.intercept))
# Summarize the model over the training set and print out some metrics
trainingSummary = model_lr.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [9.276474699807274,0.0,-2.7557906328884165]
Intercept: 3.2530328439765346
RMSE: 3.509002
r2: 0.086105


In [157]:
# Test model
df_prediction = model_lr.transform(df_validation)
df_prediction.show(5)

# Evaluate model
from pyspark.ml.evaluation import RegressionEvaluator
eval_lr = RegressionEvaluator(predictionCol="prediction", \
                              labelCol="target",metricName="r2")
print("R Squared (R2) on test data = %g" % eval_lr.evaluate(df_prediction))

+------+--------------------+------------------+
|target|            features|        prediction|
+------+--------------------+------------------+
|   0.0|[0.0,0.4852941176...|1.8346111946957322|
|   0.0|[0.0,0.4933333333...|1.8567655899797368|
|   0.0|[0.0,0.4941176470...|1.8589269943976887|
|   0.0|       [0.0,0.5,0.5]|1.8751375275323263|
|   0.0|       [0.0,0.5,0.5]|1.8751375275323263|
+------+--------------------+------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.0862592


In [158]:
# The prediction and evaluation can be done in one step
test_result = model_lr.evaluate(df_validation)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 3.51257


In [159]:
# To summarize:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Train model
estim_lr = LinearRegression(featuresCol = 'features', labelCol='target',
                            maxIter=10, regParam=0.3, elasticNetParam=0.8)
model_lr = estim_lr.fit(df_learn)
# Evaluate model
df_prediction = model_lr.transform(df_validation)
eval_lr = RegressionEvaluator(predictionCol="prediction", \
                              labelCol="target", metricName="r2")
print("R Squared (R2) on test data = %g" % eval_lr.evaluate(df_prediction))

R Squared (R2) on test data = 0.0862592


### Cross validation selection to find best parameters

In [160]:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
temp_df = spark.createDataFrame([Row(V4366=0.0, V4460=0.232, V4916=-0.017, V1495=-0.104, V1639=0.005, V1967=-0.008, V3049=0.177, V3746=-0.675, V3869=-3.451, V524=0.004, V5409=0), Row(V4366=0.0, V4460=0.111, V4916=-0.003, V1495=-0.137, V1639=0.001, V1967=-0.01, V3049=0.01, V3746=-0.867, V3869=-2.759, V524=0.0, V5409=0), Row(V4366=0.0, V4460=-0.391, V4916=-0.003, V1495=-0.155, V1639=-0.006, V1967=-0.019, V3049=-0.706, V3746=0.166, V3869=0.189, V524=0.001, V5409=0), Row(V4366=0.0, V4460=0.098, V4916=-0.012, V1495=-0.108, V1639=0.005, V1967=-0.002, V3049=0.033, V3746=-0.787, V3869=-0.926, V524=0.002, V5409=0), Row(V4366=0.0, V4460=0.026, V4916=-0.004, V1495=-0.139, V1639=0.003, V1967=-0.006, V3049=-0.045, V3746=-0.208, V3869=-0.782, V524=0.001, V5409=0)])
trainingData=temp_df.rdd.map(lambda x:(Vectors.dense(x[0:-1]), x[-1])).toDF(["features", "label"])
trainingData.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0,0.232,-0.017...|    0|
|[0.0,0.111,-0.003...|    0|
|[0.0,-0.391,-0.00...|    0|
|[0.0,0.098,-0.012...|    0|
|[0.0,0.026,-0.004...|    0|
+--------------------+-----+



In [161]:
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize estimators, pipeline and evaluators
estim_lr = LinearRegression(featuresCol = 'features', labelCol='target', 
                            maxIter=10) #, regParam=0.3, elasticNetParam=0.8)
pipeline = Pipeline(stages=[estim_lr])
evaluator = RegressionEvaluator(predictionCol="prediction", 
                                labelCol="target", metricName="r2")

# Define the different parameters to test
paramGrid = ParamGridBuilder() \
    .addGrid(estim_lr.regParam, [0.3, 0.5]) \
    .addGrid(estim_lr.elasticNetParam, [0.5, 0.8]) \
    .build()

# Run cross validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)
cvModel = crossval.fit(df_learn)

In [162]:
# print regression result and the best parameters found
print(cvModel.bestModel.stages[0].intercept)
print(cvModel.bestModel.stages[0].coefficients)
print(cvModel.bestModel.stages[0].getRegParam())
print(cvModel.bestModel.stages[0].getElasticNetParam())

3.6854132664170263
[9.779454982679272,0.0,-4.111892840696731]
0.3
0.5


In [163]:
# CV model on the k folds, then train on the whole training set. Can be directly used for validation set
df_prediction = cvModel.transform(df_validation)
eval_lr = RegressionEvaluator(predictionCol="prediction", 
                              labelCol="target", 
                              metricName="r2")
print("R Squared (R2) on test data = %g" % eval_lr.evaluate(df_prediction))

R Squared (R2) on test data = 0.0898366
