# <center> Introduction to Spark In-memmory Computing via Python PySpark </center>

- Spark is an implementation of the MapReduce programming paradigm that operates on in-memory data and allows data reuses across multiple computations.
- Performance of Spark is significantly better than its predecessor, Hadoop MapReduce. 
- Spark's primary data abstraction is Resilient Distributed Dataset (RDD):
    - Read-only, partitioned collection of records
    - Created (aka written) through deterministic operations on data:
        - Loading from stable storage
        - Transforming from other RDDs
        - Generating through coarse-grained operations such as map, join, filter ...
    - Do not need to be materialized at all time and are recoverable via **data lineage**

<img src="figures/spark2_arch.png" width="600"/>

In [1]:
import os
import subprocess

def generate_slaves_file(node_file=os.environ['PBS_NODEFILE'], 
                         slaves_file='./spark-2.1.0-bin-hadoop2.7/conf/slaves'):
    node_list = subprocess.check_output(['cat', node_file]).decode("utf-8").strip().split('\n')
    node_list = sorted(list(set(node_list)))
    print("Host: ", node_list[0])
    print("Slaves: ") 
    with open(slaves_file, 'w') as slave_file_fp:
        for node in node_list[1:]:
            slave_file_fp.write('{}\n'.format(node))
            print("\t", node)
generate_slaves_file()

Host:  node1965.palmetto.clemson.edu
Slaves: 


In [None]:
!more spark-2.1.0-bin-hadoop2.7/conf/slaves

In [2]:
!ssh node1965 '/home/lngo/intro-to-spark/spark-2.1.0-bin-hadoop2.7/sbin/start-all.sh --host node1965'

starting org.apache.spark.deploy.master.Master, logging to /home/lngo/intro-to-spark/spark-2.1.0-bin-hadoop2.7/logs/spark-lngo-org.apache.spark.deploy.master.Master-1-node1965.out


In [None]:
!ssh node1916 '/home/lngo/intro-to-spark/spark-2.1.0-bin-hadoop2.7/sbin/stop-all.sh --host node1916'

## 1. Getting Started

Spark stores data in memory. This memory space is represented by variable **sc** (SparkContext). 

In [26]:
import sys
import os

sys.path.insert(0, './spark-2.1.0-bin-hadoop2.7/python')
sys.path.insert(0, './spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip')

os.environ["SPARK_HOME"] = "/home/lngo/intro-to-spark/spark-2.1.0-bin-hadoop2.7/"
os.environ['PYSPARK_PYTHON'] = '/software/anaconda3/4.2.0/bin/python'

import pyspark
conf = pyspark.SparkConf()
conf.setMaster("spark://node1916:7077")
conf.set("spark.driver.memory","4g")
conf.set("spark.executor.memory","60g")
conf.set("spark.num.executors","3")
conf.set("spark.executor.cores","12")

sc = pyspark.SparkContext(conf=conf)

In [27]:
sc

<pyspark.context.SparkContext at 0x2ba1e249b1d0>

In [28]:
textFile = sc.textFile("text/gutenberg-shakespeare.txt")

In [29]:
print (textFile)

text/gutenberg-shakespeare.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0


## 2. What does Spark do with my data?

**Storage Level:**
- Does RDD use disk?
- Does RDD use memory?
- Does RDD use off-heap memory?
- Should an RDD be serialized (while persisting)?
- How many replicas (default: 1) to use (can only be less than 40)?

In [30]:
textFile.getStorageLevel()

StorageLevel(False, False, False, False, 1)

In [31]:
textFile.getNumPartitions()

2

In [32]:
textFile.cache()

text/gutenberg-shakespeare.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [33]:
textFile.getStorageLevel()

StorageLevel(False, True, False, False, 1)

