PySpark Tutorial
================

_Djoerd Hiemstra, University of Twente_

In this tutorial (made for the SIKS/CBS DataCamp, 6 December 2016) we will go over some basic Spark Scala examples following the paper by ([Zahari et al. 2012][1]).

[1]: https://amplab.cs.berkeley.edu/wp-content/uploads/2012/01/nsdi_spark.pdf "Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley, Michael Franklin, Scott Shenker, Ion Stoica. Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing. Proceedings of the USENIX Symposium of Networked Systems Design and Implementation, 2012."


## 1. Spark using RDDs

Suppose we would like to analyse a huge log file, for instance a search engine's query log. The following line reads the contents of the file `log.txt` into a RDD (Resilient Distributed Dataset).

In [5]:
# necessary prolog
from pyspark import SparkContext
sc = SparkContext()
lines = sc.textFile("log.txt")


In [4]:
lines = sc.textFile("log.txt")

> NB We called the variable `spark` in the paper by Zahari et al. (2012) `sc` in our case (for 'Spark context').
> 
> Note that the paper uses Scala examples, not Python.

An RDD is a read-only collection of records that may be partitioned over many machines in your cluster. RDDs can only be created from other RDDs through a limited number of operations called _transformations_ (they may also be read from the distributed file system). Table 2 of Zahari et al. (2012) contains 13 transformations of RDDs. Why these transformations? Together these transformations support many algorithms, and, the transformations can be executed efficiently in parallel on clusters of machines. 

As an example of a transformation, `map()` takes as input a function, and applies this function to each record in the RDD. The functions that may be provided to map may not have any _side effects_, that is, they may input a record of the RDD and output a transformed record, but they cannot read or write files, nor can they have an internal state that is updated. If functions without side effects are used, then each machine in a large cluster can perform the function on part of the data without needing to know anything about results from the other machines on other parts of the data. As an example, take the following function that takes a line and outputs the line with all letters put to lower case:

    lambda line: line.lower()
    
This is called an _lambda_ function in Python. Lambda functions are anonymous functions because they have no name. The paper uses Scala syntax for anonymous functions, i.e. `line => line.toLowerCase()`.

In [8]:
lowerLines = lines.map(lambda line: line.lower())
lowerLines.collect()

