# Intro to Apache Spark

https://changhsinlee.com/install-pyspark-windows-jupyter/

### Get Set Up Locally
* Get Spark 2.3.2 from : https://spark.apache.org/downloads.html
    * Pre-Built for Hadoop 2.7 or later
    * Extract to `C:\Users\Ryan.nsj\Spark`
    * Add environment variable `SPARK_HOME` as `C:\Users\Ryan.nsj\Spark\spark-2.3.2-bin-hadoop2.7\`
    
    
* Get Java JDK 8 from : https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html


* Get pyspark with : `pip install pyspark`
* Get findspark with : `pip install findspark`
* If getting `InvalidInputError`, try using directory names with no spaces or special charaters

### SparkContext
* __SparkContext__ object manages the connection to the __clusters__, and coordinates the running of processes on those clusters
* More specifically, it connects to the __cluster managers__.
* The cluster managers control the __executors__ on different __Worker Nodes__ that run the computations.

In [1]:
# Find path to PySpark.
import findspark
findspark.init()

# Import PySpark and initialize SparkContext object.
import pyspark
sc = pyspark.SparkContext()

In [2]:
from pyspark.sql import SQLContext
from pyspark.ml.linalg import Vectors, VectorUDT

### Resilient Distributed Data (RDD)
* Immutable

In [3]:
raw_data = sc.textFile("daily_show.tsv")
raw_data.take(5)

['YEAR\tGoogleKnowlege_Occupation\tShow\tGroup\tRaw_Guest_List',
 '1999\tactor\t1/11/99\tActing\tMichael J. Fox',
 '1999\tComedian\t1/12/99\tComedy\tSandra Bernhard',
 '1999\ttelevision actress\t1/13/99\tActing\tTracey Ullman',
 '1999\tfilm actress\t1/14/99\tActing\tGillian Anderson']

* Lazy code evaluation means that the reading in of the testFile is not executed until it is needed in the second command.
* Allows to build a task queue that can be optimized by Spark in the background.

### Data Pipelining

* Every operation or calculation in Spark is a series of __methods__ that we can chain together and run in succession to form a pipeline.
* Each step in the pipeline returns either a Python value (such as an integer), a Python data structure (such as a dictionary), or an RDD object.  


* __2 Method Types:__
    * __Transformations__ - `map()`, `reduceByKey()`, `flatMap()`
    * __Actions__ - `take()`, `reduce()`, `saveTextFile()`, `collect()`

### Transformations
* Returns reference to an RDD
* Lazy operations, only run when another __action__ requires the use of the transformation's output RDD

#### _map(f)_
* map(f) applies function `f` to every element in the target RDD.
* Returns a new RDD.

In [4]:
daily_show = raw_data.map(lambda line: line.split('\t'))
daily_show.take(5)

[['YEAR', 'GoogleKnowlege_Occupation', 'Show', 'Group', 'Raw_Guest_List'],
 ['1999', 'actor', '1/11/99', 'Acting', 'Michael J. Fox'],
 ['1999', 'Comedian', '1/12/99', 'Comedy', 'Sandra Bernhard'],
 ['1999', 'television actress', '1/13/99', 'Acting', 'Tracey Ullman'],
 ['1999', 'film actress', '1/14/99', 'Acting', 'Gillian Anderson']]

#### _reduceByKey(f)_

In [None]:
# map()
# ('YEAR', 1)
# ('1991', 1)
# ('1991', 1)
# ('1991', 1)
# ('1991', 1)
# ...

# reduceByKey()
# ('YEAR', 1)
# ('1991', 4)
# ...

In [5]:
tally = daily_show.map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)
tally.take(5)

[('YEAR', 1), ('2012', 164), ('2013', 166), ('2004', 164), ('2011', 163)]

In [6]:
tally_count = tally.count()
print(tally_count)

18


In [7]:
tally.take(tally.count())

[('YEAR', 1),
 ('2012', 164),
 ('2013', 166),
 ('2004', 164),
 ('2011', 163),
 ('2014', 163),
 ('2002', 159),
 ('2007', 141),
 ('2015', 100),
 ('2003', 166),
 ('2010', 165),
 ('2001', 157),
 ('2000', 169),
 ('2008', 164),
 ('2005', 162),
 ('2009', 163),
 ('1999', 166),
 ('2006', 161)]

#### filter(f)
* filter() returns rows in the input RDD where function `f(row) == True`, and filters out rows where `f(row) == False`

In [10]:
def filter_year(line):
    if line[0] == 'YEAR':
        return False
    return True

filtered_daily_show = daily_show.filter(lambda line: filter_year(line))

In [12]:
filtered_daily_show.take(5)

[['1999', 'actor', '1/11/99', 'Acting', 'Michael J. Fox'],
 ['1999', 'Comedian', '1/12/99', 'Comedy', 'Sandra Bernhard'],
 ['1999', 'television actress', '1/13/99', 'Acting', 'Tracey Ullman'],
 ['1999', 'film actress', '1/14/99', 'Acting', 'Gillian Anderson'],
 ['1999', 'actor', '1/18/99', 'Acting', 'David Alan Grier']]

### Pipeline of Methods and Actions

In [13]:
filtered_daily_show.filter(lambda line: line[1] != '') \        # Remove Guests with profession as blank
                   .map(lambda line: (line[1].lower(), 1)) \    # Convert all to (profession,1) key-value pairs in lowercase
                   .reduceByKey(lambda x,y: x+y) \              # Collapse and count unique professions by key
                   .take(5)                                     # Return the first 5 (unsorted)

[('radio personality', 3),
 ('former governor of new york', 1),
 ('illustrator', 1),
 ('presidnet', 3),
 ('former united states secretary of state', 6)]

### Another Example

In [14]:
# Read `recent-grads.csv` in to an RDD.
f = sc.textFile('recent-grads.csv')
data = f.map(lambda line: line.split('\n'))
data.take(10)

[['Rank,Major_code,Major,Total,Men,Women,Major_category,ShareWomen,Sample_size,Employed,Full_time,Part_time,Full_time_year_round,Unemployed,Unemployment_rate,Median,P25th,P75th,College_jobs,Non_college_jobs,Low_wage_jobs'],
 ['1,2419,PETROLEUM ENGINEERING,2339,2057,282,Engineering,0.120564344,36,1976,1849,270,1207,37,0.018380527,110000,95000,125000,1534,364,193'],
 ['2,2416,MINING AND MINERAL ENGINEERING,756,679,77,Engineering,0.101851852,7,640,556,170,388,85,0.117241379,75000,55000,90000,350,257,50'],
 ['3,2415,METALLURGICAL ENGINEERING,856,725,131,Engineering,0.153037383,3,648,558,133,340,16,0.024096386,73000,50000,105000,456,176,0'],
 ['4,2417,NAVAL ARCHITECTURE AND MARINE ENGINEERING,1258,1123,135,Engineering,0.107313196,16,758,1069,150,692,40,0.050125313,70000,43000,80000,529,102,0'],
 ['5,2405,CHEMICAL ENGINEERING,32260,21239,11021,Engineering,0.341630502,289,25694,23170,5180,16697,1672,0.061097712,65000,50000,75000,18314,4440,972'],
 ['6,2418,NUCLEAR ENGINEERING,2573,2200,373,En