##### References I used here for spark setup and installation:
1. https://changhsinlee.com/install-pyspark-windows-jupyter/
2. https://github.com/jupyter/jupyter/issues/248
3. https://medium.com/@GalarnykMichael/install-spark-on-windows-pyspark-4498a5d8d66c

##### Things I did which resolve the error (Java gateway process exited before sending the driver its port number): 
1. Reinstalled Anaconda 3 and chosed "Add Anaconda to my PATH environment variable"
2. Installed Java 8 rather than the most updated Java 11

##### Jupyter Python Notebook keyboard shortcuts:
* `Esc` will take you into command mode where you can navigate around your notebook with arrow keys.
* While in command mode:
 * `A` to insert a new cell above the current cell  
 * `B` to insert a new cell below  
 * `M` to change the current cell to Markdown  
 * `Y` to change it back to code  
 * `D + D` (press the key twice) to delete the current cell  
* `Enter` will take you from command mode back into edit mode for the given cell.
* `ctrl + enter` run cell.

(Adapted from https://www.dataquest.io/blog/jupyter-notebook-tips-tricks-shortcuts/)


In [1]:
# Set up and check environment variables
import os
import sys

#os.environ['SPARK_HOME'] = 'C:\spark\spark-2.3.2-bin-hadoop2.7'
print(os.environ['SPARK_HOME'])

#os.environ['JAVA_HOME'] = 'C:\Java8'
print(os.environ['JAVA_HOME'])

#os.environ['HADOOP_HOME'] = 'C:\spark\spark-2.3.2-bin-hadoop2.7'
print(os.environ['HADOOP_HOME'])

#os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
print(os.environ['PYSPARK_DRIVER_PYTHON'])

#os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'notebook'
print(os.environ['PYSPARK_DRIVER_PYTHON_OPTS'])

pyspark_submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "")
if not "pyspark-shell" in pyspark_submit_args: pyspark_submit_args += " pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args
print(os.environ['PYSPARK_SUBMIT_ARGS'])

print(sys.path)

C:\spark\spark-2.3.2-bin-hadoop2.7
c:\JAVA8
C:\spark\spark-2.3.2-bin-hadoop2.7
jupyter
notebook
 pyspark-shell
['', 'C:\\spark\\spark-2.3.2-bin-hadoop2.7\\bin', 'C:\\Users\\Samuel\\Anaconda3\\python37.zip', 'C:\\Users\\Samuel\\Anaconda3\\DLLs', 'C:\\Users\\Samuel\\Anaconda3\\lib', 'C:\\Users\\Samuel\\Anaconda3', 'C:\\Users\\Samuel\\Anaconda3\\lib\\site-packages', 'C:\\Users\\Samuel\\Anaconda3\\lib\\site-packages\\win32', 'C:\\Users\\Samuel\\Anaconda3\\lib\\site-packages\\win32\\lib', 'C:\\Users\\Samuel\\Anaconda3\\lib\\site-packages\\Pythonwin', 'C:\\Users\\Samuel\\Anaconda3\\lib\\site-packages\\IPython\\extensions', 'C:\\Users\\Samuel\\.ipython']


In [2]:
# Initiate SparkSession
# SparkSession is the entry point to programming Spark with the Dataset and DataFrame API.
#
# A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, 
# cache tables, and read parquet files. To create a SparkSession, use the following builder pattern

import findspark
findspark.init()

import pyspark # only run after findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

