In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -q

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=dcc7fc65333487e1efcebc6868196cc3627397f0dae13932880c3910c8d6de5b
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
Reading package lists...
Building dependency tree...
Reading state information...
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum
  fonts-ip

In [2]:
import random
import re

## PySpark Python API

PySpark can be used from standalone Python scripts by creating a `SparkContext`. You can set configuration properties by passing a `SparkConf` object to `SparkContext`.

Documentation: [pyspark package](https://spark.apache.org/docs/latest/api/python/pyspark.html)

In [3]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

In [4]:
# cannot run multiple SparkContexts at once (so stop one just in case)
sc = SparkContext.getOrCreate()
sc.stop()

In [5]:
# spark conf
conf = SparkConf()

In [6]:
sc = SparkContext(conf=conf)

## RDD - Resilient Distributed Datasets

resilient:
- (of a person or animal) able to withstand or recover quickly from difficult conditions
- (of a substance or object) able to recoil or spring back into shape after bending, stretching, or being compressed

Spark is RDD-centric!
- RDDs are immutable
- RDDs are computed lazily
- RDDs can be cached
- RDDs know who their parents are
- RDDs that contain only tuples of two elements are “pair RDDs”

## RDD Actions

**RDD** - Resilient Distributed Datasets

Some useful actions:
- take(n) – return the first n elements in the RDD as an array.
- collect() – return all elements of the RDD as an array. Use with caution.
- count() – return the number of elements in the RDD as an int.
- saveAsTextFile(‘path/to/dir’) – save the RDD to files in a directory. Will create the directory if it doesn’t exist and will fail if it does.
- foreach(func) – execute the function against every element in the RDD, but don’t keep any results.

#### Demo files

```
file1.txt:
    Apple,Amy
    Butter,Bob
    Cheese,Chucky
    Dinkel,Dieter
    Egg,Edward
    Oxtail,Oscar
    Anchovie,Alex
    Avocado,Adam
    Apple,Alex
    Apple,Adam
    Dinkel,Dieter
    Doughboy,Pilsbury
    McDonald,Ronald

file2.txt:
    Wendy,
    Doughboy,Pillsbury
    McDonald,Ronald
    Cheese,Chucky
```

In [7]:
# input files
# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')


Mounted at /content/drive


In [8]:
data1 = sc.textFile("/content/drive/My Drive/Colab Notebooks/MMDS/file1.txt")
data2 = sc.textFile("/content/drive/My Drive/Colab Notebooks/MMDS/file2.txt")

In [9]:
data1.collect()

['Apple,Amy',
 'Butter,Bob',
 'Cheese,Chucky',
 'Dinkel,Dieter',
 'Egg,Edward',
 'Oxtail,Oscar',
 'Anchovie,Alex',
 'Avocado,Adam',
 'Apple,Alex',
 'Apple,Adam',
 'Dinkel,Dieter',
 'Doughboy,Pilsbury',
 'McDonald,Ronald']

In [10]:
print("file1: %d lines" % data1.count())

file1: 13 lines


In [21]:
data1.take(5)

['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky', 'Dinkel,Dieter', 'Egg,Edward']

In [12]:
data2.collect()

['Wendy,', 'Doughboy,Pillsbury', 'McDonald,Ronald', 'Cheese,Chucky']

In [13]:
print("file2: %d lines" % data2.count())

file2: 4 lines


In [14]:
data2.take(3)

['Wendy,', 'Doughboy,Pillsbury', 'McDonald,Ronald']

## RDD Operations

### map()
Return a new RDD by applying a function to each element of this RDD.
- apply an operation to every element of an RDD
- return a new RDD that contains the results

In [15]:
data1.map(lambda line: line.split(',')).take(3)

[['Apple', 'Amy'], ['Butter', 'Bob'], ['Cheese', 'Chucky']]

### flatMap()
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
- apply an operation to the value of every element of an RDD
- return a new RDD that contains the results after dropping the outermost container

In [16]:
data1.flatMap(lambda line: line.split(',')).take(7)

['Apple', 'Amy', 'Butter', 'Bob', 'Cheese', 'Chucky', 'Dinkel']

### mapValues()
Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.
- apply an operation to the value of every element of an RDD
- return a new RDD that contains the results

Only works with pair RDDs.

In [17]:
data3 = data1.map(lambda line: line.split(','))

In [18]:
data3.take(3)

[['Apple', 'Amy'], ['Butter', 'Bob'], ['Cheese', 'Chucky']]

In [19]:
data4 = data3.map(lambda pair: (pair[0], pair[1]))

In [20]:
data4.take(3)

[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]

In [None]:
data4.mapValues(lambda name: name.lower()).take(3)

[('Apple', 'amy'), ('Butter', 'bob'), ('Cheese', 'chucky')]

In [None]:
data4.mapValues(lambda name: name.upper()).take(3)

[('Apple', 'AMY'), ('Butter', 'BOB'), ('Cheese', 'CHUCKY')]

### flatMapValues()
Pass each value in the key-value pair RDD through a flatMap function without changing the keys; this also retains the original RDD's partitioning.
- apply an operation to the value of every element of an RDD
- return a new RDD that contains the results after removing the outermost container

Only works with pair RDDs.

In [None]:
data3 = data1.map(lambda line: line.split(','))

In [None]:
data4 = data3.map(lambda pair: (pair[0], pair[1]))

In [None]:
data3.take(3)


[['Apple', 'Amy'], ['Butter', 'Bob'], ['Cheese', 'Chucky']]

In [None]:
data4.take(3)

[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]

In [None]:
data4.flatMapValues(lambda name: name.lower()).take(9)

[('Apple', 'a'),
 ('Apple', 'm'),
 ('Apple', 'y'),
 ('Butter', 'b'),
 ('Butter', 'o'),
 ('Butter', 'b'),
 ('Cheese', 'c'),
 ('Cheese', 'h'),
 ('Cheese', 'u')]

In [None]:
data4.flatMapValues(lambda name: name.upper()).take(9)

[('Apple', 'A'),
 ('Apple', 'M'),
 ('Apple', 'Y'),
 ('Butter', 'B'),
 ('Butter', 'O'),
 ('Butter', 'B'),
 ('Cheese', 'C'),
 ('Cheese', 'H'),
 ('Cheese', 'U')]

### filter()
Return a new RDD containing only the elements that satisfy a predicate.
- return a new RDD that contains only the elements that pass a **filter operation**

In [None]:
#sc = SparkContext(conf=conf)
data1 = sc.textFile("/content/drive/My Drive/Colab Notebooks/MMDS/file1.txt")
data1.filter(lambda line: re.match(r'^[AEIOU]', line)).take(12) # come back later after we learn regex


['Apple,Amy',
 'Egg,Edward',
 'Oxtail,Oscar',
 'Anchovie,Alex',
 'Avocado,Adam',
 'Apple,Alex',
 'Apple,Adam']

In [None]:
data1.filter(lambda line: re.match(r'.+[y]$', line)).take(6)#later after regex

['Apple,Amy', 'Cheese,Chucky', 'Doughboy,Pilsbury']

### groupByKey()
Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.
- apply an operation to the value of every element of an RDD
- return a new RDD that contains the results after removing the outermost container

Only works with pair RDDs.

In [None]:
data3 = data1.map(lambda line: line.split(',')).map(lambda pair: (pair[0], pair[1]))

In [None]:
data3.take(10)

[('Apple', 'Amy'),
 ('Butter', 'Bob'),
 ('Cheese', 'Chucky'),
 ('Dinkel', 'Dieter'),
 ('Egg', 'Edward'),
 ('Oxtail', 'Oscar'),
 ('Anchovie', 'Alex'),
 ('Avocado', 'Adam'),
 ('Apple', 'Alex'),
 ('Apple', 'Adam')]

In [None]:
data3.groupByKey().take(12)

[('Apple', <pyspark.resultiterable.ResultIterable at 0x7f36a8df5950>),
 ('Butter', <pyspark.resultiterable.ResultIterable at 0x7f36a8dfb2d0>),
 ('Dinkel', <pyspark.resultiterable.ResultIterable at 0x7f36a8d6f790>),
 ('Doughboy', <pyspark.resultiterable.ResultIterable at 0x7f36a8df5b50>),
 ('Cheese', <pyspark.resultiterable.ResultIterable at 0x7f36a9684c90>),
 ('Egg', <pyspark.resultiterable.ResultIterable at 0x7f36a8d7b3d0>),
 ('Oxtail', <pyspark.resultiterable.ResultIterable at 0x7f36a8de5dd0>),
 ('Anchovie', <pyspark.resultiterable.ResultIterable at 0x7f36a8d7b690>),
 ('Avocado', <pyspark.resultiterable.ResultIterable at 0x7f36a8de5250>),
 ('McDonald', <pyspark.resultiterable.ResultIterable at 0x7f36a8de58d0>)]

In [None]:
for pair in data3.groupByKey().take(5):
  print("%s: %s" % (pair[0], ",".join([n for n in pair[1]])))

Apple: Amy,Alex,Adam
Butter: Bob
Dinkel: Dieter,Dieter
Doughboy: Pilsbury
Cheese: Chucky


### reduceByKey()
Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.
- combine elements of an RDD by key and then
- apply a reduce operation to pairs of keys
- until only a single key remains.
- return the result in a new RDD

In [None]:
data = data1.map(lambda line: line.split(","))
data.take(5)

[['Apple', 'Amy'],
 ['Butter', 'Bob'],
 ['Cheese', 'Chucky'],
 ['Dinkel', 'Dieter'],
 ['Egg', 'Edward']]

In [None]:
data = data.map(lambda pair: (pair[0], pair[1]))

In [None]:
data.take(20)

[('Apple', 'Amy'),
 ('Butter', 'Bob'),
 ('Cheese', 'Chucky'),
 ('Dinkel', 'Dieter'),
 ('Egg', 'Edward'),
 ('Oxtail', 'Oscar'),
 ('Anchovie', 'Alex'),
 ('Avocado', 'Adam'),
 ('Apple', 'Alex'),
 ('Apple', 'Adam'),
 ('Dinkel', 'Dieter'),
 ('Doughboy', 'Pilsbury'),
 ('McDonald', 'Ronald')]

In [None]:
data.reduceByKey(lambda v1, v2: v1 + ":" + v2).take(6)

[('Apple', 'Amy:Alex:Adam'),
 ('Butter', 'Bob'),
 ('Dinkel', 'Dieter:Dieter'),
 ('Doughboy', 'Pilsbury'),
 ('Cheese', 'Chucky'),
 ('Egg', 'Edward')]

### sortBy()
Sorts this RDD by the given keyfunc.
- sort an RDD according to a sorting function
- return the results in a new RDD

In [None]:
data3 = data1.map(lambda line: line.split(","))

In [None]:
data4 = data3.map(lambda pair: (pair[0], pair[1]))

In [None]:
data4.collect()

[('Apple', 'Amy'),
 ('Butter', 'Bob'),
 ('Cheese', 'Chucky'),
 ('Dinkel', 'Dieter'),
 ('Egg', 'Edward'),
 ('Oxtail', 'Oscar'),
 ('Anchovie', 'Alex'),
 ('Avocado', 'Adam'),
 ('Apple', 'Alex'),
 ('Apple', 'Adam'),
 ('Dinkel', 'Dieter'),
 ('Doughboy', 'Pilsbury'),
 ('McDonald', 'Ronald')]

In [None]:
data4.sortBy(lambda pair: pair[1]).take(10)

[('Avocado', 'Adam'),
 ('Apple', 'Adam'),
 ('Anchovie', 'Alex'),
 ('Apple', 'Alex'),
 ('Apple', 'Amy'),
 ('Butter', 'Bob'),
 ('Cheese', 'Chucky'),
 ('Dinkel', 'Dieter'),
 ('Dinkel', 'Dieter'),
 ('Egg', 'Edward')]

In [None]:
data4.sortBy(lambda pair: pair[0]).take(10)

[('Anchovie', 'Alex'),
 ('Apple', 'Amy'),
 ('Apple', 'Alex'),
 ('Apple', 'Adam'),
 ('Avocado', 'Adam'),
 ('Butter', 'Bob'),
 ('Cheese', 'Chucky'),
 ('Dinkel', 'Dieter'),
 ('Dinkel', 'Dieter'),
 ('Doughboy', 'Pilsbury')]

### sortByKey()
Sorts this RDD, which is assumed to consist of (key, value) pairs.
- sort an RDD according to the natural ordering of the keys
- return the results in a new RDD

In [None]:
data4.sortByKey().take(6)

[('Anchovie', 'Alex'),
 ('Apple', 'Amy'),
 ('Apple', 'Alex'),
 ('Apple', 'Adam'),
 ('Avocado', 'Adam'),
 ('Butter', 'Bob')]

### subtract()
Return each value in self that is not contained in other.
- return a new RDD that contains all the elements from the original RDD
- that do not appear in a target RDD

In [None]:
data1.subtract(data2).collect()

['Egg,Edward',
 'Doughboy,Pilsbury',
 'Oxtail,Oscar',
 'Apple,Alex',
 'Apple,Amy',
 'Butter,Bob',
 'Anchovie,Alex',
 'Avocado,Adam',
 'Dinkel,Dieter',
 'Dinkel,Dieter',
 'Apple,Adam']

In [None]:
data1.subtract(data2).count()

11

### join()
Return an RDD containing all pairs of elements with matching keys in self and other. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in self and (k, v2) is in other.
- return a new RDD that contains all the elements from the original RDD
- joined (inner join) with elements from the target RDD

In [None]:
data3 = data1.map(lambda line: line.split(',')).map(lambda pair: (pair[0], pair[1]))

In [None]:
data3.collect()

[('Apple', 'Amy'),
 ('Butter', 'Bob'),
 ('Cheese', 'Chucky'),
 ('Dinkel', 'Dieter'),
 ('Egg', 'Edward'),
 ('Oxtail', 'Oscar'),
 ('Anchovie', 'Alex'),
 ('Avocado', 'Adam'),
 ('Apple', 'Alex'),
 ('Apple', 'Adam'),
 ('Dinkel', 'Dieter'),
 ('Doughboy', 'Pilsbury'),
 ('McDonald', 'Ronald')]

In [None]:
data3.count()

13

In [None]:
data4 = data2.map(lambda line: line.split(',')).map(lambda pair: (pair[0], pair[1]))

In [None]:
data4.collect()

[('Wendy', ''),
 ('Doughboy', 'Pillsbury'),
 ('McDonald', 'Ronald'),
 ('Cheese', 'Chucky')]

In [None]:
data4.count()

4

In [None]:
data3.join(data4).collect()

[('Doughboy', ('Pilsbury', 'Pillsbury')),
 ('McDonald', ('Ronald', 'Ronald')),
 ('Cheese', ('Chucky', 'Chucky'))]

In [None]:
data3.join(data4).count()

3

In [None]:
data3.fullOuterJoin(data4).take(5)

[('Dinkel', ('Dieter', None)),
 ('Dinkel', ('Dieter', None)),
 ('Doughboy', ('Pilsbury', 'Pillsbury')),
 ('Egg', ('Edward', None)),
 ('McDonald', ('Ronald', 'Ronald'))]

In [None]:
data3.fullOuterJoin(data4).collect()

[('Dinkel', ('Dieter', None)),
 ('Dinkel', ('Dieter', None)),
 ('Doughboy', ('Pilsbury', 'Pillsbury')),
 ('Egg', ('Edward', None)),
 ('McDonald', ('Ronald', 'Ronald')),
 ('Wendy', (None, '')),
 ('Apple', ('Amy', None)),
 ('Apple', ('Alex', None)),
 ('Apple', ('Adam', None)),
 ('Butter', ('Bob', None)),
 ('Cheese', ('Chucky', 'Chucky')),
 ('Oxtail', ('Oscar', None)),
 ('Anchovie', ('Alex', None)),
 ('Avocado', ('Adam', None))]

## MapReduce demo

We will now count the occurences of each word. The typical "Hello, world!" app for Spark applications is known as word count. The map/reduce model is particularly well suited to applications like counting words in a document.

The `flatMap()` operation first converts each line into an array of words, and then makes
each of the words an element in the new RDD.

In [None]:
# split the lines into individual words
def get_words(line):
  w = line.split(',')
  return w

ws = data1.flatMap(lambda line: get_words(line))
ws.take(7)

['Apple', 'Amy', 'Butter', 'Bob', 'Cheese', 'Chucky', 'Dinkel']

In [None]:
# as above but with regex
words = data1.flatMap(lambda l: re.split(r'[^\w]+', l))
words.take(7)

['Apple', 'Amy', 'Butter', 'Bob', 'Cheese', 'Chucky', 'Dinkel']

The `map()` operation replaces each word with a tuple of that word and the number 1. The
pairs RDD is a pair RDD where the word is the key, and all of the values are the number 1.

In [None]:
# replace each word with a tuple of that word and the number 1
pairs = words.map(lambda w: (w, 1))
pairs.take(3)

[('Apple', 1), ('Amy', 1), ('Butter', 1)]

The `reduceByKey()` operation keeps adding elements' values together until there are no
more to add for each key (word).

In [None]:
# group the elements of the RDD by key (word) and add up their values
counts = pairs.reduceByKey(lambda n1, n2: n1 + n2)
counts.take(3)

[('Apple', 3), ('Amy', 1), ('Butter', 1)]

In [None]:
# sort the elements by values in descending order
counts.sortBy(lambda pair: pair[1], ascending=False).take(10)

[('Apple', 3),
 ('Dinkel', 2),
 ('Alex', 2),
 ('Dieter', 2),
 ('Adam', 2),
 ('Amy', 1),
 ('Butter', 1),
 ('Chucky', 1),
 ('Edward', 1),
 ('Doughboy', 1)]

#### Simplify chained transformations

It is good to know that the code above can also be written in the following way:

In [None]:
sorted_counts = (data1.flatMap(lambda l: re.split(r'[^\w]+', l))       # words
                      .map(lambda w: (w, 1))                           # pairs
                      .reduceByKey(lambda n1, n2: n1 + n2)             # counts
                      .sortBy(lambda pair: pair[1], ascending=False))  # sorted counts

In [None]:
sorted_counts.take(10)

[('Apple', 3),
 ('Dinkel', 2),
 ('Alex', 2),
 ('Dieter', 2),
 ('Adam', 2),
 ('Amy', 1),
 ('Butter', 1),
 ('Chucky', 1),
 ('Edward', 1),
 ('Doughboy', 1)]

In [None]:
# stop Spark context
sc.stop()