## RDD manipulation with Key-Value Pairs

#### Group By Key

This operation allows you to manipulate key-value pair RDDs according to their **key**.

This operation places all of the values with the same key into an **iterable**.

In [1]:
kv_rdd = sc.parallelize([("Ibrahim", 21), ("Juan", 32), ("Andrew", 200), ("Ibrahim", 12), ("Ibrahim", -45), ("Ibrahim", "Chocolate")], 
                        numSlices=6) # 6 paritions

In [2]:
kv_rdd.groupByKey().collect()

[('Andrew', <pyspark.resultiterable.ResultIterable at 0x114506908>),
 ('Juan', <pyspark.resultiterable.ResultIterable at 0x114506048>),
 ('Ibrahim', <pyspark.resultiterable.ResultIterable at 0x114506978>)]

We have generated a list with the keys as the first entry of a tuple, and an **iterator** object as the second entry.

In order to view the values in the iterable, we do the following:

In [3]:
results = kv_rdd.groupByKey().collect()

for entry in results:
    print(f"This entry corresponds to {entry[0]}. The contents of the iterable are: ")
    for contents in entry[1]:
        print(contents)

This entry corresponds to Andrew. The contents of the iterable are: 
200
This entry corresponds to Juan. The contents of the iterable are: 
32
This entry corresponds to Ibrahim. The contents of the iterable are: 
21
12
-45
Chocolate


This is a great technique if you need to view what the results are by key, however, should you wish to perform calculations on the iterable, it would be much better to use `reduceByKey` or `aggregateByKey`.

Also, in the above code snipped, I used a nested for-loop to show all results. In reality, you would maybe wish to inspect a partiular result. This can be achieved more efficiently through a list comprehension:

In [4]:
[x for x in results[-1][1]] #results for ibrahim

[21, 12, -45, 'Chocolate']

`groupByKey` is using `reduce` behind the scenes - as such, with large RDD's you may see some performance lag. This is because spark is moving the data from the edxecutor that is reading the data to the executor that is collecting the values for a given key. Recall that we partitioned our data into 6.

All of this data movement, by way of using `reduce` under the hood, leads to some performance lag. As mentioned in the previous notebook, using reduce is an expensive operation, as such, working to **reduce** the number of `reduce`'s will always lead to a performance improvement.

#### Reduce By Key

An excellent operation for calculating aggregates over your entire RDD.

