<a href="https://colab.research.google.com/github/esraguzel/dsc-resilient-distributed-datasets-rdd-lab-onl01-dtsc-ft-012120/blob/master/index.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Resilient Distributed Datasets (RDDs) - Lab

Resilient Distributed Datasets (RDD) are fundamental data structures of Spark. An RDD is essentially the Spark representation of a set of data, spread across multiple machines, with APIs to let you act on it. An RDD can come from any data source, e.g. text files, a database, a JSON file, etc.


## Objectives

You will be able to:

- Apply the map(func) transformation to a given function on all elements of an RDD in different partitions 
- Apply a map transformation for all elements of an RDD 
- Compare the difference between a transformation and an action within RDDs 
- Use collect(), count(), and take() actions to trigger spark transformations  
- Use filter to select data that meets certain specifications within an RDD 
- Set number of partitions for parallelizing RDDs 
- Create RDDs from Python collections 


### RDD Transformations vs Actions

In Spark, we first create a __base RDD__ and then apply one or more transformations to that base RDD following our processing needs. Being immutable means, **once an RDD is created, it cannot be changed**. As a result, **each transformation of an RDD creates a new RDD**. Finally, we can apply one or more **actions** to the RDDs. Spark uses lazy evaluation, so transformations are not actually executed until an action occurs.


<img src="./images/rdd_diagram.png" width=500>

### Transformations

Transformations create a new dataset from an existing one by passing each dataset element through a function and returning a new RDD representing the results. In short, creating an RDD from an existing RDD is ‘transformation’.
All transformations in Spark are lazy. They do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result that needs to be returned to the driver program.
A transformation is an RDD that returns another RDD, like map, flatMap, filter, reduceByKey, join, cogroup, etc.

### Actions
Actions return final results of RDD computations. Actions trigger execution using lineage graph to load the data into original RDD and carry out all intermediate transformations and return the final results to the driver program or writes it out to the file system. An action returns a value (to a Spark driver - the user program).

Here are some key transformations and actions that we will explore.


| Transformations   | Actions       |
|-------------------|---------------|
| map(func)         | reduce(func)  |
| filter(func)      | collect()     |
| groupByKey()      | count()       |
| reduceByKey(func) | first()       |
| mapValues(func)   | take()        |
| sample()          | countByKey()  |
| distinct()        | foreach(func) |
| sortByKey()       |               |


Let's see how transformations and actions work through a simple example. In this example, we will perform several actions and transformations on RDDs in order to obtain a better understanding of Spark processing. 

### Create a Python collection 

We need some data to start experimenting with RDDs. Let's create some sample data and see how RDDs handle it. To practice working with RDDs, we're going to use a simple Python list.

- Create a Python list `data` of integers between 1 and 1000 using the `range()` function. 
- Sanity check: confirm the length of the list (it should be 1000)

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [4]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 63kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 46.4MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=0850e117d026dcba44dba12855cb4836f5486fdfe36ed4306fb9a93378f19961
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5


In [5]:
nums = list(range(1,1001))
len(nums)

1000

### Initialize an RDD

When using Spark to make computations, datasets are treated as lists of entries. Those lists are split into different partitions across different cores or different computers. Each list of data held in memory is a partition of the RDD. The reason why Spark is able to make computations far faster than other big data processing languages is that it allows all data to be stored __in-memory__, which allows for easy access to the data and, in turn, high-speed processing. Here is an example of how the alphabet might be split into different RDDs and held across a distributed collection of nodes:

<img src ="./images/partitions_1.png" width ="500">  
To initialize an RDD, first import `pyspark` and then create a SparkContext assigned to the variable `sc`. Use `'local[*]'` as the master.

In [0]:

import pyspark
sc = pyspark.SparkContext('local[*]')

Once you've created the SparkContext, you can use the `.parallelize()` method to create an RDD that will distribute the list of numbers across multiple cores. Here, create one called `rdd` with 10 partitions using `data` as the collection you are parallelizing.

In [7]:
rdd = sc.parallelize(nums, numSlices=10)
print(type(rdd))
# <class 'pyspark.rdd.RDD'>

<class 'pyspark.rdd.RDD'>


Determine how many partitions are being used with this RDD with the `.getNumPartitions()` method.

In [8]:
rdd.getNumPartitions()
# 10

10

### Basic descriptive RDD actions

Let's perform some basic operations on our RDD. In the cell below, use the methods:
* `count`: returns the total count of items in the RDD 
* `first`: returns the first item in the RDD
* `take`: returns the first `n` items in the RDD
* `top`: returns the top `n` items
* `collect`: returns everything from your RDD


It's important to note that in a big data context, calling the collect method will often take a very long time to execute and should be handled with care!

In [9]:
# count
rdd.count()

1000

