<div style="background:black">
    <center>
<img src="./images/session1/title.png" alt="Title"/>
    </center>
</div>

<div class="alert alert-block alert-success">

<center>At the end of this session, you will know:</center>

&#x25a2; _when_ to use Spark

&#x25a2; _how_ to write Spark programs

&#x25a2; what is the relation between Spark and _Hadoop_
</div>

<p style="float: left; font-size: 9pt; text-align: center; width: 30%; margin-right: 30%; margin-bottom: 0.5em;"><a href="http://spark.apache.org"><img src="http://spark.apache.org/images/spark-logo-trademark.png" width=150></a></p>
<p style="float: left; font-size: 9pt; text-align: center; width: 30%; margin-right: 1%; margin-bottom: 0.5em;"><a href="http://hadoop.apache.org"><img src="https://hadoop.apache.org/hadoop-logo.jpg" width=290></a></p>
<p style="clear: both;">



<center>
<div class="alert alert-block alert-info">
    <b>Goal:</b> process large datasets easily and efficiently 
</div>
    </center>

# Key concept 1: Resilient Distributed Datasets (RDDs)

<div class="alert alert-block alert-info">

<center>Apache Spark is built around a data structure: the <b>Resilient Distributed Dataset (RDD)</b></center>
</div>

<center>

## Why distributed?

<a href="https://www.google.com/about/datacenters/gallery/">
<img src="https://lh3.googleusercontent.com/_T5y2eKUusOWBn44MkgTDc1EQVsiGkvWXDDgbNZxeKOp1aHKYpIMS56JhU3esg6F_V6sbmGmxmThuk5ugETygfPdv2ssbVRjHD3fcw=w800-l80-sg-rj-c0xffffff" alt="google data center"/></a>

Photo (c) Google
    
```
Our Council Bluffs, Iowa data center provides over 115,000 square feet of space. We make the best out of every inch, so you can use services like Search and YouTube in the most efficient way possible.
```
   </center>


<center>
    <b>Cluster architecture</b>
    <img src="./images/session1/cluster-architecture.png" alt="cluster architecture"/>
    adapted from <a href="htttp://mmds.org">http://mmds.org</a>
</center>

<div class="alert alert-block alert-info">

<center>An RDD can be distributed among many cluster nodes, aggregating CPU, memory and disk resources.</center>
</div>

<center>
    
## Why resilient?

<a href="https://www.google.com/about/datacenters/gallery/">
    <img src="https://lh3.googleusercontent.com/0N2MQjMcGLlmHE4p-0FRdsKUyKieC-ga0UtW5HVDB1HHBbmiRj2kA2TCDgyWaK2ZzvrKpWoI4c1djmRi0I-ujnNCo43qpQp7e4q3TA=w800-l80-sg-rj-c0xffffff" alt="data center 1"/>
 </a>

Photo (c) Google
    
```
Blue LEDs on this row of servers in our Douglas County, Georgia data center tell us everything is running smoothly. We use LEDs because they are energy efficient, long lasting and bright.
```

</center>

<center>
    <b>The Hadoop Distributed File System (HDFS)</b>
<img src="images/session1/hdfs.png"/>
    adapted from <a href="htttp://mmds.org">http://mmds.org</a>
    </center>

<div class="alert alert-block alert-info">
    <center>An RDD can be stored on HDFS, where data chunks are <b>replicated</b> across nodes.</center>
</div>

<center>
    <b>File reads</b>
<img src="images/session1/hdfs2.png"/>
    Source: Hadoop: The Definitive Guide, Tom White, 4th edition.
    </center>

<center>
    <b>File writes</b>
<img src="images/session1/hdfs1.png"/>
    Source: Hadoop: The Definitive Guide, Tom White, 4th edition.
    </center>

<div class="alert alert-block alert-info">
<center>An RDD can also be <b>recomputed</b>, as it stores its own lineage.</center>
</div>

<center>
<img src="images/session1/lineage.png"/>
<a href="https://dl.acm.org/doi/fullHtml/10.1145/2934664">[Zaharia <i>et al.</i>, 2016]</a>
    </center>

## Creating RDDs

Before we can create and manipulate RDDs, let's make sure that Apache Spark is properly installed. Spark comes with APIs in four languages: Java, Scala, R and Python. Here we will use the Python API.

`pyspark` is the Python package that provides Spark's Python API. Make sure that version 3.0.0 is properly installed on your computer:

In [1]:
# ! pip install pyspark

In [2]:
# ! pip freeze | grep pyspark

In [3]:
# A little magic to adjust the config at Ericsson
import os
os.environ["IPYTHON"]="1"
#os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_PYTHON"]="C:\\ProgramData\\Anaconda3\\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"]="ipython3"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"]="notebook"
#os.environ["JAVA_HOME"]="/usr/lib/jvm/default-java"
os.environ["JAVA_HOME"]="C:\\Program Files\\Java\\jdk1.8.0_202"

In Spark, many functions are provided through a helper object called the Spark context. A Spark context can be created as follows:

In [4]:
# Create a Spark context
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()

RDDs can be created in three ways:

(1) From an existing Python list, using function <code>parallelize</code>:

In [5]:
# Create "numbers", an RDD defined from a Python list
numbers = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
numbers

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262

The content of an RDD can be inspected using <b>actions</b> such as `collect`:

In [6]:
# Convert the RDD back to a Python list, for inspection
numbers.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

