<a href="https://colab.research.google.com/github/forestlinlinlinlinlin/Bollinger-Bands-exercise/blob/main/T2_rdd_operator_lab_v2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Tutorial 2: Programming with RDDs 
In this tutorial, you will learn how to create a RDD. RDD represents **Resilient Distributed Dataset**. An RDD in Spark is simply an immutable distributed collection of objects sets. Each RDD is split into multiple partitions (similar pattern with smaller sets), which may be computed on different nodes of the cluster.

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 54.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805911 sha256=4851ed2d0855c7108ccca26e359d47490c5a74d278b4684b0a77c031809d5532
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


## Create SparkContext and SparkSession
Remeber to create a session, which is an entry point to Spark

In [3]:
# create entry points to spark
from pyspark.sql import SparkSession

ss  = SparkSession.builder \
                            .master("local[1]")\
                            .appName("SparkByExamples.com")\
                            .getOrCreate()
spark = ss.sparkContext

## 2.1 Create RDD
The class `pyspark.SparkContext` creates a client which connects to a Spark cluster. This client can be used to create an RDD object. There are two methods from this class for directly creating RDD objects:
* `parallelize()`
* `textFile()`

## 2.1.1 `Parallelize()`

`parallelize()` distribute a local **python collection** to form an RDD. Common built-in python collections include `dist`, `list`, `tuple` or `set`.

Examples:

**Important**: Use `collect()` to get back the value in RDD

In [None]:
# from a list
rdd_data = spark.parallelize([1,2,3])
rdd_data

In [None]:
rdd_data.collect()

In [None]:
# from a tuple
rdd_data = spark.parallelize(('cat', 'dog', 'fish'))
rdd_data.collect()

In [None]:
# from a list of tuple
list_t = [('cat', 'dog', 'fish'), ('orange', 'apple')]
rdd_data = spark.parallelize(list_t)
rdd_data.collect()

In [None]:
# from a set
s = {'cat', 'dog', 'fish', 'cat', 'dog', 'dog'}
rdd_data = spark.parallelize(s)
rdd_data.collect()

## 2.1.2 `textfile()`

`textfile()` convert a txt file to RDD object.

Examples:

In [5]:
# these line are used to load the Google drive into the notebook
from google.colab import drive
drive.mount('/content/drive/')
data_path = "/content/drive/MyDrive/Colab Notebooks/"  # this is your drive


Mounted at /content/drive/


In [7]:
textFile = spark.textFile(data_path + "wordCount.txt") # use this if you are using Colab
# textFile = spark.textFile("wordCount.txt")
textFile.collect()

['I am Billy studying in Computer', 'Ada is studying in Computer']

## 2.2 RDD transformation (Single RDD)
RDD transformation is a list of **operations** that apply on the source RDD and construct a new RDD 



## 2.2.1 `Map()`

`map()` returns a new RDD formed by passing each element of the source RDD throguh a function 

Examples:

In [None]:
# define a function
def addOne(x):
    return x+1

In [None]:
rdd_data = spark.parallelize([1,2,3])
results = rdd_data.map(addOne)
results.collect()

### 2.2.1.1 : map() example 1

Use ``map()`` to perform **map** transformation, add one to each element in RDD, and use ``collect()`` to get the results

Instead of function, you can use a **closure** instead

In [None]:
results1 = rdd_data.map(lambda x: x+1)
results1.collect()

### 2.2.1.2 : map() example 2

``map()`` can also apply on **String** datatype

In [None]:
list_t1 = ['cat', 'dog', 'fish', 'orange', 'apple']
rdd_data1 = spark.parallelize(list_t1)

results2 = rdd_data1.map(lambda x: 'object: '+ x)
results2.collect()

### 2.2.1.3: map() example 3

``map()`` can also apply to create **(String, Int)** Pair

In [None]:
list_t1 = ['cat', 'cat', 'dog', 'fish', 'orange', 'apple']
rdd_data1 = spark.parallelize(list_t1)

results2 = rdd_data1.map(lambda x: (x, 1))
results2.collect()

## 2.2.2 ``Filter()``
``Filter()`` returns a new dataset formed by selecting those elements of the source on which function return **true**. 

