
#  Spark Tutorial. Part 1 - Learning Apache Spark

## Preface

Notebooks in this course consist of theoretical information, practical examples and excercises. Exercises are cells with Python code, in which some pieces of code are missing. Your task is to fill these cells in and then pass the filled notebook to the verification system. Also, there is a test cell after each exercise, that can help you check the correctness of your code.

If you have any complaints or suggestions, concact us: [info@datascience-school.com](mailto:info@datascience-school.com)

Good luck!

##  An introduction to using Apache Spark with the PySpark API running in the browser

[Apache Spark](http://spark.apache.org/) is a cluster computing platform designed to be fast and general purpose.

Every Spark application contains a driver program that launches parallel operations on a cluster. Driver programs access Spark through a [`SparkContext`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext) object, which represents a connection to a computing cluster.

In order to use Spark and its API, we create an instance of the `SparkContext` class, that is usually named `sc`. The good news is that it is automatically included into this notebook.

In [1]:
import os
# export PYSPARK_PYTHON=/usr/bin/python3
# export PYSPARK_DRIVER_PYTHON=ipython3
# export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
# os.environ["PYSPARK_DRIVER_PYTHON"] = "/home/dmitriy/.local/lib/python3.6/site-packages"
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3.6"
# os.environ["PYSPARK_DRIVER_PYTHON_OPTS"]="notebook"
# os.environ["SPARK_HOME"] = "/usr/local/spark/spark-2.4.0-bin-hadoop2.7"

In [2]:
from pyspark import SparkContext
sc = SparkContext("local", "First App")

In [3]:
# Display the type of the Spark Context sc.
print (type(sc))

<class 'pyspark.context.SparkContext'>


You can use Python's [dir()](https://docs.python.org/2/library/functions.html?highlight=dir#dir) function to get a list of all the attributes and methods accessible through the `sc` object.

In [4]:
# List sc's attributes.
print (dir(sc)[50:60])

['_repr_html_', '_serialize_to_jvm', '_temp_dir', '_unbatched_serializer', 'accumulator', 'addFile', 'addPyFile', 'appName', 'applicationId', 'binaryFiles']


### Resilient Distributed Datasets (RDDs)

An RDD in Spark is simply an immutable distributed collection of objects.
- *Immutability* means that once RDD is created, it can not be changed.
- *Distribution* means that different unique pairs of an RDD are stored on a different partitions.

Each RDD is splitted into multiple partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.

### Transformations and Actions

Since RDD is immutable, there are two ways to create a new one:

- By loading an external dataset.
- By running transformation on a pre-existing RDD.

**Transformations** are lazy operations on a RDD that create one or many new RDDs.

**Actions** are RDD operations that produce non-RDD values. They compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g. HDFS). 


### Creating your first RDD   
First, we generate dummy data by creating a list of numbers from 1 to 4561 with a step of 2.

In [5]:
dummy_data = range(1, 4561, 2)

Let's check the size of the list and print its first and last elements.

In [6]:
print ("The size is {}".format(len(dummy_data)))
print ("The elements are [{}...{}]".format(dummy_data[0], dummy_data[-1]))

The size is 2280
The elements are [1...4559]


To create an RDD, we use the [`sc.parallelize()`](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.SparkContext.parallelize) function, which tells Spark to create a new set of input data based on data that is passed in as an argument.

There are many different types of RDDs. The base implementation of RDD is [`pyspark.RDD`](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD) and other RDDs inherit from it, so they have the same APIs and are functionally identical.   

*Note: After creating RDDs, we can track them in the "Storage" tab of the web UI. You may notice that new datasets are not listed there until Spark returns the result of the action being performed. This feature of Spark is called "lazy evaluation". It allows Spark to avoid performing unnecessary calculations.*

In [7]:
# Parallelize data using 4 partitions.
# This operation is a transformation of data into an RDD.
# Spark uses lazy evaluation, so no Spark workers will be running here.
dummy_rdd = sc.parallelize(dummy_data, 4)

# We can name each newly created RDD using the setName() method.
dummy_rdd.setName('Learning about RDDs')

# Let's view the lineage (the sequence of transformations) of the RDD using toDebugString().
print (dummy_rdd.toDebugString())

b'(4) Learning about RDDs PythonRDD[1] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []'


### Transformations
One of the most common transformation is the [`map()`](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.map) function. It applies a function to each element of the dataset, and returns resulting dataset of the same length.