<center>
<div class="alert alert-block alert-danger">
    <b>Warning:</b> actions can be very dangerous!
    </div>
    </center>

(2) RDDs can also be created from a file, using `textFile`:

In [7]:
# Read a text file in an RDD where each element is a line of the file
words = sc.textFile('data/session1/words.txt')
words.collect()

['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight']

<center>
<div class="alert alert-block alert-info">
<b>Note:</b>: <code>textFile</code> can also read data from HDFS
        </center>

(3) Or from an existing RDD, using a <b>transformation</b>:

In [8]:
# Create a random sample with 50% of the data
sample = numbers.sample(fraction=0.5, withReplacement=False)
sample.collect()

[1, 3, 5, 6, 8, 9]

## Manipulating RDDs

RDDs are manipulated using <b>transformations</b> and <b>actions</b>:

<a href="http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations"><img src="images/session1/transformations-actions.png" alt="Transformations and actions"/></a>

Several transformations, such as `filter`, take a function as argument. This function can be either explicitly defined, or defined in the argument of the transformation as an anonymous (aka "lambda") function.

Example:

In [9]:
# Function definition
def even(x):
    '''
    Return True if x is an even integer
    '''
    return x % 2 == 0

# Extracts the even numbers from RDD "numbers"
even = numbers.filter(even)

# Note: each partition of an RDD can be processed independently using filter

even.collect()

[2, 4, 6, 8, 10]

In [10]:
# Same operation, now with the function declared as a lambda function
even = numbers.filter(lambda x: x % 2 == 0)
even.collect()

[2, 4, 6, 8, 10]

## Exercise

<ul style="list-style-image: url('images/do.png');">
<li>Create an RDD that contains the words in <code>words</code> that have four letters or more.</li>
</ul>


In [11]:
long_words = words.filter(lambda x: len(x) >= 4)
long_words.collect()

['three', 'four', 'five', 'seven', 'eight']

## Use-case (part 1): Open Data from the City of Montreal

We will study <a href="http://donnees.ville.montreal.qc.ca/dataset/frenes-publics-proteges-injection-agrile-du-frene ">this dataset</a>, provided by the city of Montreal. 

The dataset contains 
the list of trees treated against the <a href="https://en.wikipedia.org/wiki/Emerald_ash_borer">emerald ash borer</a> in Montreal.

We will use the 2015 and 2016 data sets also available in directory `data` made available with this notebook.

<div class="alert alert-block alert-info">
<center>Our goal is to use Spark to extract basic statistics about the dataset.</center>
</div>

As CSV files, these datasets can be read by many libraries, such as `Pandas`:

In [12]:
import pandas as pd
df = pd.read_csv('data/session1/frenepublicinjection2015.csv')
df

Unnamed: 0,Nom_arrond,Invent,No_Civiq,Rue,Rue_De,Rue_a,Nom_parc,Sigle,Injections,x,y,longitude,latitude
0,Ahuntsic-Cartierville,R,10265,Rue Saint-Hubert,SAURIOL E RUE,FLEURY E RUE,,FRPESU,2015,292338.7500,5.046268e+06,-73.659611,45.556217
1,Ahuntsic-Cartierville,R,9451,Rue Foucher,CHABANEL E RUE,LOUVAIN E RUE DE,,FRAMKL,2015,293090.7190,5.045716e+06,-73.649967,45.551254
2,Ahuntsic-Cartierville,R,10435,Rue Taché,TACHÉ PLACE,PRIEUR E RUE,,FRPE,2015,292521.6820,5.047302e+06,-73.657294,45.565520
3,Ahuntsic-Cartierville,R,1475,Rue Prieur Est,OLYMPIA BOULE,HAMEL AVENU,,FRPE,2015,292325.8971,5.047060e+06,-73.659796,45.563336
4,Ahuntsic-Cartierville,R,9905,Avenue du Sacré-Coeur,SAUVÉ E RUE,SAURIOL E RUE,,FRPE,2015,292839.2650,5.046594e+06,-73.653208,45.559155
5,Ahuntsic-Cartierville,R,9345,Avenue Millen,CHABANEL E RUE,LOUVAIN E RUE DE,,FRPE,2015,293251.9380,5.045722e+06,-73.647902,45.551320
6,Ahuntsic-Cartierville,R,10125,Rue Parthenais,SAURIOL E RUE,FLEURY E RUE,,FRAM,2015,293277.3130,5.047976e+06,-73.647630,45.571602
7,Ahuntsic-Cartierville,R,9905,Avenue Papineau,SAUVÉ E RUE,SAURIOL E RUE,,FRPE,2015,293163.5320,5.047406e+06,-73.649074,45.566471
8,Ahuntsic-Cartierville,R,9131,Rue Basile-Routhier,LEGENDRE E RUE,CHABANEL E RUE,,FRPE,2015,293449.0310,5.045493e+06,-73.645372,45.549258
9,Ahuntsic-Cartierville,R,9147,Rue Basile-Routhier,LEGENDRE E RUE,CHABANEL E RUE,,FRPE,2015,293438.2500,5.045497e+06,-73.645510,45.549294


<div class="alert alert-block alert-warning">
    <center><b>Note:</b> We could use Pandas instead of Spark to analyze this dataset, but it wouldn't scale to larger datasets.</center>
</div>

To start with, let's load this dataset in Spark, using Python's CSV module:

