# Start Spark

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 27 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 15.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=6b51aba46c878f261c936ee37e9d88f7a5c75d39fb619f928e8061e20ca70b02
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [2]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("Spark-Core").setMaster("local[*]")
sc = SparkContext(conf=conf)

# Definitions

## RDD
- Resilient Distributed Dataframe
- Collection of elements partitioned across the nodes of the cluster that can be operated on in parallel
- is immutable

## DAG
- Directed Acyclic Graph
- The graph of "lazy" transfomations
- Thanks to the DAG, each RDD knows how it was built
- Allows recovery in case of failure

## Transformations
- Filter
- Map
- GroupBy


## Actions
- Collect, count, reduce, ...
- Trigger the DAG execution

# Data Sources

## Memory

### parallelize

In [3]:
rdd = sc.parallelize(range(0, 100), 7).glom().collect()
# glom : Return an RDD created by coalescing all elements within each partition into a list.
rdd

[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13],
 [14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27],
 [28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41],
 [42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56],
 [57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70],
 [71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84],
 [85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]]

### range

In [4]:
rdd = sc.range(start=0, end=100, step=1, numSlices=7).glom().collect()
rdd

[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13],
 [14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27],
 [28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41],
 [42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56],
 [57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70],
 [71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84],
 [85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]]

