In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

# Ultimate Spark Tutorial

* Installation
* Comparision with Pandas
* Data Sources for I/O
    * Connecting with Relational & NoN-Relational Databases
* Deployment Modes
    * spark-submit
    * Local, Standalone, Cloud (AWS Demo)
    * Running jupyter notebook
* ML & Analytics - Notebook Demo
    * Concept of PipeLines in Spark
    * Estimators & Transformers
    * Create a Machine Learning Model
* Understanding Spark Internals - Notebook Demo
    * Caching
    * Repartitioning


# Spark Installation

### Step 1 : Install Java, Scala

In [None]:
## Install Scala and Java
cd ~
sudo apt install default-jre scala


### Step 2 : Download Spark and set SPARK_HOME in .bashrc

In [None]:
## Download Spark
wget https://www-us.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
tar xvf spark-2.4.0-bin-hadoop2.7.tgz
sudo mv spark-2.4.0-bin-hadoop2.7 /usr/local/spark

## put these lines in bashrc
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
export JAVA_HOME=/usr/lib/jvm/default-java

## refresh .bashrc file
source .bashrc

In [None]:
## Test it
pyspark

## Quick Look - Compare with Pandas

## Imports

In [None]:
# Pandas :
import pandas as pd

# PySpark :
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myfirst program')\
                            .master("local[4]")\
                            .getOrCreate()

In [None]:
spark

## Load CSV

In [None]:
# Pandas :
dfp = pd.read_csv("data/hotel_energy.csv", 
                  header=0)

# PySpark:
dfs = spark.read.csv("data/hotel_energy.csv", 
                     header=True, 
                     inferSchema=True)

## Show DataFrame

In [None]:
# Pandas : 
dfp.head(15)

# PySpark :
dfs.show(15)

## Column and Data Types

In [None]:
# Pandas :
dfp.columns
dfp.dtypes

# PySpark :
dfs.columns
dfs.printSchema()

## Change Column Names 

In [None]:
# Pandas :
dfp.columns = ["a", "b", "c"]

dfp.rename(columns = {"old1":"new1",
                      "old2":"new2"})

# PySpark : 
dfs1 = df.toDF(*["a", "b", "c"])

dfs.withColumnRenamed("old1", "new1")\
   .withColumnRenamed("old2", "new2")\
   .withColumnRenamed("old3", "new2")

## Drop Columns

In [None]:
# Pandas :
dfp.drop("hotel", axis=1)

# PySpark :
dfs.drop("hotel")

## Change Column Type

In [None]:
# Pandas :
dfp["sales"].astype('int')


# PySpark :
from pyspark.sql.functions import col
df = dfs.withColumn("sales", col("sales").cast("int"))

BinaryType: binary
BooleanType: boolean
ByteType: tinyint
DateType: date
DecimalType: decimal(10,0)
DoubleType: double
FloatType: float
IntegerType: int
LongType: bigint
ShortType: smallint
StringType: string
TimestampType: timestamp

In [None]:
from pyspark.sql.types import DoubleType

dfs = dfs.withColumn("sales", col("sales").cast(DoubleType()))

## Aggregates

In [None]:
# Pandas :
df.groupby(['age', 'gender'])\
  .agg({'height':"mean", 'income':'min'})

# Pyspark :    
df.groupby(['age', 'gender'])\
  .agg({'height':"mean", 'income':'min'})    

## Standard Transformations

In [None]:
# Pandas :
import numpy as np
df['log_sales'] = np.log(df["sales"])

# Pyspark:
import pyspark.sql.functions as F
df = df.withColumn('log_sales', F.log(df.sales))

## SQL Queries

In [None]:
# Pandas
# -

# Pyspark:
df.createOrReplaceTempView("df_VIEW")
ans_df = spark.sql("select * from df_VIEW where fruit == 'orange'")

## Missing Values

In [None]:
from pyspark.sql.functions import col,sum
def missing_df(df):
    df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

## Spark Data Sources