In [13]:
filename2015 = 'data/session1/frenepublicinjection2015.csv'

def read_data_file(filename):
    import csv
    # Load the text file as an RDD, where each element is a line
    rdd = sc.textFile(filename)
    # Use Python's CSV reader to map every line of the text file to a Python list
    trees = rdd.mapPartitions(lambda x: csv.reader(x))
    return trees
    
# Show the first two elements of the RDD, with action "take"
read_data_file(filename2015).take(2)

[['Nom_arrond',
  'Invent',
  'No_Civiq',
  'Rue',
  'Rue_De',
  'Rue_a',
  'Nom_parc',
  'Sigle',
  'Injections',
  'x',
  'y',
  'longitude',
  'latitude'],
 ['Ahuntsic-Cartierville',
  'R',
  '10265',
  'Rue Saint-Hubert',
  'SAURIOL E RUE',
  'FLEURY E RUE',
  '',
  'FRPESU',
  '2015',
  '292338.75000000000',
  '5046268.50000000000',
  '-73.6596113879546',
  '45.5562169952192']]

<ul style="list-style-image: url('images/do.png');">
    <li>Print the number of trees that were treated in 2015 and 2016. Tip: use the <code>count</code> action.</li>
</ul>

In [14]:
filename2016 = 'data/session1/frenepublicinjection2016.csv'

trees2015 = read_data_file(filename2015)
trees2016 = read_data_file(filename2016)

print(trees2015.count())
print(trees2016.count())

21915
27245


<ul style="list-style-image: url('images/do.png');">
    <li>Print the number of trees in park <code>BEAUBIEN</code> that were treated in 2015. Tip: use the <code>filter</code> action on column <code>Nom_parc</code> (index: 6)</li>
</ul>

In [15]:
trees2015.filter(lambda x: x[6] == 'BEAUBIEN').count()

24

## Quiz

<div class="alert alert-block alert-warning">
A Spark implementation of a data analysis will always be faster than a Pandas implementation of the same analysis:
    
    
&#x25a2; True
    
&#x25a2; False
</div>

<div class="alert alert-block alert-warning">
A Spark implementation of a data analysis will always be faster than a Pandas implementation of the same analysis:
    
    
&#x25a2; True
    
&#x2611; False
</div>

Spark has overheads, due to:
- The need to transfer data between computing nodes
- The Spark framework itself

Pandas will be faster when:
- Data fits in memory
- Processing is limited

<div class="alert alert-block alert-info">
Writing a Spark program requires detailed knowledge of cluster computing:
    
&#x25a2; True
    
&#x25a2; False
</div>

<div class="alert alert-block alert-info">
Writing a Spark program requires detailed knowledge of cluster computing:
    
&#x25a2; True
    
&#x2611; False
</div>

Spark's APIs decouple the analysis logic from its distributed execution on a cluster.

<div class="alert alert-block alert-warning">
Spark analyses are limited to basic data queries:  
    
&#x25a2; True
    
&#x25a2; False
</div>

<div class="alert alert-block alert-warning">
Spark analyses are limited to basic data queries:  
    
&#x25a2; True
    
&#x2611; False
</div>

Spark has rich APIs for data analysis, data mining, machine learning and data stream analysis. 


# Key concept 2: MapReduce


MapReduce is arguably the most famous and powerful <b>programming model</b> for Big Data analysis. 

Introduced by <a href="https://dl.acm.org/doi/abs/10.1145/1327452.1327492?casa_token=DCf_01yMfcQAAAAA:lOY3pLjhcDj4YPtDqowe7zcsZd3bpqgqZW8x0KYaadjEPFas7Id1O4g4t6idEAVUeS2ebaeyFumGkw">Google in 2008</a>, it is available in Spark through the `map` and `reduceByKey` transformations.

## Map

`map` is a transformation that evaluates a function on each element of an RDD:

In [16]:
upper = words.map(lambda x: x.upper())
upper.collect()

['ONE', 'TWO', 'THREE', 'FOUR', 'FIVE', 'SIX', 'SEVEN', 'EIGHT']

## Exercise

<ul style="list-style-image: url('images/do.png');">
    <li>Create an RDD that contains the square roots of the numbers in RDD <code>numbers</code>.</li>
</ul>


In [17]:
from math import sqrt
sqrts = numbers.map(sqrt)
sqrts.collect()

[1.0,
 1.4142135623730951,
 1.7320508075688772,
 2.0,
 2.23606797749979,
 2.449489742783178,
 2.6457513110645907,
 2.8284271247461903,
 3.0,
 3.1622776601683795]

## Reduce

A reduce operation works on __key-value pairs__.

Transformation `groupByKey` groups key-value pairs by key.

For instance, when applied to:

`[ ('apple', 3), ('banana', 2), ('apple', 7), ('banana', 5) ]`, 

`groupByKey` returns:

`[ ('apple', [ 3, 7 ]), ('banana', [2, 5]) ]`.

In [18]:
# Create RDD
fruits = [ ('apple', 3), ('banana', 2), ('apple', 7), ('banana', 5) ]
fruits_rdd = sc.parallelize(fruits)

# Apply groupByKey
group = fruits_rdd.groupByKey()

# Map values to list to display
group.mapValues(list).collect()

[('apple', [3, 7]), ('banana', [2, 5])]

Spark transformation `reduceByKey` groups key-value pairs by key, and applies a binary aggregation function to the list of values associated with a key.

For instance:

