# ID2221: Data Intensive Computing
# Lab 2 - Spark and Spark SQL
## (Updated 2017-09-23)

In this part of the lab you will practice the basic operations of Spark (RDDs) and Spark SQL (DataFrames). Next you will use what you learned to do some interactive spark analytics.


## The entry point: SparkSession
This is the main entry point to all Spark functionality

In [173]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row

In [174]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('Spark Basics') \
    .getOrCreate()

We can check which version of spark we are using

In [175]:
spark.version

'2.2.0'

To access the RDD api we can get the SparkContext form our SparkSession

In [176]:
#spark context is used to create rdd, accumulators and stuffs.
sc = spark.sparkContext

## Basic Operations

## Create an RDD from Python list

We start by createing a "local" python list, in this example `myList`. This list is a normal Python collection stored in the memory of your machine, not in Spark yet

In [177]:
myList = [1, 2, 3, 4, 5, 6]

In [178]:
myList

[1, 2, 3, 4, 5, 6]


Then, we can create an RDD from a Python collection using `parallelize()`. This will divide the collection into a number of partitions, distribute it on the servers in your Spark cluster, and return you a reference to it (`nums` in this example) which is an RDD Resilient Distributed Dataset. 

In [179]:
nums = sc.parallelize(myList)

In [180]:
nums

ParallelCollectionRDD[203] at parallelize at PythonRDD.scala:480

Optionally, you can manually set the number of partitions as parameter to `sc.parallelize(myList, 2)`. You can also repartition an RDD with `nums2 = nums.repartition(2)`, and check the number of partitions with `nums.getNumPartitions()`

In [181]:
nums.getNumPartitions()

2

You can get help on any Python function using `help()`

In [182]:
help(nums.getNumPartitions)

Help on method getNumPartitions in module pyspark.rdd:

getNumPartitions() method of pyspark.rdd.RDD instance
    Returns the number of partitions in RDD
    
    >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
    >>> rdd.getNumPartitions()
    2



You can execute linux shell commands by prepending an exclamation mark (!) to a shell command

In [183]:
!ls -a

.				data		    nums.txt
..				.DS_Store	    Spark+Basics.ipynb
Apache+Log+File+Analysis.ipynb	.ipynb_checkpoints  spark-warehouse


## Some Actions

Remember, actions trigger computations and produce output (e.g., print on your screen). Other transformations are "lazy"

You can convert an RDD back to a python collection (e.g., to print it) using `collect()`. If it is too large you can get first few elements with `take()` or a random sample with `takeSample()`.

In [184]:
nums.collect()

[1, 2, 3, 4, 5, 6]

In [185]:
nums.take(2)

[1, 2]

In [186]:
#first argument is withreplacement true or false
nums.takeSample(False, 3)

[6, 1, 3]

Count the number of elements

In [187]:
nums.count()

6

## Map
Apply a function to all elements of an RDD

In [188]:
squares = nums.map(lambda x: x**2)

In [189]:
squares.collect()

[1, 4, 9, 16, 25, 36]

Filter using a boolean function

In [190]:
even = nums.filter(lambda x: x%2 == 0)

In [191]:
even.collect()

[2, 4, 6]

## Map vs. FlatMap
FlatMap generates zero or more elements for each input

In [192]:
many_nums = nums.flatMap(lambda x: list(range(0, x)))

In [193]:
many_nums.collect()

[0, 0, 1, 0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5]

Map is one to one

In [194]:
lists = nums.map(lambda x: list(range(0, x)))

In [195]:
lists.collect()

[[0], [0, 1], [0, 1, 2], [0, 1, 2, 3], [0, 1, 2, 3, 4], [0, 1, 2, 3, 4, 5]]

## Key-Value Pairs

RDDs support simple data structure key-value in the from (k, v). For example:

In [196]:
users = sc.parallelize([('A', 20), ('B', 30), ('C', 60)])  # (Name, Age)

In [197]:
users.keys().collect()

['A', 'B', 'C']

In [198]:
users.values().collect()

[20, 30, 60]

In [199]:
users.map(lambda x : (x[0]+'_wiser', x[1]+1)).collect()  # Assuming that you get wiser when you get older :D

[('A_wiser', 21), ('B_wiser', 31), ('C_wiser', 61)]

In [200]:
# apply map to values only
users.mapValues(lambda x : x**2).collect()

[('A', 400), ('B', 900), ('C', 3600)]

In [201]:
users.collectAsMap()

{'A': 20, 'B': 30, 'C': 60}

## Reduce

In [202]:
pets = sc.parallelize([('cat',1), ('dog',2), ('cat',3)])

In [203]:
pets.collect()

[('cat', 1), ('dog', 2), ('cat', 3)]

In [204]:
pets.lookup('cat')

[1, 3]

In [205]:
pets.groupByKey().mapValues(list).collect()

