## Academic exercise for study

***Summary***

This notebook introduces you to the use of Resilient Distributed Datasets (RDDs) in Spark. RDDs are the foundation of Spark's "low-level" API.
Though we will mostly use the higher-level DataFrame API, RDDs are more convenient to introduce a number of core concepts in Spark and to express MapReduce-like computations.

__References__:
  - [RDD API documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html).

## Install pyspark and customize Colab configuration

We need to install and setup pyspark first because it is not installed by default in the Colab runtime. The pyspark installation will persist until the runtime is recycled.

In [1]:
# Python interface to Spark
!pip install pyspark --quiet
# Installation and update of the PyDrive library, for interacting with Google Drive using Python.
!pip install -U -q PyDrive --quiet
# Install OpenJDK 8
!apt install openjdk-8-jdk-headless &> /dev/null
# Download the ngrok zip file to access the local server over the internet
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip &> /dev/null
# Unzip the ngrok zip file
!unzip ngrok-stable-linux-amd64.zip &> /dev/null
# Starts ngrok, allowing HTTP traffic on port 4050
get_ipython().system_raw('./ngrok http 4050 &')
# Import the Python os module
import os
# Sets the JAVA_HOME environment variable to the location of Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


## Initialize Spark

This is the typical bootstrap to get Spark working on  standalone mode. We obtain:

- `spark`: a `SparkSession` object - this notebook acts as __driver program__ that communicates with Spark through the Spark session (we will talk about Spark's internal architecture in a future class).

- `sc`: a `SparkContext` object that exposes Spark's low-level API.

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().set('spark.ui.port', '4050').setAppName("shakespeare").setMaster("local[2]")
sc = SparkSession.builder.config(conf=conf).getOrCreate()

## Load data


In [3]:
# Download from http to local file
!wget --quiet --show-progress https://ocw.mit.edu/ans7870/6/6.006/s08/lecturenotes/files/t8.shakespeare.txt



## What is an RDD?

RDDs are "immutable, partitioned collection of elements that can be operated on in parallel". This means:

- They are immutable, because you cannot change the contents of RDDs after creating them.

- Their data is split in several partitions.

- The partitions can be processed in parallel.

RDD processing involves [__transformations__](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations) and [__actions__](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions):

- __Transformations__:  create a new RDD from from a data source or a previously existing RDD. A directed acyclic graph (DAG) of transformations (in many cases taking form just as a sequence) that defines an RDD is called the RDD __lineage__.

- Transformations are evaluated __lazily__, meaning that they will execute only when triggered by __actions__ that
return a value to the driver program.

## The word count program

The `wordCount` function definition and usage below is an example of RDD processing:

- The RDD is specified in `wordCount` by a chain of transformations:  `textFile` (defining the data source), `map`, `flatMap`, `reduceByKey`, and `sortBy`.
- After the call to `wordCount`, `collect` is an action that yields back the RDD contents as a list (of key-value pairs).

In [4]:
from pyspark.sql import SparkSession
import string

def wordCount(file):
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    sc = spark.sparkContext
# Define the punctuation filter
    punctFilter = str.maketrans(string.punctuation + '0123456789', ' ' * (len(string.punctuation) + 10))
    '''
# example
import string
punctFilter = str.maketrans(string.punctuation + '0123456789', ' ' * (len(string.punctuation) + 10))
text = "Hello! This is a sentence with some numbers 123 and punctuation!!!"
filtered_text = text.translate(punctFilter)
print(filter_text)
    Hello This is a sentence with some numbers and punctuation
In this example, the punctFilter translation table is used in the translate() method to replace all punctuation characters
and digits for spaces in the text variable.
The result is the string filtered_text, where punctuation characters and digits have been replaced with spaces.
    '''
# Load the text file into an RDD
    rdd = sc.textFile(file)
# Perform word count
    word_counts = rdd.flatMap(lambda line: line.translate(punctFilter).strip().upper().split()) \
                    .map(lambda word: (word, 1)) \
                    .reduceByKey(lambda x, y: x + y) \
                    .sortBy(lambda kv: kv[1], ascending=False)
    return word_counts
'''
rdd.flatMap(lambda line: line.translate(punctFilter).strip().upper().split()):
The RDD rdd is flattened to generate a new RDD, where each line is translated using the punctFilter table to replace punctuation characters and digits with spaces.
Then a strip() is done to remove whitespace at the beginning and end of the line,
upper() to convert all words to uppercase,
and split() to split the line into separate words.
This results in an RDD of individual words.

.map(lambda word: (word, 1)):
A mapping operation is applied to each word in the RDD.
Each word maps to a tuple (word, 1), where word is the word itself and 1 is an initial count value.

.reduceByKey(lambda x, y: x + y): The tuples are reduced by key, adding the corresponding values.
This effectively performs word counting by aggregating the occurrence count of each word

.sortBy(lambda kv: kv[1], ascending=False):
The RDD is sorted based on the count value (second element of the tuple) in descending order.
This results in an RDD ordered by word frequency, highest to lowest.
'''
# Path to the Shakespeare text file
data_file = '/content/t8.shakespeare.txt'
# Perform word count
words_rdd = wordCount(data_file)
# Collect and print the results
data = words_rdd.collect()
# Display the first 20 elements in the list
print('First 20 elements in the list with %d elements:' % len(data))
print(data[:20])

First 20 elements in the list with 23683 elements:
[('THE', 27660), ('AND', 26784), ('I', 22538), ('TO', 19819), ('OF', 18191), ('A', 14746), ('YOU', 13860), ('MY', 12489), ('THAT', 11549), ('IN', 11123), ('IS', 9784), ('D', 8960), ('NOT', 8740), ('FOR', 8341), ('WITH', 8016), ('ME', 7777), ('IT', 7737), ('S', 7723), ('BE', 7130), ('YOUR', 6885)]


In [6]:
# view 20 elements with indexing
data[:20]

[('THE', 27660),
 ('AND', 26784),
 ('I', 22538),
 ('TO', 19819),
 ('OF', 18191),
 ('A', 14746),
 ('YOU', 13860),
 ('MY', 12489),
 ('THAT', 11549),
 ('IN', 11123),
 ('IS', 9784),
 ('D', 8960),
 ('NOT', 8740),
 ('FOR', 8341),
 ('WITH', 8016),
 ('ME', 7777),
 ('IT', 7737),
 ('S', 7723),
 ('BE', 7130),
 ('YOUR', 6885)]

## Derive relative word frequencies

We now use `words_rdd` and `total_words` to define an RDD to hold relative word frequencies rather than absolute frequences. That is: for  `(word,n)` in `words_rdd` we obtain `(word, n / total_words)` in the new RDD.

In [10]:
# Reduce is another action (not a transformation like reduceByKey!)
'''
words_rdd.values(): The values() function is called on the words_rdd RDD to get just the values ​​(word counts) from the RDD.
This returns a new RDD containing just the word counts, without the actual words.
.reduce(lambda x, y: x + y): The reduce() function is applied to the word count RDD to reduce it to a single value.
The lambda lambda x, y: x + y specifies the reduction operation, which in this case is the simple sum of counts.
The reduce() function repeatedly combines pairs of values ​​until only one value remains, which is the sum total of the word counts.
At the end, the total_words variable will contain the total sum of all word counts present in the RDD words_rdd
'''
# We can use it obtain the total number of words
total_words = words_rdd.values().reduce(lambda x,y: x+y)
print('Total words [1]: %d' % total_words)
# We can just use the sum utility action in this case
total_words = words_rdd.values().sum()
print('Total words [2]: %d' % total_words)

Total words [1]: 928012
Total words [2]: 928012


##  Compare relative frequencies with Zipf's law

Now derive an RDD with tuples of the form `(word,(freq,zipfEstimate))`.

You can start by mapping `(word,freq)`
to `((word,freq), index)` using the `zipWithIndex()` transformation as shown below.

Then we use the index to map `((word,freq),index)` onto `(word,(freq,zipf_law(index))`.


In [11]:
freq_rdd = words_rdd.map(lambda kv: (kv[0], kv[1] / total_words))
freq_rdd.take(10)

[('THE', 0.029805649064882783),
 ('AND', 0.028861695753934217),
 ('I', 0.024286323883742883),
 ('TO', 0.021356404874074905),
 ('OF', 0.019602117214001544),
 ('A', 0.015889880734300848),
 ('YOU', 0.01493515170062456),
 ('MY', 0.01345780011465369),
 ('THAT', 0.012444882178247695),
 ('IN', 0.01198583639004668)]

In [12]:
# Simpler alternative
freq_rdd = words_rdd.mapValues(lambda count: count / total_words)
freq_rdd.take(10)

[('THE', 0.029805649064882783),
 ('AND', 0.028861695753934217),
 ('I', 0.024286323883742883),
 ('TO', 0.021356404874074905),
 ('OF', 0.019602117214001544),
 ('A', 0.015889880734300848),
 ('YOU', 0.01493515170062456),
 ('MY', 0.01345780011465369),
 ('THAT', 0.012444882178247695),
 ('IN', 0.01198583639004668)]

In [14]:
# (word,freq) -> ((word,freq),index)
'''
the resulting RDD rdd_with_index will contain elements composed of a tuple containing the original RDD element freq_rdd and the index corresponding to that element.
This new RDD can be useful in scenarios where it is necessary to index or identify RDD elements sequentially.
'''
rdd_with_index = freq_rdd.zipWithIndex()
rdd_with_index.take(10)

[(('THE', 0.029805649064882783), 0),
 (('AND', 0.028861695753934217), 1),
 (('I', 0.024286323883742883), 2),
 (('TO', 0.021356404874074905), 3),
 (('OF', 0.019602117214001544), 4),
 (('A', 0.015889880734300848), 5),
 (('YOU', 0.01493515170062456), 6),
 (('MY', 0.01345780011465369), 7),
 (('THAT', 0.012444882178247695), 8),
 (('IN', 0.01198583639004668), 9)]

In [15]:
# Zipf's law - simpler approximation
def zipf_law(i):
  return 0.1 / (i+1)
# TODO ((word,freq),index) -> (word, (freq,zipf_law(index)))
rdd_cmp_zipf = rdd_with_index.\
               map(lambda x: (x[0][0], (x[0][1], zipf_law(x[1]))))
'''
A new RDD called rdd_cmp_zipf is created from the rdd_with_index RDD.
The zipf_law(i) function is applied to each element of the RDD rdd_with_index to calculate the expected frequency according to Zipf's Law.
Here is an explanation of what each piece of code does:
zipf_law(i): It is a defined function that takes a parameter i representing the index and calculates the expected frequency according to Zipf's Law.
In this case, the formula used is 0.1 / (i + 1). This function returns the expected frequency for a given index.
rdd_with_index.map(lambda x: (x[0][0], (x[0][1], zipf_law(x[1])))): The map() function is applied in the rdd_with_index RDD to map each element .
The lambda lambda x: (x[0][0], (x[0][1], zipf_law(x[1]))) is used to transform each element of the RDD into a tuple,
where the first element is the original word (x[0][0]), the second element is a tuple containing the original count and the
expected frequency according to Zipf's Law ((x[0][1], zipf_law(x[1]))).
In this way, the resulting RDD rdd_cmp_zipf will have elements composed of a tuple containing the original word,
the original count and expected frequency according to Zipf's Law for the index corresponding to that word.
This RDD can be used to compare the actual count with the expected frequency according to the Zipf distribution.

'''

rdd_cmp_zipf.take(10)

[('THE', (0.029805649064882783, 0.1)),
 ('AND', (0.028861695753934217, 0.05)),
 ('I', (0.024286323883742883, 0.03333333333333333)),
 ('TO', (0.021356404874074905, 0.025)),
 ('OF', (0.019602117214001544, 0.02)),
 ('A', (0.015889880734300848, 0.016666666666666666)),
 ('YOU', (0.01493515170062456, 0.014285714285714287)),
 ('MY', (0.01345780011465369, 0.0125)),
 ('THAT', (0.012444882178247695, 0.011111111111111112)),
 ('IN', (0.01198583639004668, 0.01))]

In [16]:
# Suppose we want the maximum absolute error between actual frequencies and the Zipf's estimate
# (it's easy to guess that it will be the difference for the word "THE")
max_error = rdd_cmp_zipf\
          .values()\
          .map(lambda x: abs(x[0]-x[1]))\
          .max()
max_error

0.07019435093511722