![Spark Data Sources](https://i2.wp.com/www.jenunderwood.com/wp-content/uploads/2016/10/SparkArchitecture-Databrickss.gif?resize=800%2C462&ssl=1)

# Data Ingestion From External Sources - Spark
* Generic Format
* HDFS vs Local File
* Special Format - Need Drivers
    * Avro
    * S3
* Relational Database
    * Postgres
    * MySQL
* NoN-Relational Database
    * Cassandra

# Generic Format - Dont need drivers

* csv
* json
* parquet
* libsvm
* text

In [None]:
#### Read

spark.read.<format>("<file name>")

#### Write

spark.write.<format>("<file name>")

#### read
spark.read.format("<format name>").options("path", "<path here>").load()

#### write
spark.write.format("<format name>").options("path", "<path here>").save()

## HDFS vs Local File

There is a different convention when dealing with files stored in HDFS and Local

In [None]:
#### HDFS
spark.read.<format>.("hdfs://<full path here>") # notice the double slash

#### Local File 
spark.read.<format>("file:///<full path here>") # notice the triple slash

# Special Formats - Need Drivers

You can include the following packages using **--packages**

|Source| Driver Package|
|-----------|----------------|
|S3        |org.apache.hadoop:hadoop-aws:2.7.1|
|Avro       |org.apache.spark:spark-avro_2.11:2.4.0|

# S3

Set up your Access Key and Scret Key in .bashrc

In [None]:
export AWS_ACCESS_KEY_ID=<access ID here>
    
export AWS_SECRET_ACCESS_KEY=<access KEY here>

In [None]:
#### Read

df = spark.read.<format>("s3a://<bucket name>/<file name>") # notice the s3a

#### Write

df.write.<format>("s3a://<bucket name>/<file name>", mode="overwrite") # notice the s3a

# Relational Databases

|Source| Driver Package|Driver Name|Standard Port|
|-----------|----------------|---------|----|
|Postgres   |org.postgresql:postgresql:42.1.1|org.postgresql.Driver|5432
|MySQL       |mysql:mysql-connector-java:8.0.13|com.mysql.jdbc.Driver|3306
|SQL Server| Download from internet - see SQLserver section| com.mysql.jdbc.Driver|  1433|

In [None]:
#### Generic Read

spark.read\
      .format("jdbc")\
      .option("driver", "<driver name>")\
      .option("url", "jdbc:<source>://<ip>:<port>/<dbname>")\
      .option("dbtable", "<table>")\
      .option("user", "<username>")\
      .option("password","<password>")\
      .load()

#### Generic Write

df.write\
      .format("jdbc")\
      .option("driver", "org.postgresql.Driver")\
      .option("url", "jdbc:postgresql://localhost:5432/spark_demo_db")\
      .option("dbtable", "fire_service_over_time")\
      .option("user", "sahil")\
      .option("password","zxcvbnm")\
      .mode("overwrite")\
      .save()

## Postgres

In [None]:
#### Read

spark.read\
      .format("jdbc")\
      .option("driver", "<driver name>")\
      .option("url", "jdbc:<source>://<ip>:<port>/<dbname>")\
      .option("dbtable", "<table>")\
      .option("user", "<username>")\
      .option("password","<password>")\
      .load()

#### Write

df.write\
      .format("jdbc")\
      .option("driver", "org.postgresql.Driver")\
      .option("url", "jdbc:postgresql://localhost:5432/spark_demo_db")\
      .option("dbtable", "my_table")\
      .option("user", "sahil")\
      .option("password","zxcvbnm")\
      .mode("overwrite")\
      .save()

## MYSQL

In [None]:
#### Read
df.read\
      .format("jdbc")\
      .option("driver", "com.mysql.jdbc.Driver")\
      .option("url", "jdbc:mysql://<ip>:3306/<dbname>")\
      .option("dbtable", "<table>")\
      .option("user", "<user>")\
      .option("password","<passwd>")\
      .mode("overwrite")\
      .load()



#### Write
df.write\
      .format("jdbc")\
      .option("driver", "com.mysql.jdbc.Driver")\
      .option("url", "jdbc:mysql://<i[>:3306/<dbname>")\
      .option("dbtable", "<tablename>")\
      .option("user", "<user>")\
      .option("password","<passwrd>")\
      .mode("overwrite")\
      .save()

## SQL Server

Install SQL Server from [here](https://docs.microsoft.com/en-us/sql/linux/quickstart-install-connect-ubuntu?view=sql-server-2017)

Download latest driver package from [here](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-2017)

In [None]:
# include the jar from the above downloaded packages as --driver-class-path option
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = = '--driver-class-path /home/sahil/Desktop/sqljdbc_7.2/enu/mssql-jdbc-7.2.1.jre8.jar'

In [None]:
#### Read

df = spark.read.format("jdbc") \
    .option("driver", "com.mysql.jdbc.Driver")\
    .option("url", "jdbc:sqlserver://<ip>:1433;databaseName=<dbname>") \
    .option("dbtable", "<table>") \
    .option("user", "<user>") \
    .option("password", "<pasword>")\
    .load()

#### Write

df.write.format("jdbc") \
    .option("driver", "com.mysql.jdbc.Driver")\
    .option("url", "jdbc:sqlserver://<ip>:1433;databaseName=<dbname>") \
    .option("dbtable", "<table>") \
    .option("user", "<user>") \
    .option("password", "<password>")\
    .mode("overwrite")\
    .save()

# NoSQL Databases

|Source| Driver Package|Format Name|Standard Port|
|-----------|----------------|---------|----|
|Cassandra  |com.datastax.spark:spark-cassandra-connector_2.11:2.3.0|org.apache.spark.sql.cassandra|9042
|DynamoDB   |com.amazon.emr:emr-dynamodb-hadoop:4.2.0|

## Cassandra

In [None]:
#### Read

spark.read.format("org.apache.spark.sql.cassandra")\
          .option("spark.cassandra.connection.host","<ip>")\
          .option("spark.cassandra.connection.port","<port>")\
          .option("keyspace","<keyspace name>")\
          .option("table","<table name>")
          .load()

#### Write

spark.write.format("org.apache.spark.sql.cassandra")\
          .option("spark.cassandra.connection.host","<ip>")\
          .option("spark.cassandra.connection.port","<port>")\
          .option("keyspace","<keyspace name>")\
          .option("table","<table name>")
          .save()

## Dynamo DB (TODO)

- I dont think it is possible to get data in/out from DynamoDB using Spark

In [None]:
#### Read



#### Write

### How to Include Drivers and Jars

Drivers from https://mvnrepository.com/

Jar Format is **groupID:artifactID:version**

In [None]:
## Method 1
spark = SparkSession.builder\
                    .appName('postgres spark demo')\
                    .master("local")\
                    .config("spark.jars.packages", "<jar>,<jar>")\
                    .getOrCreate()                

In [None]:
## Method 2
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages \
                                     <jar>,<jar> \
                                     pyspark-shell'
spark = SparkSession.builder\
                    .appName('Demo 1')\
                    .master("local")\
                    .getOrCreate()

### AWS

In [None]:
spark.read.<format>("s3a://<bucket name>/<file name>")
spark.write.<format>("s3a://<bucket name>/<file name>")

# Spark Deployment Modes
* Interactive  or   Cluser

* Local Mode
* Standalone Mode
* Cluster Mode

## Local Mode

* When you use master as local[2] you request Spark to use 2 core's and run the driver and workers in the same JVM. 
* In local mode all spark job related tasks run in the same JVM.
* Its useful for learning purpose only

![Notebook + Micro Cluster](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/book_intro/notebook_microcluster.png)

In [None]:
spark = from pyspark.sql import SparkSession

spark = SparkSession.builder\
                    .appName('first app')\
                    .master("local[2]")\
                    .getOrCreate()

In [None]:
spark-submit --master local[4] my_app.py

## Standalone Cluster Manager

<img src="https://zeppelin.apache.org/docs/0.7.0/assets/themes/zeppelin/img/docs-img/spark_ui.png" alt="Drawing" style="width: 800px;"/>


* Cluster Manager inbuilt with spark. 
* Not used in production (YARN/MESOS/Kubernets)
* Good for testing purposes, before deployment - does not support deploy mode

**How to create a cluster on local machine?**

In [None]:
# start spark master
$SPARK_HOME/sbin/start-master.sh -h localhost

# start slave
$SPARK_HOME/sbin/start-slave.sh spark://localhost:7077
        
# check master UI at 
htpps://localhost:8080

In [None]:
spark = from pyspark.sql import SparkSession

spark = SparkSession.builder\
                    .appName('first app')\
                    .master("spark://localhost:7077")\
                    .getOrCreate()

In [None]:
spark-submit --master spark://localhost:7077 my_app.py

## Cluster Mode

In [None]:
spark-submit --master <master ip> --deploy-mode cluster my_app.py

## Running Jupyter Notebook

### Method 1

In [None]:
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=8889 --ip="*""

pyspark --master <master ip>

### Method 2

Start the jupyter notebook as it is! Create the spark context inside the jupyter session

## Machine Learning & Analytics

* ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering
* Featurization: feature extraction, transformation, dimensionality reduction, and selection
* Pipelines: tools for constructing, evaluating, and tuning ML Pipelines
* Persistence: saving and load algorithms, models, and Pipelines
* Utilities: linear algebra, statistics, data handling, etc.

## Cacheing and Repartitioning Dataframe

In [None]:
df = df.repartition(8) ## according to number of executors
df.createOrReplaceTempView("fireServiceVIEW");
spark.catalog.cacheTable("fireServiceVIEW")
df = spark.table("fireServiceVIEW")