In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

# Practical PySpark

* Installation
* Comparision with Pandas
    * Common Operations
* Data Sources for I/O
    * Common Formats
    * Local File System and HDFS
    * Connecting with AWS, Cassandra & Postgres
* Deployment Modes
    * Client, Cluster, Local
    * spark-submit
* ETL & ML - <font color='red'>(Notebook Demo)</font>
    * EDA and ETL Pipelines
    * ML pipelines
    * AWS EMR Cloud <font color='red'>(AWS Demo)</font>

# Spark Installation

### Step 1 : Install Java, Scala

In [None]:
## Install Scala and Java
cd ~
sudo apt install default-jre scala


### Step 2 : Download Spark and set SPARK_HOME in .bashrc

In [None]:
## Download Spark
wget https://www-us.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
tar xvf spark-2.4.0-bin-hadoop2.7.tgz
sudo mv spark-2.4.0-bin-hadoop2.7 /usr/local/spark

## put these lines in bashrc
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
export JAVA_HOME=/usr/lib/jvm/default-java

## refresh .bashrc file
source .bashrc

In [None]:
## Test it
pyspark

## Quick Look - Compare with Pandas

## Imports

In [None]:
# Pandas :
import pandas as pd

# PySpark :
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myfirst program')\
                            .master("local[4]")\
                            .getOrCreate()

In [None]:
spark

## Load CSV

In [None]:
# Pandas :
dfp = pd.read_csv("data/hotel_energy.csv", 
                  header=0)

# PySpark:
dfs = spark.read.csv("data/hotel_energy.csv", 
                     header=True, 
                     inferSchema=True)

## Show DataFrame

In [None]:
# Pandas : 
dfp.head(15)

# PySpark :
dfs.show(15)

## Column and Data Types

In [None]:
# Pandas :
dfp.columns
dfp.dtypes

# PySpark :
dfs.columns
dfs.printSchema()

## Change Column Names 

In [None]:
# Pandas :
dfp.columns = ["a", "b", "c"]

dfp.rename(columns = {"old1":"new1",
                      "old2":"new2"})

# PySpark : 
dfs1 = df.toDF["a", "b", "c"]

dfs.withColumnRenamed("old1", "new1")\
   .withColumnRenamed("old2", "new2")\
   .withColumnRenamed("old3", "new2")

## Drop Columns

In [None]:
# Pandas :
dfp.drop("hotel", axis=1)

# PySpark :
dfs.drop("hotel")

## Change Column Type

In [None]:
# Pandas :
dfp["sales"].astype('int')


# PySpark :
from pyspark.sql.functions import col
df = dfs.withColumn("sales", col("sales").cast("int"))

BinaryType: binary
BooleanType: boolean
ByteType: tinyint
DateType: date
DecimalType: decimal(10,0)
DoubleType: double
FloatType: float
IntegerType: int
LongType: bigint
ShortType: smallint
StringType: string
TimestampType: timestamp

In [None]:
from pyspark.sql.types import DoubleType

dfs = dfs.withColumn("sales", col("sales").cast(DoubleType()))

## Aggregates

In [None]:
# Pandas :
df.groupby(['age', 'gender'])\
  .agg({'height':"mean", 'income':'min'})

# Pyspark :    
df.groupby(['age', 'gender'])\
  .agg({'height':"mean", 'income':'min'})    

## Standard Transformations

In [None]:
# Pandas :
import numpy as np
df['log_sales'] = np.log(df["sales"])

# Pyspark:
import pyspark.sql.functions as F
df = df.withColumn('log_sales', F.log(df.sales))

## Difference between Pandas and Spark

## SQL Queries

In [None]:
# Pandas
# -

# Pyspark:
df.createOrReplaceTempView("df_VIEW")
ans_df = spark.sql("select * from df_VIEW where fruit == 'orange'")

## Lazy Evaluations

**Pandas** : When you run a cell, the contents are executed

In [None]:
# Pandas
# Cell Runs and data is loaded in memory
df = pd.read_csv("cars.csv") 

# executed and you get a new data frame right now
df2 = df.filter("mileage > 30") 
df3 = df2.select('carType').distinct()

df3 # you get your dataframe

**Spark** : When you run an operation, they are not executed. Insted a recepie is created.

This recepie is called a **DAG** (Directed Acyclic Graph)

In [None]:
# PySpark
# Cell Runs and data is loaded in memory
df = spark.read.csv("cars.csv") 

# Not executed, but a recepie is created
df2 = df.filter("mileage > 30")
df3 = df2.select('carType').distinct()

df3 # nothing happens - only a DAG is created

df3.show() # execute the recepie - using an action

![Dag1](./images/diff_dag.png)

* Why Lazy Evaluation?
Dealing with 10s/100s of GB of data, does not fit in RAM

### Transformations and Actions

![Spark T/A](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pagecounts/trans_and_actions.png)

* Transformations create a recepie

* Actions execute the recepie

### Immutability

* <font color='red'> Every transformation/action gives a new dataframe </font>
* Each new dataframe is immutable ( No inplace operations like Pandas )

In [None]:
# pandas
df = read_csv("/file")
df.drop('age', inplace=True, axis=1) # df is changed