### 2.2.2.1 : filter() example 1

In [None]:
rdd_data2 = spark.parallelize([3,1,2,5,5])
result3 = rdd_data2.filter(lambda x: x > 2)
result3.collect()

### 2.2.2.2 : filter() example 2

Let's try filter with **multiple** condition 

In [None]:
rdd_data2 = spark.parallelize([3,1,2,6,5])
result3 = rdd_data2.filter(lambda x: (x > 2) & (x % 2 ==0)) # larger than 2 and divisible by 2
result3.collect()

## 2.2.3 ``Distinct()``
``Distinct()`` is used to filter out repetitive items

### 2.2.3.1: distinct() example 1

In [None]:
rdd_data2 = spark.parallelize([3,1,2,5,5,5])
result4 = rdd_data2.distinct()
result4.collect()

### 2.2.3.2 : distinct() example 2
Similar to example 2.4.2, just this time we use **string** instead of **integer** 

In [None]:
rdd_data2 = spark.parallelize(['cat', 'cat', 'dog', 'fish', 'orange', 'apple'])
result5 = rdd_data2.distinct()
result5.collect()

## 2.2.4 ``RandomSplit()``
``randomSplit()`` can be used to divided an RDD into multiple sub segments, it can randomly splites this RDD with 
the provided weights. 

### 2.2.4.1: randomSplit() example 1
Split the RDD into 60/40

In [None]:
rdd_data3 = spark.parallelize(range(10), 1) # create a list of 1 to 10
print('full data: %s'% rdd_data3.collect()) 

res = rdd_data3.randomSplit([0.6, 0.4])  # here, you will have two segment return

## obtain the first segment
print('first half: %s'% res[0].collect())

## obtain the second segment
print('second half: %s'% res[1].collect())

### 2.2.4.2 (Important): randomSplit() example 2 (RandomSplit in a non-random way)
Often, you need to split the dataset in a way that you can **reproduce** the result. 
Then you will pre-define a seed so that each time the splitting pattern is identical.
You can re-run example 2.5.2 to see if spliting results in each trial is the same.

In [None]:
rdd_data3 = spark.parallelize(range(10), 1)
res = rdd_data3.randomSplit([0.6, 0.4], seed=1234)  # here, give a fix seed to ensure every time splitting is same 

## obtain the first segment
print('first half: %s'% res[0].collect())

## obtain the second segment
print('second half: %s'% res[1].collect())

## 2.2.5 ``GroupBy()``
Return an RDD of grouped items based on a pre-defined **condition**

### 2.2.5.1 : groupBy() example 1
Grouped by even number

In [None]:
rdd_data4 = spark.parallelize([1, 1, 2, 3, 5, 8])
result = rdd_data4.groupBy(lambda x: x % 2)  # groupby number divisible by 2

# groupBy will return a key-value pair
res = result.collect()

# to view it, we need to convert the value to list
print([(x, list(y)) for (x, y) in res])

## 2.2.6 ``flatMap()``
Sometimes we want to produce multiple output elements for each input elements. The operation to do this is called
``flatMap()``.

### 2.2.6.1: flatMap() example 1
For example, we can apply a ``split()`` function on each phrase in the RDD to split them by space

In [9]:
rdd_data7 = spark.parallelize(['hello world', 'hi', 'dog', 'fish', 'orange', 'apple'])
result7 = rdd_data7.flatMap(lambda x: x.split(' ')) # split each phrase by space
result7.collect()

['hello', 'world', 'hi', 'dog', 'fish', 'orange', 'apple']

### 2.2.6.2: ``flatMap()`` vs ``map()``
``map()`` will not **flatten** the results. Hence, 3 input will have 3 output. 
In contrast, ``flatMap()`` flatten the output, so 3 input will have 1 output.

In [10]:
rdd_data7 = spark.parallelize(['hello world', 'hi', 'dog', 'fish', 'orange', 'apple'])
result7 = rdd_data7.flatMap(lambda x: x.split(' ')) # split each phrase by space
print("Flatmap(): %s" % result7.collect())

result8 = rdd_data7.map(lambda x: x.split(' '))
print("map(): %s" % result8.collect())