Let's use `map()` to increment each value in the previously created **`dummy_rdd`** dataset.

Remember that most of transformations require a function as an argument. In the following code section we will pass a lambda function to the `map()`. Labmda functions are preferred over named fuctions (defined with `def`) when their body is relatively small and simple.

In [8]:
# Lambda function can be assigned to a variable to make it reusable:
increment = lambda x: x + 1

# Compare to named function:
def increment (x):
    return x + 1

# These two transformations are identical
dummy_incremented = dummy_rdd.map(lambda x: x + 1)
dummy_incremented = dummy_rdd.map(increment)

# We can't directly see the content of an RDD, because
# no transformations are performed on it untill an action is called.
print (dummy_incremented.take(10))

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]


###  Actions  
To see the resulting list we can use the [`collect()`](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.collect) function.

The `collect()` function is the first action operation we have encountered. It returns all the elements of the dataset as an array to the driver program.

All actions in Spark cause the transformations applied to an RDD to be computed. Therefore in our example workers will be launched to perform the `parallelize`, `map`, and `collect` operations.

In [10]:
collected_data = dummy_incremented.collect()

# We incremented the initial dataset earlier, so its first element is 2
print (collected_data[0:5])

[2, 4, 6, 8, 10]


The [`takeSample()`](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.takeSample) action is used to retrieve a random sample of data from the dataset. The first argument tells whether the sample can include the same elements, and the second argument specifies the length of the sample.

In [11]:
# Get 10 random elements from the RDD. Elements can be repeated.
# Note: returned result is NOT an RDD, but a common Python list, becaue takeSample() is an action.
print (dummy_incremented.takeSample(True, 10))

[342, 2408, 1046, 174, 2592, 2166, 232, 866, 58, 3892]


Another usefull action is [`count()`](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.count). It returns the number of elements in a RDD. 


In [12]:
print (dummy_incremented.count())

2280


## Exercise 1

Create a **`dummy_plus_5`** dataset by adding 5 to each element of the **`dummy_rdd`**. You should use the `map()` transformation to achieve the result.

In [13]:
# Exercise 1

# Add 5 to each element of the dummy_rdd using map() and lambda function.
dummy_plus_5 = dummy_rdd.map(lambda x: x + 5)

print (dummy_plus_5.take(10))

[6, 8, 10, 12, 14, 16, 18, 20, 22, 24]


In [None]:
# TEST
import sys, os
sys.path.append("/usr/local/lib/python2.7/site-packages")
dir_path = os.path.dirname(os.path.realpath('Lab_1/Lab_1_SPARK.ipynb'))
sys.path.append('/home/vagrant/.local/lib/python2.7/site-packages')

from test_helper_spark import *

In [None]:
# Test

lab1_test_ex_1(dummy_plus_5)

### Filtering

Let's create a new RDD containing values from the **`dummy_incremented`** dataset that are less than 100.

Here we use the [`filter()`](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.filter) function. This function is a transformation operation that creates a new RDD from the input RDD by applying the predicate function to each element in it and passing only those elements for which the predicate function returns `True`. Elements that do not return `True` will be dropped.

In [14]:
# A predicate function that checks if value is less than 100.
# It will be applied to each element of the dataset.
def is_less_than_100(value):
    return value < 100
    
dummy_lt_100 = dummy_incremented.filter(is_less_than_100)

# Since filter is a transformation, we have to use the collect action
# to preform calculations and get the resulting data.
print (dummy_lt_100.collect())

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98]


## Exercise 2

1. Implement an **`is_divisible_by_3()`** function that checks if the number is divisible by 3.
2. Create a **`dummy_divisible_by_3`** dataset containing values from the **`dummy_plus_5`** dataset that are divisible by 3.

In [15]:
# Exercise 2

# Implement a function that checks if value is divisible by 3
def is_divisible_by_3(value):
    return value % 3 == 0

# RDD with elements divisible by 3
dummy_divisible_by_3 = dummy_plus_5.filter(is_divisible_by_3)

print (dummy_divisible_by_3.take(10))

[6, 12, 18, 24, 30, 36, 42, 48, 54, 60]


In [None]:
# Test
lab1_test_ex_2(dummy_divisible_by_3)

### More actions

There are few other frequently used actions: [first()](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.first), [take()](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.take), [top()](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.top), [takeOrdered()](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.takeOrdered), and [reduce()](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.reduce).
 
