In [1]:
import os
import shutil

# PySpark API and Data Structures

Source: https://realpython.com/pyspark-intro/#next-steps-for-real-big-data-processing

The entry-point of any PySpark program is a `pyspark.SparkContext` (or simply `SparkContext`) module. It allows us to connect to a Spark cluster and create a specialized data structures called **Resilient Distributed Datasets (RDDs)**.

In [2]:
import pyspark
sc = pyspark.SparkContext(
    master='local[*]',
    appName='muict-ds-pyspark')

ModuleNotFoundError: No module named 'pyspark'

where
* `master`: The URL of the cluster it connects to.
* `appName`: Name of your job.

In this example, we use `local[*]` for `master` which is a special string denoting that we want to use a *local* cluster or a *single-machine mode*. The `*` tells Spark to create as many worker threads as logical cores on your machine. Also we can, for instance, use `local` or `local[2]` to only use single or two cores on the machine, respectively.

**Note**: Setting up a Spark cluster is outside the scope of this course. Typically, we need a data engineering team to handle such setup and installation, or we pay for the cluster service from cloud computing providers (e.g., Amazon AWS, Google Cloud, Microsoft Azure, etc.). You can read more information about the cluster setup from this link (https://phoenixnap.com/kb/install-spark-on-ubuntu , https://stackoverflow.com/questions/35835649/reading-a-file-in-hdfs-from-pyspark).

Let's have a look at the `SparkContext`.

In [None]:
sc

Note: the `SparkContext` should be created ONLY ONCE per session.

# Create a RDDs

There are two ways that we can create a RDD.

## Parallelized Collections
The **first approach** is to take an existing collection in our program and pass it to `SparkContext`'s `parallelize` method. All elements in the collection will then be copied to form a *distributed dataset* that can be operated on *in parallel*. This approach is good when you would like to experiment with some commands.

In [None]:
numbers = list(range(1,6))
numbers_rdd = sc.parallelize(numbers)
print(numbers_rdd)

However it is **NOT** typically used in practice because this approach assumes that the size of the entire dataset can be loaded into a single computer memory, which is NOT the case when we are dealing with big data.

## External Datasets

The **second approach**, which is a common way, is to create RDDs from any external storage supported by Hadoop, including local file system, HDFS, Cassandra, HBase, Amazon S3, etc.

Text file RDDs can be created using `textFile` method on `SparkContext`. The `textFile` method reads the file into a RDD with each line in the file being an element in the RDD collection. The exeternal storage can be the local file system.

Let's see how to import a text file, named `big-data-wikipedia.txt` from our local disk into the RDDs.

In [None]:
txt = sc.textFile('file:////home/jovyan/pyspark-data/big-data-wikipedia.txt')
print(txt)

More realistically, the external storage is usually a distributed file system such as *Amazon S3* (i.e., s3a://) or *HDFS* (i.e., hdfs://). There are other data sources which can be integrated with Spark and used to create RDDs including JDBC, Cassandra, Elasticsearch, etc.

In this course, we will only focus on the *local disk* for the external storage.

# What can we do with RDDs?

**Resilient Distributed Datasets (RDDs)** hide all the complexity of transforming and distributing your data, and parallelizing the operations you perform on them *automatically across multiple nodes* by a scheduler if you’re running on a cluster.

In Spark, all works are expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result.

RDDs support two types of operations:
1. **Transformations**: create a new dataset from an existing one, and
2. **Actions**: return a value to the driver program after running a computation on the dataset

The following sections are examples of the transformations and actions. The full list of available operations can be found here (https://spark.apache.org/docs/latest/api/python/reference/pyspark.html).

Source:
* https://spark.apache.org/docs/latest/rdd-programming-guide.html
* https://www.youtube.com/playlist?list=PLot-YkcC7wZ_2sxmRTZr2c121rjcaleqv

# Transformation

Transformations are operations on RDDs which will return a new RDD.

## filter() transformation

The `filter` function returns a new RDD with a subset of the data in the original RDD. We define the selection criteria using a *lambda* function where it only selects the items that the lambda function return `True`.

In [None]:
# Count the number of lines that contains a `big` word.
big_lines = txt.filter(lambda line: 'big' in line.lower())
print(big_lines)
print(big_lines.count())

## map() transformation

The `map` function applies a function to each item in the input RDD and return the new value of each element. The *return type* of the map function is not necessary the same as its *input type*.

In [None]:
# Count the number of characters in each line
lens = txt.map(lambda line: len(line))
print(lens)

In [None]:
# Select the first and the last words, split by ' ', from each line
def first_last(line):
    words = line.split(' ')
    return f'{words[0]} {words[-1]}'

first_last_words = txt.map(first_last)
print(first_last_words)

Next, we save the output RDD as text files to a directory via the `saveAsTextFile` function. Note that we will discuss more about the `saveAsTextFile` later in this notebook.

In [None]:
# Note: if the directory alreay exists, it will show an error
if os.path.isdir('out.lengths'):
    shutil.rmtree('out.lengths')
lens.saveAsTextFile('out.lengths')
if os.path.isdir('out.first_last'):
    shutil.rmtree('out.first_last')
first_last_words.saveAsTextFile('out.first_last')

When you open the `out.lengths` or the `out.first_last` directory, the output(s) are stored in the `part-00001`, `part-00002`, etc. depending on the number of cores we specified in SparkContext.

## flatMap() transformation

The `flatMap` function is similar to the `map` function, except that the results are *flatten* before being returned.

![flatMap vs map](flatMap_vs_map.png)

Source: https://www.youtube.com/watch?v=xhBnHBvOHwc&list=PLot-YkcC7wZ_2sxmRTZr2c121rjcaleqv&index=9

Here is an example when we apply the `split` to each line using `map` and `flatMap`.

In [None]:
map_out = txt.map(lambda line: line.split(' '))
flatmap_out = txt.flatMap(lambda line: line.split(' '))

We can use the `first` function to get the first element of the RDD.

In [None]:
print(txt.first())

In [None]:
print(map_out.first())

In [None]:
print(flatmap_out.first())

It can be seen that the `map` function returns a list of words of the first line, while the `flatMap` function returns the first word of the first line. This is because after the list of the lists of words are flatten it becomes a single list containing a sequence of all words.

## sample() transformation

The `sample` operation creates a random sample from an RDD. This is useful for testing purpose.

In [None]:
sample_txt = txt.sample(withReplacement=False, fraction=0.2)

## distinct() operation

The distinct operation return a new RDD containing the distinct elements of this RDD.

Note: This transformation is **expensive** because it requires shuffling all the data across partitions to ensure that we receive only one copy of each element

Try to avoid this oepration if it is not necessary.

In [None]:
print(sc.parallelize([1,1,2,2,2,2,3,4,1]).distinct().collect())

The `collect` function is used to retrieve all the elements of the dataset (from all nodes) to the driver node. 

## Set operation

There are several *Set* operations which are performed on two RDDs and produce one resulting RDD.
* union
* intersection
* subtract
* catesian product

Here is an example of the `intersection` operation. This operation removes all duplicates including the duplicates from single RDD before returning the results. This is also an expensive operation since it requires shuffling all the data across partitions to identify common elements.

In [None]:
rdd1 = sc.parallelize([1,1,2,2,2,2,3,4,1])
rdd2 = sc.parallelize([2,2,3,4,5,6,6,7])
intersect_rdd = rdd1.intersection(rdd2)
print(intersect_rdd.collect())

# Action

Actions are the operations which will return a result to the driver node or write the result to an external storage system, and kick off a computation.

## collect() action

The `collect` function is used to retrieve the entire RDD (from all nodes) and returns it to the driver node in the form of a regular collection or value.

**Important**: We should use the `collect()` on smaller dataset usually after `filter()`, `group()`, `count()` etc. Retrieving larger dataset results in out of memory.

It is widely used in unit tests, to compare the value of our RDD with our expected result, as long as the entire contents of the resulting RDD can fit in memory of the driver node.

In [None]:
for l in sample_txt.collect():
    print(l)

## count() action

The `count` function returns the number of elements (or rows) in an RDD.

In [None]:
print(txt.count())

## countByValue() action

The `countByValue` function determines unique item values and returns a map of each unique value to its count. This function is useful when the RDD contains duplicate rows, and you want to count how many of each unique row values you have.

Here is an example of how to count the number of wards in the RDDs.

In [None]:
# Split each line by a space ' ' and flatten the resulting RDD
words = txt.flatMap(lambda line: line.split(' '))

# Count the number of each word
# The function `countByValue` returns a defaultdict
wordCounts = words.countByValue()

for word, count in wordCounts.items():
    print(f'{word}: {count}')

## take() action

The `take` function takes `n` elements from an RDD. This is useful if you would like to take a peek at the RDD for unit tests and quick debugging.

**Caution**: The `take` function will try to reduce the number of partitions it accesses. Thus, it is possible that we will get a biased collection, and it does not necessary return the elements in the order we might expect.

In [None]:
words.take(6)

In [None]:
for w in words.take(6):
    print(w)

## saveAsTextFile() action

The `saveAsTextFile` function can be used to write data out to a distributed storage system such as HDFS or Amazom S3, or even local file system.

The following is the code similar to the previous example above that saves two RDD to the local disk.

In [None]:
# Note: if the directory alreay exists, it will show an error
if os.path.isdir('out.lengths'):
    shutil.rmtree('out.lengths')
lens.saveAsTextFile('out.lengths')
if os.path.isdir('out.first_last'):
    shutil.rmtree('out.first_last')
first_last_words.saveAsTextFile('out.first_last')

## reduce() action

The `reduce` function takes a function that operates on *two* elements of the type in the input RDD and returns *a new element* of the *same type*. This function produces the same results when repetitively applied on the same set of RDD data, and reduces to a single value.

With `reduce` operation, we can perform different types of aggregations.

In [None]:
numbers = sc.parallelize([2,5,6,7,1])
product = numbers.reduce(lambda x, y: x*y)
# 2,    5,    6,    7,    1
# 2*5=10 ,    6*7=42  ,    1
# 10     ,    42*1=42
# 10*42=420
print(f'prod={product}')

summation = numbers.reduce(lambda x, y: x+y)
print(f'sum={summation}')

## reduceByKey() action

The `reduceByKey` function is similar to the `reduce` action, but it first merges the values for each key before using an associative and commutative reduce function.

In [None]:
words = txt.flatMap(lambda line: line.split(' '))

# Convert to (key, value) pairs
kv_words = words.map(lambda word: (word, 1))

# Merge the values for each key using a lambda function.
wordCounts = kv_words.reduceByKey(lambda x, y: x+y)
for word, count in sorted(wordCounts.collect()):
    print(f'{word}: {count}')

# Transformations vs Actions

All transformations in Spark are *lazy*, in that they DO NOT compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. *This design enables Spark to run more efficiently*. 

For example, we can realize that a dataset created through `map` will be used in a `reduce` and return only the result of the `reduce` to the driver, rather than the larger mapped dataset.

<!-- Transformations on RDDs are **lazily evaluated**, meaning that Spark will not begin to execute until it sees an action.

Rather than thinking of an RDD as containing a specific data, think of each RDD as consisting of **sequence of instructions** on how to compute the data that we build up through transformations.

Spark uses laze evaluation to **reduce the number of passes** it has to take over our data by grouping operations together. -->

# Caching and persistence

Sometimes we would like to call actions on the same RDD multiple times.

If we do this naively, RDDs and all of its dependencies are **recomputed** each time an action is called on the RDD. This can be very expensive, especially for some iterative algorithms, which would call actions on the same dataset many times.

If we want to reuse an RDD in multiple actions, we can ask Spark to persist by calling the `persist()` function on the RDD. When we persist an RDD, the first time it is computed in an action, it will be kept in memory across the nodes.

This allows the future action to be much faster. Caching is a critical mechanism for an iterative algorithm or a fast interactive use.

In [None]:
numbers = sc.parallelize([2,5,6,7,1])
numbers.persist(pyspark.StorageLevel.MEMORY_ONLY)  # =numbers.cache()

# At this point, the parallelized transformation will executed
# to distribute the RDD from the driver program to all worker threads
# and call `reduce` on this partition.
# Since this RDD is persisted, so it will be kept in memory across
# the worker threads.
product = numbers.reduce(lambda x, y: x*y)
print(f'prod={product}')

# At this point, Spark will NOT parallelize the transformations again.
# Instead, it can immediately count the number of elements.
count = numbers.count()
print(f'count={count}')

Note that `RDD.cache() = RDD.persist(MEMORY_ONLY)`.

Spark's sotrage levels are meant to provide different trade-offs between memory usage and CPU efficiency.
* If RDD can fit in memory, use `MEMORY_ONLY`. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible
* If not, try to use `MEMORY_ONLY_SER` for more space-efficient, but more CPU intensive to read
* DO NOT save to disk (e.g., `MEMORY_AND_DISK`, `MEMORY_AND_DISK_SER`, `DISK_ONLY`) unless the functions that computed your datasets are expensive, or they filter a significant amount of the data. Otherwise, recomputing the partitions may be as fast as we read it from the disk.

Spark will *evict old partitions automatically* using a *Least Recently Used* cache policy.

**Caution**: caching unnecessary data can cause Spark to evict useful data and lead to longer re-computation time.