Flatmap(): ['hello', 'world', 'hi', 'dog', 'fish', 'orange', 'apple']
map(): [['hello', 'world'], ['hi'], ['dog'], ['fish'], ['orange'], ['apple']]


## 2.3 RDD transformation (Multiple RDD)
Here, we introduce transformation operators that apply on multiple RDD (e.g., union, intersection)

## 2.3.1 ``Union()``
Return the **combination** of all input RDD

### 2.3.1.1 ``union()`` example 1

In [None]:
rddData1 = spark.parallelize([1, 1, 2, 3])
rddData2 = spark.parallelize([4, 5, 6, 7])
rddData3 = spark.parallelize([21, 22, 23, 25])
rddData1.union(rddData2).union(rddData3).collect()

## 2.3.2 ``Intersection()``
Return the **overlap** of all input RDD. For example, {1,2,3} interaction {3,4,5} = {3}

### 2.3.2.1 intersection example 1

In [None]:
rddData1 = spark.parallelize([1, 1, 2, 3])
rddData2 = spark.parallelize([3, 4, 5, 7])
rddData1.intersection(rddData2).collect()

## 2.3.3 ``Subtract()``
Return each value in self that is not contained in other. For example {1,2,3} subtract {3,4,5} = {1,2}

### 2.3.3.1 subtract example 1

In [None]:
rddData1 = spark.parallelize([1, 1, 2, 3])
rddData2 = spark.parallelize([3, 4, 5, 7])
sorted(rddData1.subtract(rddData2).collect())

## 2.3.4 ``Cartesian()`` 
Return the Cartesian product of this RDD and another one. 
For example, {1,2,3} cartesian {3,4,5} = {{1,3}, {1,4}...{3,5}}

### 2.3.4.1 cartesian() example 1

In [None]:
rddData1 = spark.parallelize([1, 1, 2, 3])
rddData2 = spark.parallelize([3, 4, 5, 7])
rddData1.cartesian(rddData2).collect()

## 2.4 RDD Actions
Actions operators will create non-rdd values. For example ``collect()`` is an action opeator.

## 2.4.1 ``first()`` 
Return the first element in the RDD

### 2.4.1.1: first() example 1

In [None]:
rddData1 = spark.parallelize([21, 1, 2, 3])
rddData1.first()

## 2.4.2 ``take()``
Get the first N elements in the RDD 

### 2.4.2.1: take() example 1

In [None]:
rddData1 = spark.parallelize([21, 1, 2, 3])
rddData1.take(2)

## 2.4.3 ``takeOrder()``
Get the N elements from an RDD ordered in ascending order or as specified by the optional key function

### 2.4.3.1: takeOrder() example 1
You can take the first N item in ascending order

In [None]:
spark.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)

### 2.4.3.2: takeOrder() example 2
You can take the first N item in **decending** order

In [None]:
spark.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6, lambda x: -1*x)

## 2.5 Basic Statistics in RDD
Here, we will learn few RDD operators for getting the statistics in RDD

## 2.5.1 ``min()``
Get the minimum value in the RDD

### 2.5.1.1: min() example 1

In [None]:
spark.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).min()

## 2.5.2 ``max()``
Get the maximum value in the RDD

### 2.5.2.1: max() example 1

In [None]:
spark.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).max()

## 2.5.3 ``stdev()``
Get the standard deviation in the RDD

### 2.5.3.1: stdev() example 1

In [None]:
spark.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).stdev()

## 2.5.4 ``count()``
Find the number of elements in this RDD.

### 2.5.4.1: count() example 1

In [None]:
spark.parallelize([2, 3, 4]).count()

## 2.5.5 ``sum()``
Find the sum of elements in this RDD.

### 2.5.5.1: sum() example 1

In [None]:
spark.parallelize([1, 2, 3, 4]).sum()

## 2.5.6 ``mean()``
Find the average of elements in this RDD.

### 2.5.6.1: mean() example 1

In [None]:
spark.parallelize([1, 2, 3, 4]).mean()

## 2.6 Single *key-value* pair transformations
Key-value pair RDD is a specific type of RDD, in which the data will be stored in *key-value* format.
The *key* serves as a unique ID for retrieving the values in the RDD. 

