# PySpark Tutorial - Joins
<div>
 <h2> CSCI 4253 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("pyspark tutorial").setMaster("local[*]")
sc = SparkContext(conf=conf)

## Key Value Data, Grouping and Joins

The Key-Value or (k,v) datatype is fundemental to many operations including grouping and joins. We need to be able to:
* Create keys from non-KV data
* Group or organize data according to keys
* Operate on data according to keys
* Form standard joins

### Creating Key-Pair Values

We can directly create K-V pairs using lists of pairs

In [3]:
visits = sc.parallelize([ ("index.html", "1.2.3.4"),
                         ("about.html", "3.4.5.6"),
                         ("index.html", "1.3.3.1") ])
pageNames = sc.parallelize([ ("index.html", "Home"),
                            ("about.html", "About") ])

When we `collect()` the K-V pairs, we'll get the full list of keys -- this pulls the data to the front-end machine:

In [4]:
visits.collect()

[('index.html', '1.2.3.4'),
 ('about.html', '3.4.5.6'),
 ('index.html', '1.3.3.1')]

The first position of the K-V pair is the key. We'll cover grouping in more detail later, but let's take a look at grouping visits by the keys:

In [5]:
visits.groupByKey().collect()

[('about.html', <pyspark.resultiterable.ResultIterable at 0x7f1939b0c4f0>),
 ('index.html', <pyspark.resultiterable.ResultIterable at 0x7f1939ac64c0>)]

The KV pairs have now been *grouped* meaning that we have an RDD of all the keys and the values for each key are a "list" of of the values corresponding to that key in the original KV RDD. Because the values are scattered across your cluster, a `ResultIterable` is used to represented their distributed type.

We reify the results by converting it to a list. You wouldn't do this in practice because this brings all the values back to the front-end machine across the whole networking, but lets see what this produces to understand what `groupByKey` is doing.

We'll `map` a lambda function that simply "flattens" the items by converting the `ResultIterable` into a list.

In [6]:
visits.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

[('about.html', ['3.4.5.6']), ('index.html', ['1.2.3.4', '1.3.3.1'])]

When we `map` across a grouped KV the mapped function takes an argument which is the pair "(key, value") -- that's why we're referring to `x[0]` for the value and `x[1]` for the value in the sample `map`.

It's more typicaly that you want to just `map` across the values and there is a corresponding `mapValues` function that does precisely this. In the example below, we are mapping `list` across the values in the groups. The results should be the same as the 

In [7]:
visits.groupByKey().mapValues(list).collect()

[('about.html', ['3.4.5.6']), ('index.html', ['1.2.3.4', '1.3.3.1'])]

`keyBy` is a function to efficiently create a key-value pair from an RDD. The elements of the original RDD are the "values" and a function provides the associated key.

For example, assume we want $K = V^2$ for values $V$ in an RDD:

In [8]:
r = sc.parallelize( [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3] )
r.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3]

In [9]:
rsq = r.keyBy(lambda x : x*x)
rsq.collect()

[(0, 0),
 (1, 1),
 (4, 2),
 (9, 3),
 (16, 4),
 (25, 5),
 (36, 6),
 (49, 7),
 (64, 8),
 (81, 9),
 (0, 0),
 (1, 1),
 (4, 2),
 (9, 3)]

For example, we might want to create keys for the **passwd** data using the shell name

In [10]:
passwd = sc.textFile("/etc/passwd")

The password is the 7th field (6th index)  in the `/etc/passwd` file:

In [11]:
passwd.take(1)[0].split(':')

['root', 'x', '0', '0', 'root', '/root', '/bin/bash']

Thus, if we want the `key` to be the 6th element, we provide a function that extracts that from each entry:

In [12]:
byShell = passwd.keyBy( lambda x : x.split(':')[6] )
byShell.take(3)

[('/bin/bash', 'root:x:0:0:root:/root:/bin/bash'),
 ('/usr/sbin/nologin', 'daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin'),
 ('/usr/sbin/nologin', 'bin:x:2:2:bin:/bin:/usr/sbin/nologin')]

This is more or less equivilent to a `map` that returns pairs of values. For example:

In [13]:
passwd.map( lambda x : (x.split(':')[6], x) ).take(3)

[('/bin/bash', 'root:x:0:0:root:/root:/bin/bash'),
 ('/usr/sbin/nologin', 'daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin'),
 ('/usr/sbin/nologin', 'bin:x:2:2:bin:/bin:/usr/sbin/nologin')]

### Using Key-Value Pairs