- By default, each transformed RDD may be recomputed each time you run an action on it.
- It is also possible to *persist* RDD in memory using *persist()* or *cache()*
    - *persist()* allows you to specify level of storage for RDD
    - *cache()* only persists RDD in memory
    - To retire RDD from memory, *unpersist()* is called

## 3. WordCount

Data operations in Spark are categorized into two groups, *transformation* and *action*. 
- A *transformation* creates new dataset from existing data. Examples of *transformation* include map, filter, reduceByKey, and sort. 
- An *action* returns a value to the driver program (aka memory space of this notebook) after running a computation on the data set. Examples of *action* include count, collect, reduce, and save. 

"All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program." -- Spark Documentation

#### RDD Operations in Spark

**Transformations: **

- *map*(f: T -> U) : RDD[T] -> RDD[U]
- *filter*(f: T -> Bool) : RDD[T] -> RDD[T]
- *flatMap*(f: T -> Seq[U]) : RDD[T] -> RDD[U]
- *sample*(*fraction*: Float) : RDD[T] -> RDD[T] (deterministic sampling)
- *groupByKey*() : RDD[(K,V)] -> RDD[(K, Seq[V])]
- *reduceByKey*(f: (V,V) -> V) : RDD[(K,V)] -> RDD[(K,V)]
- *union*() : (RDD[T], RDD[T]) -> RDD[T]
- *join*() : (RDD[(K,V)], RDD[(K,W)]) -> RDD[(K,(V,W))]
- *cogroup*() : (RDD[(K,V)], RDD[(K,W)] -> RDD[(K, (Seq[V],Seq[W]))]
- *crossProduct*() : (RDD[T], RDD[U]) -> RDD[(T,U)]
- *mapValues*(f: V -> W) : RDD[(K,V)] -> RDD[(K,W)] (preserves partitioning)
- *sort*(c: Comparator[K]) :  RDD[(K,V)] -> RDD[(K,V)]
- *partitionBy*(p: Partitioner[K]) : RDD[(K,V)] -> RDD[(K,V)]

**Actions:**

- *count*() : RDD[T] -> Long
- *collect*() : RDD[T] -> Seq[T]
- *reduce*(f: (T,T) -> T) : RDD[T] -> T
- *lookup*(k : K) : RDD[(K,V)] -> Seq[V] (on hash/range partitionied RDDs)
- *save*(path: String) : Outputs RDD to a storage system 

In [34]:
textFile = sc.textFile("text/gutenberg-shakespeare.txt")

In [35]:
textFile

text/gutenberg-shakespeare.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

In [36]:
%%time
textFile.count()

CPU times: user 11 ms, sys: 2 ms, total: 13 ms
Wall time: 1.65 s


124213

In [37]:
wordcount = textFile.flatMap(lambda line: line.split(" ")) \
            .map(lambda word: (word, 1)) \
            .reduceByKey(lambda a, b: a + b)

In [38]:
wordcount

PythonRDD[9] at RDD at PythonRDD.scala:48

In [None]:
wordcount.saveAsTextFile("output-wordcount-01")

In [39]:
!cat output-wordcount-01/part-00000 \
    2>/dev/null | head -n 20

('', 516839)
('Quince', 1)
('LIBRARY,', 218)
('Just', 10)
('enrooted', 1)
('divers', 20)
('Doubtless', 2)
('undistinguishable,', 1)
('Rheims,', 1)
('Freedom!', 1)
('incorporate.', 1)
('bawd!', 3)
('Sir-I', 1)
('withering', 2)
('Mopsa,', 1)
('[BEROWNE', 3)
('forgetfulness?', 1)
('Tranio?', 1)
('Wound', 3)
('twice,', 2)


**Step-by-step actions:**

In [None]:
!cat text/gutenberg-shakespeare.txt \
    2>/dev/null | head -n 10

In [None]:
wordcount_step_01 = textFile.flatMap(lambda line: line.split(" "))

In [None]:
wordcount_step_01

In [None]:
wordcount_step_01.take(20)

In [None]:
wordcount_step_02 = wordcount_step_01.map(lambda word: (word, 1))

In [None]:
wordcount_step_02.take(20)

In [None]:
wordcount_step_03 = wordcount_step_02.reduceByKey(lambda a, b: a + b)

In [None]:
wordcount_step_03.take(20)

## 4. Movie Ratings

An independent movie company is looking to invest in a new movie project. With limited finance, the company wants to 
analyze the reaction of audiences, particularly toward various movie genres, in order to identify beneficial 
movie project to focus on. The company relies on data collected from a publicly available recommendation service 
by [MovieLens](http://dl.acm.org/citation.cfm?id=2827872). This 
[dataset](http://files.grouplens.org/datasets/movielens/ml-10m-README.html) contains **24404096** ratings and **668953**
 tag applications across **40110** movies. These data were created by **247753** users between January 09, 1995 and January 29, 2016. This dataset was generated on October 17, 2016. 

From this dataset, several analyses are possible, include the followings:
1.   Find movies which have the highest average ratings over the years and identify the corresponding genre.
2.   Find genres which have the highest average ratings over the years.
3.   Find users who rate movies most frequently in order to contact them for in-depth marketing analysis.

These types of analyses, which are somewhat ambiguous, demand the ability to quickly process large amount of data in 
elatively short amount of time for decision support purposes. In these situations, the sizes of the data typically 
make analysis done on a single machine impossible and analysis done using a remote storage system impractical. For 
remainder of the lessons, we will learn how HDFS provides the basis to store massive amount of data and to enable 
the programming approach to analyze these data.

In [None]:
!ls -h movielens

In [None]:
!cat  movielens/README.txt

In [None]:
!cat  movielens/links.csv \
    2>/dev/null | head -n 5

In [None]:
!cat  movielens/movies.csv \
    2>/dev/null | head -n 5

In [None]:
!cat  movielens/ratings.csv \
    2>/dev/null | head -n 5

In [None]:
!cat  movielens/tags.csv \
    2>/dev/null | head -n 5

In [40]:
ratings = sc.textFile("movielens/ratings.csv")

In [41]:
ratings.cache()

movielens/ratings.csv MapPartitionsRDD[11] at textFile at NativeMethodAccessorImpl.java:0

In [42]:
%%time
ratings.count()

CPU times: user 9 ms, sys: 3 ms, total: 12 ms
Wall time: 5.67 s


24404097

In [43]:
%%time
ratings.count()

CPU times: user 9 ms, sys: 4 ms, total: 13 ms
Wall time: 2.99 s


24404097

In [55]:
%%time
ratings.count()

CPU times: user 9 ms, sys: 3 ms, total: 12 ms
Wall time: 3.05 s


24404097

### 4.1 Find movies which have the highest average ratings over the years and identify the corresponding genre

- Find the average ratings of all movies over the years
- Identify the corresponding genres for each movie

In [45]:
ratings.take(5)

['userId,movieId,rating,timestamp',
 '1,122,2.0,945544824',
 '1,172,1.0,945544871',
 '1,1221,5.0,945544788',
 '1,1441,4.0,945544871']

In [46]:
ratingHeader = ratings.first() #extract header
print(ratingHeader)

userId,movieId,rating,timestamp


In [47]:
ratingsOnly = ratings.filter(lambda x:x != ratingHeader)

In [48]:
ratingsOnly.take(5)

['1,122,2.0,945544824',
 '1,172,1.0,945544871',
 '1,1221,5.0,945544788',
 '1,1441,4.0,945544871',
 '1,1609,3.0,945544824']

In [49]:
movieRatings = ratingsOnly.map(lambda line: (line.split(",")[1], float(line.split(",")[2])))

In [50]:
movieRatings.take(5)

[('122', 2.0), ('172', 1.0), ('1221', 5.0), ('1441', 4.0), ('1609', 3.0)]

**Possible approaches in aggregating data:** 
- groupByKey and mapValues
- reduceByKey and countByKey

**groupByKey and mapValues**

In [51]:
groupByKeyRatings = movieRatings.groupByKey()

groupByKeyRatings.take(5)

[('154214', <pyspark.resultiterable.ResultIterable at 0x2ba1e24ad588>),
 ('27479', <pyspark.resultiterable.ResultIterable at 0x2ba1e24ad710>),
 ('129667', <pyspark.resultiterable.ResultIterable at 0x2ba1e24adf28>),
 ('140054', <pyspark.resultiterable.ResultIterable at 0x2ba1e24ad780>),
 ('45183', <pyspark.resultiterable.ResultIterable at 0x2ba1e24ade10>)]

In [52]:
mapValuesToListRatings = groupByKeyRatings.mapValues(list)

mapValuesToListRatings.take(5)

[('154214', [3.5, 0.5, 2.0]),
 ('27479', [4.0, 1.5, 1.0]),
 ('129667', [5.0, 5.0, 2.0, 3.0]),
 ('140054', [3.5, 2.0, 4.0]),
 ('45183',
  [3.0,
   1.5,
   3.5,
   3.0,
   3.0,
   2.0,
   3.5,
   2.0,
   1.5,
   3.5,
   4.0,
   4.0,
   3.5,
   5.0,
   3.5,
   4.0,
   0.5,
   2.0,
   4.5,
   5.0,
   2.0,
   4.0,
   4.5,
   4.0,
   5.0,
   3.5,
   2.0,
   3.5,
   4.5,
   3.5,
   0.5,
   5.0,
   4.5,
   3.0,
   3.5,
   3.5,
   4.5,
   3.5,
   4.5,
   4.5,
   4.5,
   3.5,
   5.0,
   0.5,
   4.0,
   4.0,
   2.5,
   2.5,
   2.0,
   3.0,
   4.5,
   3.5,
   2.5,
   4.0,
   3.5,
   4.0,
   3.0,
   2.5,
   4.0,
   4.5,
   4.5,
   4.5,
   4.5,
   4.5,
   4.0,
   3.5,
   3.0,
   2.5,
   3.5,
   4.5,
   2.5,
   5.0,
   3.5,
   3.5,
   3.0,
   2.5,
   3.0,
   4.5,
   2.0,
   4.0,
   3.5,
   3.5,
   3.0,
   3.5,
   3.5,
   5.0,
   4.5,
   3.0,
   3.5,
   3.0,
   3.0,
   3.5,
   5.0,
   5.0,
   4.5,
   3.5,
   4.0,
   2.0,
   4.5,
   3.0,
   5.0,
   4.0,
   3.0,
   4.5,
   2.5,
   4.0,
   2.0,
   4.0,
 

In [53]:
avgRatings01 = mapValuesToListRatings.mapValues(lambda V: sum(V) / float(len(V)))

avgRatings01.take(5)

[('154214', 2.0),
 ('27479', 2.1666666666666665),
 ('129667', 3.75),
 ('140054', 3.1666666666666665),
 ('45183', 3.5485781990521326)]

Is this correct?

In [None]:
(3.5 + 3.5 + 2.5 + 3.5 + 2.0 + 3.5 + 2.5 + 3.0) / 8

**reduceByKey and countByKey**

In [None]:
countsByKey = movieRatings.countByKey()

countsByKey

In [None]:
def sumValues(x,y):
    return (x + y)

sumRatings = movieRatings.reduceByKey(sumValues)

sumRatings.take(5)

In [None]:
import operator

sumRatings = movieRatings.reduceByKey(operator.add)
sumRatings.take(5)

In [None]:
avgRatings02 = sumRatings.map(lambda x: (x[0], x[1] / countsByKey.get(x[0])))

avgRatings02.take(5)

How do we augment movie ratings data with title informations?

In [54]:
movies = sc.textFile("movielens/movies.csv")

In [57]:
movieHeader = movies.first() #extract header
print(movieHeader)

movieId,title,genres


In [58]:
movies = movies.filter(lambda x:x != movieHeader)

movies.take(5)

['1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy',
 '2,Jumanji (1995),Adventure|Children|Fantasy',
 '3,Grumpier Old Men (1995),Comedy|Romance',
 '4,Waiting to Exhale (1995),Comedy|Drama|Romance',
 '5,Father of the Bride Part II (1995),Comedy']

In [59]:
movieInfo = movies.map(lambda line: (line.split(",")[0], (line.split(",")[1], line.split(",")[2])))

movieInfo.take(5)

[('1', ('Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy')),
 ('2', ('Jumanji (1995)', 'Adventure|Children|Fantasy')),
 ('3', ('Grumpier Old Men (1995)', 'Comedy|Romance')),
 ('4', ('Waiting to Exhale (1995)', 'Comedy|Drama|Romance')),
 ('5', ('Father of the Bride Part II (1995)', 'Comedy'))]

In [60]:
augmentedRatings = avgRatings01.join(movieInfo)

augmentedRatings.take(5)

[('1440', (2.776470588235294, ('Amos & Andrew (1993)', 'Comedy'))),
 ('106450', (4.25, ('Chicago Overcoat (2009)', 'Action|Drama'))),
 ('564', (2.3294797687861273, ('Chasers (1994)', 'Comedy'))),
 ('108318', (2.8088235294117645, ('"Single Shot', ' A (2013)"'))),
 ('150421', (3.0, ('Man on Horseback (1969)', '(no genres listed)')))]

*Movie with highest average rating:*

In [61]:
augmentedRatings.takeOrdered(10, key = lambda x : -x[1][0])

[('120436', (5.0, ('Garbo Talks (1984)', 'Comedy|Drama'))),
 ('136874', (5.0, ('Natarang (2010)', '(no genres listed)'))),
 ('146946', (5.0, ('The Hardy Bucks Movie (2013)', 'Comedy'))),
 ('114353', (5.0, ('Heavyweights (Schwere Jungs) (2006)', 'Comedy'))),
 ('123727', (5.0, ('Immigration Tango (2011)', 'Comedy|Romance'))),
 ('164869', (5.0, ('A Cinderella Story: If the Shoe Fits (2016)', 'Comedy'))),
 ('159423',
  (5.0,
   ('Jonas Brothers: The Concert Experience (2009)', '(no genres listed)'))),
 ('133575', (5.0, ('Do Detectives Think? (1927)', 'Comedy'))),
 ('93967',
  (5.0, ('"Keeping the Promise (Sign of the Beaver', ' The) (1997)"'))),
 ('160325', (5.0, ('I Love Hong Kong (2011)', 'Comedy')))]

*Movie with lowest average rating:*

In [62]:
augmentedRatings.takeOrdered(10, key = lambda x : x[1][0])

[('164927', (0.5, ('Where Souls Go (2007)', '(no genres listed)'))),
 ('156840', (0.5, ('Jurassic Attack (2012)', 'Action|Sci-Fi'))),
 ('138008', (0.5, ('New Year (2011)', '(no genres listed)'))),
 ('131152', (0.5, ('The Fat Spy (1966)', 'Comedy'))),
 ('109355', (0.5, ('13 Fighting Men (1960)', 'Western'))),
 ('127327', (0.5, ('Khan Kluay (2006)', 'Adventure|Animation|Children'))),
 ('160978', (0.5, ('Hellevator (2004)', 'Horror|Sci-Fi'))),
 ('145285', (0.5, ('Octopus (2000)', 'Action|Horror|Thriller'))),
 ('133541', (0.5, ('Two Hundred Thousand Dirty (2014)', 'Comedy'))),
 ('139717', (0.5, ('10 Cent Pistol (2015)', 'Crime|Thriller')))]

### Challenge

- Augment the mapping process of WordCount with a function to filter out punctuations and capitalization from the unique words

### Challenge:

1. Make appropriate changes so that only movies with averaged ratings higher than 3.75 are collected
2. Further enhance your modification so that only movies with averaged ratings higher than 3.75 and number of ratings of at least 1000 times are collected.

### 4.2 Find genres which have the highest average ratings over the years

- Identify the genres associated with a movie and its rating
- Each movie can have multiple genres. How to flip the Key/Value pair?

In [63]:
movieRatings.take(5)

[('122', 2.0), ('172', 1.0), ('1221', 5.0), ('1441', 4.0), ('1609', 3.0)]

In [64]:
movieInfo.take(5)

[('1', ('Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy')),
 ('2', ('Jumanji (1995)', 'Adventure|Children|Fantasy')),
 ('3', ('Grumpier Old Men (1995)', 'Comedy|Romance')),
 ('4', ('Waiting to Exhale (1995)', 'Comedy|Drama|Romance')),
 ('5', ('Father of the Bride Part II (1995)', 'Comedy'))]

In [65]:
augmentedInfo = movieRatings.join(movieInfo)

In [66]:
augmentedInfo.take(5)

[('1440', (3.0, ('Amos & Andrew (1993)', 'Comedy'))),
 ('1440', (2.0, ('Amos & Andrew (1993)', 'Comedy'))),
 ('1440', (3.0, ('Amos & Andrew (1993)', 'Comedy'))),
 ('1440', (5.0, ('Amos & Andrew (1993)', 'Comedy'))),
 ('1440', (3.0, ('Amos & Andrew (1993)', 'Comedy')))]

In [67]:
def extractGenreRating (t):
    final_tuples = []
    genreList = t[1][1][1].split("|")
    for genre in genreList:
        final_tuples.append((genre,t[1][0]))
    return final_tuples

print(extractGenreRating((u'1', (3.0, (u'Toy Story (1995)', u'Adventure|Animation|Children|Comedy|Fantasy')))))

[('Adventure', 3.0), ('Animation', 3.0), ('Children', 3.0), ('Comedy', 3.0), ('Fantasy', 3.0)]


In [68]:
genreRatings = augmentedInfo.flatMap(extractGenreRating)

In [69]:
genreRatings.take(5)

[('Comedy', 3.0),
 ('Comedy', 2.0),
 ('Comedy', 3.0),
 ('Comedy', 5.0),
 ('Comedy', 3.0)]

### Challenge:

Complete the remaining portion of task 2.2: Calculating the average rating of each genre over the years

### 4.3 Find users who rate movies most frequently in order to contact them for in-depth marketing analysis

- How do you define "frequently"?
    - At least once per week?

In [70]:
userRatings = ratingsOnly.map(lambda line: (line.split(",")[0], float(line.split(",")[3])))

In [71]:
ratingGroupByUsers = userRatings.groupByKey().mapValues(list)
ratingGroupByUsers.take(5)

[('51313',
  [847620176.0,
   847620106.0,
   847620374.0,
   847620326.0,
   847620411.0,
   847620264.0,
   847620347.0,
   847619964.0,
   847619965.0,
   847620326.0,
   847620233.0,
   847620294.0,
   847620021.0,
   847620211.0,
   847620021.0,
   847620326.0,
   847619878.0,
   847620211.0,
   847620264.0,
   847620021.0,
   847620374.0,
   847620470.0,
   847619923.0,
   847620049.0,
   847620049.0,
   847620347.0,
   847620411.0,
   847620049.0,
   847620072.0,
   847620176.0,
   847620211.0,
   847620690.0,
   847619900.0,
   847620748.0,
   847620748.0,
   847620049.0,
   847620233.0,
   847620106.0,
   847620656.0,
   847620072.0,
   847619900.0,
   847619878.0]),
 ('23161',
  [945058557.0,
   945063385.0,
   945049906.0,
   945063316.0,
   945049950.0,
   945057901.0,
   945057196.0,
   945057745.0,
   945057745.0,
   945058411.0,
   945050030.0,
   945049753.0,
   945063619.0,
   945061581.0,
   945058080.0,
   945057505.0,
   945057506.0,
   945057314.0,
   945061786.0,


In [72]:
avgRatingFreq = ratingGroupByUsers.mapValues(lambda V: (max(V) - min(V)) / float(len(V)))
avgRatingFreq.take(5)

[('67056', 37.26),
 ('77986', 3230.665418227216),
 ('226527', 8.6),
 ('207119', 12.28),
 ('53196', 7.011363636363637)]

In [None]:
x = [1346139060.0,
   1346139098.0,
   1346139113.0,
   1346139053.0,
   1346139234.0,
   1346139006.0,
   1346139209.0,
   1346139147.0,
   1346138998.0,
   1346139206.0,
   1346139224.0,
   1346139174.0,
   1346139152.0,
   1346139230.0,
   1346139181.0,
   1346139159.0,
   1346139314.0]
(max(x) - min(x)) / float(len(x))

In [73]:
topUsers = avgRatingFreq.top(10, key=lambda x: x[1])

In [74]:
topUsers

[('40407', 51121853.75),
 ('241087', 40917744.72727273),
 ('54838', 36601016.0),
 ('248290', 33999095.666666664),
 ('39601', 33138222.0),
 ('155302', 33013341.666666668),
 ('117995', 29406107.25),
 ('183383', 29210786.666666668),
 ('121552', 26685917.875),
 ('74936', 26309319.0)]

## 5. Airlines

**Spark SQL**
- Spark module for structured data processing
- provide more information about the structure of both the data and the computation being performed for additional optimization
- execute SQL queries written using either a basic SQL syntax or HiveQL

**DataFrame**
- distributed collection of data organized into named columns
- conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood
- can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

In [75]:
sqlContext = pyspark.SQLContext(sc)
sqlContext

<pyspark.sql.context.SQLContext at 0x2ba1e24c6be0>

In [76]:
airlines = sqlContext.read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .load("airlines/data/")\
    .cache()

In [77]:
%%time
airlines.count()

CPU times: user 7 ms, sys: 2 ms, total: 9 ms
Wall time: 1min 7s


123534969

In [78]:
%%time
airlines.count()

CPU times: user 1 ms, sys: 2 ms, total: 3 ms
Wall time: 222 ms


123534969

In [79]:
airlines.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Carr

You can interact with a DataFrame via SQLContext using SQL statements by registerting the DataFrame as a table

In [80]:
airlines.registerTempTable("airlines")

*How many unique airlines are there?*

In [81]:
uniqueAirline = sqlContext.sql("SELECT DISTINCT UniqueCarrier \
                                FROM airlines")
uniqueAirline.show()

+-------------+
|UniqueCarrier|
+-------------+
|           EA|
|           UA|
|           PI|
|           PS|
|           AA|
|           NW|
|           EV|
|           B6|
|           HP|
|           TW|
|           DL|
|           OO|
|           F9|
|           YV|
|           TZ|
|           US|
|           AQ|
|           MQ|
|           OH|
|           HA|
+-------------+
only showing top 20 rows



*Calculate how many flights completed by each carrier over time*

In [82]:
%%time
carrierFlightCount = sqlContext.sql("SELECT UniqueCarrier, COUNT(UniqueCarrier) AS FlightCount \
                                    FROM airlines GROUP BY UniqueCarrier")
carrierFlightCount.show()

+-------------+-----------+
|UniqueCarrier|FlightCount|
+-------------+-----------+
|           EA|     919785|
|           UA|   13299817|
|           PI|     873957|
|           PS|      83617|
|           AA|   14984647|
|           NW|   10292627|
|           EV|    1697172|
|           B6|     811341|
|           HP|    3636682|
|           TW|    3757747|
|           DL|   16547870|
|           OO|    3090853|
|           F9|     336958|
|           YV|     854056|
|           TZ|     208420|
|           US|   14075530|
|           AQ|     154381|
|           MQ|    3954895|
|           OH|    1464176|
|           HA|     274265|
+-------------+-----------+
only showing top 20 rows

CPU times: user 2 ms, sys: 0 ns, total: 2 ms
Wall time: 1.45 s


*How do you display full carrier names?*

In [83]:
carriers = sqlContext.read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .load("airlines/metadata/carriers.csv")\
    .cache()
carriers.registerTempTable("carriers")

In [84]:
carriers.printSchema()

root
 |-- Code: string (nullable = true)
 |-- Description: string (nullable = true)



In [85]:
%%time
carrierFlightCountFullName = sqlContext.sql("SELECT c.Description, a.UniqueCarrier, COUNT(a.UniqueCarrier) AS FlightCount \
                                    FROM airlines AS a \
                                    INNER JOIN carriers AS c \
                                    ON c.Code = a.UniqueCarrier \
                                    GROUP BY a.UniqueCarrier, c.Description \
                                    ORDER BY a.UniqueCarrier")
carrierFlightCountFullName.show()

+--------------------+-------------+-----------+
|         Description|UniqueCarrier|FlightCount|
+--------------------+-------------+-----------+
|Pinnacle Airlines...|           9E|     521059|
|American Airlines...|           AA|   14984647|
| Aloha Airlines Inc.|           AQ|     154381|
|Alaska Airlines Inc.|           AS|    2878021|
|     JetBlue Airways|           B6|     811341|
|Continental Air L...|           CO|    8145788|
|    Independence Air|           DH|     693047|
|Delta Air Lines Inc.|           DL|   16547870|
|Eastern Air Lines...|           EA|     919785|
|Atlantic Southeas...|           EV|    1697172|
|Frontier Airlines...|           F9|     336958|
|AirTran Airways C...|           FL|    1265138|
|Hawaiian Airlines...|           HA|     274265|
|America West Airl...|           HP|    3636682|
|Midway Airlines I...|       ML (1)|      70622|
|American Eagle Ai...|           MQ|    3954895|
|Northwest Airline...|           NW|   10292627|
|         Comair Inc

*What is the averaged departure delay time for each airline?*

In [86]:
%%time
avgDepartureDelay = sqlContext.sql("SELECT FIRST(c.Description), FIRST(a.UniqueCarrier), AVG(a.DepDelay) AS AvgDepDelay \
                                    FROM airlines AS a \
                                    INNER JOIN carriers AS c \
                                    ON c.Code = a.UniqueCarrier \
                                    GROUP BY a.UniqueCarrier \
                                    ORDER BY a.UniqueCarrier")
avgDepartureDelay.show()

+-------------------------+---------------------------+-------------------+
|first(Description, false)|first(UniqueCarrier, false)|        AvgDepDelay|
+-------------------------+---------------------------+-------------------+
|     Pinnacle Airlines...|                         9E| 7.9279144892173035|
|     American Airlines...|                         AA|  7.862321254420546|
|      Aloha Airlines Inc.|                         AQ| 1.5993176899118409|
|     Alaska Airlines Inc.|                         AS|  8.297235193754096|
|          JetBlue Airways|                         B6| 11.262714178314551|
|     Continental Air L...|                         CO|  7.695967155526857|
|         Independence Air|                         DH|  9.612639389688926|
|     Delta Air Lines Inc.|                         DL|  7.593716274369933|
|     Eastern Air Lines...|                         EA|  8.674050565435543|
|     Atlantic Southeas...|                         EV| 13.483736343326541|
|     Fronti

In [None]:
airlines.unpersist()

In [None]:
sc.stop()