### 2.6.1 Get elements

#### 2.6.1.1: get all key

In [16]:
kvRDD = spark.parallelize([(3,4), (3,6), (5,6), (1,2)])
kvRDD.keys().collect()

[3, 3, 5, 1]

#### 2.6.1.2: get all values

In [17]:
kvRDD = spark.parallelize([(3,4), (3,6), (5,6), (1,2)])
kvRDD.values().collect()

[4, 6, 6, 2]

### 2.6.2 Filter elements (``filter()``)

#### 2.6.2.1: filter by keys
filter all key value pair with key less than 5

In [None]:
kvRDD = spark.parallelize([(3,4), (3,6), (5,6), (1,2)])
kvRDD.filter(lambda kv: kv[0] < 5).collect()  #kv[0] = key

#### 2.6.2.2: filter by values
filter all key value pair with value less than 5

In [None]:
kvRDD = spark.parallelize([(3,4), (3,6), (5,6), (1,2)])
kvRDD.filter(lambda kv: kv[1] < 5).collect()  #kv[1] = value

### 2.6.3 Sort elements (``sortBy()``)

#### 2.6.3.1: Sort by keys

In [None]:
kvRDD = spark.parallelize([(3,4), (3,1), (5,6), (1,2)])
kvRDD.sortBy(lambda kv: kv[0], ascending=True).collect()

#### 2.6.3.1: Sort by values

In [None]:
kvRDD = spark.parallelize([(3,4), (3,1), (5,6), (1,2)])
kvRDD.sortBy(lambda kv: kv[1], ascending=True).collect()

### 2.6.4 Map elements (``map()``)

#### 2.6.4.1: apply map on each value
Key-value map is similar to our previous case used in example 2.2.1. 
Just this time the ``map()`` is apply on key-value only.
Here, we try replacing each key with its value

In [None]:
kvRDD = spark.parallelize([(3,4), (3,1), (5,6), (1,2)])
kvRDD.map(lambda kv: (kv[1], kv[1])).collect()

#### 2.6.4.2: map(key value swap)
This time, we swap the key and value

In [None]:
kvRDD = spark.parallelize([(3,4), (3,1), (5,6), (1,2)])
kvRDD.map(lambda kv: (kv[1], kv[0])).collect()

#### 2.6.4.3 ``mapValue()``
We can use ``mapValues()`` to apply function on only value

In [None]:
kvRDD = spark.parallelize([(3,4), (3,1), (5,6), (1,2)])
kvRDD.mapValues(lambda kv: kv**2).collect()

### 2.6.5 reduceByKey()
``reduceByKey()`` means grouping the keys together, and apply a function on its values.
For example, we can group all data with key=3, then add their values

#### 2.6.5.1 group and add values

In [14]:
from operator import add
kvRDD = spark.parallelize([(3,4), (3,1), (5,6), (1,2)])
kvRDD.reduceByKey(add).collect()

[(3, 5), (5, 6), (1, 2)]

#### 2.6.5.2 group and add values (another way)

In [15]:
kvRDD = spark.parallelize([(3,4), (3,1), (5,6), (1,2)])
kvRDD.reduceByKey(lambda x, y: x+y).collect()  # here, x=current values, y=next values

[(3, 5), (5, 6), (1, 2)]

## 2.7 Multiple *key-value* pair transformations
Here, we test operators for **joining** multiple key-pair RDDs

### 2.7.1 ``join()``
We can use ``join()`` to combine multiple key-value pair RDD

#### 2.7.1.1 example 
We will **inner join** two RDDs. Here, two elements will join only if they shared same keys.
For example {(1,2), (3,4), (3,6)} join {(3,7)} = {(3,(4,7)), (3,(6,7))}

In [None]:
kvRDD1 = spark.parallelize([(1,2), (3,4), (3,6)])
kvRDD2 = spark.parallelize([(3,7)])
kvRDD1.join(kvRDD2).collect()

### 2.7.2 ``leftOuterJoin()``
For ``leftOuterJoin()``, key must exist in the **left** RDD

In [None]:
kvRDD1 = spark.parallelize([(1,2), (3,4), (3,6)])
kvRDD2 = spark.parallelize([(3,7)])
result = kvRDD1.leftOuterJoin(kvRDD2) 
result.collect()