We can return a dictionary containing the number of logins using each shell using the `countByKey` function:

In [14]:
byShell.countByKey()

defaultdict(int, {'/bin/bash': 2, '/usr/sbin/nologin': 21, '/bin/sync': 1})

And we can extract keys and values in parallel across the cluster:

In [15]:
shellKeys = byShell.keys()
shellKeys.take(3)

['/bin/bash', '/usr/sbin/nologin', '/usr/sbin/nologin']

In [16]:
byShell.values().take(3)

['root:x:0:0:root:/root:/bin/bash',
 'daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin',
 'bin:x:2:2:bin:/bin:/usr/sbin/nologin']

## Grouping and Joins

Once you have (k,v) pairs, you can group the key -- the values are iterable ( distributed lists) of the values for that key. Earlier, we shows that you can `map` a function over the `ResultIterable` -- it's unlikely you want want to return them to a native `list` because that will pull all the values back to the front end machine.

In [17]:
byShell.groupByKey().take(3)

[('/bin/bash', <pyspark.resultiterable.ResultIterable at 0x7f1939a63b20>),
 ('/usr/sbin/nologin',
  <pyspark.resultiterable.ResultIterable at 0x7f1939a635b0>),
 ('/bin/sync', <pyspark.resultiterable.ResultIterable at 0x7f1939a638e0>)]

We can also group KV pairs by other attributes using the groupBy() method. For example, here we're going to group the password data by the user name (the 0'th field of the values)

In [18]:
byShell.take(3)

[('/bin/bash', 'root:x:0:0:root:/root:/bin/bash'),
 ('/usr/sbin/nologin', 'daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin'),
 ('/usr/sbin/nologin', 'bin:x:2:2:bin:/bin:/usr/sbin/nologin')]

In [19]:
def getLogin(x):
    return x.split(':')[0]

In [20]:
byShell.groupBy (lambda x : getLogin(x[1]) ).take(3)

[('root', <pyspark.resultiterable.ResultIterable at 0x7f1939a63d90>),
 ('daemon', <pyspark.resultiterable.ResultIterable at 0x7f1939a63610>),
 ('sync', <pyspark.resultiterable.ResultIterable at 0x7f1939a63f10>)]

Once items are grouped, you can iterate over the values associated with a key.

Let's group the `/etc/passwd` entries by the shell.

In [21]:
grpdShell = byShell.groupByKey()
grpdShell.take(3)

[('/bin/bash', <pyspark.resultiterable.ResultIterable at 0x7f1939a7c310>),
 ('/usr/sbin/nologin',
  <pyspark.resultiterable.ResultIterable at 0x7f1939a7c370>),
 ('/bin/sync', <pyspark.resultiterable.ResultIterable at 0x7f1939a7c340>)]

Now the key are differ Unix shells (*e.g.* `/bin/bash`) and the values are `ResultIterable`s of the values having that key in the original KV list.

Again, we typically wouldn't want to pull in all the value using a `list` because this will bring everything to the front-end machine in the cluster. But, let's convert the `ResultIterable` to a list just to see the structure:

In [22]:
grpdShell.map(lambda x: (x[0], list(x[1])) ).take(3)

