# (Py)Spark Exercizes
@stravanni



## Spark vs Pyspark?

Spark is written in Scala. The 'native' API is in Scala.

Pyspark is a very lightweight wrapper around the native API. (You can see its implementation [here](https://github.com/apache/spark/tree/master/python/pyspark).)

---

![](http://i.imgur.com/YlI8AqEl.png)

[source](https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals)

---

### Introduction to the (py)Spark

#### Warm up by creating an RDD (Resilient Distributed Dataset) named `pagecounts` from the input files. In the Spark shell, the SparkContext is already created for you as variable `sc`.

In [11]:
from pyspark import SparkConf, SparkContext
PATH = ""

In [12]:
conf = SparkConf()
sc = SparkContext(conf=conf)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-5-490420a72675>:2 

### PageCounts dataset
log wikipedia pages - page count of 2 days

https://dumps.wikimedia.org/other/pagecounts-raw/

In [13]:
pagecounts = sc.textFile(PATH+"data/pagecounts.txt")
pagecounts

MapPartitionsRDD[5] at textFile at NativeMethodAccessorImpl.java:-2

#### Let's take a peek at the data. You can use the take operation of an RDD to get the first K records. Here, K = 10.

In [14]:
pagecounts.take(10)

['20090505-000000 aa Main_Page 2 9980',
 '20090505-000000 ab %D0%90%D0%B8%D0%BD%D1%82%D0%B5%D1%80%D0%BD%D0%B5%D1%82 1 465',
 '20090505-000000 ab %D0%98%D1%85%D0%B0%D0%B4%D0%BE%D1%83_%D0%B0%D0%B4%D0%B0%D2%9F%D1%8C%D0%B0 1 16086',
 '20090505-000000 af.b Tuisblad 1 36236',
 '20090505-000000 af.d Tuisblad 4 189738',
 '20090505-000000 af.q Tuisblad 2 56143',
 '20090505-000000 af Afrika 1 46833',
 '20090505-000000 af Afrikaans 2 53577',
 '20090505-000000 af Australi%C3%AB 1 132432',
 '20090505-000000 af Barack_Obama 1 23368']

#### Let's see how many records in total are in this data set

In [15]:
pagecounts.count()

117712

#### cache in memory
In this data set the second field is the "project code" and contains information about the language of the pages.

For example, the project code "en" indicates an English page.

Let's derive an RDD containing only English pages from `pagecounts`.
- This can be done by applying a filter function to `pagecounts`.
- For each record, we can split it by the field delimiter (i.e. a space) and get the second field-– and then compare it with the string "en".

To avoid reading from disks each time we perform any operations on the RDD, we also __cache the RDD into memory__.
**This is where Spark really starts to to shine**.

In [16]:
enPages = pagecounts.filter(lambda x: x.split(" ")[1] == "en").cache()

When you type this command into the Spark shell, Spark defines the RDD, but because of lazy evaluation, no computation is done yet.

Next time any action is invoked on `enPages`, Spark will cache the data set in memory across the workers in your cluster.

#### How many records are there for English pages?

In [17]:
enPages.count()

82683

The first time this command is run, similar to the last count we did, it will take a while, because Spark scans through the entire data set on disk.

**But since enPages was marked as "cached" in the previous step, if you run count on the same RDD again, it should return an order of magnitude faster**.

#### Let's try something fancier.
Generate a histogram of total page views on Wikipedia English pages for the date range represented in our dataset (May 5 to May 7, 2009).

The high level idea of what we'll be doing is as follows:
- First, we generate a key value pair for each line;
- the key is the date (the first eight characters of the first field), and the value is the number of pageviews for that date (the fourth field).

In [18]:
enTuples = enPages.map(lambda x: x.split(" "))
enKeyValuePairs = enTuples.map(lambda x: (x[0][:8], int(x[3])))

- Next, we shuffle the data and group all values of the same key together.
- Finally we sum up the values for each key.
There is a convenient method called `reduceByKey` in Spark for exactly this pattern.

**Note** that the second argument to `reduceByKey` determines the number of reducers to use.

By default, Spark assumes that the reduce function is commutative and associative and applies combiners on the mapper side. (see foldLeft)

Since we know there is a very limited number of keys in this case (because there are only 3 unique dates in our data set), let's use only one reducer.

In [19]:
enKeyValuePairs.reduceByKey(lambda x, y: x + y, 1).collect()

[('20090505', 1557798)]

The `collect` method at the end converts the result from an RDD to an array.

We can combine the previous three commands into one:

In [23]:
enPages.map(lambda x: x.split(" "))\
        .map(lambda x: (x[0][:8], int(x[3])))\
        .reduceByKey(lambda x, y: x + y, 1)\
        .collect()

[('20090505', 1557798)]

#### Suppose we want to find pages that were viewed more than 200,000 times during the three days covered by our dataset.

Conceptually, this task is similar to the previous query.

We are doing an expensive group-by with a lot of network shuffling of data.

To recap:
- first we split each line of data into its respective fields;
- next, we extract the fields for page name and number of page views;
- we reduce by key again, this time with 2 reducers;
- then we filter out pages with less than 200,000 total views over our time window represented by our dataset.

In [24]:
enPages.map(lambda x: x.split(" "))\
        .map(lambda x: (x[2], int(x[3])))\
        .reduceByKey(lambda x, y: x + y, 2)\
        .filter(lambda x: x[1] > 200000)\
        .map(lambda x: (x[1], x[0]))\
        .collect()

[(558329, '404_error/')]

#### There is no hard and fast way to calculate the optimal number of reducers for a given problem; you will build up intuition over time by experimenting with different values.


#### You can explore the full RDD API by browsing the [Java/Scala](http://www.cs.berkeley.edu/~pwendell/strataconf/api/core/index.html#spark.RDD) or [Python](http://www.cs.berkeley.edu/~pwendell/strataconf/api/pyspark/index.html) API docs.

---
## PySpark API

Let's look at some PySpark API.

#### By clicking on the images you can directly access to the documentation.

<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.map">
<img align=left src="pyspark-pictures-master/images/pyspark-page3.svg" width=500 height=500 />
</a>

In [26]:
# map
x = sc.parallelize([1,2,3]) # sc = spark context, parallelize creates an RDD from the passed object
print(x.collect())  # collect copies RDD elements to a list on the driver

y = x.map(lambda x: (x,x**2))
print(y.collect())

[1, 2, 3]
[(1, 1), (2, 4), (3, 9)]


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.flatMap">
<img align=left src="pyspark-pictures-master/images/pyspark-page4.svg" width=500 height=500 />
</a>

In [27]:
# flatMap
x = sc.parallelize([1,2,3])
print(x.collect())

def f(x):
    return [100*x, x**2]
print(x.map(lambda x: (x, 100*x, x**2)).collect()) # Map

y = x.flatMap(
    lambda x: (x, 100*x, x**2)) # this lambda yields 2 elements for each element of x
print(y.collect())

[1, 2, 3]
[(1, 100, 1), (2, 200, 4), (3, 300, 9)]
[1, 100, 1, 2, 200, 4, 3, 300, 9]


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.filter">
<img align=left src="pyspark-pictures-master/images/pyspark-page8.svg" width=500 height=500 />
</a>

In [28]:
# filter
x = sc.parallelize([1,2,3,4,5,6,7,8,9])
print(x.collect())

y = x.filter(lambda x: x%2 == 1)  # filters out even elements
print(y.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9]
[1, 3, 5, 7, 9]


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.reduce">
<img align=left src="pyspark-pictures-master/images/pyspark-page23.svg" width=500 height=500 />
</a>

In [29]:
# reduce
x = sc.parallelize([1,2,3])
print(x.collect())

y = x.reduce(lambda obj, accumulated: obj + accumulated)  # computes a cumulative sum
print(y)

[1, 2, 3]
6


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.reduceByKey">
<img align=left src="pyspark-pictures-master/images/pyspark-page44.svg" width=500 height=500 />
</a>

In [30]:
# reduceByKey
x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
print(x.collect())

y = x.reduceByKey(lambda agg, obj: agg + obj)
print(y.collect())

[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]
[('B', 3), ('A', 12)]


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.sortBy">
<img align=left src="pyspark-pictures-master/images/pyspark-page15.svg" width=500 height=500 />
</a>

In [31]:
# sortBy
x = sc.parallelize(['Cat','Apple','Bat'])
print(x.collect())

def keyGen(val):
    return val[0]

y = x.sortBy(keyGen)

print(y.collect())

['Cat', 'Apple', 'Bat']
['Apple', 'Bat', 'Cat']


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.take">
<img align=left src="pyspark-pictures-master/images/pyspark-page39.svg" width=500 height=500 />
</a>

In [32]:
x = sc.parallelize([1,3,1,2,3])
print(x.collect())

y = x.take(num = 3)
print(y)

[1, 3, 1, 2, 3]
[1, 3, 1]


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.top">
<img align=left src="pyspark-pictures-master/images/pyspark-page37.svg" width=500 height=500 />
</a>

In [33]:
# top
# Get the top N elements from a RDD
x = sc.parallelize([1,3,1,2,3,1,1,1])
print(x.collect())

y = x.top(num = 3)
print(y)

[1, 3, 1, 2, 3, 1, 1, 1]
[3, 3, 2]


<a href="http://spark.apache.org/docs/1.2.0/api/python/pyspark.html#pyspark.RDD.distinct">
<img align=left src="pyspark-pictures-master/images/pyspark-page9.svg" width=500 height=500 />
</a>

In [34]:
# distinct
x = sc.parallelize(['A','A','B','A','B'])
print(x.collect())

y = x.distinct()
print(y.collect())

['A', 'A', 'B', 'A', 'B']
['B', 'A']