In [10]:
# first
rdd.first()

1

In [11]:
# take
rdd.take(5)

[1, 2, 3, 4, 5]

In [12]:
# top
rdd.top(4)

[1000, 999, 998, 997]

In [13]:
# collect
rdd.collect()

[1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100,
 101,
 102,
 103,
 104,
 105,
 106,
 107,
 108,
 109,
 110,
 111,
 112,
 113,
 114,
 115,
 116,
 117,
 118,
 119,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 128,
 129,
 130,
 131,
 132,
 133,
 134,
 135,
 136,
 137,
 138,
 139,
 140,
 141,
 142,
 143,
 144,
 145,
 146,
 147,
 148,
 149,
 150,
 151,
 152,
 153,
 154,
 155,
 156,
 157,
 158,
 159,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 168,
 169,
 170,
 171,
 172,
 173,
 174,
 175,
 176,
 177,
 178,
 179,
 180,
 181,
 182,
 183,
 184,
 185

## Map functions

Now that you've been working a little bit with RDDs, let's make this a little more interesting. Imagine you're running a hot new e-commerce startup called BuyStuff, and you're trying to track of how much it charges customers from each item sold. In the next cell, we're going to create simulated data by multiplying the values 1-1000 with a random number from 0-1.

In [14]:
import random
import numpy as np

nums = np.array(range(1, 1001))
sales_figures = nums * np.random.rand(1000)
sales_figures

array([3.14440921e-01, 7.76822851e-01, 1.84427418e+00, 3.12244947e+00,
       4.82956389e+00, 1.34833699e+00, 4.18646922e+00, 3.68138504e+00,
       6.49616190e+00, 5.89542746e+00, 2.62590706e+00, 4.87051619e+00,
       7.00750896e+00, 2.82994398e+00, 2.68855025e+00, 1.47012212e+01,
       2.67853720e+00, 4.52369871e+00, 1.06898918e+01, 1.77217967e+01,
       5.83255321e+00, 3.35764714e+00, 8.24987118e+00, 1.43574782e+01,
       6.29765695e-01, 6.58837920e+00, 5.47305366e-01, 2.75276681e+01,
       5.44970285e+00, 1.65548831e+01, 1.65077601e+01, 5.54925727e+00,
       1.11511271e+01, 2.74123018e+01, 2.80182660e+00, 1.58552663e+01,
       1.97637091e-01, 3.55385501e+01, 1.51006280e+01, 2.31085455e+01,
       3.12983437e+01, 3.58953104e+01, 1.97019269e+01, 7.22237248e+00,
       2.89784193e+00, 1.44933094e+01, 7.05738771e+00, 2.24486269e+00,
       3.61934386e+01, 3.13454314e+01, 5.00856549e+01, 3.80904302e+00,
       2.17143858e+01, 1.16453056e+01, 4.99473128e+01, 2.66099075e+01,
      

We now have sales prices for 1000 items currently for sale at BuyStuff. Now create an RDD called `price_items` using the newly created data with 10 slices. After you create it, use one of the basic actions to see what's in the RDD.

In [15]:
price_items = sc.parallelize(sales_figures, numSlices=10)
price_items.take(5)

[0.3144409213184508,
 0.7768228510148258,
 1.844274182291589,
 3.122449471369494,
 4.829563893779002]

Now let's perform some operations on this simple dataset. To begin with, create a function that will take into account how much money BuyStuff will receive after sales tax has been applied (assume a sales tax of 8%). To make this happen, create a function called `sales_tax()` that returns the amount of money our company will receive after the sales tax has been applied. The function will have this parameter:

* `item`: (float) number to be multiplied by the sales tax.


Apply that function to the rdd by using the `.map()` method and assign it to a variable `renenue_minus_tax`

In [0]:
def sales_tax(num):
    return num * 0.92

revenue_minus_tax = price_items.map(sales_tax)

Remember, Spark has __lazy evaluation__, which means that the `sales_tax()` function is a transformer that is not executed until you call an action. Use one of the collection methods to execute the transformer now a part of the RDD and observe the contents of the `revenue_minus_tax` rdd.

In [17]:
# perform action to retrieve rdd values
revenue_minus_tax.take(10)

[0.28928564761297476,
 0.7146770229336398,
 1.6967322477082618,
 2.8726535136599347,
 4.443198782276682,
 1.2404700271246216,
 3.8515516859310015,
 3.3868742322361176,
 5.9764689465355065,
 5.423793262062068]

### Lambda Functions

Note that you can also use lambda functions if you want to quickly perform simple operations on data without creating a function. Let's assume that BuyStuff has also decided to offer a 10% discount on all of their items on the pre-tax amounts of each item. Use a lambda function within a `.map()` method to apply the additional 10% loss in revenue for BuyStuff and assign the transformed RDD to a new RDD called `discounted`.

In [0]:
discounted = revenue_minus_tax.map(lambda x: x*0.9)

In [19]:
discounted.take(10)

[0.2603570828516773,
 0.6432093206402758,
 1.5270590229374357,
 2.5853881622939414,
 3.9988789040490142,
 1.1164230244121596,
 3.4663965173379014,
 3.0481868090125057,
 5.378822051881956,
 4.8814139358558615]

## Chaining Methods

You are also able to chain methods together with Spark. In one line, remove the tax and discount from the revenue of BuyStuff and use a collection method to see the 15 costliest items.

In [20]:
price_items.map(sales_tax).map(lambda x : x*0.9).top(15)

[791.1289211688274,
 749.835502270556,
 738.7087622945373,
 735.1750587149869,
 733.8268760808336,
 731.2232547218744,
 720.7815821621178,
 719.4225715862655,
 716.6511494124562,
 708.9567557897885,
 704.9499286996534,
 702.8574139580586,
 702.2648212914446,
 701.9653645566381,
 689.8205352597582]

## RDD Lineage


We are able to see the full lineage of all the operations that have been performed on an RDD by using the `RDD.toDebugString()` method. As your transformations become more complex, you are encouraged to call this method to get a better understanding of the dependencies between RDDs. Try calling it on the `discounted` RDD to see what RDDs it is dependent on.

In [21]:
discounted.toDebugString()

b'(10) PythonRDD[10] at RDD at PythonRDD.scala:53 []\n |   ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:195 []'

### Map vs. Flatmap

Depending on how you want your data to be outputted, you might want to use `.flatMap()` rather than a simple `.map()`. Let's take a look at how it performs operations versus the standard map. Let's say we wanted to maintain the original amount BuyStuff receives for each item as well as the new amount after the tax and discount are applied. Create a map function that will return a tuple with (original price, post-discount price).

In [22]:
mapped = price_items.map(lambda x: (x, x*0.92 *0.9))
print(mapped.count())
print(mapped.take(10))

1000
[(0.3144409213184508, 0.2603570828516773), (0.7768228510148258, 0.6432093206402758), (1.844274182291589, 1.5270590229374357), (3.122449471369494, 2.5853881622939414), (4.829563893779002, 3.9988789040490142), (1.3483369860050234, 1.1164230244121596), (4.186469223838045, 3.4663965173379014), (3.6813850350392583, 3.0481868090125057), (6.496161898408159, 5.378822051881956), (5.895427458763117, 4.8814139358558615)]


Note that we have 1000 tuples created to our specification. Let's take a look at how `.flatMap()` differs in its implementation. Use the `.flatMap()` method with the same function you created above.

In [23]:
flat_mapped = price_items.flatMap(lambda x: (x, x*0.92 *0.9))
print(flat_mapped.count())
print(flat_mapped.take(10))

2000
[0.3144409213184508, 0.2603570828516773, 0.7768228510148258, 0.6432093206402758, 1.844274182291589, 1.5270590229374357, 3.122449471369494, 2.5853881622939414, 4.829563893779002, 3.9988789040490142]


Rather than being represented by tuples, all of the  values are now on the same level. When we are trying to combine different items together, it is sometimes necessary to use `.flatMap()` rather than `.map()` in order to properly reduce to our specifications. This is not one of those instances, but in the upcoming lab, you just might have to use it.

## Filter
After meeting with some external consultants, BuyStuff has determined that its business will be more profitable if it focuses on higher ticket items. Now, use the `.filter()` method to select items that bring in more than $300 after tax and discount have been removed. A filter method is a specialized form of a map function that only returns the items that match a certain criterion. In the cell below:
* use a lambda function within a `.filter()` method to meet the consultant's suggestion's specifications. set `RDD = selected_items`
* calculate the total number of items remaining in BuyStuff's inventory

In [24]:
# use the filter function
selected_items = discounted.filter(lambda x: x > 300)

# calculate total remaining in inventory 
selected_items.count()

278

## Reduce

Reduce functions are where you are in some way combing all of the variables that you have mapped out. Here is an example of how a reduce function works when the task is to sum all values:

<img src = "./images/reduce_function.png" width = "600">  


As you can see, the operation is performed within each partition first, after which, the results of the computations in each partition are combined to come up with one final answer.  

Now it's time to figure out how much money BuyStuff would make from selling one of all of its items after they've reduced their inventory. Use the `.reduce()` method with a lambda function to add up all of the values in the RDD. Your lambda function should have two variables. 

In [25]:
selected_items.reduce(lambda x,y: x + y)

126501.77540399716

The time has come for BuyStuff to open up shop and start selling its goods. It only has one of each item, but it's allowing 50 lucky users to buy as many items as they want while they remain in stock. Within seconds, BuyStuff is sold out. Below, you'll find the sales data in an RDD with tuples of (user, item bought).

In [26]:
import random
random.seed(42)
# generating simulated users that have bought each item
sales_data = selected_items.map(lambda x: (random.randint(1, 50), x))

sales_data.take(7)

[(14, 303.1650854222643),
 (10, 319.0303027039256),
 (14, 332.58541132595974),
 (10, 319.00445256792455),
 (33, 332.1195381908729),
 (14, 310.2743537803713),
 (27, 358.76149040961513)]

It's time to determine some basic statistics about BuyStuff users.

Let's start off by creating an RDD that determines how much each user spent in total.
To do this we can use a method called `.reduceByKey()` to perform reducing operations while grouping by keys. After you have calculated the total, use the `.sortBy()` method on the RDD to rank the users from the highest spending to the least spending. 

In [27]:
# calculate how much each user spent
total_spent = sales_data.reduceByKey(lambda x,y: x+y)
total_spent.take(5)

[(10, 2871.56857274321),
 (50, 3799.3406594613216),
 (40, 4995.921313783161),
 (30, 2628.870502317258),
 (41, 3827.491382409101)]

In [28]:
# sort the users from highest to lowest spenders
total_spent.sortBy(lambda x: x[1],ascending = False).collect()

[(42, 6733.984886563208),
 (29, 5274.955823921339),
 (40, 4995.921313783161),
 (27, 4903.011894113405),
 (46, 4730.245013657799),
 (23, 4625.372861242738),
 (17, 4438.3226020728225),
 (8, 3879.904911951525),
 (41, 3827.491382409101),
 (50, 3799.3406594613216),
 (35, 3655.3509932699008),
 (4, 3652.4454445774113),
 (13, 3587.4110998768074),
 (14, 3584.295802942165),
 (5, 3536.5419966190248),
 (9, 3486.515223856313),
 (44, 3321.0389497153146),
 (49, 3209.540468589899),
 (38, 3190.6816875880013),
 (3, 2989.708772673705),
 (12, 2887.1390288424345),
 (10, 2871.56857274321),
 (1, 2713.8801553661165),
 (28, 2680.377718191896),
 (30, 2628.870502317258),
 (6, 2552.6273518872545),
 (11, 2421.8199221028617),
 (18, 2405.5019221181306),
 (45, 2372.6368907231617),
 (31, 2183.2762172591106),
 (34, 1873.4915442153613),
 (2, 1871.0429168550813),
 (15, 1823.419981256786),
 (36, 1538.1476141336393),
 (33, 1508.175481670356),
 (37, 1508.1347829727833),
 (43, 1479.6149017413604),
 (16, 1479.440251191072),
 

Next, let's determine how many items were bought per user. This can be solved in one line using an RDD method. After you've counted the total number of items bought per person, sort the users from most number of items bought to least number of items. Time to start a customer loyalty program!

In [29]:
total_items = sales_data.countByKey()
sorted(total_items.items(),key=lambda kv:kv[1],reverse=True)

[(42, 14),
 (29, 12),
 (41, 11),
 (17, 11),
 (14, 10),
 (27, 10),
 (46, 10),
 (50, 9),
 (4, 9),
 (13, 9),
 (9, 9),
 (10, 8),
 (38, 8),
 (40, 8),
 (3, 8),
 (35, 8),
 (23, 8),
 (8, 7),
 (31, 6),
 (49, 6),
 (5, 6),
 (44, 6),
 (15, 6),
 (18, 6),
 (25, 6),
 (2, 5),
 (28, 5),
 (12, 5),
 (6, 5),
 (34, 5),
 (11, 5),
 (30, 5),
 (33, 4),
 (1, 4),
 (37, 4),
 (43, 3),
 (45, 3),
 (19, 3),
 (36, 3),
 (16, 2),
 (21, 2),
 (24, 2),
 (7, 1),
 (32, 1)]

### Additional Reading

- [The original paper on RDDs](https://cs.stanford.edu/~matei/papers/2012/nsdi_spark.pdf)
- [RDDs in Apache Spark](https://data-flair.training/blogs/create-rdds-in-apache-spark/)
- [Programming with RDDs](https://runawayhorse001.github.io/LearningApacheSpark/rdd.html)
- [RDD Transformations and Actions Summary](https://www.analyticsvidhya.com/blog/2016/10/using-pyspark-to-perform-transformations-and-actions-on-rdd/)

## Summary

In this lab we went through a brief introduction to RDD creation from a Python collection, setting a number of logical partitions for an RDD and extracting lineage. We also used transformations and actions to perform calculations across RDDs on a distributed setup. In the next lab, you'll get the chance to apply these transformations on different books to calculate word counts and various statistics.