### Exercise:
In example 2.7.2, we saw "None" in result variable. How can we replace *None* with "error"?

In [None]:
result.map(lambda x: (x[0], ('error' if x[1][0] is None else x[1][0], 'error' if x[1][1] is None else x[1][1]))).collect()

### 2.7.3 ``rightOuterJoin()``
For ``rightOuterJoin()``, key must exist in the **right** RDD

In [None]:
kvRDD1 = spark.parallelize([(1,2), (3,4), (3,6)])
kvRDD2 = spark.parallelize([(3,7)])
result = kvRDD1.rightOuterJoin(kvRDD2) 
result.collect()

### 2.7.4 ``subtractByKey()``
For ``subtractByKey()``, a key appears in both RDD will be removed

In [None]:
kvRDD1 = spark.parallelize([(1,2), (3,4), (3,6)])
kvRDD2 = spark.parallelize([(3,7)])
result = kvRDD1.subtractByKey(kvRDD2) 
result.collect()

## 2.8 Key Value RDD actions
Here, we introduce few action opertors, which apply on key-value pair

### 2.8.1 ``first()``
Similar to example 2.4.1, we use ``first()`` to get the first element of key and value  

In [None]:
kvRDD1 = spark.parallelize([(1,2), (3,4), (3,6)])

# Get first element
print("First key-value: %s %s" % kvRDD1.first())

# Get first key
print("First key: %s" % kvRDD1.keys().first()) 

# Get first value
print("First value: %s" % kvRDD1.values().first()) 


### 2.8.2 ``take()``
Similar to example 2.4.2, we use ``take()`` to get the first N elements of key and value  

In [None]:
kvRDD1 = spark.parallelize([(1,2), (3,4), (3,6), (4,1), (5,6)])

# Get first element
print("First 2 key-value:", kvRDD1.take(2))

# Get first key
print("First 2 key:", kvRDD1.keys().take(2)) 

# Get first value
print("First2 value:", kvRDD1.values().take(2)) 

### 2.8.2 ``countByKey()`` and ``countByValue()``
``countByKey()`` and ``countByValue()`` count the frequency of each **key** and **value** (resp.), 
and return the count as **defaultdict** 

In [None]:
kvRDD1 = spark.parallelize([(1,2), (3,4), (3,6), (4,1), (5,6)])

# Get frequency of key
print("Key Frequency:", kvRDD1.countByKey())

# Get frequency of value
print("Value Frequency:", kvRDD1.countByValue())

## 2.9 RDD Transformation (laziness)
Transformation will **not process** until we call **action**. 
For example, if we import an unexisting file, no **error** found.


In [None]:
input_test = spark.textFile("../data/no_such_file.txt")

Problem comes only when we call action (e.g., collect)

In [None]:
input_test.collect()

### 2.9.1 Advantage of Laziness
Lazy evaluation means that if you tell Spark to operate on a set of data, it listens to what you ask it to do, writes down some **shorthand** for it so it doesn’t forget, and then does absolutely nothing. 
It will continue to do nothing, until you ask it for the final answer.
It waits until you’re done giving it operators, and only when you ask it to give you the final answer does it evaluate, and it always looks to limit how much work it has to do.

ref: https://stackoverflow.com/questions/38027877/spark-transformation-why-is-it-lazy-and-what-is-the-advantage

## 2.10 Exercise: Word Count
Write a module to read the "wordCount.txt" file, split the word by space and do a word count and sort the result descendingly. 

Hints: You may need to use the following functions:

* `textFile()`
* `flatMap()`
* `reduceByKey()`
* `sortBy()`

In [21]:
# Your Answer


textFile = spark.textFile(data_path + "wordCount.txt") 
data = textFile.flatMap(lambda x: x.split(' ')) 

counts = data.map(lambda x: 
	(x, 1)).reduceByKey(add).sortBy(lambda x: x[1],
	 ascending=False).collect()

for (word, count) in counts:
    print("{}: {}".format(word, count))





studying: 2
in: 2
Computer: 2
I: 1
am: 1
Billy: 1
Ada: 1
is: 1
