# Introduction to PySpark

## Motivations for using Spark

Apache Spark is a cluster computing platform designed to be fast on rather large datasets. 

Data sizes are growing these days, and we need to be able to process and conduct analysis on big data using reasonable amount of time. Spark offers the opportunity to do that due to its unique ability to process data in memory. This advantage of Spark especially shows itself when doing innately iterative machine learning tasks. This is different from its predecessors like Hadoop which does processing in disk.

More practically speaking, knowing how to use Spark is definitely a plus on your resume as a data analyst, data scientist or data engineer!

## How Spark Works

Spark operates on the idea of distributed data parallelism. We split the data over several working nodes, which independently operate on the data shards in parallel. If necessary, the data shards are then combined afterwards. For this tutorial, we will not worry about the particular inner structure of Spark. In fact, you do not need to know the details of what happens under the hood to be able to use basic spark!



## Overview of Content

In this tutorial, we will cover the basics of PySpark and the machine learning package in PySpark. 

* [Downloading and Installing PySpark](#Downloading-and-Installing-PySpark)
* [Starting our first SparkContext and SparkSession](#Starting-our-first-SparkContext-and-SparkSession)
* [Creating or Loading Sample Data](#Creating-or-Loading-Sample-Data)
* [RDD and DataFrame Basics](#RDD-and-DataFrame-Basics)
* [Machine Learning in PySpark](#Machine-Learning-in-PySpark)
* [Shutting off a SparkContext](#Shutting-off-a-SparkContext)
* [PySpark in Action: Will I Respond Correctly?](#PySpark-in-Action:-Will-I-Respond-Correctly?)
* [Starting a Spark Cluster](#Starting-a-Spark-Cluster)
* [Useful Resources](#Useful-Resources)

## Downloading and Installing PySpark

First, if you do not already have PySpark, install it using pip

    $ pip install pyspark 

After installing pySpark, make sure the following command works:

In [1]:
import pyspark

## Starting our first SparkContext and SparkSession

SparkContext is an entry point to Spark programming with RDD and to connect to Spark Cluster. Since Spark 2.0 SparkSession has been introduced and became an entry point to start programming with DataFrame. Don't worry if terms like RDD and DataFrame confuse you now. We will formally introduce them in our next section. 


In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sparkContext=sc)

## Creating or Loading Sample Data

Before getting to the details of creating/loading data, let us sart with an introduction to PySpark's data structures: RDDs and DataFrames:

#### RDDs (Resilient Distributed Datasets)
RDDs are the elements in PySpark that run and operate on multiple nodes to do parallel processing. The two key features of RDDs are immutable and fault tolerant. Once you create a RDD, you cannot change it in any way. This allows for fault tolerance, since if any process fails, it can potentially restart with the original (unchanged) data. 

#### DataFrames
DataFrames are SQL-like structured datasets with query operations. It is a relational API over RDDs. In contrast to RDDs being a functional API, DataFrames is a declarative relational API. 

Rule of Thumb: If data permits it, we should almost always use DataFrame as it providers a higher level of abstraction with optimization potentials and thus performance benefits. Below, we introduce creating and importing data as RDDs and Dataframes.

##### Creating a simple RDD using SparkContext's parallelize on a existing iterable or collection.

In [3]:
# let's create a list of (name,list(courses))
l = [('Alice', [10405, 15213, 10301]),('Bob', [10405, 10701]),('Chad', [15513, 15445, 10405])]
l_rdd = sc.parallelize(l)
l_rdd.take(3)

[('Alice', [10405, 15213, 10301]),
 ('Bob', [10405, 10701]),
 ('Chad', [15513, 15445, 10405])]

##### Creating a RDD from an external dataset 

We obtained the datatset from a course I have taken (85-432) and modified and cleaned it for this tutorial. If you want to follow along, please download the dataset here: [joined_Lexical_Data.csv](https://github.com/edenhu11/15388Tutorial/blob/main/joined_Lexical_Data.csv)

In [4]:
ld_rdd = sc.textFile('joined_Lexical_Data.csv')
ld_rdd.take(5)

['"X","Sub_ID","Trial","Type","D_RT","D_Word","Outlier","D_Zscore","Correct","Length","Log_Freq_HAL"',
 '1,157,1,1,"710","browse","true",-0.437,1,6,8.856',
 '2,67,1,1,"1,094","refrigerant","false",0.825,1,11,4.644',
 '3,120,1,1,"587","gaining","false",-0.645,1,7,8.304',
 '4,21,1,1,"984","cheerless","true",0.025,1,9,2.639']

##### Creating a dataframe with createDataFrame() using an RDD, a list or a pandas.DataFrame

In [5]:
df = spark.createDataFrame(l,['name','courses'])
df.show()

+-----+--------------------+
| name|             courses|
+-----+--------------------+
|Alice|[10405, 15213, 10...|
|  Bob|      [10405, 10701]|
| Chad|[15513, 15445, 10...|
+-----+--------------------+



##### Creating a dataframe from an external dataset. 

In [6]:
#import csv, tell the function that the csv has a header and ask it to infer that data type of each column.
ld_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("LexicalData.csv")
ld_df.show(3)

+---+------+-----+----+-----+-----------+-------+--------+-------+
|_c0|Sub_ID|Trial|Type| D_RT|     D_Word|Outlier|D_Zscore|Correct|
+---+------+-----+----+-----+-----------+-------+--------+-------+
|  1|   157|    1|   1|  710|     browse|  false|  -0.437|      1|
|  2|    67|    1|   1|1,094|refrigerant|  false|   0.825|      1|
|  3|   120|    1|   1|  587|    gaining|  false|  -0.645|      1|
+---+------+-----+----+-----+-----------+-------+--------+-------+
only showing top 3 rows



## RDD and DataFrame Basics

### RDD
For RDD only, there are two types of operations: transformation and action. Transformations always happen on a previous RDD and then return a newly changed RDD. In contrast, an action returns something other than a RDD, such an integer. 

Another very important distinction is that all transformations are lazy, as they are not really exectuted when you run them. They are merely "planned" to be executed. All the transformations are really executed when an action is called, which as opposed to lazy, is eager. 

#### Some Transformation Operations

##### filter(): 
use ```filter()``` text to RDD element with the target

In [7]:
alice_rdd = l_rdd.filter(lambda x: "Alice" in x)
alice_rdd.take(1)

[('Alice', [10405, 15213, 10301])]

##### union() 
use ```union()``` combines two RDD together

In [8]:
combined_rdd = l_rdd.filter(lambda x: "Bob" in x).union(alice_rdd)
combined_rdd.take(2)

[('Bob', [10405, 10701]), ('Alice', [10405, 15213, 10301])]

##### map()
use ```map()``` to do operation on every element in the RDD

In [9]:
#get rid of everyone's first class in the list
l_rdd_new = l_rdd.map(lambda x:(x[0],x[1][1:]))
l_rdd_new.take(3)

[('Alice', [15213, 10301]), ('Bob', [10701]), ('Chad', [15445, 10405])]

##### reduceByKey()
use ```reduceByKey()``` to calculate a value for the RDD elements in the same group

In [10]:
l2 = [('Alice',3),('Bob',5),('Chad',6)]
l_rdd2 = sc.parallelize(l2)
l_rdd2.take(3)

[('Alice', 3), ('Bob', 5), ('Chad', 6)]

In [11]:
#reduceByKey to calculate average. 
l_rdd2_remapped = l_rdd2.map(lambda x:('people',(x[1],1))) # map all people to a common group "people"
l_rdd2_remapped.take(3)

[('people', (3, 1)), ('people', (5, 1)), ('people', (6, 1))]

In [12]:
average = l_rdd2_remapped.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])) # calculate the sum
average.take(1)

[('people', (14, 3))]

In [13]:
average.map(lambda x:x[1][0]/x[1][1]).take(1) # calcualte average

[4.666666666666667]

#### Some Action Operations

##### collect()
It returns all the elements of the dataset as an array.

In [14]:
l_rdd2.collect()

[('Alice', 3), ('Bob', 5), ('Chad', 6)]

##### count() and countByValue()
we use ```count()``` to count the number of lines in a RDD. ```countByValue()``` counts the occurence of each RDD element.

In [15]:
l_rdd2.count()

3

In [16]:
l_rdd2.countByValue()

defaultdict(int, {('Alice', 3): 1, ('Bob', 5): 1, ('Chad', 6): 1})

##### Reduce()
```reduce``` aggragate the elements using the provided function and returns a value. 

In [17]:
#calculate average using reduce
#first, get the age as a separate RDD
sum_rdd = l_rdd2.map(lambda x:x[1])
sum_rdd.collect()

[3, 5, 6]

In [18]:
# map each element to one to calculate number of elements
num_rdd = l_rdd2.map(lambda x:(1))
num_rdd.collect()

[1, 1, 1]

In [19]:
#calculate average
sumVal = sum_rdd.reduce(lambda accum,n: accum +n)
print("sum is: ",sumVal)
numVal = num_rdd.reduce(lambda accum,n: accum +n)
print("number of element is: ",numVal)
avg = sumVal/numVal
print("average is: ",avg)

sum is:  14
number of element is:  3
average is:  4.666666666666667


This introduction is surely not exhausitive, so feel free to check the Spark API for more examples. 

### DataFrame

Spark DataFrames are very similar to Pandas DataFrame. Read the [PySpark in Action: Will I Respond Correctly?](#PySpark-in-Action:-Will-I-Respond-Correctly?) section for a quick example.

## Machine Learning in PySpark

PySpark provides a general machine learning library (MLlib). 
In this section, we will introduce two of the most common techniques to pre-process categroical data: StringIndexer and OneHotEncoder. Then, we go into Spark's machine learning pipeline.

### ```StringIndexer``` and ```OneHotEncoderEstimator```
In Spark, ```Stringindexer``` maps a categorical variable column to an index column, which Spark will then see as a categorical variable. The indices start with 0 and are ordered by label frequencies. 

Three steps to implementing the ```StringIndexer```
1. Build the StringIndexer model: specify the input and output col name
2. Learn the StringIndexer model: fit the model with your data
3. Execute the indexing: call transform to execute the indexing 

Immediately after ```StringIndexer```, we follow up with ```OneHotEncoder```. ```OneHotEncoder``` converts each category of a String Indexed column with a sparse vector. 

Let's look at a small example:

In [20]:
import pandas as pd
pdf = pd.DataFrame({
        'Day of the week':['Monday','Friday','Tuesday','Thursday','Friday'],
        'weather':['Sunny','Cloudy','Snow','Rain','Rain'],
        'temperature':['80','70','40','60','50'],
    })
df = spark.createDataFrame(pdf)
df.show()

+---------------+-------+-----------+
|Day of the week|weather|temperature|
+---------------+-------+-----------+
|         Monday|  Sunny|         80|
|         Friday| Cloudy|         70|
|        Tuesday|   Snow|         40|
|       Thursday|   Rain|         60|
|         Friday|   Rain|         50|
+---------------+-------+-----------+



In [21]:
from pyspark.ml.feature import StringIndexer

#build indexer
string_indexer = StringIndexer(inputCol='Day of the week',outputCol='indexed_day')

#learn the model
string_indexer_model = string_indexer.fit(df)

#transform the data
df_stringindexer = string_indexer_model.transform(df)

df_stringindexer.show()

+---------------+-------+-----------+-----------+
|Day of the week|weather|temperature|indexed_day|
+---------------+-------+-----------+-----------+
|         Monday|  Sunny|         80|        1.0|
|         Friday| Cloudy|         70|        0.0|
|        Tuesday|   Snow|         40|        3.0|
|       Thursday|   Rain|         60|        2.0|
|         Friday|   Rain|         50|        0.0|
+---------------+-------+-----------+-----------+



Next, we use ```OneHotEncoder```, which follows the same steps as detailed above.

In [22]:
from pyspark.ml.feature import OneHotEncoder
OneHotEncoder(inputCols=['indexed_day'],outputCols=['encoded_day']).fit(df_stringindexer).transform(df_stringindexer).show()

+---------------+-------+-----------+-----------+-------------+
|Day of the week|weather|temperature|indexed_day|  encoded_day|
+---------------+-------+-----------+-----------+-------------+
|         Monday|  Sunny|         80|        1.0|(3,[1],[1.0])|
|         Friday| Cloudy|         70|        0.0|(3,[0],[1.0])|
|        Tuesday|   Snow|         40|        3.0|    (3,[],[])|
|       Thursday|   Rain|         60|        2.0|(3,[2],[1.0])|
|         Friday|   Rain|         50|        0.0|(3,[0],[1.0])|
+---------------+-------+-----------+-----------+-------------+



The result of ```oneHotEncoder``` is read as \[ vector size, \[index of the variable\], \[1.0\]\]. It represents a binary sparse matrix. Note that, by default, Spark drops the last category to ensure linear independence. Before training a model, we should OneHotEncode all categorical data and merge them using ```VectorAssembler```, as demonstrated in [PySpark in Action: Will I Respond Correctly?](#PySpark-in-Action:-Will-I-Respond-Correctly?).

### Spark pipeline

Pipeline is a sequence of stages which consist of estimators and/or transformers. It allows us to fit and transform continuously without saving the intermediate products.
In the code below, we pipelined the string-indexing and one-hot-encoding stage of the two categorical variables-"Day of the week" and "weather".

In [23]:
from pyspark.ml import Pipeline
categorical_columns = ['Day of the week', 'weather']
stringindexer_stages = [StringIndexer(inputCol = column,outputCol='stringindexed_'+column)\
                        for column in categorical_columns]
onehotencoder_stages = [OneHotEncoder(dropLast=False,inputCol = 'stringindexed_'+column,outputCol='onehotencoded_'+column)\
                        for column in categorical_columns]

all_stages = stringindexer_stages + onehotencoder_stages

#build pipeline model
pipeline = Pipeline(stages=all_stages)

#fit and transform
df_coded = pipeline.fit(df).transform(df)
#only select interested columns
selected_columns = ['onehotencoded_' + c for c in categorical_columns] + ['temperature']
df_coded = df_coded.select(selected_columns)
df_coded.show()

+-----------------------------+---------------------+-----------+
|onehotencoded_Day of the week|onehotencoded_weather|temperature|
+-----------------------------+---------------------+-----------+
|                (4,[1],[1.0])|        (4,[3],[1.0])|         80|
|                (4,[0],[1.0])|        (4,[1],[1.0])|         70|
|                (4,[3],[1.0])|        (4,[2],[1.0])|         40|
|                (4,[2],[1.0])|        (4,[0],[1.0])|         60|
|                (4,[0],[1.0])|        (4,[0],[1.0])|         50|
+-----------------------------+---------------------+-----------+



### Other ML package in Spark

Spark also offers a very intuitive machine learning API that is similiar to Scikit-Learn. Please check the MLlib API for more information. 

## Shutting off a SparkContext

A SparkContext needs to be manually shutoff before opening a new one.

In [24]:
sc.stop()

## PySpark in Action: Will I Respond Correctly?

In the sections below, we provide an example using Spark DataFrame and MLlib to predict which words are more likely to be responded to correctly during a lexical decision task, based on their length, frequency, and whether it is an outlier word. We will using Spark to read in the data for data preprocessing, create StringIndexing, OneHotEncoding and vectorAssembling and train a logistic regression model with cross validation. 

In [25]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
# start a sparkContext
sc = SparkContext.getOrCreate()
spark = SparkSession(sparkContext=sc)

#read in data
data = spark.read.format("csv").option("header","true").option("inferSchema","true").load("joined_Lexical_Data.csv")

#cache for faster access
data.cache() 

DataFrame[X: int, Sub_ID: int, Trial: int, Type: int, D_RT: string, D_Word: string, Outlier: boolean, D_Zscore: double, Correct: int, Length: int, Log_Freq_HAL: double]

First, let's extract the features and labels from the DataFrame. In this dataset, the relevant features are Outlier, Length, Log_Frequency_Hal. The label is Correct.

In [26]:
from pyspark.sql.functions import *
#select relevant data and rename the columns
selected_data = data.select(col('`Log_Freq_HAL`').alias('Frequency'),'Outlier','Length', 'Correct');

#change relevant columns types to float
df = selected_data.withColumn('Fequency',selected_data.Frequency.cast('float'))
df = selected_data.withColumn('Length',selected_data.Length.cast('float'))
df = selected_data.withColumn('Correct',selected_data.Correct.cast('float'))
#cast boolean column to numeric column
df = df.withColumn('Outlier',when(col('Outlier')=='True',1.0).otherwise(0.0))

Then we pre-process the categorical varaible Outlier.

In [27]:
#create onehotencoder
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

categorical_columns = list(set(['Outlier']))
#build stages
stringindexer_stages = [StringIndexer(inputCol = column,outputCol='stringindexed_'+column)\
                        for column in categorical_columns]
onehotencoder_stages = [OneHotEncoder(inputCol = 'stringindexed_'+column,outputCol='onehotencoded_'+column)\
                        for column in categorical_columns]

all_stages = stringindexer_stages + onehotencoder_stages

#build pipeline model
pipeline = Pipeline(stages=all_stages)

#fit pipeline model
pipeline_mode = pipeline.fit(df)

#transform data
df_coded = pipeline_mode.transform(df)

#remove uncoded columns
selected_columns = ['onehotencoded_' + c for c in categorical_columns] + ['Length', 'Frequency', 'Correct']
df_coded = df_coded.select(selected_columns)

df_coded.show()

+---------------------+------+---------+-------+
|onehotencoded_Outlier|Length|Frequency|Correct|
+---------------------+------+---------+-------+
|            (1,[],[])|     6|    8.856|    1.0|
|        (1,[0],[1.0])|    11|    4.644|    1.0|
|        (1,[0],[1.0])|     7|    8.304|    1.0|
|            (1,[],[])|     9|    2.639|    1.0|
|        (1,[0],[1.0])|     8|    1.386|    1.0|
|        (1,[0],[1.0])|     8|    5.268|    1.0|
|        (1,[0],[1.0])|    10|    9.339|    1.0|
|        (1,[0],[1.0])|     6|     4.19|    1.0|
|        (1,[0],[1.0])|    11|      0.0|    1.0|
|        (1,[0],[1.0])|     9|    5.537|    1.0|
|        (1,[0],[1.0])|     6|    5.557|    1.0|
|        (1,[0],[1.0])|     6|   10.041|    1.0|
|            (1,[],[])|    11|    6.392|    1.0|
|        (1,[0],[1.0])|    13|    5.011|    1.0|
|            (1,[],[])|     8|    5.817|    1.0|
|            (1,[],[])|    12|    7.474|    1.0|
|        (1,[0],[1.0])|     5|     3.85|    1.0|
|        (1,[0],[1.0

In [28]:
#create VectorAssembler
from pyspark.ml.feature import VectorAssembler

#feature columns
feature_columns = df_coded.columns[0:3]
#build VectorAssembler instance
vectorassembler = VectorAssembler(inputCols=feature_columns,outputCol='features')
#transform data
df_features = vectorassembler.transform(df_coded)
df_features = df_features.withColumn('label',col('Correct').alias('label'))
df_features.show()

+---------------------+------+---------+-------+----------------+-----+
|onehotencoded_Outlier|Length|Frequency|Correct|        features|label|
+---------------------+------+---------+-------+----------------+-----+
|            (1,[],[])|     6|    8.856|    1.0| [0.0,6.0,8.856]|  1.0|
|        (1,[0],[1.0])|    11|    4.644|    1.0|[1.0,11.0,4.644]|  1.0|
|        (1,[0],[1.0])|     7|    8.304|    1.0| [1.0,7.0,8.304]|  1.0|
|            (1,[],[])|     9|    2.639|    1.0| [0.0,9.0,2.639]|  1.0|
|        (1,[0],[1.0])|     8|    1.386|    1.0| [1.0,8.0,1.386]|  1.0|
|        (1,[0],[1.0])|     8|    5.268|    1.0| [1.0,8.0,5.268]|  1.0|
|        (1,[0],[1.0])|    10|    9.339|    1.0|[1.0,10.0,9.339]|  1.0|
|        (1,[0],[1.0])|     6|     4.19|    1.0|  [1.0,6.0,4.19]|  1.0|
|        (1,[0],[1.0])|    11|      0.0|    1.0|  [1.0,11.0,0.0]|  1.0|
|        (1,[0],[1.0])|     9|    5.537|    1.0| [1.0,9.0,5.537]|  1.0|
|        (1,[0],[1.0])|     6|    5.557|    1.0| [1.0,6.0,5.557]

In [29]:
#split data into training and test data
training, test = df_features.randomSplit([0.8,0.2],seed=100)

#cross-validation model
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator

lr = LogisticRegression(featuresCol='features',labelCol='label')
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
# here we have 3 values each for regParam and elasticNetParam. So, in total, there are 9 models to train and validate.
param_grid = ParamGridBuilder()\
                .addGrid(lr.regParam,[0.0,0.5,1.0])\
                .addGrid(lr.elasticNetParam,[0.0,0.5,1.0])\
                .build()                
# we are using 5-fold cv here. 
cv = CrossValidator(estimator=lr,estimatorParamMaps=param_grid,evaluator=evaluator,numFolds=5)
# do cross validation on training dataset
cv_model = cv.fit(training)
cv_pred = cv_model.transform(training)

#use the cv_model to predict test data
predictions = cv_model.transform(test)

print("prediction accuracy: ", evaluator.evaluate(cv_pred))
print("testing accuracy: ",evaluator.evaluate(predictions))

prediction accuracy:  0.6995674611040452
testing accuracy:  0.6931035542961131


## Starting a Spark Cluster

So far, we have only scratched the surface of Spark. To fully utilize the parallel processing ability of Spark, you would need to create different clusters and more importantly, configure different clusters. Multiple clusters become a necessity when the data becomes huge. 

In Spark, there is one master that functions as the cluster manager and the others are workers. The master does the distributing of work and the workers do the actual work. 

In general, there are 3 ways to start a cluster: standalone, Hadoop YARN, Apache Mesos. In the following section, we will only be exploring the standalone option. This means that there are multiple machines (that have Spark installed) function as clusters. The master is provided by Spark itself. We will be starting a cluster on our local machine, so our computer will function both as a master and as workers. Now, this is not very useful but it will give you a flavor of Spark clusters.

To be safe, first check that you have java installed:
    
    $ java -version
    
Before we start, we need to download a separate version of Spark.

    https://www.apache.org/dyn/closer.lua/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz

Untar:

    $ tar zxvf spark-3.1.1-bin-hadoop2.7.tar

Now, cd into ```spark-3.1.1-bin-hadoop2.7/conf```.

Write the following in spark-env.sh (e.g. vim spark-env.sh):

    export SPARK_WORKER_MEMORY=1g
    export SPARK_EXECUTOR_MEMORY=512m
    export SPARK_WORKER_INSTANCES=2
    export SPARK_WORKER_CORES=2
    export SPARK_WORKER_DIR= <some dir to save running logs>

After this, type vim workers and write:
    
    localhost 

since we only have a standalone machine. If the master is remote, it should be the url of the master.

Return to the previous folder and start master with the following command:

    $ sbin/start-master.sh

Start the workers:

    $ sbin/start-workers.sh

Note: If you get connection refused error, enable "Remote Login" on your machine. 

Now, go to localhost:8080 using a browser to see the Spark UI.

[<img src="https://github.com/edenhu11/15388Tutorial/blob/main/sparkUI.png?raw=true">](https://github.com/edenhu11/15388Tutorial/blob/main/sparkUI.png?raw=true)

Now that we have the clusters running, let's open up the shell to run some operations. Unfortunately, it is a scala shell.

    $ bin/spark-shell --master URL_displaying_on_the_Left_Corner_of_UI_page

[<img src="https://github.com/edenhu11/15388Tutorial/blob/main/scala.png?raw=true">](https://github.com/edenhu11/15388Tutorial/blob/main/scala.png?raw=true)

After starting the shell, run these commands and see changes in the UI.

    val file=sc.textFile("README.md")
    file.count()
    file.take(3)

Use Ctrl-c to exit the shell. Use

    $ sbin/stop-master.sh
    $ sbin/stop-workers.sh 

to stop the processes.

## Useful Resources

PySpark API: http://spark.apache.org/docs/3.1.1/api/python/reference/index.html

RDD Cheatsheet: https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_Cheat_Sheet_Python.pdf

DataFrame Cheatsheet: https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf

Helpful tutorials with examples:
1. https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/ (also recommends relevant books)
2. https://github.com/MingChen0919/learning-apache-spark

Running PySpark on Amazon AWS: https://towardsdatascience.com/getting-started-with-pyspark-on-amazon-emr-c85154b6b921