# PySpark 
> [Main Table of Contents](../../README.md)

## In This Notebook

- What is PySpark?
- PySpark ML module
    - One-hot vector / one-hot encoding
- PySpark SQL Module
    - Example: Builder pattern to create SparkSession
    - `pyspark.sql...` Methods
    - `pyspark.sql.DataFrame` Methods
    - `pyspark.sql.SparkSession` Methods
    - `pyspark.sql.SparkSession.catalog` Methods
- Vocabulary

## What is PySpark?

- Distributed computing on clusters for large scale parallel data processing

## PySpark ML module (DataFrame-based) [API](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html)
- `pyspark.ml` only handles numeric data
- `pyspark.ml.Pipeline` is the a class that combines all the Estimators and Transformers.
    - Reuse the same modeling process

### Pipeline workflow for ML
1. Make sure the values of the columns of interest are in numerical format
    - See one-hot vector for converting categorical text data into numerical binary values
2. Assemble a vector
    - The last step in the Pipeline is to combine all of the columns containing our features of interest into a single column. This has to be done before modeling can take place because every Spark modeling routine expects the data to be in this form. You can do this by storing each of the values from a column as an entry in a vector. Then, from the model's point of view, every observation is a vector that contains all of the information about it and a label that tells the modeler what value that observation corresponds to.
    - Because of this, the pyspark.ml.feature submodule contains a class called `VectorAssembler`. This Transformer takes all of the columns you specify and combines them into a new vector column
3. `pyspark.ml.Pipeline(stages=[<list all estimators, transformers])` creates a reproducible machine learning pipeline
    - A simple pipeline, which acts as an estimator. A Pipeline consists of a sequence of stages, each of which is either an Estimator or a Transformer. When Pipeline.fit() is called, the stages are executed in order. If a stage is an Estimator, its Estimator.fit() method will be called on the input dataset to fit a model. Then the model, which is a transformer, will be used to transform the dataset as the input to the next stage. If a stage is a Transformer, its Transformer.transform() method will be called to produce the dataset for the next stage. The fitted model from a Pipeline is a PipelineModel, which consists of fitted models and transformers, corresponding to the pipeline stages. If stages is an empty list, the pipeline acts as an identity transformer
