# Exercise 3: Spark accumulators and visualization tools.

### Part 1: Spark SQL - Joins

During the exercises, the following resources might come in handy:
* The [PySpark API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)
* The [Python documentation](https://docs.python.org/3.7/)
* The [Spark SQL API documentation](http://spark.apache.org/docs/latest/sql-programming-guide.html)

To run code in Jupyter, press: 
* `Ctrl-Enter` to run the code in the currently selected cell
* `Shift-Enter` to run the code in the currently selected cell and jump to the next cell

If you want to execute these lines in a python script, you will need to create first a spark context:

In [None]:
# import os
# os.environ["SPARK_OPTS"] = "--driver-java-options=-Xms1024M --driver-java-options=-Xmx1536M --driver-java-options=-Dlog4j.logLevel=info"

# from pyspark import SparkContext, StorageLevel
# from pyspark.sql import SQLContext

# sc = SparkContext(master="local[*]")
# sqlContext = SQLContext(sc)


But since we are using the notebooks, those lines are not needed here.

The following code creates the `displayRows()` function again, like we saw in Lab 2.1

In [None]:
from IPython.display import display, HTML
import warnings

def displayRows(rowDf):
    headers = []
    rows = []
    if(str(type(rowDf)) == "<class 'pyspark.sql.dataframe.DataFrame'>"):
        rows = rowDf.limit(10000).collect() #Let's limit the output just in case!
        if(len(rows) == 10000):
            if(rowDf.limit(10001).count() == 10001):
                warnings.warn("More than 10 000 rows was returned, only showing the first 10 000.")
                
        headers = list(rowDf.columns)
    else:
        rows = rowDf
        if(len(rows) > 10000):
            warnings.warn("Rows has {0} elements, only showing the first 10 000.".format(len(rows)))
            rows = rows[0:10000]
            
        #Computes the unique set of keys
        headers = list(sorted(reduce(lambda x,y: x.union(set(y.asDict().iterkeys())), rows, set())))
            
    tableHead = ["<th>{0}</th>".format(key) for key in headers]
    tableBody = ["<tr>{0}</tr>".format(
                    "".join(["<td>{0}</td>".format(rowDict.get(header)) 
                            for rowDict 
                            in (row.asDict(),) 
                            for header 
                            in headers])
                    ) for row in rows]
    
    display(HTML(
    u"""<table>
    <thead><tr>{0}</tr></thead>
    <tbody>{1}</tbody>
    </table>
    """.format("".join(tableHead), "".join(tableBody))))

## Part 1: Spark SQL - Joins

This part will introduce Spark SQL.

The cell below generates the data which you will write queries for.

In [None]:
#Top 20 boy and girl names 2014 in random order.
names = ["Caden", "Kaylee", "Lucas", "Ethan", "Alexander", "Jackson", 
         "Aiden", "Madelyn", "Michael", "Avery", "Luke", "Isabella", 
         "Chloe", "Elijah", "Abigail", "Madison", "Jacob", "Zoe", "Emily", 
         "Jayden", "Liam", "Mason", "Mia", "Sophia", "Benjamin", "Layla", 
         "Emma", "Lily", "Charlotte", "Caleb", "James", "Noah", "Ella", 
         "Jack", "Jayce", "Aubrey", "Olivia", "Harper", "Logan", "Ava"]

#A-G in phonetic alphabet
groups = ["Alpha","Bravo", "Charlie", "Delta", "Echo", "Foxtrot", "Golf"]

#Some numeric magic to generate not so uniform random data.
tblUserRdd = sc.parallelize(map(lambda i: (i, ((i*104729)^131) % 7, 26500 + ((i*104729)^96587) % 6367), range(1,51)))
tblNamesRdd = sc.parallelize(enumerate(names, 1), 4)
tblGroupNamesRdd = sc.parallelize(enumerate(groups), 2)

#Create dataframes from the RDDs
tblNames      = sqlContext.createDataFrame(tblNamesRdd,      ["userId", "name"])
tblUsers      = sqlContext.createDataFrame(tblUserRdd,       ["id", "groupId", "salary"])
tblGroupNames = sqlContext.createDataFrame(tblGroupNamesRdd, ["id", "name"])

#Register them for use.
sqlContext.registerDataFrameAsTable(tblGroupNames, "tblGroupNames")
sqlContext.registerDataFrameAsTable(tblUsers, "tblUsers")
sqlContext.registerDataFrameAsTable(tblNames, "tblNames")

The next 3 cells will display the content of the dataframe by using the helper function `displayRows()`

In [None]:
displayRows(tblUsers)

In [None]:
displayRows(tblNames)

In [None]:
displayRows(tblGroupNames)

### 1.a) Merging user and group names of the users

You will be using 2 inner joins to join `tblNames` and `tblGroupNames`

Some names will be lost due to the inner join, but we will find them later.

The result should have the following columns:

1. **id**: User id
2. **name**: The user name
3. **groupName**: The group name

Sort by **name**

In [None]:
q1a = sqlContext.sql("""
    SELECT 
        tblUsers.id, 
        tblNames.name AS name,
        tblGroupNames.name AS groupName
    FROM tblUsers 
    INNER JOIN tblNames ON tblUsers.id=tblNames.userId
    INNER JOIN tblGroupNames ON tblUsers.groupId=tblGroupNames.id
    order by name
""")

displayRows(q1a)

In [None]:
q1aResult = q1a.collect()
assert len(q1aResult) == 40
assert all(map(lambda i: q1aResult[i].name <= q1aResult[i+1].name, range(0,39)))

### 1.b) Find only the users that do not have any name

By using either a left outer join or a left join between `tblUsers` and `tblNames`, we can find the users which do not have any name by checking if the name ```IS NULL```.

Without this check we would get all users and ```NULL``` in the name column for those users which have no name.

In [None]:
q1b = sqlContext.sql("""
    SELECT 
        tblUsers.id, 
        tblGroupNames.name AS groupName,
        tblNames.name
    FROM tblUsers 
    LEFT JOIN tblNames ON tblUsers.id=tblNames.userId
    INNER JOIN tblGroupNames ON tblUsers.groupId = tblGroupNames.id
    where tblNames.name is Null
""")

displayRows(q1b)

In [None]:
q1bresult = q1b.collect()
assert len(q1bresult) == 10
assert set(map(lambda row: row.id, q1bresult)) == set(range(41,51))

### 1.c) Counting name initials

It is often useful to perform queries on a result, or perform operations on columns on data which forces you to split up the query into multiple parts.

In this exercise you will use a powerful mechanism that allows you to query the result of a subquery by treating the result as a dataframe. We have provided the subquery which uses a string function SUBSTR that extracts the first character of a string.

The result should have the following columns:

1. Initial: The extracted Initial
2. Counts: The number of names which starts with *Initial*

Order by the computed result `Counts` descending, and then by initial; return the top 10 results.

**Hint:** Use `COUNT`, `GROUP BY`, `ORDER BY`, `LIMIT`

In [None]:
q1c = sqlContext.sql("""
SELECT 
    Initial, 
    count(tblInitials.Initial) as Counts
FROM
(
    SELECT SUBSTR(name,0,1) AS Initial FROM tblNames
) AS tblInitials
group by Initial
order by Counts desc, Initial
limit 10
""")
                     
displayRows(q1c)

In [None]:
q1cresult = q1c.collect()
assert len(q1cresult) == 10
assert q1cresult[0].Initial == "A" and q1cresult[1].Initial == "J" and q1cresult[2].Initial == "L"