['info\t2016-12-05 18:00:00\tnl.utwente.santa\theerlijk avondje is gekomen',
 'info\t2016-12-05 18:00:01\tnl.utwente.santa\tavondje van santa',
 'info\t2016-12-05 18:00:02\tnl.utwente.santa\tvol verwachting klopt ons hart',
 'info\t2016-12-05 18:00:04\tnl.utwente.santa\tvol verwachting klopt ons hart',
 'info\t2016-12-05 18:00:07\tnl.utwente.santa\tsinging approved by saint nicolas',
 'error\t2016-12-05 18:00:08\tnl.utwente.santa\terror fire place on aborting chimney descend',
 'info\t2016-12-05 18:01:00\tnl.utwente.santa\theerlijk avondje is gekomen',
 'info\t2016-12-05 18:01:01\tnl.utwente.santa\tavondje van santa',
 'info\t2016-12-05 18:01:02\tnl.utwente.santa\tvol verwachting klopt ons hart',
 'info\t2016-12-05 18:01:04\tnl.utwente.santa\tvol verwachting klopt ons hart',
 'info\t2016-12-05 18:01:05\tnl.utwente.santa\tsinging approved by saint nicolas',
 'error\t2016-12-05 18:01:06\tnl.utwente.santa\terror fire place on aborting chimney descend',
 'error\t2016-12-05 18:01:06\tnl.utw

Anonymous functions are a consice way to use a function once (maybe they should have been called _disposable functions_). The same line with a named function would be:


    def toLower(line):
        return line.lower()
    
    lowerLines2 = lines.map(toLower)

Spark runs all your operations on RDDs in parallel. If you want to do something in linear order in plain old Python, for instance outputting the contents of the RDD, then the function `collect()` turns your RDD into an ordinary Python array.

> NB Beware, the Jupyter notebook might only show part of your result.

> The `\t` strings in the output denote tabs in the original file.

In [9]:
lines.collect()

['INFO\t2016-12-05 18:00:00\tnl.utwente.santa\tHeerlijk avondje is gekomen',
 'INFO\t2016-12-05 18:00:01\tnl.utwente.santa\tAvondje van santa',
 'INFO\t2016-12-05 18:00:02\tnl.utwente.santa\tVol verwachting klopt ons hart',
 'INFO\t2016-12-05 18:00:04\tnl.utwente.santa\tVol verwachting klopt ons hart',
 'INFO\t2016-12-05 18:00:07\tnl.utwente.santa\tSinging approved by Saint Nicolas',
 'ERROR\t2016-12-05 18:00:08\tnl.utwente.santa\tError Fire place on Aborting chimney descend',
 'INFO\t2016-12-05 18:01:00\tnl.utwente.santa\tHeerlijk avondje is gekomen',
 'INFO\t2016-12-05 18:01:01\tnl.utwente.santa\tAvondje van santa',
 'INFO\t2016-12-05 18:01:02\tnl.utwente.santa\tVol verwachting klopt ons hart',
 'INFO\t2016-12-05 18:01:04\tnl.utwente.santa\tVol verwachting klopt ons hart',
 'INFO\t2016-12-05 18:01:05\tnl.utwente.santa\tSinging approved by Saint Nicolas',
 'ERROR\t2016-12-05 18:01:06\tnl.utwente.santa\tError Fire place on Aborting chimney descend',
 'ERROR\t2016-12-05 18:01:06\tnl.utw

### Exercise 1.1, count the number of errors in the log

Follow the examples from Zaharia et al. (2012), and print the number of lines that start with "ERROR". Your solution should print: 4.


> Comments in Python are preceded by a hash `'#'`

> To divide a long statement over multiple lines in Python, end a line with a backslah `'\'`.

In [10]:
# INSERT ANSWER HERE
# END ANSWER
errors = lines.filter(lambda w: w.startswith("ERROR"))
errors.count()

4

c### Exercise 1.2, count errors mentioning chimney

Follow the examples from Zaharia et al. (2012), and print the number of lines that start with "ERROR" and that contain "chimney". Your solution should print: 3. Tip: you might build on the result of the previous question.


In [21]:
# INSERT ANSWER HERE
# END ANSWER
results = errors.filter(lambda w: "chimney" in w)
results.collect()

['ERROR\t2016-12-05 18:00:08\tnl.utwente.santa\tError Fire place on Aborting chimney descend',
 'ERROR\t2016-12-05 18:01:06\tnl.utwente.santa\tError Fire place on Aborting chimney descend',
 'ERROR\t2016-12-05 18:01:06\tnl.utwente.santa\tError Retry chimney descend failed']

### Exercise 1.3, return the time fields of the errors mentioning 'chimney'

Follow the examples from Zaharia et al. (2012), and print the time fields of the lines that start with "ERROR" and that contain "chimney". Your solution should print: `2016-12-05 18:00:08, 2016-12-05 18:01:06,` and `2016-12-05 18:01:06`.


In [12]:
# INSERT ANSWER HERE
# END ANSWER
results.map(lambda w: (w.split('\t')[1])).collect()

['2016-12-05 18:00:08', '2016-12-05 18:01:06', '2016-12-05 18:01:06']

### Exercise 1.4, the seminal MapReduce word count

In 2004, Google employees Jeff Dean and Sanjay Ghemawat proposed a framework for distributed data processing that supports only 2 transformations: `map()` and `reduce()`. They called their framework appropriately MapReduce ([Dean and Ghemawat 2004][1]). 

The seminal example they introduce in their paper is _word count_: Input a large text corpus, and output all words with for each word its count, i.e. the total number of times it occurs in the text corpus. A naive implementation might update a global data structure for each word that it encounters, adding 1 for the particular word. However, remember that we need functions *without* side effects to be able to distribute computations over many machines (so no updating a data structure!). Dean and Ghemawat therefore propose a solution that splits words and outputs pairs (_word_, 1) in the "map phase"; and then adds the 1's in the "reduce phase" (after the framework groups all data with the same together). Spark can process every MapReduce algorithm (and many more complex algorithms) using its transformations. Study the word count solution by Dean and Ghemawat, and come up with the equivalent Spark solution. See also the remarks by Zahari et al. (2012), for instance in Section 7.1. 

Execute _word count_ on the message fields of the lines that start with "ERROR". Your solution should find that the three most occurring words are "error", "descend" and "chimney", which occur respectively 5, 3 and 3 times.

> Tip: build your solution one transformation at a time: Start from the lines that start with "ERROR", then take the message field, then split the (lower-cased) fields on space " " to get the words, then transform each word to (word, 1), etc. Test your solution after adding each transformation.

[1]: http://research.google.com/archive/mapreduce-osdi04.pdf "Jeffrey Dean and Sanjay Ghemawat. MapReduce: Simplified Data Processing on Large Clusters, In Proceedings of the 6th Symposium on Operating System Design and Implementation (OSDI), 2004"




In [78]:
# INSERT ANSWER HERE
# END ANSWER
content = errors.map(lambda w: (w.split('\t')[3]))#.collect()
#content.collect()
#content.map(lambda x: (x.split(),1)).collect()

counts = content.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
sortedcount = counts.sortBy(lambda x: x[1], False).collect()
sortedcount[0:3]

[('Error', 4), ('descend', 3), ('chimney', 3)]

### Further exercises

Zahari et al. (2012) cover two more elegant examples of iterative algorithms that can be efficiently processed by Spark: _logistic regression_, and Google's _PageRank_. Similar solutions exist using MapReduce, but they are less efficient, because MapReduce writes all intermediate data to disk for each iteration of the algorithm, whereas Spark tries to keep data in memory, if possible.

## 2.  Spark using DataFrames

In the following examples we use Spark with Dataframes. In Spark, a DataFrame is a distributed collection of data organized into _named_ columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with the optimizations provided by RDDs under the hood. We base this part of the tutorial on the SIGMOD 2015 paper by ([Armbrust et al. 2015][2])

Let's read some example data in a DataFrame using the standard environment variable `sqlContext`. We read data from a json file, because it is _self-describing_, that is, it contains the schema information needed for DataFrames (except for the table name).

[2]: https://amplab.cs.berkeley.edu/wp-content/uploads/2015/03/SparkSQLSigmod2015.pdf "Michael Armbrust, Reynold S. Xin, Cheng Lian, Yin Huai, Davies Liu, Joseph K. Bradley, Xiangrui Meng, Tomer Kaftan, Michael J. Franklin, Ali Ghodsi, Matei Zaharia. In. Proceedings of the International Conference on Management of Data (SIGMOD), 2015"

In [32]:
# neceesary prolog for dataframes
from pyspark import sql
sqlContext = sql.SQLContext(sc)

In [35]:
people = sqlContext.read.json("people.json") 

people.printSchema()
people.show()

root
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- organization: string (nullable = true)

+------+---------+------------+
|gender|     name|organization|
+------+---------+------------+
|  male|    Arjen|          RU|
|  male|   Djoerd|          UT|
|  male|    Robin|          UT|
|  male|     Yuri|          UT|
|female|    Doina|          UT|
|female|     Anna|          UT|
|  male|     Piet|         CBS|
|  male|  Barteld|         CBS|
|female|Jacobiene|         CBS|
|  male|    Marco|         CBS|
|female|  Claudia|         TUD|
+------+---------+------------+



DataFrames support special functions like `printSchema()` and `show()` and the common [relational algebra operations](https://en.wikipedia.org/wiki/Relational_algebra): projection, called `select()`; selection, called `where()`; and join, called `join()`, as well as aggregations: (`groupBy()` and `agg()`).

> Yes, you are right: Someone really messed up naming the relational algebra operations! The naming of relational algebra operations differs in some unfortunate ways from the naming used in SQL statements. The Spark algebra operations use the SQL conventions, to make the confusion complete. 

So, like RDDs, DataFrames support a limited number of transformations (but now based on relational algebra), and that's what we will have to work with. However, we might also work directly in SQL (see Exercise 2.4 below).

### Exercise 2.1, select all names

Show the list of names of all people (the projection of the column 'name').


In [46]:
# INSERT ANSWER HERE
people.select(people.name).show()

+---------+
|     name|
+---------+
|    Arjen|
|   Djoerd|
|    Robin|
|     Yuri|
|    Doina|
|     Anna|
|     Piet|
|  Barteld|
|Jacobiene|
|    Marco|
|  Claudia|
+---------+



### Exercise 2.2, select all names of people from CBS

Show the list of names of people that work at CBS (Can we first do the projection of the column 'name' and then the selection of the row for which `organization="CBS"`?).


In [56]:
# INSERT ANSWER HERE
# END ANSWER
people.show()
people.where(people.organization=="CBS").select(people.name).show()

+------+---------+------------+
|gender|     name|organization|
+------+---------+------------+
|  male|    Arjen|          RU|
|  male|   Djoerd|          UT|
|  male|    Robin|          UT|
|  male|     Yuri|          UT|
|female|    Doina|          UT|
|female|     Anna|          UT|
|  male|     Piet|         CBS|
|  male|  Barteld|         CBS|
|female|Jacobiene|         CBS|
|  male|    Marco|         CBS|
|female|  Claudia|         TUD|
+------+---------+------------+

+---------+
|     name|
+---------+
|     Piet|
|  Barteld|
|Jacobiene|
|    Marco|
+---------+



----

Let's introduce the following organization DataFrame. Interestingly, this DataFrame contains two structured columns, a feature known from object-relational databases.

In [57]:
organizations = sqlContext.read.json("organizations.json") 
organizations.printSchema()
organizations.show()

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- number: long (nullable = true)
 |    |-- street: string (nullable = true)
 |-- attractions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- color: string (nullable = true)
 |-- organization: string (nullable = true)

+--------------------+--------------------+-----+------------+
|             address|         attractions|color|organization|
+--------------------+--------------------+-----+------------+
|[Enschede,5,drien...|[torentje, hadoop...|black|          UT|
|[Heerlen,11,CBS-weg]|[mijnmuseum, spar...| blue|         CBS|
|[Nijmegen,4,Comen...|       [doornroosje]|  red|          RU|
|   [Delft,5,Postbus]|      [nice cluster]| blue|         TUD|
+--------------------+--------------------+-----+------------+



### Exercise 2.3, people in blue organizations

Show the name and organization of people whose organization's color is blue by joining the two tables. 

> Question: Can we speed up the computation by changing the order of the statements?

> Bonus: Count for each organization the number of people, outputting ("organization", "count"). Note that the Armbrust et al. paper contains an error in Section 3.3: the operation `.agg(count("name"))` should be `.count()`.

In [64]:
# INSERT ANSWER HERE
# END ANSWER
org = organizations.where(organizations.color=="blue").select(organizations.organization)
people.where(people.organization in org).show()

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

----

If you are familiar with SQL, then the following approach would also find the people with blue organizations. Once we registered the DataFrames `people` and `organizations`, we can use `sqlContext.sql()` to execute any SQL query.

In [65]:
people.registerTempTable("people");
organizations.registerTempTable("organizations");
sqlContext.sql('''
SELECT P.name, O.organization
FROM people P, organizations O
WHERE P.organization = O.organization
AND O.color = 'blue'
''').show()

+---------+------------+
|     name|organization|
+---------+------------+
|     Piet|         CBS|
|  Barteld|         CBS|
|Jacobiene|         CBS|
|    Marco|         CBS|
|  Claudia|         TUD|
+---------+------------+



### Exercise 2.4, women from Enschede

In SQL, the complex types `address` and `attractions` are available as follows: For instance, `Organizations.address.city = 'Heerlen'` for organizations in Heerlen. 
Count the number of female employees in Enschede. Your answer should be: 2.


In [71]:
# INSERT ANSWER HERE
# END ANSWER
sqlContext.sql('''
SELECT P.name, O.organization, O.address.city
FROM people P, organizations O
WHERE O.address.city = 'Enschede'
AND P.organization = O.organization
AND P.gender = 'female'
''').show()

+-----+------------+--------+
| name|organization|    city|
+-----+------------+--------+
|Doina|          UT|Enschede|
| Anna|          UT|Enschede|
+-----+------------+--------+



### Further exercises

New to SQL? Then we recommend additional SQL exercises, for instance [Learn SQL from Codecademy](https://www.codecademy.com/learn/learn-sql).

## 3.  DataCamp sample data

Below you find a samples of the data that is available on the DataCamp Hadoop/Spark cluster. Use the sample data to develop and test your Spark scripts before executing them on the cluster.


### Dutch Tweets
We use the Twitter data described by ([Tjong-Kim-Sang and Van den Bosch 2013][3]) which is available on the Twente Hadoop cluster under: `/data/twitterNL`. 

[3]: http://ifarm.nl/erikt/papers/clin2013.pdf "Erik Tjong Kim Sang and Antal van den Bosch. Dealing with big data: The case of Twitter. In: Computational Linguistics in the Netherlands Journal 3, ISSN: 2211-4009, pages 121-134, 2013."

In [74]:
tweets = sqlContext.read.json("tweets.json.gz") 

# uncomment to print the (crazy) schema:
# tweets.printSchema()

tweets.select("text").show(10)

+--------------------+
|                text|
+--------------------+
|Pinnen over je wa...|
|@AllesofNooit @He...|
|#dooba #sexdate J...|
|En weer een uur v...|
|Frederik Meulewae...|
|#webcambabes patr...|
|Weer een kwartier...|
|Een hond als huis...|
|Het gebaar van de...|
|[NOS] Politie zet...|
+--------------------+
only showing top 10 rows



### AIS data

The [Automatic identification system](https://en.wikipedia.org/wiki/Automatic_identification_system) (AIS) is an automatic tracking system used on ships and by vessel traffic services (VTS) for identifying and locating vessels by electronically exchanging data with other nearby ships, AIS base stations, and satellites. The data is available on the Twente Hadoop cluster under: `/data/aisUT`.


In [75]:
ais = sqlContext.read.json("ais.json.gz") 
# ais.show(5)
ais.printSchema()

root
 |-- callsign: string (nullable = true)
 |-- cog: long (nullable = true)
 |-- destination: string (nullable = true)
 |-- dimbow: long (nullable = true)
 |-- dimport: long (nullable = true)
 |-- dimstarboard: long (nullable = true)
 |-- dimstern: long (nullable = true)
 |-- draught: long (nullable = true)
 |-- eta_day: long (nullable = true)
 |-- eta_hour: long (nullable = true)
 |-- eta_minute: long (nullable = true)
 |-- eta_month: long (nullable = true)
 |-- heading: long (nullable = true)
 |-- imo: long (nullable = true)
 |-- lat: double (nullable = true)
 |-- lat2: string (nullable = true)
 |-- lon: double (nullable = true)
 |-- lon2: string (nullable = true)
 |-- mmsi: long (nullable = true)
 |-- nav_status: long (nullable = true)
 |-- rot_angle: double (nullable = true)
 |-- rot_direction: string (nullable = true)
 |-- shipname: string (nullable = true)
 |-- shiptype: long (nullable = true)
 |-- sog: long (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- ts: long

### RDW data
The Rijksdienst Wegverkeer is the Dutch ministery the takes care of the public roads. The data contains measurements from sensors in the roads. This data is available as comma-separated value (CSV) files and is available on the Twente Hadoop cluster under: `/data/cbs/loopraw`.


In [76]:
rdw = sc.textFile("rdw.csv.gz")
rdw.map(lambda line: line.split(',')[:3]).collect()

[['RWS01_MONIBAS_0091hrl0763ra', '1', '11B'],
 ['RWS01_MONIBAS_0091hrl0763ra', '1', '12B'],
 ['RWS01_MONIBAS_0091hrl0785ra', '1', '1B'],
 ['RWS01_MONIBAS_0091hrl0785ra', '1', '2B'],
 ['RWS01_MONIBAS_0091hrl0785ra', '1', '3B'],
 ['RWS01_MONIBAS_0091hrl0785ra', '1', '4B'],
 ['RWS01_MONIBAS_0091hrl0785ra', '1', '9B'],
 ['RWS01_MONIBAS_0091hrl0785ra', '1', '10B'],
 ['RWS01_MONIBAS_0091hrl0785ra', '1', '11B'],
 ['RWS01_MONIBAS_0091hrl0785ra', '1', '12B'],
 ['RWS01_MONIBAS_0091hrl0794ra', '1', '1B'],
 ['RWS01_MONIBAS_0091hrl0794ra', '1', '2B'],
 ['RWS01_MONIBAS_0091hrl0794ra', '1', '3B'],
 ['RWS01_MONIBAS_0091hrl0794ra', '1', '4B'],
 ['RWS01_MONIBAS_0091hrl0794ra', '1', '9B'],
 ['RWS01_MONIBAS_0091hrl0794ra', '1', '10B'],
 ['RWS01_MONIBAS_0091hrl0794ra', '1', '11B'],
 ['RWS01_MONIBAS_0091hrl0794ra', '1', '12B'],
 ['RWS01_MONIBAS_0091hrr0059ra', '1', '1A'],
 ['RWS01_MONIBAS_0091hrr0059ra', '1', '3A'],
 ['RWS01_MONIBAS_0091hrr0059ra', '1', '5A'],
 ['RWS01_MONIBAS_0091hrr0064ra', '1', '1A'],
 [

### Volkskrant data
The Dutch netwspaker [Volkskrant](http://www.volkskrant.nl) has a large archive of its articles online. We downloaded the years 2000 - 2016 and they are available on the Twente Hadoop cluster under: `/data/volkskrant`.

In [77]:
volkskrant = sqlContext.read.json("volkskrant.json.gz") 

volkskrant.show(5)

volkskrant.printSchema()

+--------+---+--------------------+-----+--------------------+--------------------+---------+--------------------+----+
|category|day|                href|month|                text|                time|timeofday|               title|year|
+--------+---+--------------------+-----+--------------------+--------------------+---------+--------------------+----+
| Archief|  5|http://www.volksk...|    7|Een provocatieve ...|  5 juli 2011, 00:00|    00:00|Mensen uitlachen ...|2011|
|Politiek| 18|http://www.volksk...|    3|Een euforisch gej...|18 maart 2015, 22:39|    22:39|'De boodschap van...|2015|
|Economie| 21|http://www.volksk...|    6|De klant is konin...| 21 juni 2008, 02:47|    02:47|Klant geen koning...|2008|
|Magazine|  4|http://www.volksk...|    1|Het gaat van kwaa...|4 januari 2013, 1...|    12:04|Indiase politicus...|2013|
| Archief| 29|http://www.volksk...|    3|Chris Klomp vindt...|29 maart 2013, 00:00|    00:00|Uitgedaagd op Vol...|2013|
+--------+---+--------------------+-----