In order to get rough understanding of the data through visual inspection, `first()`, `take()`, `top()`, and `takeOrdered()` actions are used. Note that for the `first()` and `take()` actions, the elements that are returned depend on how the RDD is *partitioned*.

* The `take(n)` action returns the first n elements of the RDD. 
* The `first()` action returns the first element of an RDD, and is equivalent to `take(1)`.
* The `takeOrdered(n)` action returns the first n elements of an RDD, using either their natural order or a custom comparator. 
* The `top(n)` action is similar to `takeOrdered(n)` except that it returns the list in *descending order*.
* The `reduce()` action reduces the elements of a RDD to a single value by applying a function that takes two parameters and returns a single value. The function should be commutative and associative, as `reduce()` is applied at the partition level and then again to aggregate results from partitions. If these rules don't hold, the results from `reduce()` will be inconsistent.  Reducing locally at partitions makes `reduce()` very efficient.

In [16]:
# Get the first element
print (dummy_rdd.first())

1


In [17]:
# Get the first 10 elements
print (dummy_rdd.take(10))

[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]


In [18]:
# Retrieve the three smallest elements
print (dummy_rdd.takeOrdered(3))

[1, 3, 5]


In [19]:
# Retrieve the five largest elements
print (dummy_rdd.top(5))

[4559, 4557, 4555, 4553, 4551]


In [20]:
# Pass a lambda function to takeOrdered to reverse the order
print (dummy_rdd.takeOrdered(4, lambda s: -s))

[4559, 4557, 4555, 4553]


In [21]:
# Getting Python's native add() function
from operator import add

# Sum the RDD using the reduce() function
print (dummy_rdd.reduce(add))

# Sum the RDD using reduce() with a lambda function
print (dummy_rdd.reduce(lambda a, b: a + b))

5198400
5198400


## Exercise 3
1. Create a **`dummy_less_than_50`** dataset with numbers less than 50 from the **`dummy_divisible_by_3`**.
2. Create a **`dummy_top_5`** dataset with top 5 numbers from the **`dummy_less_than_50`**.
3. Create a **`dummy_product`** dataset with a product of all numbers from the **`dummy_less_than_50`**.

In [22]:
# Exercise 3

# RDD with numbers that are less than 50
dummy_less_than_50 = dummy_divisible_by_3.filter(lambda x: x < 50)

# Get top 5 largest numbers from dummy_less_than_50
dummy_top_5 = dummy_less_than_50.top(5)

# Multiply all numbers from dummy_less_than_50
dummy_product = dummy_less_than_50.reduce(lambda x, y: x * y)

print (dummy_less_than_50.collect())
print (dummy_top_5)
print (dummy_product)

[6, 12, 18, 24, 30, 36, 42, 48]
[48, 42, 36, 30, 24]
67722117120


In [None]:
# Test
lab1_test_ex_3(dummy_less_than_50, dummy_top_5, dummy_product)

### Count by value

The [`countByValue()`](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.countByValue) action returns the count of each unique value in the RDD as a dictionary that maps values to counts.

In [23]:
symbols = sc.parallelize(["d", "s", "d", "d", "r", "ee", "rr", "r", "r"])
print (symbols.countByValue())

defaultdict(<class 'int'>, {'d': 3, 's': 1, 'r': 3, 'ee': 1, 'rr': 1})


## Exercise 4

1. Create a **`letters`** dataset containing all letters and symbols from text except whitespaces.    
2. Count the occurrence of each symbol in the **`letters`** RDD (it will be a dictionary) and assign it to a variable **`occurrences`**.

*Note: While you can use native Python's `map()` and `replace()` functions to complete the first task, the better approach is to use Spark's `map()` and `filter()` functions, because, unlike the Python's functions, they are designed to work with huge datasets that does not fit in memory of one machine.*

In [27]:
# Exercise 4

text = "I have a cat. The cat is very nice. I love my cat very much."

to_lower = lambda x: x.lower()

# The first approach, using Python's functions
python_letters = list(map(to_lower, text.replace(' ', '')))

# Create an RDD containing all letters and symbols from the text, except whitespaces.
letters = sc.parallelize(text) \
            .filter(lambda x: x != ' ') \
            .map(to_lower)

print(python_letters, letters.collect())
# They must be equal
assert python_letters == letters.collect()
        
