# Getting started with SPARK: 

### (1) - Set up a AWS Account and a t2.micro instance
### (2) - Installed Anaconda (Python 3.6)
### (3) - Installed Java
### (4) - Installed Scala
### (5) - Installed SPARK+Hadoop


In [2]:
from pyspark import SparkContext

In [3]:
sc = SparkContext()

#It works!

In [4]:

%%writefile example.txt
first line
second line
third line
fourth line

Overwriting example.txt


### RDD using textFile method
- sc is the context
- RDD Actions ... return values
- RDD transformations ... return ... RDD with subset of items

In [5]:
#use spark context to make RDD from txt or other files.
textFile = sc.textFile('example.txt')

In [6]:
#simplest action is count()
textFile.count()

4

In [7]:
# grab the first line as an ACTION
textFile.first()

'first line'

- Filter Method transformation (similar to python filter)
    - only contians lines contianing the ____

In [8]:
secfind = textFile.filter(lambda line: 'second' in line)

That was superfast to run b/c transformations are lazily evaluated. 
- Think of transformation like a recipe

In [9]:
#RecipE: no output unti8l the run action is called.
secfind

PythonRDD[4] at RDD at PythonRDD.scala:48

In [10]:
# Do it...
secfind.collect()

['second line']

In [11]:
secfind.count()

1

- You can create a complicated recipe without having to run the many commands

# RDD TRANSFORMATIONS AND ACTIONS

### Important Items

- **RDD** - Resilient Distributed Dataset
- **Transformation** - Spark operation producing RDD
    - will not give an object until an action is called
- **Action** - Spark operation producting a local object
    - 
- **Spark Job** - Sequence of transformations on data w/ a final action

### Creating an RDD

- sc.parallelize(array) - create RDD of elements of array or list
- sc.textFile(path/to/file) - create RDD of lines from file

### RDD Transformation

- Create a set of instructions we want to perform o the RDD
- BEFORE we call an ACTION and actually execute them
- the 'recipe' pieces

### RDD Actions
- execute your recipe

# Transformation and Actions Practice

In [17]:
%%writefile example2.txt
first 
second line
the third line
then a fourth line


Writing example2.txt


In [19]:
#from pyspark import SparkContext
#sc = SparkContext()

In [20]:
sc.textFile('example2.txt')

example2.txt MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:0

In [21]:
text_rdd = sc.textFile('example2.txt')

In [22]:
# a transformation that splits every line into a list of words
words = text_rdd.map(lambda line: line.split())

In [24]:
# words won't do anything until we take an ACTION
words

PythonRDD[10] at RDD at PythonRDD.scala:48

In [25]:
words.collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [27]:
#This is what the non-mapped, original RDD looks like:
text_rdd.collect()

['first ', 'second line', 'the third line', 'then a fourth line']

### map vs flatMap

In [28]:
# Can do transformation and action into a single line
# no memory penalty for splitting them up b/c Lazy

text_rdd.flatMap(lambda line: line.split()).collect()

#A list of all words in the text file

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

### Example from video

In [29]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Writing services.txt


In [30]:
services = sc.textFile('services.txt')

may need to clean or manipulate...

In [31]:
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [32]:
services.map(lambda line: line.split())

PythonRDD[15] at RDD at PythonRDD.scala:48

In [33]:
#transform string into a list

services.map(lambda line: line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [35]:
#remove the hastag before EventID
services.map(lambda line: line[1:] if line[0]=='#' else line).collect()

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

In [38]:
clean = services.map(lambda line: line[1:] if line[0]=='#' else line)

In [39]:
clean = clean.map(lambda line: line.split())

In [40]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [47]:
# Practice grabbing fields...
# Total sales by state 
# list of tuple pairs
# Needed for the key to work

clean.map(lambda lst: (lst[3],lst[-1])).collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [54]:
pairs = clean.map(lambda lst: (lst[3],lst[-1]))

In [50]:
# reduceByKey assumes data comes in key: item tuples
# It automatically assumes 1st column is key

rekey = pairs.reduceByKey(lambda amt1,amt2 : amt1 + amt2)

In [51]:
rekey.collect()

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00'),
 ('CA', '200.00500.00')]

In [55]:
# Notes... there are still single quotes, so the numbers are concatenated strings
# Need to fix...

rekey = pairs.reduceByKey(lambda amt1,amt2 : float(amt1) + float(amt2))


In [56]:
rekey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

### A lot of SPARK is cleaning up data to get it into the form you can use it

In [57]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [62]:
# Grab (State, Amount)
step1 = clean.map(lambda lst: (lst[3],lst[-1]))

# Reduce by key
step2 = step1.reduceByKey(lambda amt1,amt2 : float(amt1) + float(amt2))

# Get rid of State, Amount titles
step3 = step2.filter(lambda x: not x[0]=='State')

# Sort Results by Amoutn
# sort by 2nd item in tuple, desciending
step4 = step3.sortBy(lambda stAmt : stAmt[1],ascending=False)

step4.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

### GEneral
### Use tuple unpacking for readability 
- Replace indexing with tuple unpacking


In [63]:
x = ['ID','State','Amount']

In [64]:
def func1(lst):
    return lst[-1]

In [65]:
# If you come back in the future, a lst[3] isn't readable
# so... ... ...

def func2(id_st_amt):
    #unpack values
    (Id,st,amt) = id_st_amt
    return amt

In [66]:
func1(x)

'Amount'

In [67]:
func2(x)

'Amount'

#### Function 2 is much much more readable later on.