In [1]:
# Importing Relevant Libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

There are a varied number of options that you can configure when setting up a SparkSession. Let's go over a few of the more common ones:
* **master**: The URL for the cluster SparkContext to connect to master: The URL for the cluster SparkContext to connect to
* **appName**: The name that will be displayed in the Spark cluster UI
* **config**: Configuration for SparkSession. Any key-value pairs in the config will be applied to the session's SparkConf. For example, you can set the spark.sql.shuffle.partitions configuration property to change the number of partitions in joins and aggregations. Or you can set spark.executor.memory to change the amount of memory used per executor process.

In [2]:
# Creating a Spark Session
# Wrapping the session in brackets allows us to chain commands without using a "\" in Python
spark = (SparkSession.builder
        .master("local[*]")
        .appName("Catch-Up Session")
        .getOrCreate())

In [None]:
# Viewing the Spark Session


The SparkSession is the entry point to Spark SQL. It manages the SparkContext that was used to create it, and provides a way to create DataFrames and DataSets. Spark SQL is the Spark module for working with structured data. It allows you to use SQL or the DataFrame/Dataset API to express Spark operations on structured data.

For this exercise, we will be going over how we can read, access and manipulate structured data in Spark using Spark SQL. The data we will be using will be stored in the data directory contained in this repository

## Read Files

There are a lot of similarities between Pandas and Spark SQL when it comes to reading files. The main difference is that Spark SQL is able to read files from a distributed file system, such as HDFS, whereas Pandas is only able to read files from a local file system. Spark SQL is also able to read files from a local file system, but it is not recommended to do so in a production environment.

On top of that, there are also similar functionalities when handling the dataframes produced. Let's explore some of those while also listing the differences.

In [None]:
# Column names to use in the wine dataset
colNames = [
    'target', 'alcohol', 'malic_acid', 'ash', 'alcalinity_of_ash', 'magnesium', 'total_phenols',
    'flavanoids', 'nonflavanoid_phenols', 'proanthocyanins', 'color_intensity', 'hue',
    'od280_od315_of_diluted_wines', 'proline'
]

In [None]:
# Read the data into a variable called wine


While reading the csv, we could have gone about it in different manners. For example:
1. `spark.read.csv("data/airports.csv").option("header", "true").option("inferSchema", "true")`

    In this instance, we are chaining the options we want to set to the dataframe. This is a very common way of doing things in Spark SQL.
2. `spark.read.options(header="true", inferSchema="true").csv("data/airports.csv")`

    In this instance, we are passing the options as keyword arguments to the options function. This is also a very common way of doing things in Spark SQL.

Pick what you prefer and stick with it. The important thing is to be **consistent**.

In [None]:
# View the first 5 rows of the wine dataset


A schema is a description of the structure of your data. It is a list of fields (columns) and their data types, nothing more. It does not contain any data itself. A schema can be applied to a DataFrame, which allows Spark to understand the data in that DataFrame. This allows Spark to run certain optimizations on the data, and allows Spark to compress the data when it is serialized and sent over the network.

In [None]:
# Printing the schema of the wine dataset


## Basic Data Cleaning

In [None]:
# Checking for duplicates
assert wine.count() == wine.dropDuplicates().count()

In [None]:
# Checking for nulls


> Derivation for the above code can be found [here](https://stackoverflow.com/questions/44627386/how-to-find-count-of-null-and-nan-values-for-each-column-in-a-pyspark-dataframe)

**Explanation**: The code loops over all the columns and for each, it filters the null entries and returns them to the count function which tallies them up. Each result gets an alias of the column name and is then unioned with the previous result. The final result is a dataframe with the column names and the number of null entries for each column.

**Conclusion**: None of the columns have null values.

In [None]:
# It's important to break down what happened in the above line of code
# Here is the output of the list comprehension used.
[F.count(F.when(F.isnull(c), c)).alias(c) for c in wine.columns]

Great! Since we have no null or duplicate values, the work we need to do is minimal. Let's move on to the next step.
Let's rename a column in the dataframe. Let's focus on the od280_od315_of_diluted_wines column. This column is a bit hard to read, so let's rename it to od280.

In [None]:
# Great example of withColumnRenamed in action


I encourage you to look up how to feature engineer using the `withColumn` function. It is a very useful function that you will be using a lot.

#### Query Data Using SQL

It's possible to create a temporary view of a DataFrame by calling the createOrReplaceTempView method on that DataFrame. This will register the DataFrame as a table in the catalog, which will allow you to run SQL queries on its data. This is a temporary view, so it will only exist for the duration of your SparkSession.

In [None]:
# Create a temporary view of the wine dataset

# Query for unique values and their distiribution in the target column


> Clearly there is a class imbalance in our targe variable especially within class 2.

In [None]:
# Average alcohol content by target rounded off to 2 decimal places


I am sure SQL querying brings in a familiarity to the data scientists who have worked with SQL databases. It is still recommended to use the DataFrame API for most of your data manipulation tasks, as it is **more flexible and less error-prone** than SQL queries. 

In [None]:
# It's possible to do the same using the DataFrame API
# Remember the paranthesis just allow us to chain commands without using a "\"


# the same as 
# wine.select(['target', 'alcohol']).groupBy('target').agg(F.round(F.avg('alcohol'), 2).alias('avgAlcohol')).orderBy('target').show()

## Build a Machine Learning Pipeline

Since we have very little preprocessing done, all we need to do is create a pipeline that will take in the data and output a model. We will be using the `VectorAssembler` to create a vector of all the features and then we will be using the `RandomForestClassifier` model to train our data.

It would also be a good idea to cross validate our model to ensure that we are not overfitting our data.

In [None]:
# Split the data into training and test sets; 70% for training and 30% for testing


In [None]:
# Create the VectorAssembler object and handle invalid data by raising an error


# Instantiate the RandomForestClassifier with the following parameters
# maxDepth = 5, numTrees = 50, seed = 23


# Instantiate the Evaluation object with F1 as the metric

# Create a pipeline with the assembler and model as stages


In [None]:
# Fit the pipeline to the training data

# Transform the training and test sets


# Use the evaluator to get the F1 score for the training and test sets


Given the size of the dataset, it's no surprise we have such good metrics. It would be a good idea to cross validate our model to ensure that we are not overfitting our data.

### Paramater Tuning

In [None]:
# Leave as is
params = ParamGridBuilder()\
    .addGrid(rfModel.numTrees, [50, 100, 150])\
    .addGrid(rfModel.maxDepth, [5, 10, 15])\
    .build()
    
# Instantiate the CrossValidator with 5 folds/iterations
cv = CrossValidator(estimator=pipe, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

In [None]:
# Fitting the cross validator to the training data


In [None]:
# Get the average F1 metric from the cross validator models.


This is a pretty descent performance. The `cvModel` variable is now saved as the best performing model. We can now use this model to make predictions on our test set.

In [None]:
# Let's examine our best model as of Cross Validation


In [None]:
# For the parameters we focused on, what are the best values?


## Close Spark Session

In [None]:
# spark.stop()