# Count the occurrence of each symbol
occurrence = letters.countByValue()

print (letters.collect(), '\n')
print (occurrence)

['i', 'h', 'a', 'v', 'e', 'a', 'c', 'a', 't', '.', 't', 'h', 'e', 'c', 'a', 't', 'i', 's', 'v', 'e', 'r', 'y', 'n', 'i', 'c', 'e', '.', 'i', 'l', 'o', 'v', 'e', 'm', 'y', 'c', 'a', 't', 'v', 'e', 'r', 'y', 'm', 'u', 'c', 'h', '.'] ['i', 'h', 'a', 'v', 'e', 'a', 'c', 'a', 't', '.', 't', 'h', 'e', 'c', 'a', 't', 'i', 's', 'v', 'e', 'r', 'y', 'n', 'i', 'c', 'e', '.', 'i', 'l', 'o', 'v', 'e', 'm', 'y', 'c', 'a', 't', 'v', 'e', 'r', 'y', 'm', 'u', 'c', 'h', '.']
['i', 'h', 'a', 'v', 'e', 'a', 'c', 'a', 't', '.', 't', 'h', 'e', 'c', 'a', 't', 'i', 's', 'v', 'e', 'r', 'y', 'n', 'i', 'c', 'e', '.', 'i', 'l', 'o', 'v', 'e', 'm', 'y', 'c', 'a', 't', 'v', 'e', 'r', 'y', 'm', 'u', 'c', 'h', '.'] 

defaultdict(<class 'int'>, {'i': 4, 'h': 3, 'a': 5, 'v': 4, 'e': 6, 'c': 5, 't': 4, '.': 3, 's': 1, 'r': 2, 'y': 3, 'n': 1, 'l': 1, 'o': 1, 'm': 2, 'u': 1})


In [None]:
# Test
lab1_test_ex_4(letters, occurrence)

### Flat map

The [flatMap()](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.flatMap) transformation is similar to `map()`, except that with `flatMap()` each input item can be mapped to zero or more output elements.

In [29]:
# Using flatMap() you can flatten any collection
foo = lambda x: [x * 10, x * 10]
bar = lambda x: (x * 10, x * 10)
baz = lambda x: {x * 10, x * 10}

numbers = sc.parallelize([1, 2, 3])

print ("From arrays", numbers.flatMap(foo).collect())
print ("From tuples", numbers.flatMap(bar).collect())
print ("From sets", numbers.flatMap(baz).collect())

From arrays [10, 10, 20, 20, 30, 30]
From tuples [10, 10, 20, 20, 30, 30]
From sets [10, 20, 30]


## Exercise 5

Given a **`sample`** dataset:
1. Map each element of the **`sample`** to a pair: (original data, original data + letter 's') and assign it to a variable **`pairs`**.
    >For example, `["data", "science"]` becomes `[("data", "datas"), ("science", "sciences")]`.
2. Using the `flatMap()` directly to the **`sample`**, create a flat RDD of pairs and assign it to a variable **`flat_pairs`**.
    >For example, `["data", "science"]` becomes `["data", "datas", "science", "sciences"]`.
3. Using the `flatMap()` function, flatten the **`pairs`** dataset and assign it to a variable **`flattened_pairs`**.
    >For example, `[("data", "datas"), ("science", "sciences")]` becomes `["data", "datas", "science", "sciences"]`.

In [31]:
# Exercise 5

sample = sc.parallelize([str(x) for x in range(1, 40)], 4)

# Create an RDD of pairs. E.g. [(1, 2), (3, 4)].
pairs = sample.map(lambda x: (x, x + 's'))

# Create a flat RDD of pairs by transforming the pairs variable. E.g. [1, 2, 3, 4].
flat_pairs = sample.flatMap(lambda x: (x, x + 's'))

# Create a flat RDD of pairs by flattening the flat_pairs variable. E.g. [1, 2, 3, 4].
flattened_pairs = pairs.flatMap(lambda x: x)

print (pairs.take(5))
print (flat_pairs.take(10))
print (flattened_pairs.take(10))

[('1', '1s'), ('2', '2s'), ('3', '3s'), ('4', '4s'), ('5', '5s')]
['1', '1s', '2', '2s', '3', '3s', '4', '4s', '5', '5s']
['1', '1s', '2', '2s', '3', '3s', '4', '4s', '5', '5s']