4. Send the dataset through the pipeline by calling `fit` and `transform` with the dataset
5. Now data is fully cleaned. Split into train/test sets.
    - In Spark it's important to make sure you split the data *AFTER* all the transformations. This is because operations like StringIndexer don't always produce the same index even when given the same list of strings
        - Data splitting options
            - `df.randomSplit([.6, .4])` means 60% train / 40% test
            - `

In [None]:
# EXAMPLE: Full pipeline build
# Setup:
#   All columns of following df are string type
#   Two Spark DataFrames: 1. planes.columns ==> ['tailnum', 'year', 'type', 'manufacturer', 'model', 'engines', 'seats', 'speed', 'engine']
#                         2. flights.columns ==> ['year', 'month', 'day', 'dep_time', 'dep_delay', 'arr_time', 'arr_delay', 'carrier', 
#                                                 'tailnum', 'flight', 'origin', 'dest', 'air_time', 'distance', 'hour', 'minute']

################################
# CLEANING DATA - Mosty manual #
################################
# Easily cast numerical valued strings to "integers" or "doubles"
# Rename year column so won't have same named columns in final df
planes = planes.withColumnRenamed('year', 'plane_year')
# Join the DataFrames
model_data = flights.join(planes, on='tailnum', how="leftouter")
# Cast the columns to integers. Replace exiting columns
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast('integer'))
model_data = model_data.withColumn("air_time", model_data.air_time.cast('integer'))
model_data = model_data.withColumn("month", model_data.month.cast('integer'))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast('integer'))
# Create new column "plane_age"
model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)
# Assume this dataset will be used by a model that predicts a yes/no question: is the flight late?
# Create new column is_late with "true"/"false" values
model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)
# Convert to an integer b/c Spark ML can only handle numeric values
model_data = model_data.withColumn("label", model_data.is_late.cast('integer'))  # "label" default name for the response variable in Spark's ML routines
# Remove missing values
model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")

####################################################
# CLEANING DATA - One-Hot Vector - Create Pipeline #
####################################################
# flights.dest and flights.carriers are categorical text values, use one-hot vector (one-hot encoding) process, which is a two step process in pyspark
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
carr_indexer = StringIndexer(inputCol='carrier', outputCol='carrier_index')  # maps a string column of labels to an ML column of label indices
carr_encoder = OneHotEncoder(inputCol='carrier_index', outputCol='carrier_fact') # maps columns of category indices to binary vectors
dest_indexer = StringIndexer(inputCol='dest', outputCol='dest_index')
dest_encoder = OneHotEncoder(inputCol='dest_index', outputCol='dest_fact')

# MERGE MULTIPLE FEATURE COLUMNS INTO SINGLE VECTOR COLUMN #
############################################################
# Make a VectorAssembler - feature transformer that merges multiple columns into a vector column
# The last step in the Pipeline is to combine all of the columns containing our features into a single column. 
# This has to be done before modeling can take place because every Spark modeling routine expects the data to be in this form
# Want model to use following features: month, air_time, plane_age, carrier_fact, dest_fact (last two were new columns created by one-hot vector process)
vec_assembler = VectorAssembler(inputCols=['month', 'air_time', 'plane_age', 'carrier_fact', 'dest_fact'], outputCol='features')

# CREATE THE ONE-HOT VECTOR PIPELINE #
######################################
one_hot_vec_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

# USE THE ONE-HOT VECTOR PIPELINE TO FURTHER CLEAN/MODIFY THE DATA #
####################################################################
final_cleaned_data = one_hot_vec_pipe.fit(model_data).transform(model_data)

# SPLIT THE FINAL DATA AND IT IS READY TO USE ON ANY ML MODEL#
##############################################################
training, test = final_cleaned_data.randomSplit([.6, .4])  # 60% train / 40% test


: 

In [None]:
# EXAMPLE: Create and use a logistic regression classification model (Uses Non-linear boundary that determines 0 or 1)

from pyspark.ml.classification import LogisticRegression
###########################################################################################
# MODEL OF INTEREST, BUT WOULD LIKE TO OPTIMIZE PARAMETERS TO FIND BEST MODEL FOR DATASET #
###########################################################################################
# Estimator. Will optimize hyperparams "regParam" and "elasticNetParam" in this example
lr = LogisticRegression()


################################################################################
# FIND BEST LOGISTIC REGRESSION CLASSIFICATION MODEL IOW: BEST HYPERPARAMETERS #
################################################################################
import numpy as np
import pyspark.ml.evaluation as evals
import pyspark.ml.tuning as tune
# Use k-fold cross validation for hyperparameter optimization (find best model process)
# `pyspark.ml.tuning.CrossValidator` takes three parameters:  estimator, estimatorParamMaps, evaluator

# For the `pyspark.ml.tuning.CrossValidator` evaluator parameter
# When doing cross validation for model selection, need a way to compare different models
# This model is a binary classification model, use `BinaryClassificationEvaluator` from `pyspark.ml.evaluation`
# This evaluator calculates the area under the ROC. 
# This is a metric that combines the two kinds of errors a binary classifier can make (false positives and false negatives) into a simple number.
# Common metric for binary classification algorithms call the AUC, or area under the curve. In this case, the curve is the ROC, or receiver operating curve
# The details of what these things actually measure isn't important for this course. The closer the AUC is to 1, the better the model
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

# For the `pyspark.ml.tuning.CrossValidator` estimatorParamMaps parameter
# Create a grid of values to search over when looking for the optimal hyperparameters
grid = tune.ParamGridBuilder()
# Add the hyperparameter to optimize and list of values to try
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])
grid = grid.build()

cv = tune.CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
# Get all models using cross validation. Computationally very expensive.
models = cv.fit(training)  # training data is from previous example above
best_lr = models.bestModel # Best model hyperparameters are: regParam=0, elasticNetParam=0, which is default values for lr


#####################################################################
# WITH BEST MODEL HYPERPARAMETERS CREATE THE MODEL WITH THOSE VALUE #
#####################################################################
best_lr = LogisticRegression() # Best model hyperparams were defaults, so can call without adjustments
best_lr = best_lr.fit(training)


##########################################
# USE THE MODEL AND EVALUATE PERFORMANCE #
##########################################
# Use the model to predict on the test set
test_results = best_lr.transform(test)  # test data is from the previous example above
# Evaluate the predictions - compute the AUC
print(evaluator.evaluate(test_results)) # ==> 0.7123313100891033

: 

### One-hot vector (aka one-hot encoding)

- Convert categorical text values to numerical binary values
- `inputCol` here is the name of the column to index or encode
- `outputcol` is the name of the new column to be filled with the result

1. Create `StringIndexer(inputCol, outputCol)` (estimator)
    - Estimators that take a DataFrame with a column of strings and map each unique string to a number. Then, the Estimator returns a Transformer that takes a DataFrame, attaches the mapping to it as metadata, and returns a new DataFrame with a numeric column corresponding to the string column  
    
2. Encode this numeric column as one-hot vector with `OneHotEncoder(inputCol, outputCol)`
    - This works exactly the same way as the StringIndexer by creating an Estimator and then a Transformer. The end result is a column that encodes your categorical feature as a vector that's suitable for machine learning routines (numerical binary values in this case)


## PySpark SQL Module

1. Create connection with a cluster with `sc = pyspark.sql.SparkContext(conf=conf)` where the connection is configured through `conf=pyspark.SparkConf()`
    - `print(sc)` Verify SparkContext is in my environment
    - `print(sc.version)` 
2. Access PySpark.DataFrame API through instance of `pyspark.sql.SparkSession()` from my instance of `SparkContext()`
    - This instance is easier-to-use high-level abstraction to RDD
3. Use `pyspark.sql.read.<method>` or `pyspark.sql.write.<method>`
    - Read/Write from/to external storage (e.g. csv, json, key-value store, etc) directly into and out of pyspark without converting to pandas.DataFrame
4. Use `pyspark.sql.SparkSession` and `pyspark.sql.DataFrame` methods for table and spark DataFrame operations

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkContext, SparkSession

# Example: Build SparkContext and print version to verify SparkContext is in my environment
sc = SparkContext(conf=SparkConf())
print(sc.version)

# Example: Builder pattern to create SparkSession
spark = SparkSession.builder \
        .master("local") \
        .appName("Word Count") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()  # Creating multiple SparkSessions and SparkContexts can cause issues, so it's best practice to use the SparkSession.builder.getOrCreate()

: 

### `pyspark.sql...` API [All](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/index.html)

Classes | Explanation
--- | ---
`pyspark.SparkConf()` | Configuration about SparkContext application
`pyspark.sql.SparkContext(conf=conf)` | Connection to a cluster<br>Tell Spark how to access a cluster how to access a cluster using `conf` kwarg
`pyspark.sql.SparkSession(spark_context)` |  Interface (DataFrame implementation/API) to a cluster<br>- When using RDDs, it's up to the data scientist to figure out the right way to optimize the query, but the DataFrame implementation has much of this optimization built in<br>- SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables<br>- Create a `SparkSession` using builder pattern
`pyspark.sql.read.<ioMethod>`<br>e.g. `pyspark.sql.read.csv()` | Load DataFrame from external storage systems (e.g. csv, json, key-value, etc)<br>[I/O methods](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/io.html)<br>[DataFrameReader methods](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.html#pyspark.sql.DataFrameReader)


### `pyspark.sql.DataFrame` Methods [All](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.show.html?highlight=show#pyspark.sql.DataFrame.show)

- PySpark DataFrames are immutable. Modifications functions will always return *new* DataFrame

Method | Explanation
--- | ---
`pyspark.sql.DataFrame.show(n=20)` | Print the n rows to the console
`pyspark.sql.DataFrame.toPandas()` | Convert spark DataFrame to pandas DataFrame
`pyspark.sql.DataFrame.createTempView`<br><br>`pyspark.sql.DataFrame.createOrReplaceTempView` | Creates a local temporary view with this DataFrame<br>Throws error if already exists in the catalog<br>Creates or replaces a local temporary view with this DataFrame.<br>The lifetime of both temporary table is tied to the SparkSession that was used to create this DataFrame. 
`pyspark.sql.DataFrame.groupBy()` | Returns `GroupedData` and can run aggregation functions on them<br>[List of all grouping methods including some agg funcs](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/grouping.html)<br>
`pyspark.sql.DataFrame.groupBy().agg(*exprs)` | Ability to use other agg functions found under [`pyspark.sql.functions`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html) Aggregate Functions sections with this method
`pyspark.sql.DataFrame.select` | Returns new DataFrame with selected columns
`pyspark.sql.DataFrame.withColumn()`<br>`pyspark.sql.DataFrame.withColumns()` | Returns new DataFrame by Add new column or Replace existing column of same name
`pyspark.sql.DataFrame.withColumnRenamed()` | Returns new DataFrame by renaming existing column of same name
`pyspark.sql.DataFrame.join()` | Returns new DataFrame by joining two dfs


### `pyspark.sql.Column` Methods [All](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/column.html)

- Methods that work on dataframe columns

Method | Explanation
--- | ---
`pyspark.sql.Column.cast("dtype")` | SparkML only handles numeric data<br>Popular method to convert datatypes of columns<br>"dtype" e.g. "integer", "double" for decimals

### `pyspark.sql.SparkSession` Methods [All](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/spark_session.html)

Method | Explanation
--- | ---
`pyspark.sql.SparkSession.catalog` | Interface through which the user may create, drop, alter or query underlying databases, tables, functions etc
`pyspark.sql.SparkSession.sql(query)` | Run SQL query on tables in spark cluster<br>Returns `pyspark.sql.dataframe.DataFrame`
`pyspark.sql.SparkSession.table(tableName` | Returns table as `pyspark.sql.dataframe.DataFrame`
`pyspark.sql.SparkSession.createDataFrame(pd.DataFrame)` | Conver pandas DataFrame to a *locally* *stored* spark DataFrame<br>Locally stored means can use all the Spark DataFrame methods on it, but can't access the data in other contexts<br>e.g. a SQL query (using the .sql() method) that references a locally stored DataFrame will throw an error<br>To access the data in this way, have to save it as a temporary table with `.createTempView()` or `.createOrReplaceTempView()` which registers the DataFrame as a table in the catalog

### `pyspark.sql.SparkSession.catalog` Methods [All](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/catalog.html)

Method | Explanation
--- | ---
`pyspark.sql.SparkSession.catalog.currentDatabase()`<br>`pyspark.sql.SparkSession.catalog.setCurrentDatabase()` | Returns the current default database in this session<br>Set current default db in the session
`pyspark.sql.SparkSession.catalog.listDatabases` | Returns a list of databases available across all sessionsc
`pyspark.sql.SparkSession.catalog.listTables(db=None)` | List of all tables/views in the cluster. Uses current db if no db provided.
`pyspark.sql.SparkSession.catalog.listFunctions(db=None)` | Returns a list of functions registered in the specified database. API uses current database if no database is provided
`pyspark.sql.SparkSession.catalog.listColumns(tableName,db=None)` | Returns a list of columns for the given table/view in the specified database.API uses current database if no database is provided
`pyspark.sql.SparkSession.catalog.createTable(tableName, path=None, source=None, schema=None, **options)` | Creates a table based on the dataset in a data source and returns the DataFrame associated with the table
`pyspark.sql.SparkSession.catalog.dropGlobalTempView(viewName)` | Drops the global temporary view with the given view name in the catalog. If the view has been cached before, then it will also be uncached. Returns true if this view is dropped successfully, false otherwise.
`pyspark.sql.SparkSession.catalog.dropTempView(viewName)` | 	Drops the local temporary view with the given view name in the catalog. If the view has been cached before, then it will also be uncached. Returns true if this view is dropped successfully, false otherwise
`pyspark.sql.SparkSession.catalog.isCached(tableName)` | Returns true if the table is currently cached in-memory.
`pyspark.sql.SparkSession.catalog.recoverPartitions(tableName)` | 	Recovers all the partitions of the given table and update the catalog. Only works with a partitioned table, and not a view
`pyspark.sql.SparkSession.catalog.refreshByPath(path)` | Invalidates and refreshes all the cached data for any DataFrame that contains the given data source path
`pyspark.sql.SparkSession.catalog.refreshTable(tableName)` | Invalidates and refreshes all the cached data and metadata of the given table
`pyspark.sql.SparkSession.catalog.cacheTable(tableName)` |Caches the specified table in-memory
`pyspark.sql.SparkSession.catalog.uncacheTable(tableName)` |Removes the specified table from the in-memory cache
`pyspark.sql.SparkSession.catalog.clearCache()` |Removes all cached tables from the in-memory cache.

## Vocabulary

Term | Explanation
--- | ---
Cluster | Collection of processing units<br>e.g. Group of separate computers
Node | One unit in a cluster
Master | Main unit in a cluster responsible for splitting data and distributing to workers
Worker | Other units in a cluster that work on segments of data delegated by the master
RDD<br>Resilient Distrubuted Datasets|The core datastructure in Spark<br>A fault-tolerant collection of elements that can be operated on in parallel<br>RDD is a low-level API and difficult to use. Instead use the higher abstraction `SparkSession.DataFrame`
Estimator | Estimators have `fit` method<br>Estimators that take a DataFrame and output model (transformer)
Transformer | Transformers have `transform` method<br>Transformers takes a DataFrame and returns a DataFrame
k-fold cross validation | Use Case: Use this for hyperparameter optimization aka: best model selection<br>A way of splitting training/test data and measuring model performance<br>It works by splitting the training data into a few different partitions. The exact number is up to you, but in this course you'll be using PySpark's default value of three. Once the data is split up, one of the partitions is set aside, and the model is fit to the others. Then the error is measured against the held out partition. This is repeated for each of the partitions, so that every block of data is held out and used as a test set exactly once. Then the error on each of the partitions is averaged. This is called the cross validation error of the model, and is a good estimate of the actual error on the held out data