In [3]:
# Quick test using Spark SQL
df = spark.sql('''select 'spark' as hello ''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



##### Reference for the following exercises:  
https://changhsinlee.com/pyspark-dataframe-basics/

In [4]:
# Create sparkContext, which tells Spark about how and where to access the cluster
import pandas as pd
sc = spark.sparkContext
sc

In [5]:
# Create Spark dataframe from panda dataframe
data1 = {'PassengerId': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5},
         'Name': {0: 'Owen', 1: 'Florence', 2: 'Laina', 3: 'Lily', 4: 'William'},
         'Sex': {0: 'male', 1: 'female', 2: 'female', 3: 'female', 4: 'male'},
         'Survived': {0: 0, 1: 1, 2: 1, 3: 1, 4: 0}}

data2 = {'PassengerId': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5},
         'Age': {0: 22, 1: 38, 2: 26, 3: 35, 4: 35},
         'Fare': {0: 7.3, 1: 71.3, 2: 7.9, 3: 53.1, 4: 8.0},
         'Pclass': {0: 3, 1: 1, 2: 3, 3: 1, 4: 3}}

df1_pd = pd.DataFrame(data1, columns=data1.keys())
df2_pd = pd.DataFrame(data2, columns=data2.keys())
df1_pd

Unnamed: 0,PassengerId,Name,Sex,Survived
0,1,Owen,male,0
1,2,Florence,female,1
2,3,Laina,female,1
3,4,Lily,female,1
4,5,William,male,0


In [6]:
df2_pd

Unnamed: 0,PassengerId,Age,Fare,Pclass
0,1,22,7.3,3
1,2,38,71.3,1
2,3,26,7.9,3
3,4,35,53.1,1
4,5,35,8.0,3


In [7]:
df1 = spark.createDataFrame(df1_pd)
df2 = spark.createDataFrame(df2_pd)
df1
# note that df1.show() would lead to error

DataFrame[PassengerId: bigint, Name: string, Sex: string, Survived: bigint]

In [8]:
df1.printSchema()

root
 |-- PassengerId: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Survived: long (nullable = true)



Five data maipulation actions for dataframe: select, filter, mutate, summarize, and arrange (Hadley Wickham)

In [9]:
## 1. Select ## (columns)
cols1 = ['PassengerId', 'Name']
df1.select(cols1)
# df1.select(cols1).show() would lead to error, possibly becasue of running out of memory
# It seems that transformations would not lead to error, but actions would.

DataFrame[PassengerId: bigint, Name: string]

In [10]:
## 2. Filter (it functions as the same as WHERE in SQL queries) ## (rows)
df1.filter(df1.Sex == 'female')
# df1.filter(...).first() would lead to error

DataFrame[PassengerId: bigint, Name: string, Sex: string, Survived: bigint]

In [11]:
## 3. Mutate ## (columns)
df2.withColumn('AgeTimesFare', df2.Age*df2.Fare)

DataFrame[PassengerId: bigint, Age: bigint, Fare: double, Pclass: bigint, AgeTimesFare: double]

In [12]:
## 4. Summarize (and groupby) ##
# Firstly, convert the dataframe to a GroupedData object and then call the aggregate functions
gdf2 = df2.groupby('Pclass')
gdf2

<pyspark.sql.group.GroupedData at 0x84b1470>

In [13]:
# Obtain the average of columns by passing an unpacked list of column names.
# Note that '*' is added before the column name, and this is called unpacking
# https://www.quora.com/What-are-packing-and-unpacking-arguments-in-Python
avg_cols = ['Age', 'Fare']
gdf2.avg(*avg_cols)

DataFrame[Pclass: bigint, avg(Age): double, avg(Fare): double]

In [14]:
# To call multiple aggregation functions at once, pass a dictionary.
df2.agg({'*': 'count', 'Age': 'avg', 'Fare':'sum'})

DataFrame[count(1): bigint, avg(Age): double, sum(Fare): double]

In [15]:
# To rename the columns, use toDF().
(
    gdf2
    .agg({'*': 'count', 'Age': 'avg', 'Fare':'sum'})
    .toDF('Pclass', 'counts', 'average_age', 'total_fare')
)

DataFrame[Pclass: bigint, counts: bigint, average_age: double, total_fare: double]

In [16]:
## 5. Arrange (sort) ##
df2.sort('Fare', ascending=False)

DataFrame[PassengerId: bigint, Age: bigint, Fare: double, Pclass: bigint]

In [17]:
## 6. Joins and unions
# Joins combine data into new columns
# Unions combine data into new rows
df1.join(df2, ['PassengerId'])

DataFrame[PassengerId: bigint, Name: string, Sex: string, Survived: bigint, Age: bigint, Fare: double, Pclass: bigint]

In [18]:
# Nonequi joins
# They can be very slow due to skewed data, but this is one thing that Spark can do that Hive can not.
df1.join(df2, df1.PassengerId <= df2.PassengerId)

DataFrame[PassengerId: bigint, Name: string, Sex: string, Survived: bigint, PassengerId: bigint, Age: bigint, Fare: double, Pclass: bigint]

In [19]:
df1.union(df1)

DataFrame[PassengerId: bigint, Name: string, Sex: string, Survived: bigint]

In [20]:
df1.explain()

== Physical Plan ==
Scan ExistingRDD[PassengerId#7L,Name#8,Sex#9,Survived#10L]


In [21]:
df2.explain()

== Physical Plan ==
Scan ExistingRDD[PassengerId#15L,Age#16L,Fare#17,Pclass#18L]


In [22]:
dfj1 = df1.join(df2, ['PassengerId'])
dfj1.explain()

== Physical Plan ==
*(5) Project [PassengerId#7L, Name#8, Sex#9, Survived#10L, Age#16L, Fare#17, Pclass#18L]
+- *(5) SortMergeJoin [PassengerId#7L], [PassengerId#15L], Inner
   :- *(2) Sort [PassengerId#7L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(PassengerId#7L, 200)
   :     +- *(1) Filter isnotnull(PassengerId#7L)
   :        +- Scan ExistingRDD[PassengerId#7L,Name#8,Sex#9,Survived#10L]
   +- *(4) Sort [PassengerId#15L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(PassengerId#15L, 200)
         +- *(3) Filter isnotnull(PassengerId#15L)
            +- Scan ExistingRDD[PassengerId#15L,Age#16L,Fare#17,Pclass#18L]


In [23]:
# Caching: Cache a dataframe when it is used multiple times in the script.
df1.cache()

DataFrame[PassengerId: bigint, Name: string, Sex: string, Survived: bigint]

In [24]:
df1.storageLevel

StorageLevel(True, True, False, True, 1)

In [25]:
df1.unpersist()
df1.storageLevel

StorageLevel(False, False, False, False, 1)

In [26]:
# Partitions and repartition()
df1.rdd.getNumPartitions()

4

In [27]:
df1_repartitioned = df1.repartition(1)
df1_repartitioned.rdd.getNumPartitions()

1