In [19]:
fruits_rdd.collect()

[('apple', 3), ('banana', 2), ('apple', 7), ('banana', 5)]

In [20]:
# Add all the values associated with a key
fruits_rdd.reduceByKey(lambda x, y: x+y).collect()

[('apple', 10), ('banana', 7)]

<center>
<div class="alert alert-block alert-info">
MapReduce is an extremely <b>versatile</b> programming model: any distributed program can be implemented using map and reduce operations.
</div>
    </center>

## Exercise: Word Count

<center>
<div class="alert alert-block alert-info">
WordCount is the historical MapReduce example, aiming at counting the occurrence of words in a document. For instance, if the document contains ten occurrences of the word "house", the result returned by WordCount will contain ("house", 10).
    </div>
    </center>
    
The MapReduce WordCount implementation is based on the following principle:
1. Create an RDD where each element is a word in the input document
2. Use the `map` transformation to create an RDD containing `(word, 1)` for every word in the document
3. Use the `reduceByKey` action to sum the 1s associated with a given word.

<ul style="list-style-image: url('images/do.png');">
<li>Implement a WordCount program in Spark to count the occurence of words in file <code>data/places.txt</code>.</li>
</ul>

In [21]:
filename = 'data/session1/places.txt'
places = sc.textFile(filename)

places.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y).collect()

[('Mercier-Hochelaga-Maisonneuve', 3102),
 ('Rivière-des-Prairies - Pointe-aux-Trembles', 752),
 ('Verdun', 183),
 ('Ville-Marie', 159),
 ('Lachine', 216),
 ('Île-Bizard-Sainte-Geneviève', 28),
 ('Montréal-Nord', 124),
 ('Outremont', 252),
 ('PARCS-NATURE', 780),
 ('Ahuntsic-Cartierville', 1748),
 ('Côte-des-Neiges-Notre-Dame-de-Grâce', 1888),
 ('Lasalle', 149),
 ('Le Sud-Ouest', 3005),
 ('Pierrefonds - Roxboro', 358),
 ('Le Plateau-Mont-Royal', 1806),
 ('Rosemont - La Petite-Patrie', 2618),
 ('Saint-Laurent', 819),
 ('Saint-Léonard', 354),
 ('Villeray-Saint-Michel - Parc-Extension', 3284),
 ('Anjou', 289)]

## Exercise (optional): Inverted Index


<center>
<div class="alert alert-block alert-info">
    An <b>inverted index</b> is a data structure that, given a set of text documents, returns for each word the set of documents in which it occurs. It is the basic structure used in a <b>Web search engine</b>, and a key historical motivation for Google to develop MapReduce.
    </div>
    </center>


Assume that an RDD <code>docs_and_words</code> exists that contains pairs <code>doc,word</code> where <code>doc</code> identifies a document and <code>word</code> is a word contained in this document. The following code generates such an RDD:

In [22]:
# Countries for which a document exists in data/sesssion1/docs
countries = ('canada', 'usa', 'mexico')
# Initialize an empty RDD.
docs_and_words = sc.emptyRDD()
# Iterate over documents
for country in countries:
    file = f'data/session1/docs/{country}.txt'
    
    # Get all the words in the document, in the form of (doc,word) pairs
    words = sc.textFile(file)\
              .flatMap(lambda x: x.split(' '))\
              .map(lambda x: (country+'.txt', x.lower()))
    
    # Add to the list of docs_and_words
    docs_and_words = docs_and_words.union(words)
# Check result
docs_and_words.take(3)

[('canada.txt', 'canada'), ('canada.txt', 'is'), ('canada.txt', 'a')]

<ul style="list-style-image: url('images/do.png');">
    <li>Transform <code>docs_and_words</code> into an inverted index containing pairs of the form <code>word,[docs]</code>, where <code>[docs]</code> is the list of documents that contain <code>words</code>. Sort the inverted index by key (word). Tip: to remove duplicates in a Python list, use a <a href="https://docs.python.org/3/library/stdtypes.html#set-types-set-frozenset"><code>set</code></a> instead.</li>
</ul>

In [23]:
docs_and_words.map(lambda x: (x[1], set([x[0]])))\
              .reduceByKey(lambda x,y: x.union(y))\
              .sortByKey()\
              .mapValues(list).take(500)