# spark
df = read.csv("/file")
df.drop('age') # df is not changed
# returns a new df, with 'age' column dropped

## Spark Data Sources

![Spark Data Sources](https://i2.wp.com/www.jenunderwood.com/wp-content/uploads/2016/10/SparkArchitecture-Databrickss.gif?resize=800%2C462&ssl=1)

### Common formats

* CSV
* JSON
* Parquet
* LibSVM
* Text File

In [None]:
## Generic format
spark.read.<format>("/path/to/file")
spark.write.<format>("/path/to/file")

In [None]:
## CSV
spark.read.csv("/path/to/file")

## Parquet
spark.read.parquet("path/to/file")

If **HDFS** is enabled

In [None]:
## For Local file
spark.read.<format>.("file:///<full path here>") # note the file:///  & path
spark.write.<format>.("file:///<full path here>") # note the file:/// & path

### Other Formats  - Include Drivers and Jars
* AWS
* Cassandra
* Postgres

In [None]:
## Method 1 - Prefer
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages \
                                     <jar>,<jar> \
                                     pyspark-shell'
spark = SparkSession.builder\
                    .appName('Demo 1')\
                    .master("local")\
                    .getOrCreate()

In [None]:
## Method 2
spark = SparkSession.builder\
                    .appName('postgres spark demo')\
                    .master("local")\
                    .config("spark.jars.packages", "<jar>,<jar>")\
                    .getOrCreate()                

### Driver & Jar Files

Drivers from https://mvnrepository.com/

Jar Format is **groupID:artifactID:version**

In [None]:
Postgress Jar: "org.postgresql:postgresql:42.2.1"

AWS Jar: "org.apache.hadoop:hadoop-aws:2.7.1"

Cassnadra Jar: "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0"
                

### AWS

In [None]:
spark.read.<format>("s3a://<bucket name>/<file name>") # note the s3a://

### Cassandra

In [None]:
spark.read.format("org.apache.spark.sql.cassandra")\
          .option("spark.cassandra.connection.host","<ip>")\
          .option("spark.cassandra.connection.port","<port>")\
          .option("keyspace","<keyspace name>")\
          .option("table","<table name>")
          .load()

In [None]:
spark.read.format("org.apache.spark.sql.cassandra")\
          .option("spark.cassandra.connection.host","localhost")\
          .option("spark.cassandra.connection.port","9042")\
          .option("keyspace","spark_demo_keyspace")\
          .option("table","fruits_prices_over_time")
          .load()

### Postgres

In [None]:
## Postgres - Mysql - Relational Databases
spark.read\
      .format("jdbc")\
      .option("driver", "<driver name>")\
      .option("url", "jdbc:<dbtype>://<ip>:<port>/<dbname>")\
      .option("dbtable", "<table>")\
      .option("user", "<username>")\
      .option("password","<password>")\
      .load()

In [None]:
spark.read\
      .format("jdbc")\
      .option("driver", "org.postgresql.Driver")\
      .option("url", "jdbc:postgresql://localhost:5432/spark_demo_db")\
      .option("dbtable", "fruits_prices_over_time")\
      .option("user", "sahil")\
      .option("password","1234567890")\
      .load()

## Lets Create a Cluster - Standalone Cluster Manager

* 1 Master and 1 Slave

In [None]:
$SPARK_HOME/sbin/start-master.sh --host localhost


$SPARK_HOME/sbin/start-slave.sh spark://localhost:7077 --memory 5G --cores 3
# visit localhost:8080 for MASTER UI

## Spark Deployment Modes

* Client Mode

* Cluster Mode

* Local Mode


### Client Mode
* Driver runs on your laptop/master
* Interactive Apps : Jupyter Notebook, Spark-Shell, Pyspark

![Client Mode](./images/client_mode.png)

### Cluster Mode
* Driver runs on worker node
* spark-submit

![Client Mode](./images/cluster_mode.png)

### Local Mode
* Driver and Worker runs on your laptop
* Learning purpose only

![Client Mode](./images/local_mode.png)

## Spark-Submit

Used to run scripts on cluster after prototyping

* Remove master from script and PySpark Submit Arguments

In [None]:
# notebook
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages \
                                     <jar>,<jar> \
                                     pyspark-shell'
spark = SparkSession.builder.appName('myfirst program')\
                            .master("local[4]")\
                            .getOrCreate()

# script
import os
spark = SparkSession.builder.appName('myfirst program')\
                            .getOrCreate()

#### Generic Spark Submit
spark-submit --packages <font color='green'>jar,jar</font> --master <font color='green'>ip</font>  script_name.py

#### Local Mode
spark-submit --packages jar,jar --master <font color='red'>local[6]</font> script_name

#### For Standalone Cluster Manager (No Cluster Mode):
spark-submit --packages jar,jar --master <font color='red'>spark://ip:7077</font> script_name

#### For YARN Cluster Manager:
spark-submit     --packages jar,jar --master <font color='red'>yarn</font> --deploy-mode <font color='red'>cluster/client</font> script_name

In [None]:
# local
spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.1 \
--master local[6] \
ETL_aws.py

# standalone
spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.1 \
--master spark://localhost:7077 \
ETL_aws.py
                
# yarn
spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.1 \
--master yarn \
ETL_aws.py

## NoteBook Demo