[('/bin/bash',
  ['root:x:0:0:root:/root:/bin/bash',
   'jovyan:x:1000:100::/home/jovyan:/bin/bash']),
 ('/usr/sbin/nologin',
  ['daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin',
   'bin:x:2:2:bin:/bin:/usr/sbin/nologin',
   'sys:x:3:3:sys:/dev:/usr/sbin/nologin',
   'games:x:5:60:games:/usr/games:/usr/sbin/nologin',
   'man:x:6:12:man:/var/cache/man:/usr/sbin/nologin',
   'lp:x:7:7:lp:/var/spool/lpd:/usr/sbin/nologin',
   'mail:x:8:8:mail:/var/mail:/usr/sbin/nologin',
   'news:x:9:9:news:/var/spool/news:/usr/sbin/nologin',
   'uucp:x:10:10:uucp:/var/spool/uucp:/usr/sbin/nologin',
   'proxy:x:13:13:proxy:/bin:/usr/sbin/nologin',
   'www-data:x:33:33:www-data:/var/www:/usr/sbin/nologin',
   'backup:x:34:34:backup:/var/backups:/usr/sbin/nologin',
   'list:x:38:38:Mailing List Manager:/var/list:/usr/sbin/nologin',
   'irc:x:39:39:ircd:/var/run/ircd:/usr/sbin/nologin',
   'gnats:x:41:41:Gnats Bug-Reporting System (admin):/var/lib/gnats:/usr/sbin/nologin',
   'nobody:x:65534:65534:nobody:/

The better way to process this would be to would `map` a function over each KV pair in the grouped list. The key is `x[0]` and the `ResultIterable` is `x[1]`. We can then map a function over each of the  `ResultIterable` to extract some field, such as the login information.

In [23]:
grpdShell.mapValues(list).take(3)

[('/bin/bash',
  ['root:x:0:0:root:/root:/bin/bash',
   'jovyan:x:1000:100::/home/jovyan:/bin/bash']),
 ('/usr/sbin/nologin',
  ['daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin',
   'bin:x:2:2:bin:/bin:/usr/sbin/nologin',
   'sys:x:3:3:sys:/dev:/usr/sbin/nologin',
   'games:x:5:60:games:/usr/games:/usr/sbin/nologin',
   'man:x:6:12:man:/var/cache/man:/usr/sbin/nologin',
   'lp:x:7:7:lp:/var/spool/lpd:/usr/sbin/nologin',
   'mail:x:8:8:mail:/var/mail:/usr/sbin/nologin',
   'news:x:9:9:news:/var/spool/news:/usr/sbin/nologin',
   'uucp:x:10:10:uucp:/var/spool/uucp:/usr/sbin/nologin',
   'proxy:x:13:13:proxy:/bin:/usr/sbin/nologin',
   'www-data:x:33:33:www-data:/var/www:/usr/sbin/nologin',
   'backup:x:34:34:backup:/var/backups:/usr/sbin/nologin',
   'list:x:38:38:Mailing List Manager:/var/list:/usr/sbin/nologin',
   'irc:x:39:39:ircd:/var/run/ircd:/usr/sbin/nologin',
   'gnats:x:41:41:Gnats Bug-Reporting System (admin):/var/lib/gnats:/usr/sbin/nologin',
   'nobody:x:65534:65534:nobody:/

In [24]:
shellAndLogins = grpdShell.map( 
    lambda x: (x[0], ",".join( [ getLogin(y) for y in x[1] ]) ))

In [25]:
shellAndLogins.take(3)

[('/bin/bash', 'root,jovyan'),
 ('/usr/sbin/nologin',
  'daemon,bin,sys,games,man,lp,mail,news,uucp,proxy,www-data,backup,list,irc,gnats,nobody,_apt,systemd-timesync,systemd-network,systemd-resolve,messagebus'),
 ('/bin/sync', 'sync')]

Rather than first group the keys and then combine the values into the string above, we can use **foldByKey** to do more or less the same thing -- this combines the mapping phase implicit in the list comprehension above.

In [26]:
byShell.foldByKey( "", lambda x,y: x + getLogin(y) + ',' ).collect()

[('/bin/bash', 'root,jovyan,,'),
 ('/usr/sbin/nologin',
  'daemon,bin,sys,games,man,lp,mail,news,uucp,proxy,www-data,backup,list,irc,gnats,nobody,_apt,systemd-timesync,systemd-network,systemd-resolve,messagebus,,'),
 ('/bin/sync', 'sync,')]

Internally, this is done using a "combiner(createCombiner, mergeValue, mergeCombiners)", which turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C.  Note that V and C can be different -- for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).

This example takes the byShell (K,V) list and constructs a new V there is the name of the users of that shell. Entries within the same RDD partition are joined by a comma and between partitions by "AND".

In [27]:
byShell.combineByKey( getLogin,
                        lambda xs, x: xs + ',' + getLogin(x),
                        lambda xs, ys: xs + ' AND ' + ys ).collect()

[('/bin/bash', 'root AND jovyan'),
 ('/usr/sbin/nologin',
  'daemon,bin,sys,games,man,lp,mail,news,uucp,proxy,www-data,backup,list AND irc,gnats,nobody,_apt,systemd-timesync,systemd-network,systemd-resolve,messagebus'),
 ('/bin/sync', 'sync')]

**combineByKey** is used to develop "reduceByKey" functions that can e.g. sum up the items associated with a key. This is effectively doing a **groupByKey** followed by a **reduce** on each list of values

In [28]:
c = sc.parallelize([ (1,2), (2,3), (1, 99), (3, 44), (2, 1), (4,5), (3, 19) ] )

The `groupByKey` operator gives us `iterable` items.

In [29]:
c.groupByKey().collect()

[(2, <pyspark.resultiterable.ResultIterable at 0x7f1939a8bf10>),
 (4, <pyspark.resultiterable.ResultIterable at 0x7f1939b0cd00>),
 (1, <pyspark.resultiterable.ResultIterable at 0x7f1939b0cc40>),
 (3, <pyspark.resultiterable.ResultIterable at 0x7f1939b0c910>)]

We can reify (make concrete) the iterable item by convert it to a list:

In [30]:
c.groupByKey().map(lambda x : (x[0], list(x[1]))).collect()

[(2, [3, 1]), (4, [5]), (1, [2, 99]), (3, [44, 19])]

The `reduceByKey` groups identical keys and then applys a function over the iterable results.

In [31]:
c.reduceByKey( operator.add ).collect()

[(2, 4), (4, 5), (1, 101), (3, 63)]

## CoGroup - the basis for joins

As in Pig, joins are done performing "co-groups" where multiple data sets are grouped by the same key

In [32]:
s1 = c
s1.collect()

[(1, 2), (2, 3), (1, 99), (3, 44), (2, 1), (4, 5), (3, 19)]

In [33]:
s2 = sc.parallelize( [ (2, -99), ( 4, 199), (19, 23) ] )
s2.collect()

[(2, -99), (4, 199), (19, 23)]

In [34]:
co = s1.cogroup(s2)
co.collect()

[(4,
  (<pyspark.resultiterable.ResultIterable at 0x7f1939ac6520>,
   <pyspark.resultiterable.ResultIterable at 0x7f1939a7c4c0>)),
 (1,
  (<pyspark.resultiterable.ResultIterable at 0x7f1939a7cee0>,
   <pyspark.resultiterable.ResultIterable at 0x7f1939a7cf70>)),
 (2,
  (<pyspark.resultiterable.ResultIterable at 0x7f1939a7cfa0>,
   <pyspark.resultiterable.ResultIterable at 0x7f1939a7ca00>)),
 (3,
  (<pyspark.resultiterable.ResultIterable at 0x7f1939a7c8b0>,
   <pyspark.resultiterable.ResultIterable at 0x7f1939a7c8e0>)),
 (19,
  (<pyspark.resultiterable.ResultIterable at 0x7f1939a7cb80>,
   <pyspark.resultiterable.ResultIterable at 0x7f1939a7cdc0>))]

Let's use our "convert it to a list trick" to see what's in each cogroup

In [35]:
co.map(lambda x: (x[0], list(x[1][0]), list(x[1][1])) ).collect()

[(4, [5], [199]),
 (1, [2, 99], []),
 (2, [3, 1], [-99]),
 (3, [44, 19], []),
 (19, [], [23])]

This is used to build different kinds of joins

In [36]:
s1.join(s2).collect()

[(4, (5, 199)), (2, (3, -99)), (2, (1, -99))]

In [37]:
s1.leftOuterJoin(s2).collect()

[(4, (5, 199)),
 (1, (2, None)),
 (1, (99, None)),
 (2, (3, -99)),
 (2, (1, -99)),
 (3, (44, None)),
 (3, (19, None))]

In [38]:
s1.rightOuterJoin(s2).collect()

[(4, (5, 199)), (2, (3, -99)), (2, (1, -99)), (19, (None, 23))]

In [39]:
x = sc.parallelize( [ ("NY", 10), ("OH", 20), ("OH", 99), ("CO", 88) ] )
y = sc.parallelize( [ ("NY", 30), ("CO", 40), ("NY", 22 )] )

In [40]:
x.join(y).collect()

[('NY', (10, 30)), ('NY', (10, 22)), ('CO', (88, 40))]

In [41]:
x.join(y).flatMap(lambda kv: ((kv[0],x) for x in kv[1])).collect()

[('NY', 10), ('NY', 30), ('NY', 10), ('NY', 22), ('CO', 88), ('CO', 40)]

In [42]:
x.cogroup(y).collect()

[('OH',
  (<pyspark.resultiterable.ResultIterable at 0x7f1939a23970>,
   <pyspark.resultiterable.ResultIterable at 0x7f1939a23d00>)),
 ('NY',
  (<pyspark.resultiterable.ResultIterable at 0x7f1939a23f70>,
   <pyspark.resultiterable.ResultIterable at 0x7f1939a23670>)),
 ('CO',
  (<pyspark.resultiterable.ResultIterable at 0x7f1939a23790>,
   <pyspark.resultiterable.ResultIterable at 0x7f1939a23550>))]

In [43]:
x.cogroup(y).flatMap(lambda kv: ((kv[0],x,y) for x in kv[1][0] for y in kv[1][1])).collect()

[('NY', 10, 30), ('NY', 10, 22), ('CO', 88, 40)]