Recall that if you just wanted to calculate a single aggregate metric, you could ust use the simple [`reduce`](https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html#reduce) command.

However, it is more common, within the context of big data, to aggregate over a subset of your data - e.g. total users per IP address, total revenue per demographic. This is where `reduceByKey` shines!


In [5]:
from random import randint, seed

def generate_random_ips(amount):
    lst = []
    for _ in range(amount):
        lst.append(".".join(str(randint(0, 255)) for _ in range(4)))
    return lst

In [6]:
seed(318)
ips = generate_random_ips(5)
ips

['24.232.205.214',
 '84.155.214.192',
 '183.99.209.84',
 '3.46.212.177',
 '234.255.0.244']

In [7]:
from operator import add

ip_rdd = sc.parallelize(ips)
ip_rdd.map(lambda x:(x,1)).reduceByKey(add).collect()

[('183.99.209.84', 1),
 ('24.232.205.214', 1),
 ('84.155.214.192', 1),
 ('3.46.212.177', 1),
 ('234.255.0.244', 1)]

First and foremost, if you are not familiar with the [`operator`](https://docs.python.org/3.6/library/operator.html) library - I highly suggest you read the documentation. It is incredibly useful! Those of you with exposure to functional programming will particularly appreciate it.

I first generated 5 random IP address and made them into an RDD. By using a `lambda` function, we changed each IP address to be a tuple containing the IP and the value 1 - (ip address, 1). We have now created a key value pair!

As such, we can employ `reduceByKey`.

`reduceByKey` requires a **function** to be passed as an argument. This is the function that will be used to do the aggregation. I passed the `add` function which will simply add up all the 1's in each tuple and gives us a result. Furthermore, the function that is passed must be [**associative**](https://en.wikipedia.org/wiki/Associative_property) and [**commutative**](https://en.wikipedia.org/wiki/Commutative_property). This is because the data is split up behind the scenes and sent to difference executors - there is no guarnatee about the order in which the data will be processed/returned.

Note: If you want to just return a list of key-value pairs in a list, rather than doing an aggregation, it is better to use `groupByKey`!

From the above, we see that there was only 1 of each IP address - that is to say, they are all unique. Below is an example with duplicate IP's.

A more interesting use case comes from text analysis - after you tokenize words, you may which to calculate word counts in a corpus. `reduceByKey` is an excellent method for this task.

In [8]:
ips = ['24.232.205.214',
 '24.232.205.214',
 '183.99.209.84',
 '24.232.205.214',
 '234.255.0.244']

ip_rdd = sc.parallelize(ips)
ip_rdd.map(lambda x:(x,1)).reduceByKey(add).collect()

[('183.99.209.84', 1), ('24.232.205.214', 3), ('234.255.0.244', 1)]

#### Aggregate By Key

This method is very similar to `reduceByKey`, however, it is **significantly** more flexible in terms of it's use case. This increased flexibility comes at a cost: increased complexity! In fact, the main difference between the two is taht `aggregatebykey` allows you to specify an initial value for aggregation.

Again, for those of you with exposure to functional programming - specifically Haskell will breeze through this. You should think of `aggregateByKey` as a higher order function e.g. `foldl` or `foldr` only instead of operating on lists, you are operating on RDD's. However, take this description with a grain of salt as spark also has a `foldByKey` method which is closer to `foldl` and `foldr` 

`aggregateByKey` takes 3 parameters:

- The Zero Value: This is the starting value that you wish to pass to your first function!

- The sequencing Function: This is the fuction that is applied to each argument **per** partition.

- The Combining Function: This is the function that enables you to combine the results of the sequences functions per partition together.

In [14]:
rdd = sc.parallelize([("Ibrahim",1),("Juan",2),("Ibrahim",2),("Juan",3)], numSlices=4)

In [15]:
rdd.getNumPartitions() # 4 partitions

4

From the above, we can see that each tuple is it's own partition.

In [19]:
rdd.glom().collect() # this is what our partitions look like in list form

[[('Ibrahim', 1)], [('Juan', 2)], [('Ibrahim', 2)], [('Juan', 3)]]

In [20]:
rdd.aggregateByKey(1,lambda acc,val: acc+val, lambda acc1, acc2: acc1+acc2).collect()

[('Ibrahim', 5), ('Juan', 7)]

Let's walk through how we get the above result:

The Zero Value (1) is applied to accumulator on **each** partition. Thus, each tuple, pn its own partition is passed to the sequence function.

it is important to remember, since we are aggregating by key, the val referenced in the lambda functions refers to the numeric element in the tuples!

lambda 1,1: 1+1 = 2. At this point, the accumulator value on the first partition is 2

Value of acc on 1st parition: 2 -> lambda 1,1 = 1+1 = 2
value of acc on 2nd partition: 3 -> lambda 1,2 = 1+2 = 3
value of acc on 3rd partition: 3 - > lambda 1,2 = 1+2 = 3
Value of acc on 4th partition: 4 -> lambda 1,3 = 1+3 = 4

These values are now passed onto the combining function that combines the accumulator values on a **key** basis. Thus, for the "Ibrahim" key, we have lambda 2,3 = 2 + 3 = 5 (equivalent to adding the results of the 1st and 3rd partition).

For the "Juan" key, we have lambda 3,4 = 3 + 4 = 7

It is important to realize here that since we supply an initial accumulator value, and since the accumulator value is applied to each partition in order to kickstart the sequencing function - that if we change the number of partitions, then we will get a different result!

In [21]:
rdd = sc.parallelize([("Ibrahim",1),("Juan",2),("Ibrahim",2),("Juan",3)], numSlices=1)
rdd.getNumPartitions() # 4 partitions

1

In [22]:
rdd.glom().collect() # this is what our partitions look like in list form

[[('Ibrahim', 1), ('Juan', 2), ('Ibrahim', 2), ('Juan', 3)]]

In [23]:
rdd.aggregateByKey(1,lambda acc,val: acc+val, lambda acc1, acc2: acc1+acc2).collect()

[('Ibrahim', 4), ('Juan', 6)]

The reason we get the above result is now that the accumulator value is applied to a single partion and so get's passed on amongst the respective keys:


Ibrahim Key:

Sequencing Functions: 

lambda 1,1 = 1 + 1 = 2
lambda 2, 2 = 2 + 2 = 4

Combine function:
lambda 4,0 = 4 + 0 = 4 -- notice how acc2 here is 0.

Juan key:

Sequencing Functions: 

lambda 1,2 = 1 + 2 = 3
lambda 3,3 = 3 + 3 = 6

Combine function:
lambda 6,0 = 6 + 0 = 6 -- notice how acc2 here is 0.

Since we have only 1 partition, there will be only one accumulator per key. As such, acc2 = 0

You can find another great example of `aggregateByKey` [here](http://www.learnbymarketing.com/618/pyspark-rdd-basics-examples/)

Here are some graphics that may help visualize waht is happening:

4 paritions:

![4 paritions](imgs/aggbykey_1.png)

1 Partition:

![1 Partition](imgs/aggbykey_2.png)

#### Sort By Key

Allows you to sort an RDD of key-value pairs by the key. It also allows you to set the sort order, number of partiions to output and a preprocessing function called a keyfunc.

The pre-processing function will not change the the data that is present in the result, rather, the way the sortbykey functions "views" the data will be transformed.

In [30]:
rdd = sc.parallelize([("a",2), ("d",17), ("A",3), ("C",1)])

In [31]:
rdd.sortByKey().collect() # default sorting

[('A', 3), ('C', 1), ('a', 2), ('d', 17)]

In [32]:
rdd.sortByKey(ascending=False).collect() # reverse order

[('d', 17), ('a', 2), ('C', 1), ('A', 3)]

In [33]:
rdd.sortByKey(numPartitions=3).glom().collect() # 3 paritions

[[('A', 3), ('C', 1)], [('a', 2)], [('d', 17)]]

In [34]:
rdd.sortByKey(numPartitions=1).glom().collect() # 1 paritions

[[('A', 3), ('C', 1), ('a', 2), ('d', 17)]]

Now, let's say we wantd to normalize our sorting - that is to say, ignore the fact that we have capital letters with regards to our sorting. This is where they keyfunc comes into play.

In [35]:
rdd.sortByKey(keyfunc=lambda x: x.lower()).collect()

[('a', 2), ('A', 3), ('C', 1), ('d', 17)]

Each key was passed through the lambda function and then sorted on the basis of that results. As such, we can see from the above that we have a normalized sorting of our key-value pairs.

#### Join

This function will allow you to combine RDD's. This works by taking 2 key-value RDD's and combining all values across the RDD's by key!


In [36]:
rdd1 = sc.parallelize([(1,"a"), (2,"a")])
rdd2 = sc.parallelize([(2,"b"), (3,"b")])

In [38]:
rdd1.join(rdd2).collect()

[(2, ('a', 'b'))]

Recall that join only works by operating on keys - thus we only get output where keys have matched. However, what if we had the same key present in one RDD and joined to another RDD?

In [41]:
rdd3 = sc.parallelize([(2,"b"), (3,"b"), (2,"c")])

In [42]:
rdd1.join(rdd3).collect()

[(2, ('a', 'b')), (2, ('a', 'c'))]

Interesting, we now get pairs of every combination from rdd1 and rdd3. When having identical keys, you may wish to keep them all in the same RDD, or you could use the CoGroup function to bring it all together in one result.

Also, notice how you are **losing** data, that is, any key that was only present in 1 RDD is lost.

If you wish to retain data, then you can use the following:

- Left Outer Join: Ensures all data from the original RDD makes it to the end result, if the second RDD doesnt have a matching key, then spark will insert a none value.

- Right Outer Join: Same as above, but ensures data from the second RDD makes it to the end result.

- Full Outer Join: Preserves all values in both RDD's.

It is important to mention here that when using these joins, you must be prepared to deal with the none values. Furthemore, all of these join functions allow you to specify the number of paritions. As such, when using joins, you could potentially be moving a large amount of data in the cluster. This is almost always a performance bottleneck.

You should think carefully about if you really need a join or whether you can filter your data before joining.

In [43]:
rdd1.leftOuterJoin(rdd2).collect()

[(1, ('a', None)), (2, ('a', 'b'))]

#### CoGroup

Also allows you to combine two RDD's. But is slightly different from Join!

In [44]:
rdd1 = sc.parallelize([(1,"a"), (2,"a")])
rdd2 = sc.parallelize([(2,"b"), (3,"d"), (2,"c")])

In [45]:
rdd1.join(rdd2).collect() # just as we saw before

[(2, ('a', 'b')), (2, ('a', 'c'))]

In [48]:
rdd1.cogroup(rdd2).collect() # we get result iterables.

[(1,
  (<pyspark.resultiterable.ResultIterable at 0x10d956a90>,
   <pyspark.resultiterable.ResultIterable at 0x10d956b38>)),
 (2,
  (<pyspark.resultiterable.ResultIterable at 0x10d956f60>,
   <pyspark.resultiterable.ResultIterable at 0x10d956e48>)),
 (3,
  (<pyspark.resultiterable.ResultIterable at 0x10d956b00>,
   <pyspark.resultiterable.ResultIterable at 0x10d956dd8>))]

Let's see what is inside those iterables by casting them to lists!

In [50]:
rdd1.cogroup(rdd2).mapValues(lambda x: [list(x[0]), list(x[1])]).collect()

[(1, [['a'], []]), (2, [['a'], ['b', 'c']]), (3, [[], ['d']])]

From the above, we can see that co-group makes a **group** for each key! It essentially provides a list for values that were seen for a particular key on both of the RDD's

# Input and Output

#### Whole Text Files.

This functions allows you to read in an entire directly or text files and returns them as pairs.

Each pair is of the form: (filename, content)

This function is similar to the previous function we used: `textFile`. However, `textFile` returns an RDD with an element for each line in the file.

Whole text file returns an RDD, however, 1 elements in a 1 **entire** file of the form (filename, content).

In [1]:
sc.textFile("samples").collect() #samples is a folder

['This is some test data.',
 'That takes more than one line.',
 'This is a second test file.',
 'Is it also',
 'a multiline file.']

So the above loaded every file in the sample folder and get each line in each file an element position on the RDD.

In [2]:
sc.wholeTextFiles("samples").collect()

[('file:/Users/ibrahimgabr/spv/samples/sample_1.txt',
  'This is some test data.\nThat takes more than one line.\n'),
 ('file:/Users/ibrahimgabr/spv/samples/sample_2.txt',
  'This is a second test file.\nIs it also\na multiline file.\n')]

So when do you use which? It depends on what you are trying to do!

Use wholeTextFiles if you are, for example, generating total word counts per file. In this case, it would be impoortant to keep each file seperate!

However, if you were generating a term frequency table, that needs tp take into account the entire corpus of text, then textFile is the way to go. 

In essence, you need to decide if the level of fidelity should be every line in a file, or simply the file itself.

Also, recall that **parallelism can never be higher than the number of elements in your RDD**. As such, if you used wholetext files to load up two 5Gb textfiles, then even if you were on a 100 node cluster, only 2 nodes would be busy working while the other 98 would sit idle. As such, whenever you can, using textFile is the way to go as it allows you to exploit the power of parallelism.

#### Pickle Files

This is a way to store and load up python objects in spark.

Let's say you have finished training an ML model, you could pickle it and then load it back up later.

While I dont have a model on hand, let's run through a simple exmaple of making a pickle file and loading a pickle file.

In [3]:
sc.parallelize(["ML Model", "Logisitic Regression", {"alpha": 0.05}]).saveAsPickleFile("model")

In [4]:
sc.pickleFile("model").collect() # reloaded the pickle file returned as RDD

['ML Model', 'Logisitic Regression', {'alpha': 0.05}]

# Conclusion

That's all I have for today. Plenty of information for you to play with and get your hands dirty!! I really hope you enjoy the notebook!

The next notebook will cover topics about performance boosting, cluster configuration in addition to exposing you to some advanced spark features like MLib and Spark Streaming.

As always, feel free to reach out: igabr@uchicago.edu or [@Gabr\_Ibrahim](https://twitter.com/Gabr_Ibrahim)