## File Systems
- Local / cloud drive
- [Hadoop HDFS](https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html)
- [Amazon S3](https://aws.amazon.com/s3/)

In [5]:
from google.colab import drive
import os

drive.mount('/content/gdrive', force_remount=True)
dir = os.path.join('gdrive', 'My Drive', 'Eurostat', '05 - Data Science for Big Data')
data_dir = os.path.join(dir, 'data')

Mounted at /content/gdrive


### textFile

In [6]:
rdd = sc.textFile(os.path.join(data_dir, 'bible.txt'))
rdd.first()

'In the beginning God created the heaven and the earth. And the earth was without form, and void; and darkness was upon the face of the deep. And the Spirit of God moved upon the face of the waters. '

### wholeTextFile

In [7]:
rdd = sc.wholeTextFiles(path=os.path.join(data_dir, 'bible_word_count'))
rdd.collect()

[('file:/content/gdrive/My Drive/Eurostat/05 - Data Science for Big Data/data/bible_word_count/part-00001',
 ('file:/content/gdrive/My Drive/Eurostat/05 - Data Science for Big Data/data/bible_word_count/part-00000',

### sequenceFile

In [8]:
rdd = sc.parallelize(range(1, 10)).map(lambda x: (x, 2**x))
rdd.saveAsSequenceFile(os.path.join(data_dir, 'powers_of_two'))

In [9]:
rdd = sc.sequenceFile(path=os.path.join(data_dir, 'powers_of_two'))
rdd.collect()

[(5, 32),
 (6, 64),
 (7, 128),
 (8, 256),
 (9, 512),
 (1, 2),
 (2, 4),
 (3, 8),
 (4, 16)]

In [10]:
rdd = sc.sequenceFile(path=os.path.join(data_dir, 'powers_of_two'),
                       keyClass='org.apache.hadoop.io.IntWritable',
                       valueClass='org.apache.hadoop.io.IntWritable')
rdd.collect()

[(5, 32),
 (6, 64),
 (7, 128),
 (8, 256),
 (9, 512),
 (1, 2),
 (2, 4),
 (3, 8),
 (4, 16)]

In [11]:
rdd = sc.sequenceFile(path=os.path.join(data_dir, 'powers_of_two'),
                       keyClass='org.apache.hadoop.io.IntWritable',
                       valueClass='org.apache.hadoop.io.IntWritable')\
        .map(lambda kv: (str(kv[0]), str(kv[1])))
rdd.collect()

[('5', '32'),
 ('6', '64'),
 ('7', '128'),
 ('8', '256'),
 ('9', '512'),
 ('1', '2'),
 ('2', '4'),
 ('3', '8'),
 ('4', '16')]

### hadoopFile and newAPIHadoopFile

In [12]:
rdd = sc.newAPIHadoopFile(path=os.path.join(data_dir, 'bible.txt'),
                          inputFormatClass='org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
                          keyClass='org.apache.hadoop.io.Text',
                          valueClass='org.apache.hadoop.io.Text')
rdd.first()

(0,
 'In the beginning God created the heaven and the earth. And the earth was without form, and void; and darkness was upon the face of the deep. And the Spirit of God moved upon the face of the waters. ')

### hadoopRDD and newAPIHadoopRDD

## Databases
- [MongoDB](https://www.mongodb.com/) (JSON format)
- [Cassandra DB](https://cassandra.apache.org/_/index.html) (column-oriented DB)

## File Formats
- [Parquet](https://parquet.apache.org/) (columnar storage)
- [Avro](https://avro.apache.org/docs/1.2.0/) (row-based storage)

# Transformations & Actions

## Collect

In [13]:
directors = sc.parallelize([('Rolf Wagenführ', 'West Germany', 1952, 1966),
                            ('Raymond Dumas', 'France', 1966, 1973),
                            ('Jacques Mayer', 'France',	1973, 1977),
                            ('Aage Dornonville de la Cour', 'Denmark', 1977, 1982),
                            ('Pieter de Geus', 'Netherlands', 1982, 1984),
                            ('Silvio Ronchetti', 'Italy',	1984, 1987),
                            ('Yves Franchet', 'France', 1987, 2003),
                            ('Michel Vanden Abeele', 'Belgium', 2003, 2004),
                            ('Günther Hanreich', 'Austria', 2004, 2006),
                            ('Hervé Carré', 'France', 2006, 2008),
                            ('Walter Radermacher', 'Germany', 2008, 2016),
                            ('Mariana Kotzeva', 'Bulgaria', 2017, 'present')])

directors.collect()

[('Rolf Wagenführ', 'West Germany', 1952, 1966),
 ('Raymond Dumas', 'France', 1966, 1973),
 ('Jacques Mayer', 'France', 1973, 1977),
 ('Aage Dornonville de la Cour', 'Denmark', 1977, 1982),
 ('Pieter de Geus', 'Netherlands', 1982, 1984),
 ('Silvio Ronchetti', 'Italy', 1984, 1987),
 ('Yves Franchet', 'France', 1987, 2003),
 ('Michel Vanden Abeele', 'Belgium', 2003, 2004),
 ('Günther Hanreich', 'Austria', 2004, 2006),
 ('Hervé Carré', 'France', 2006, 2008),
 ('Walter Radermacher', 'Germany', 2008, 2016),
 ('Mariana Kotzeva', 'Bulgaria', 2017, 'present')]

## Count

In [14]:
directors = sc.parallelize([('Rolf Wagenführ', 'West Germany', 1952, 1966),
                            ('Raymond Dumas', 'France', 1966, 1973),
                            ('Jacques Mayer', 'France',	1973, 1977),
                            ('Aage Dornonville de la Cour', 'Denmark', 1977, 1982),
                            ('Pieter de Geus', 'Netherlands', 1982, 1984),
                            ('Silvio Ronchetti', 'Italy',	1984, 1987),
                            ('Yves Franchet', 'France', 1987, 2003),
                            ('Michel Vanden Abeele', 'Belgium', 2003, 2004),
                            ('Günther Hanreich', 'Austria', 2004, 2006),
                            ('Hervé Carré', 'France', 2006, 2008),
                            ('Walter Radermacher', 'Germany', 2008, 2016),
                            ('Mariana Kotzeva', 'Bulgaria', 2017, 2021)])

for part in directors.glom().collect():
  print(part)

print(f'Eurostat had a total of {directors.count()} director-generals.')

[('Rolf Wagenführ', 'West Germany', 1952, 1966), ('Raymond Dumas', 'France', 1966, 1973), ('Jacques Mayer', 'France', 1973, 1977), ('Aage Dornonville de la Cour', 'Denmark', 1977, 1982), ('Pieter de Geus', 'Netherlands', 1982, 1984), ('Silvio Ronchetti', 'Italy', 1984, 1987)]
[('Yves Franchet', 'France', 1987, 2003), ('Michel Vanden Abeele', 'Belgium', 2003, 2004), ('Günther Hanreich', 'Austria', 2004, 2006), ('Hervé Carré', 'France', 2006, 2008), ('Walter Radermacher', 'Germany', 2008, 2016), ('Mariana Kotzeva', 'Bulgaria', 2017, 2021)]
Eurostat had a total of 12 director-generals.


#### ❓ Exercise
What happens if you split the RDD over 15 partitions?

## Filter

In [15]:
german_directors = directors.filter(lambda tup : 'Germany' in tup[1])
german_directors.collect()

[('Rolf Wagenführ', 'West Germany', 1952, 1966),
 ('Walter Radermacher', 'Germany', 2008, 2016)]

#### ❓ Exercise
How many director-generals acted for 5 years or more?

### Foreach

In [16]:
directors.foreach(lambda tup : print(f'{tup[0]} from {tup[1]} directed Eurostat from {tup[2]} until {tup[3]}.'))

In [17]:
# Where is the output ?

### Map

In [18]:
director_presentations = directors.map(lambda tup : f'{tup[0]} from {tup[1]} directed Eurostat from {tup[2]} until {tup[3]}.')

In [19]:
director_presentations.collect()

['Rolf Wagenführ from West Germany directed Eurostat from 1952 until 1966.',
 'Raymond Dumas from France directed Eurostat from 1966 until 1973.',
 'Jacques Mayer from France directed Eurostat from 1973 until 1977.',
 'Aage Dornonville de la Cour from Denmark directed Eurostat from 1977 until 1982.',
 'Pieter de Geus from Netherlands directed Eurostat from 1982 until 1984.',
 'Silvio Ronchetti from Italy directed Eurostat from 1984 until 1987.',
 'Yves Franchet from France directed Eurostat from 1987 until 2003.',
 'Michel Vanden Abeele from Belgium directed Eurostat from 2003 until 2004.',
 'Günther Hanreich from Austria directed Eurostat from 2004 until 2006.',
 'Hervé Carré from France directed Eurostat from 2006 until 2008.',
 'Walter Radermacher from Germany directed Eurostat from 2008 until 2016.',
 'Mariana Kotzeva from Bulgaria directed Eurostat from 2017 until 2021.']

## Reduce

In [20]:
concat = directors.map(lambda tup : tup[0]).reduce(lambda x, y: x + ', ' + y)
print("List of all director-generals:\n%s." %(concat))

List of all director-generals:
Rolf Wagenführ, Raymond Dumas, Jacques Mayer, Aage Dornonville de la Cour, Pieter de Geus, Silvio Ronchetti, Yves Franchet, Michel Vanden Abeele, Günther Hanreich, Hervé Carré, Walter Radermacher, Mariana Kotzeva.


## Join

In [21]:
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)

In [22]:
joined.collect()

[('hadoop', (4, 5)), ('spark', (1, 2))]

## Cache

In [23]:
print("Director-generals got cached: %s" %(directors.is_cached))

Director-generals got cached: False


In [27]:
directors.cache()
caching = directors.persist().is_cached
print("Director-generals got cached: %s" %(caching))

Director-generals got cached: True


# RDD Combiners

## Union

In [29]:
directors_1900 = sc.parallelize([('Rolf Wagenführ', 'West Germany', 1952, 1966),
                                 ('Raymond Dumas', 'France', 1966, 1973),
                                 ('Jacques Mayer', 'France',	1973, 1977),
                                 ('Aage Dornonville de la Cour', 'Denmark', 1977, 1982),
                                 ('Pieter de Geus', 'Netherlands', 1982, 1984),
                                 ('Silvio Ronchetti', 'Italy',	1984, 1987),
                                 ('Yves Franchet', 'France', 1987, 2003)])
directors_2000 = sc.parallelize([('Yves Franchet', 'France', 1987, 2003),
                                 ('Michel Vanden Abeele', 'Belgium', 2003, 2004),
                                 ('Günther Hanreich', 'Austria', 2004, 2006),
                                 ('Hervé Carré', 'France', 2006, 2008),
                                 ('Walter Radermacher', 'Germany', 2008, 2016),
                                 ('Mariana Kotzeva', 'Bulgaria', 2017, 2021)])

directors_1900.union(directors_2000).collect()

[('Rolf Wagenführ', 'West Germany', 1952, 1966),
 ('Raymond Dumas', 'France', 1966, 1973),
 ('Jacques Mayer', 'France', 1973, 1977),
 ('Aage Dornonville de la Cour', 'Denmark', 1977, 1982),
 ('Pieter de Geus', 'Netherlands', 1982, 1984),
 ('Silvio Ronchetti', 'Italy', 1984, 1987),
 ('Yves Franchet', 'France', 1987, 2003),
 ('Yves Franchet', 'France', 1987, 2003),
 ('Michel Vanden Abeele', 'Belgium', 2003, 2004),
 ('Günther Hanreich', 'Austria', 2004, 2006),
 ('Hervé Carré', 'France', 2006, 2008),
 ('Walter Radermacher', 'Germany', 2008, 2016),
 ('Mariana Kotzeva', 'Bulgaria', 2017, 2021)]

## Intersection

In [31]:
directors_1900.intersection(directors_2000).collect()

[('Yves Franchet', 'France', 1987, 2003)]

## Subtract

In [33]:
directors_1900.subtract(directors_2000).collect()

[('Jacques Mayer', 'France', 1973, 1977),
 ('Pieter de Geus', 'Netherlands', 1982, 1984),
 ('Silvio Ronchetti', 'Italy', 1984, 1987),
 ('Raymond Dumas', 'France', 1966, 1973),
 ('Aage Dornonville de la Cour', 'Denmark', 1977, 1982),
 ('Rolf Wagenführ', 'West Germany', 1952, 1966)]

## Cartesian

In [34]:
rdd = sc.parallelize([1, 2, 3])
sorted(rdd.cartesian(rdd).collect())

[(1, 1), (1, 2), (1, 3), (2, 1), (2, 2), (2, 3), (3, 1), (3, 2), (3, 3)]

## Zip

In [35]:
x = sc.parallelize(range(0, 5))
y = sc.parallelize(range(1000, 1005))
x.zip(y).collect()

[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

In [25]:
sc.stop()