[('cat', [1, 3]), ('dog', [2])]

In [206]:
pets.reduceByKey(lambda x, y: x+y).collect()

[('cat', 4), ('dog', 2)]

In [207]:
from operator import add
pets.reduceByKey(add).collect()

[('cat', 4), ('dog', 2)]

## Join

In [208]:
visits = sc.parallelize([('index.html','1.2.3.4'), ('about.html','3.2.3.4'), ('index.html','5.4.3.2'), ('help.html','7.6.1.2')])

In [209]:
visits.collect()

[('index.html', '1.2.3.4'),
 ('about.html', '3.2.3.4'),
 ('index.html', '5.4.3.2'),
 ('help.html', '7.6.1.2')]

In [210]:
pageName = sc.parallelize([('index.html','Home'), ('about.html','About'), ('prod.html','Products')])

In [211]:
visits.join(pageName).collect()

[('about.html', ('3.2.3.4', 'About')),
 ('index.html', ('1.2.3.4', 'Home')),
 ('index.html', ('5.4.3.2', 'Home'))]

### Save

If your RDD has multiple partitions, the outpou will be split into an equal number of files. This allows writing in parallel into a distributed fule system and to keep the data distributed in a cluster.

First delete it if already exists

In [212]:
! rm -rf nums.txt

In [213]:
nums.saveAsTextFile('nums.txt')

# Word Count

In [214]:
lines = sc.textFile('data/shakespeare.txt')

In [215]:
lines.count()

122395

In [216]:
counts = lines.flatMap(lambda x: [y.strip('.,;:?!-') for y in x.split()]) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(lambda x,y:x+y)

In [217]:
output = counts.collect()

In [218]:
counts.count()

37706

That is a lot of words to print on the screen! Lets take a sample to view

In [219]:
counts.takeSample(False, 10)

[('vanquisheth', 1),
 ('wrong-incensed', 1),
 ('school', 23),
 ('burns', 25),
 ('featly', 2),
 ('alarums', 6),
 ('reader', 1),
 ('guardian', 4),
 ('deep', 131),
 ('egal', 1)]

What about the top used words? We can use `top()` for that. `top()` takes an optional key function that can be used to define the key used for sorting.

In [220]:
counts.top(20, key=lambda x : x[1])  # Sort using the word count

[('the', 23202),
 ('I', 20251),
 ('and', 18546),
 ('to', 15787),
 ('of', 15666),
 ('a', 12524),
 ('you', 11995),
 ('my', 10828),
 ('in', 9838),
 ('is', 8271),
 ('that', 8049),
 ('not', 8007),
 ('me', 7727),
 ('And', 7445),
 ('with', 6762),
 ('it', 6707),
 ('be', 6366),
 ('his', 6316),
 ('your', 6004),
 ('for', 5765)]

What about top words having 5 or more characters?

We can modify the key function to return 0 if the word length is less than 5, otherwise, return the word count

In [221]:
counts.top(20, key=lambda x : 0 if len(x[0]) < 5 else x[1])

[('shall', 3115),
 ('would', 2016),
 ('Enter', 1971),
 ('their', 1933),
 ('should', 1419),
 ('which', 1408),
 ('there', 1315),
 ('these', 1098),
 ('heart', 972),
 ('Exeunt', 971),
 ('speak', 965),
 ('think', 946),
 ('Which', 902),
 ('never', 896),
 ('great', 808),
 ('death', 800),
 ('father', 793),
 ('again', 747),
 ('cannot', 717),
 ('SCENE', 651)]

# Spark SQL

Try the examples we discussed on the slides during the lecture

In [222]:
l = [('Alice', 1)]
kids = spark.createDataFrame(l, ['name', 'age'])

In [223]:
kids.show()

+-----+---+
| name|age|
+-----+---+
|Alice|  1|
+-----+---+



In [224]:
kids.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [225]:
schema = StructType( \
                    [StructField('name', StringType(), True), \
                     StructField('age', IntegerType(), True)])
users2 = spark.createDataFrame(users, schema)

In [226]:
users2.show()

+----+---+
|name|age|
+----+---+
|   A| 20|
|   B| 30|
|   C| 60|
+----+---+



In [227]:
users2.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [228]:
df = spark.read.json('data/people.json')

In [229]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [230]:
df.select('name').show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [231]:
df.select(df['name'], df['age'] + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [232]:
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [233]:
df.groupBy('age').count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



### Interoperating with RDDs

In [234]:
# Load a text file and convert each line to a Row.
lines = sc.textFile('data/people.txt')
parts = lines.map(lambda l: l.split(','))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView('people')

In [235]:
# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql('SELECT name FROM people WHERE age >= 13 AND age <= 19')

teenagers.show()

+------+
|  name|
+------+
|Justin|
+------+



In [236]:
# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: 'Name: ' + p.name).collect()
for name in teenNames:
    print(name)

Name: Justin