In [None]:
# Test
lab1_test_ex_5(sample, flat_pairs, flattened_pairs)

### Key-value pairs

An approach using the `map()` transformation is often used to create key-value pairs.

Let's examine transformations that are used with key-value pairs: [groupByKey()](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.groupByKey) and [reduceByKey()](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.reduceByKey).         

The `groupByKey()` transformation groups the values for each key in the RDD into a single sequence.

The `reduceByKey()` transformation gathers together pairs that have the same key and applies a reducer function to two associated values at a time. `reduceByKey()` operates by applying the function first within each partition on a per-key basis and then across the partitions.

While both the `groupByKey()` and `reduceByKey()` transformations can often be used to solve the same problem and will produce the same answer, the `reduceByKey()` transformation works much better for large distributed datasets. 

Here are more transformations to prefer over `groupByKey()`:
  * [combineByKey()](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.combineByKey) 
can be used when you are combining elements but your return type differs from your input value type.
  * [foldByKey()](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.foldByKey) 
merges the values for each key using an associative function and a neutral "zero value".      

Now let's go through a simple `groupByKey()` and `reduceByKey()` example.

In [35]:
fruits = sc.parallelize(["apple", "banana", "apple", "papaya", "mango", "prune", "mango"], 4)

fruit_pairs = fruits.map(lambda x: (x, 1))

# 3 different ways to sum by key:

# The dumb way for those, who doesn't know about mapValues() function.
# print (fruit_pairs.groupByKey().map(lambda (k, v): (k, sum(v))).collect())

# Using mapValues(), which is recommended when they key doesn't change.
print (fruit_pairs.groupByKey().mapValues(lambda x: sum(x)).collect())

# reduceByKey() is more efficient / scalable.
print (fruit_pairs.reduceByKey(add).collect())

# countByKey() is an action that returns a dict with the number of elements for each key
print (fruit_pairs.countByKey())

[('mango', 2), ('banana', 1), ('apple', 2), ('papaya', 1), ('prune', 1)]
[('mango', 2), ('banana', 1), ('apple', 2), ('papaya', 1), ('prune', 1)]
defaultdict(<class 'int'>, {'apple': 2, 'banana': 1, 'papaya': 1, 'mango': 2, 'prune': 1})


The [`distinct()`](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.distinct) transformation is used to get a list of unique elements:

In [36]:
print (fruits.collect())
print (fruits.distinct().collect())

['apple', 'banana', 'apple', 'papaya', 'mango', 'prune', 'mango']
['mango', 'banana', 'apple', 'papaya', 'prune']


## Exercise 6
1. Create a **`numbers`** dataset from the existing collection: `[1, 3, 4, 1, 4, 7, 12, 3, 4, 2, 2, 6, 2, 1, 9]`.
2. Create a **`number_pairs`** dataset of paris, in which a key is an element from the **`numbers`** RDD and a value is `1`.
3. Create a **`number_duplicates`** dataset of pairs, in which keys are unique and values contain the number of duplicates of the corresponding key.

In [37]:
# Exercise 6

numbers_raw = [1, 3, 4, 1, 4, 7, 12, 3, 4, 2, 2, 6, 2, 1, 9]

# Create an RDD with numbers
numbers = sc.parallelize(numbers_raw)

# Create an RDD of key-value pairs,
# where first element is original data from numbers and second element is 1
number_pairs = numbers.map(lambda x: (x, 1))

# Count the number of duplicates
number_duplicates = number_pairs.reduceByKey(add)

print (number_pairs.collect())
print (number_duplicates.collect())

[(1, 1), (3, 1), (4, 1), (1, 1), (4, 1), (7, 1), (12, 1), (3, 1), (4, 1), (2, 1), (2, 1), (6, 1), (2, 1), (1, 1), (9, 1)]
[(1, 3), (3, 2), (4, 3), (7, 1), (12, 1), (2, 3), (6, 1), (9, 1)]


In [None]:
# Test
lab1_test_ex_6(number_pairs, number_duplicates)

### Caching RDDs