[('', ['canada.txt', 'mexico.txt', 'usa.txt']),
 ('(3.85', ['canada.txt']),
 ('(5,525', ['canada.txt']),
 ('(761,610', ['mexico.txt']),
 ('(9.8', ['usa.txt']),
 ('(about', ['mexico.txt']),
 ('(gdp).[25]', ['usa.txt']),
 ('(oas),', ['usa.txt']),
 ('(spanish:', ['mexico.txt']),
 ('(u.s.', ['usa.txt']),
 ('(un),', ['mexico.txt']),
 ('(usa),', ['usa.txt']),
 ('(wto),', ['mexico.txt']),
 ('1,972,550', ['mexico.txt']),
 ('10th-most', ['mexico.txt']),
 ('11', ['usa.txt']),
 ('11th-largest', ['mexico.txt']),
 ('12,000', ['usa.txt']),
 ('120,000', ['mexico.txt']),
 ('128,649,565', ['mexico.txt']),
 ('13th-largest', ['mexico.txt']),
 ('1521,', ['mexico.txt']),
 ('15th-largest', ['mexico.txt']),
 ('16th', ['canada.txt', 'usa.txt']),
 ('1763.', ['canada.txt']),
 ('1775', ['usa.txt']),
 ('1783,', ['usa.txt']),
 ('1821.[16]', ['mexico.txt']),
 ('1836', ['mexico.txt']),
 ('1848,', ['usa.txt']),
 ('1857.', ['mexico.txt']),
 ('1867,', ['canada.txt']),
 ('18th', ['usa.txt']),
 ('1910', ['mexico.txt']),


## Use case (part 2)

<ul style="list-style-image: url('images/do.png');">
<li>Print the list of unique parks where trees were treated, ordered alphabetically. Tip: use the <code>distinct</code> and <code>sortBy</code> transformations described in the <a href="https://spark.apache.org/docs/latest/rdd-programming-guide.html">Spark documentation</a>.</li>
</ul>


In [24]:
trees2015.filter(lambda x: x[6])\
         .map(lambda x: x[6])\
         .distinct().sortBy(lambda x: x).take(8)

['60E AVENUE / SUD 4E RUE',
 '<Null>',
 'AHUNTSIC',
 'AHUNTSIC, PONT',
 'ALBERT-PERRAS',
 'ALEXANDER (S12)',
 'ALEXIS-CARREL',
 'ALEXIS-NIHON']

<ul style="list-style-image: url('images/do.png');">
    <li>Create an RDD containing elements in the form <code>park,count</code>, where <code>count</code> is the number of trees treated in park <code>park</code>. Tip: use a logic similar to WordCount's.</li>
</ul>

In [25]:
trees2015.filter(lambda x: x[6])\
         .map(lambda x: (x[6], 1))\
         .reduceByKey(lambda x, y: x+y).take(8)

[('GATIEN-CLAUDE, RUE', 2),
 ('BEAU-BOIS (OUEST DE BEAU-BOIS)', 3),
 ('BASILE-ROUTHIER', 12),
 ('J.-J.-GAGNIER', 2),
 ('GABRIEL-LALEMANT', 3),
 ('AHUNTSIC', 32),
 ('AHUNTSIC, PONT', 2),
 ('SAINT-SIMON-APÔTRE', 5)]

__Optional__
<ul style="list-style-image: url('images/do.png');">
<li>Print the list of the 10 parks with the highest number of treated trees, ordered by decreasing count of treated trees. Tip: start from the solution to the previous question.</li></ul>


In [26]:
trees2015.filter(lambda x: x[6])\
         .map(lambda x: (x[6], 1))\
         .reduceByKey(lambda x, y: x+y)\
         .sortBy(lambda x: x[1], ascending=False)\
         .take(10)

[('JEAN-DRAPEAU', 453),
 ('MAISONNEUVE, DE', 296),
 ('MONT-ROYAL', 235),
 ('JARRY', 170),
 ('COMPLEXE ENVIRONNEM. ST-MICHEL', 128),
 ('NOËL-SUD', 121),
 ('PROMENADE-BELLERIVE, DE LA', 104),
 ('ANGRIGNON', 103),
 ('Alexis-Carrel', 83),
 ('PHILIPPE-LAHEURTE', 79)]

 __Optional__
<ul style="list-style-image: url('images/do.png');">
    <li>
 Print the alphabetically sorted list
        of parks that had trees treated both in 2016 and 2015. Tip: use the <code>intersection</code> transformation.
    </li>
    </ul>

In [27]:
parks2015 = trees2015.filter(lambda x: x[6])\
                     .map(lambda x: x[6])
parks2016 = trees2016.filter(lambda x: x[6])\
                     .map(lambda x: x[6])
both = parks2015.intersection(parks2016).sortBy(lambda park: park)
both.take(10)

['ATWATER, MARCHÉ PUBLIC',
 'BERTRAND, PARC',
 'Espace Vert Bertrand',
 'Nom_parc']

## Quiz

<div class="alert alert-block alert-info">
The MapReduce programming model was first introduced in Apache Spark:  
    
    
&#x25a2; True
    
&#x25a2; False
</div>

<div class="alert alert-block alert-info">
The MapReduce programming model was first introduced in Apache Spark:  
    
&#x25a2; True
    
&#x2611; False
</div>

The MapReduce programming model was first published by <a href="https://dl.acm.org/doi/abs/10.1145/1327452.1327492?casa_token=esUvbIYGBBQAAAAA:N9ZCfs4QQ8o4-4nIPw8gsr5smdwkbNQfz5v7IYM_PLwibvM3PLxUYi8jj_eBYWHgc--Zd5reHaSOHQ">Google in 2008</a>, and the first public implementation was made available in <a href="https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html">Apache Hadoop</a>. Compared to the Hadoop implementation, Spark provides:
- An easier programming model, including APIs in Python, Java, Scala and R
- Lazy evaluation (see next key concept)
- In-memory computing (see next key concept)

<div class="alert alert-block alert-warning">
The map transformation is parallel: provided that enough computers are available in the cluster, partitions of an RDD will be processed concurrently by the map function:

&#x25a2; True
    
&#x25a2; False
</div>

<div class="alert alert-block alert-warning">
The map transformation is parallel: provided that enough computers are available in the cluster, partitions of an RDD will be processed concurrently by the map function:
    
&#x2611; True
    
&#x25a2; False
</div>

The map transformation is, with filter, the main parallelization construct in Spark.

<div class="alert alert-block alert-info">
The reduce transformation may result in important overheads due to data transfers:  
    
&#x25a2; True
    
&#x25a2; False
</div>

<div class="alert alert-block alert-info">
A reduce transformation may result in important overheads due to data transfers:  
    
&#x2611; True
    
&#x25a2; False
</div>

A <code>reduceByKey</code> or <code>groupByKey</code> transformation combines data from many partitions of the RDD, which can result in important data transfers. They should be used with care!

<img src="images/session1/wide-narrow.png" alt="narrow-wide"/>

<div class="alert alert-block alert-warning">
Spark is limited to the processing of textual data. Binary data types, such as signals or images, are not supported:
    
&#x25a2; True
    
&#x25a2; False
</div>

<div class="alert alert-block alert-warning">
Spark is limited to the processing of textual data. Binary data types, such as signals or images, are not supported:
    
&#x25a2; True
    
&#x2611; False
</div>

RDDs can be created from binary data, in particular using `sc.binaryFile`. Complete frameworks have also been built over Spark to process images or signal.

# Key concept 3: Data locality, Lazy evaluation, In-memory computing

## Data locality

Data transfers can be an important source of overhead in a distributed environment. 

To limit data movement, Spark:
- schedules tasks preferably to the nodes where input data is located (HDFS helps)
- leaves output data on the nodes where it was produced

<center>
    <b>The effect of data locality <a href="https://arxiv.org/pdf/1812.06492">[Hayot-Sasson <i>et al.</i>, 2019]</a></b>
<img src="images/session1/data-locality.png"/>
    </center>

<center>
<div class="alert alert-block alert-info">
Impact of data locality is maximal for <b>data-intensive</b> applications
</div>
</center>

## Lazy evaluation

<center>
<div class="alert alert-block alert-info">
When an action is called, Spark combines transformations to avoid unnecessary computations. 
    </div>


<img src="images/session1/lazy.png" alt="lazy evaluation" width=800/>
(diagram by Valérie Hayot-Sasson)
</center>


## In-memory computing


<center>
<div class="alert alert-block alert-info">
Spark keeps data in memory as much as possible, which makes it faster than Hadoop MapReduce
    </div>

    
<img src="images/session1/memory.png" alt="in memory benchmark"/>

<a href="https://www.usenix.org/conference/nsdi12/technical-sessions/presentation/zaharia">[Zaharia <i>et al.</i>, 2012]</a>

</center>

# Key concept 4: DataFrames

## Creating and manipulating DataFrames


Similar to R or Pandas, Spark can also manipulate data through DataFrames representing textual tables. 

The DataFrame API requires to initialize a Spark session from Spark's SQL API:

In [28]:
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .getOrCreate()

The DataFrame API is particularly suited for the processing of CSV files, as is the case of our use case. CSV files can be loaded as follows:


In [29]:
filename = 'data/session1/frenepublicinjection2015.csv'
df = spark.read.option("header","true").csv(filename)
df.show(3)

+--------------------+------+--------+----------------+--------------+----------------+--------+------+----------+------------------+-------------------+-----------------+----------------+
|          Nom_arrond|Invent|No_Civiq|             Rue|        Rue_De|           Rue_a|Nom_parc| Sigle|Injections|                 x|                  y|        longitude|        latitude|
+--------------------+------+--------+----------------+--------------+----------------+--------+------+----------+------------------+-------------------+-----------------+----------------+
|Ahuntsic-Cartierv...|     R|   10265|Rue Saint-Hubert| SAURIOL E RUE|    FLEURY E RUE|    null|FRPESU|      2015|292338.75000000000|5046268.50000000000|-73.6596113879546|45.5562169952192|
|Ahuntsic-Cartierv...|     R|    9451|     Rue Foucher|CHABANEL E RUE|LOUVAIN E RUE DE|    null|FRAMKL|      2015|293090.71899999998|5045715.50000000000|-73.6499665032798|45.5512539761596|
|Ahuntsic-Cartierv...|     R|   10435|       Rue Taché|

Pandas data frames are more nicely rendered in Jupyter than Spark's, so for the sake of visualization we can convert our Spark DataFrame to Pandas:

In [30]:
df.toPandas()

Unnamed: 0,Nom_arrond,Invent,No_Civiq,Rue,Rue_De,Rue_a,Nom_parc,Sigle,Injections,x,y,longitude,latitude
0,Ahuntsic-Cartierville,R,10265,Rue Saint-Hubert,SAURIOL E RUE,FLEURY E RUE,,FRPESU,2015,292338.75000000000,5046268.50000000000,-73.6596113879546,45.5562169952192
1,Ahuntsic-Cartierville,R,9451,Rue Foucher,CHABANEL E RUE,LOUVAIN E RUE DE,,FRAMKL,2015,293090.71899999998,5045715.50000000000,-73.6499665032798,45.5512539761596
2,Ahuntsic-Cartierville,R,10435,Rue Taché,TACHÉ PLACE,PRIEUR E RUE,,FRPE,2015,292521.68199999997,5047302.00000000000,-73.6572942384626,45.5655199930171
3,Ahuntsic-Cartierville,R,1475,Rue Prieur Est,OLYMPIA BOULE,HAMEL AVENU,,FRPE,2015,292325.89710000000,5047059.65600000040,-73.6597961939526,45.5633358284252
4,Ahuntsic-Cartierville,R,9905,Avenue du Sacré-Coeur,SAUVÉ E RUE,SAURIOL E RUE,,FRPE,2015,292839.26500000001,5046594.00000000000,-73.6532084656865,45.5591547231261
5,Ahuntsic-Cartierville,R,9345,Avenue Millen,CHABANEL E RUE,LOUVAIN E RUE DE,,FRPE,2015,293251.93800000002,5045722.50000000000,-73.6479018646843,45.5513196564192
6,Ahuntsic-Cartierville,R,10125,Rue Parthenais,SAURIOL E RUE,FLEURY E RUE,,FRAM,2015,293277.31300000002,5047976.50000000000,-73.6476299844672,45.5716022627822
7,Ahuntsic-Cartierville,R,9905,Avenue Papineau,SAUVÉ E RUE,SAURIOL E RUE,,FRPE,2015,293163.53200000001,5047406.44900000000,-73.6490741850349,45.5664708823277
8,Ahuntsic-Cartierville,R,9131,Rue Basile-Routhier,LEGENDRE E RUE,CHABANEL E RUE,,FRPE,2015,293449.03100000002,5045493.00000000000,-73.6453722795055,45.5492577808012
9,Ahuntsic-Cartierville,R,9147,Rue Basile-Routhier,LEGENDRE E RUE,CHABANEL E RUE,,FRPE,2015,293438.25000000000,5045497.00000000000,-73.6455104445954,45.5492935983228


Spark DataFrames leverage RDDs. In fact, the internal representation of a DataFrame is an RDD, which can be accessed as follows:

In [31]:
df.rdd.take(3)

[Row(Nom_arrond='Ahuntsic-Cartierville', Invent='R', No_Civiq='10265', Rue='Rue Saint-Hubert', Rue_De='SAURIOL E RUE', Rue_a='FLEURY E RUE', Nom_parc=None, Sigle='FRPESU', Injections='2015', x='292338.75000000000', y='5046268.50000000000', longitude='-73.6596113879546', latitude='45.5562169952192'),
 Row(Nom_arrond='Ahuntsic-Cartierville', Invent='R', No_Civiq='9451', Rue='Rue Foucher', Rue_De='CHABANEL E RUE', Rue_a='LOUVAIN E RUE DE', Nom_parc=None, Sigle='FRAMKL', Injections='2015', x='293090.71899999998', y='5045715.50000000000', longitude='-73.6499665032798', latitude='45.5512539761596'),
 Row(Nom_arrond='Ahuntsic-Cartierville', Invent='R', No_Civiq='10435', Rue='Rue Taché', Rue_De='TACHÉ PLACE', Rue_a='PRIEUR E RUE', Nom_parc=None, Sigle='FRPE', Injections='2015', x='292521.68199999997', y='5047302.00000000000', longitude='-73.6572942384626', latitude='45.5655199930171')]

Several functions are available to process DataFrames, documented [here](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame). 

Here are a few examples:

In [32]:
# Counting
df.count()

21914

In [33]:
# Filtering
df.filter(df['Nom_parc'] == 'BEAUBIEN').toPandas()

Unnamed: 0,Nom_arrond,Invent,No_Civiq,Rue,Rue_De,Rue_a,Nom_parc,Sigle,Injections,x,y,longitude,latitude
0,Outremont,H,,PARC BEAUBIEN,,,BEAUBIEN,FRPE,2015,296260.1416,5041800.169,-73.6093059696596,45.5160684743449
1,Outremont,H,,PARC BEAUBIEN,,,BEAUBIEN,FRPE,2015,296319.743,5041754.9795,-73.6085423208117,45.515662568851
2,Outremont,H,,,,,BEAUBIEN,FRPE,2015,296272.3588,5041816.3586,-73.6091498774581,45.5162143042545
3,Outremont,H,,BEAUBIEN,,,BEAUBIEN,FRPE,2015,296288.9806,5041831.0962,-73.608937382347,45.5163471220592
4,Outremont,H,,BEAUBIEN,,,BEAUBIEN,FRPE,2015,296306.8159,5041836.7305,-73.6087091962244,45.5163980391792
5,Outremont,H,,BEAUBIEN,,,BEAUBIEN,FRPE,2015,296412.5015,5041706.0556,-73.6073542313963,45.5152234542308
6,Outremont,H,,BEAUBIEN,,,BEAUBIEN,FRPE,2015,296420.5279,5041710.4348,-73.607251574014,45.5152629565036
7,Outremont,H,,BEAUBIEN,,,BEAUBIEN,FRPE,2015,296442.08,5041716.7066,-73.6069758291288,45.5153196513007
8,Outremont,H,,BEAUBIEN,,,BEAUBIEN,FRPE,2015,296472.8789,5041736.8187,-73.6065819659723,45.515500996407
9,Outremont,H,,BEAUBIEN,,,BEAUBIEN,FRPE,2015,296457.0071,5041746.408,-73.6067852779769,45.5155870949897


## Use case (part 3): with DataFrames

We will now repeat the analyses done on our dataset, this time with the DataFrame API.

<ul style="list-style-image: url('images/do.png');">
<li>Using the DataFrame API, print the list of unique parks where trees were treated, ordered alphabetically. Tip: use the <a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.select"><code>select</code></a> function.</li>
</ul>


In [34]:
df.filter(df['Nom_parc'] != '')\
  .select('Nom_parc')\
  .distinct()\
  .orderBy('Nom_parc').show()

+--------------------+
|            Nom_parc|
+--------------------+
|60E AVENUE / SUD ...|
|              <Null>|
|            AHUNTSIC|
|      AHUNTSIC, PONT|
|       ALBERT-PERRAS|
|     ALEXANDER (S12)|
|       ALEXIS-CARREL|
|        ALEXIS-NIHON|
| ALLÉE DES TANNERIES|
|ALPHONSE-TELESHPO...|
|ALPHONSE-TELESPHO...|
|ANCIENNE-COUR-DE-...|
|  ANCIENNE-PÉPINIÈRE|
|           ANGRIGNON|
|      APPRENTIS, DES|
| AQUEDUC DE MONTRÉAL|
|      ARMAND-VANASSE|
|ATWATER, MARCHÉ P...|
|     AUTOPARC NO 088|
|     AUTOPARC NO 266|
+--------------------+
only showing top 20 rows



<ul style="list-style-image: url('images/do.png');">
    <li>Using the DataFrame API, print a list of elements in the form <code>park,count</code>, where <code>count</code> is the number of trees treated in park <code>park</code>. Tip: use function <a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy"><code>groupBy</code></a>.</li>
</ul>

In [35]:
df.filter(df['Nom_parc'] != '')\
  .select('Nom_parc')\
  .groupBy('Nom_parc')\
  .count()\
  .orderBy('Nom_parc').show()

+--------------------+-----+
|            Nom_parc|count|
+--------------------+-----+
|60E AVENUE / SUD ...|    2|
|              <Null>|   14|
|            AHUNTSIC|   32|
|      AHUNTSIC, PONT|    2|
|       ALBERT-PERRAS|   11|
|     ALEXANDER (S12)|   22|
|       ALEXIS-CARREL|    1|
|        ALEXIS-NIHON|   23|
| ALLÉE DES TANNERIES|    1|
|ALPHONSE-TELESHPO...|    1|
|ALPHONSE-TELESPHO...|    1|
|ANCIENNE-COUR-DE-...|    5|
|  ANCIENNE-PÉPINIÈRE|    8|
|           ANGRIGNON|  103|
|      APPRENTIS, DES|    5|
| AQUEDUC DE MONTRÉAL|    9|
|      ARMAND-VANASSE|    7|
|ATWATER, MARCHÉ P...|    8|
|     AUTOPARC NO 088|    6|
|     AUTOPARC NO 266|    5|
+--------------------+-----+
only showing top 20 rows



__Optional__
<ul style="list-style-image: url('images/do.png');">
<li>Print the list of the 10 parks with the highest number of treated trees, ordered by decreasing count of treated trees. Tip: start from the solution to the previous question.</li></ul>


In [36]:
df.filter(df['Nom_parc'] != '')\
  .select('Nom_parc')\
  .groupBy('Nom_parc')\
  .count()\
  .orderBy('count', ascending=False).show(10)

+--------------------+-----+
|            Nom_parc|count|
+--------------------+-----+
|        JEAN-DRAPEAU|  453|
|     MAISONNEUVE, DE|  296|
|          MONT-ROYAL|  235|
|               JARRY|  170|
|COMPLEXE ENVIRONN...|  128|
|            NOËL-SUD|  121|
|PROMENADE-BELLERI...|  104|
|           ANGRIGNON|  103|
|       Alexis-Carrel|   83|
|   PHILIPPE-LAHEURTE|   79|
+--------------------+-----+
only showing top 10 rows



 __Optional__
<ul style="list-style-image: url('images/do.png');">
    <li>
 Print the alphabetically sorted list
    of parks that had trees treated both in 2016 and 2015. Tip: use the <a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.join"><code>join</code></a> function.
    </li>
    </ul>

In [37]:
# Create DataFrames

filename1 = 'data/session1/frenepublicinjection2015.csv'
df1 = spark.read.option("header","true").csv(filename1).select('Nom_parc').distinct()

filename2 = 'data/session1/frenepublicinjection2016.csv'
df2 = spark.read.option("header","true").csv(filename2).select('Nom_parc').distinct()

# Join them on Nom_parc
df1.join(df2, df1['Nom_parc'] == df2['Nom_parc'])\
   .select(df1['Nom_parc']).show()

+--------------------+
|            Nom_parc|
+--------------------+
|Espace Vert Bertrand|
|      BERTRAND, PARC|
|ATWATER, MARCHÉ P...|
+--------------------+



# Recap

<div class="alert alert-block alert-success">
    &#x2611; <i>when</i> to use Spark
</div>

<i>Large datasets, substantial processing, independent tasks, batch processing</i>


<div class="alert alert-block alert-success">
    &#x2611; <i>how</i> to write Spark programs
</div>

<i>Two APIs: Resilient Distributed Datasets, DataFrames</i>

<div class="alert alert-block alert-success">
    &#x2611; <i>what</i> is the relation between Spark and <i>Hadoop</i>
</div>

<i>Hadoop MapReduce was the first public implementation of MapReduce. Spark now outperforms it and provides a simpler API.</i>

<i>The Hadoop Distributed File System (HDFS) can be combined with Spark to improve data locality.</i>

<center>
    <b>The Apache Spark ecosystem <a href="https://dl.acm.org/doi/fullHtml/10.1145/2934664">[Zaharia <i>et al.</i>, 2016]</a></b>
<img src="images/session1/ecosystem.png" alt="ecosystem" width=600/>