If you need to use the same RDD more than once, it can be usefull to cache it using the [`cache()`](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html#pyspark.RDD.cache) function. However, if you cache too many RDDs and Spark runs out of memory, it will delete the least recently used RDD first. The RDD will be automatically recreated when accessed.

In [38]:
# Name the RDD.
numbers.setName('My Filtered RDD')

# Cache the RDD.
numbers.cache()

# Is it cached?
print (numbers.is_cached)

True


## Simple inventory management system modeling

In [39]:
# These are the lists of inventories of the 2 warehouses.
inventory_raw_1 = ['Hammer', 'nail', 'Nail', 'screwdriver', 'Backpack',
                   'Bolt D9', 'Nut D9', 'Bolt D9', 'Nut D9', 'Bolt D9', 'nut D12',
                   'Bolt D12', 'nut D9', 'Bolt D9', 'Nut D12']
inventory_raw_2 = ['Bolt D8', 'nut D8','Screwdriver', 'Backpack',
                   'Bolt D9', 'screwdriver', 'backpack', 'Bolt D9', 'First Aid Kit']

Lets create an RDD for each warehouse. Be aware that some goods are written with uppercase letters. You have to use `lower()` function from Python.

In [40]:
# inventory_2 contains the inventory of the first warehouse.
inventory_1 = sc.parallelize(inventory_raw_1, 4).map(lambda x: x.lower())

# inventory_2 contains the inventory of the second warehouse.
inventory_2 = sc.parallelize(inventory_raw_2, 4).map(lambda x: x.lower())

print (inventory_1.collect())
print (inventory_2.collect())

['hammer', 'nail', 'nail', 'screwdriver', 'backpack', 'bolt d9', 'nut d9', 'bolt d9', 'nut d9', 'bolt d9', 'nut d12', 'bolt d12', 'nut d9', 'bolt d9', 'nut d12']
['bolt d8', 'nut d8', 'screwdriver', 'backpack', 'bolt d9', 'screwdriver', 'backpack', 'bolt d9', 'first aid kit']


Use the [union()](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html?highlight=union#pyspark.RDD.union) function to combine two datasets into one.

In [41]:
# Combine two inventories into one.
inventory = inventory_1.union(inventory_2)
print (inventory)

UnionRDD[72] at union at NativeMethodAccessorImpl.java:0


Calculate amount of goods by using `map()` and `reduceByKey()`.

In [42]:
inventory_pairs = inventory.map(lambda x: (x, 1)).reduceByKey(add)
print (inventory_pairs.collect())

[('nail', 2), ('bolt d12', 1), ('nut d8', 1), ('backpack', 3), ('nut d9', 3), ('screwdriver', 3), ('bolt d9', 6), ('nut d12', 2), ('bolt d8', 1), ('first aid kit', 1), ('hammer', 1)]


## Exercise 7

1. Find out which goods are stored in warehouse 1 but not in warehouse 2 and save the result as an **`inventory_diff`** variable. You can use the [subtract()](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html?highlight=subtract#pyspark.RDD.subtract) function.
2. Create an **`inventory_diff_sorted`** dataset, in which elements of the **`inventory_diff`** are sorted by key. Use the [sortByKey()](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html?highlight=sortbykey#pyspark.RDD.sortByKey) function to complete the task.

*Note: `sortByKey()` accepts a boolean parameter, which specifies a sorting order (default is ascending).* 

In [43]:
# Exercise 7

# Subtract inventory_2 from inventory_1.
# Create an RDD of pairs (goods, number of goods).
# You may want to use map and reduceByKey.
inventory_diff = inventory_1.subtract(inventory_2) \
                            .map(lambda x: (x, 1)) \
                            .reduceByKey(add)

# Sort items alphabetically
inventory_diff_sorted = inventory_diff.sortByKey()

print (inventory_diff.collect())
print (inventory_diff_sorted.collect())

[('nail', 2), ('bolt d12', 1), ('nut d9', 3), ('nut d12', 2), ('hammer', 1)]
[('bolt d12', 1), ('hammer', 1), ('nail', 2), ('nut d12', 2), ('nut d9', 3)]


In [None]:
# Test
lab1_test_ex_7(inventory_diff, inventory_diff_sorted)

<hr>

**Congradulations!** Now you have a basic understanding of Apache Spark's transformations and actions.

Remember that Spark's API is really huge, so if you want to know more about it's capabilities, check the [official documentation](http://spark.apache.org/docs/1.6.3/api/python/pyspark.html).

In the next practical lesson we will examine the `pyspark.sql` module.

<center><h3>Presented by <a target="_blank" rel="noopener noreferrer nofollow" href="http://datascience-school.com">datascience-school.com</a></h3></center>