# Machine learning: Predictive analysis on food inspection data using SparkML

This notebook demonstrates how to use **MLLib**, Spark's built-in machine learning libraries, to perform a simple predictive analysis on an open dataset.

--------

## What is SparkML?

SparkML is a core Spark library that provides a number of utilities that are useful for machine learning tasks, including utilities that are suitable for:

* Classification

* Regression

* Clustering

* Topic modeling

* Singular value decomposition (SVD) and principal component analysis (PCA)

* Hypothesis testing and calculating sample statistics

In this notebook, we describe a simple approach to *classification* through logistic regression.

---------

## What are classification and logistic regression?

*Classification*, a very common machine learning task, is the process of sorting input data into categories. It is the job of a classification algorithm to figure out how to assign "labels" to input data that you provide. For example, you could think of a machine learning algorithm that accepts stock information as input and divides the stock into two categories: stocks which you should sell and stocks which you should retain.

Logistic regression is the algorithm that you use for classification here. Spark's logistic regression API is useful for *binary classification*, or classifying input data into one of two groups. For more information about logistic regressions, see [Wikipedia](https://en.wikipedia.org/wiki/Logistic_regression).

In summary, the process of logistic regression produces a *logistic function* that can be used to predict the probability that an input vector belongs in one group or the other.  

----------

## What are we trying to accomplish in this notebook?

You will use Spark to perform some predictive analysis on food inspection data (**Food_Inspections1.csv**) that was acquired through the [City of Chicago data portal](https://data.cityofchicago.org/). This dataset contains information about food inspections that were conducted in Chicago, including information about each food establishment that was inspected, the violations that were found (if any), and the results of the inspection. This is not the raw data provided by the city: the raw data has been edited to make parsing easier, and we are only using about 1/10th of the complete dataset\*. You can download the raw data at the data portal website.

> \* *This site provides applications using data that has been modified for use from its original source, www.cityofchicago.org, the official website of the City of Chicago.  The City of Chicago makes no claims as to the content, accuracy, timeliness, or completeness of any of the data provided at this site.  The data provided at this site is subject to change at any time.  It is understood that the data provided at this site is being used at one’s own risk.*

In the steps below, you develop a model to see what it takes to pass or fail a food inspection. To do so, place the cursor in the cell and press **SHIFT + ENTER**. You can also click the **Run Cell** button from the menu above.

----------
## Notebook setup

When using PySpark kernel notebooks on HDInsight, there is no need to create a SparkContext or a SparkSession; a SparkSession which has the SparkContext is created for you automatically when you run the first code cell, and you'll be able to see the progress printed. The contexts are created with the following variable names:
- SparkSession (spark)

To run the cells below, place the cursor in the cell and then press **SHIFT + ENTER**.

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction, col
from pyspark.sql.types import *

--------

## Construct an input dataframe

We can use SparkSession to perform transformations on structured data. The first task is to load the sample data (**Food_Inspections1.csv**) into a Spark SQL *dataframe*. The sample data is by default available on the cluster. 

In [None]:
inspections = spark.read.csv('wasb:///HdiSamples/HdiSamples/FoodInspectionData/Food_Inspections1.csv', inferSchema=True)

Let's take a look at its schema:

In [None]:
inspections.printSchema()

We now have the CSV file as a DataFrame. It has some columns we will not use. Dropping them can save memory when caching the DataFrame. Also, we should give these columns meaningful names.

In [None]:
# Helper function to drop unused columns and rename interesting columns.
def selectInterestingColumns(rawDf):
    # Mapping column index to name.
    columnNames = {0: "id", 1: "name", 12: "results", 13: "violations"}
    
    # Rename column from '_c{id}' to something meaningful.
    cols = [col(rawDf.columns[i]).alias(columnNames[i]) for i in columnNames.keys()]
    
    # Drop columns we are not using.
    df = rawDf.select(cols)
    
    # Replace null in column 'violations' with empty string. 
    # We are going to run LogisticRegression on this column and it doesn't like null.
    return df.fillna('', ["violations"])

In [None]:
df = selectInterestingColumns(inspections).cache()

# Do a count to cache the whole dataframe in memory
df.count()

Let's take a look at the schema after transformation:

In [None]:
df.printSchema()

Let us retrieve one row from the DataFrame to glimpse at the data.

In [None]:
df.take(1)

The output of the above cell gives us an idea of the schema of the input file; the file includes the name of every establishment, the type of establishment, the address, the data of the inspections, and the location, among other things.

In [None]:
df.registerTempTable('CountResults')

We can perform our analysis on `df`. We've included 4 columns of interest in the dataframe: **id**, **name**, **results**, and **violations**. Let's get a small sample of the data:

In [None]:
df.show(5)

--------

## Understand the data

Let's start to get a sense of what our dataset contains. For example, what are the different values in the `results` column?

In [None]:
df.select('results').distinct().show()

You can run SQL queries using the `%%sql` directive. The `-o countResultsdf` argument will persist the output of the SQL query as a Pandas dataframe, with the name `countResultdf` on the Jupyter server.

In [None]:
%%sql -o count_results_df
SELECT results, COUNT(results) AS cnt FROM CountResults GROUP BY results ORDER BY cnt DESC

A quick visualization can help us reason about the distribution of these outcomes.

If you want to construct your own custom visualization of the data, use the `countResultsdf` dataframe which persists the results of a SQL query out of the cluster and onto the Jupyter server, and generate a plot with Matplotlib. In such a scenario where the data is available locally on the Jupyter server, you can use the `%%local` magic. You can use regular Python code with the magic.

>**Matplotlib** is a library used to construct a visualization of the data.

In [None]:
%%local
%matplotlib inline
import matplotlib.pyplot as plt


labels = count_results_df['results']
sizes = count_results_df['cnt']
colors = ['turquoise', 'seagreen', 'mediumslateblue', 'palegreen', 'coral']
plt.pie(sizes, labels=labels, autopct='%1.1f%%', colors=colors)
plt.axis('equal')

You can see that there are 5 distinct results that an inspection can have

* Business not located 
* Fail
* Pass
* Pass w/ conditions, and
* Out of Business 

Let us develop a model that can guess the outcome of a food inspection, given the violations. Since logistic regression is a binary classification method, it makes sense to group our data into two categories: **Fail** and **Pass**. A "Pass w/ Conditions" is still a Pass, so when we train the model, we will consider the two results equivalent. Data with the other results ("Business Not Located", "Out of Business") are not useful so we will remove them from our training set. This should be okay since these two categories make up a very small percentage of the results anyway.

Let us go ahead and convert our existing dataframe(`df`) into a new dataframe where each inspection is represented as a label-violations pair. In our case, a label of `0.0` represents a failure, a label of `1.0` represents a success, and a label of `-1.0` represents some results besides those two. We will filter those other results out when computing the new data frame.

In [None]:
# Note: We can do the same thing with pyspark.sql.functions.when(). It doesn't use UDF and is faster.
# However, we would like to demonstrate UserDefinedFunction here as an example.
def labelForResults(s):
    if s == 'Fail':
        return 0.0
    elif s == 'Pass w/ Conditions' or s == 'Pass':
        return 1.0
    else:
        return -1.0
label = UserDefinedFunction(labelForResults, DoubleType())
labeledData = df.select(label(df.results).alias('label'), df.violations).where('label >= 0')

Let's retrieve one row from the labeled data to see what it looks like.

In [None]:
labeledData.take(1)

---------

## Create a logistic regression model from the input dataframe

Our final task is to convert the labeled data into a format that can be analyzed by logistic regression. The input to a logistic regression algorithm should be a set of *label-feature vector pairs*, where the "feature vector" is a vector of numbers that represents the input point in some way. So, we need a way to convert the "violations" column, which is semi-structured and contains a lot of comments in free-text, to an array of real numbers that a machine could easily understand. 

One standard machine learning approach for processing natural language is to assign each distinct word an "index", and then pass a vector to the machine learning algorithm such that each index's value contains the relative frequency of that word in the text string. 

MLLib provides an easy way to perform this operation. First, we'll "tokenize" each violations string to get the individual words in each string, and then we'll use a `HashingTF` to convert each set of tokens into a feature vector which can then be passed to the logistic regression algorithm to construct a model. We'll conduct all of these steps in sequence using a "pipeline".

In [None]:
tokenizer = Tokenizer(inputCol="violations", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(labeledData)
model

We now have a trained logistic regression model to classify input data. 

-----------

## Evaluate the model on a separate test dataset

We can use the model we created earlier to *predict* what the results of new inspections will be, based on the violations that were observed. We trained this model on the dataset **Food_Inspections1.csv**. Let us use a second dataset, **Food_Inspections2.csv**, to *evaluate* the strength of this model on new data. This second data set should already be in the default storage container associated with the cluster.

The snippet below creates a new dataframe, **predictionsDf** that contains the prediction generated by the model.

In [None]:
testData = selectInterestingColumns(spark.read.csv('wasb:///HdiSamples/HdiSamples/FoodInspectionData/Food_Inspections2.csv', inferSchema=True))
testDf = testData.where("results = 'Fail' OR results = 'Pass' OR results = 'Pass w/ Conditions'")

predictionsDf = model.transform(testDf)
predictionsDf.registerTempTable('Predictions')
predictionsDf.columns

Let's look at one of the predictions.

In [None]:
predictionsDf.take(1)

The `model.transform()` method will apply the same transformation to any new data with the same schema, and arrive at a prediction of how to classify the data. We can do some simple statistics to get a sense of how accurate our predictions were:

In [None]:
numSuccesses = predictionsDf.where("""(prediction = 0 AND results = 'Fail') OR 
                                      (prediction = 1 AND (results = 'Pass' OR 
                                                           results = 'Pass w/ Conditions'))""").count()
numInspections = predictionsDf.count()

print("There were %d inspections and there were %d successful predictions" % (numInspections, numSuccesses))
print("This is a %d%% success rate" % (float(numSuccesses) / float(numInspections) * 100))

Using logistic regression with Spark gives us an accurate model of the relationship between violations descriptions in English and whether a given business would pass or fail a food inspection. We can construct a final visualization to help us reason about the results of this test. Let's first use a set of SQL queries to count the successful and failed predictions (we're using the `-q` option to turn off visualizations).

In [None]:
%%sql -q -o true_positive
SELECT count(*) AS cnt FROM Predictions WHERE prediction = 0 AND results = 'Fail'

In [None]:
%%sql -q -o false_positive
SELECT count(*) AS cnt FROM Predictions WHERE prediction = 0 AND (results = 'Pass' OR results = 'Pass w/ Conditions')

In [None]:
%%sql -q -o true_negative
SELECT count(*) AS cnt FROM Predictions WHERE prediction = 1 AND results = 'Fail' 

In [None]:
%%sql -q -o false_negative
SELECT count(*) AS cnt FROM Predictions WHERE prediction = 1 AND (results = 'Pass' OR results = 'Pass w/ Conditions')

Finally, we use these quantities to construct a visualization with **Matplotlib**:

In [None]:
%%local
%matplotlib inline
import matplotlib.pyplot as plt

labels = ['True positive', 'False positive', 'True negative', 'False negative']
sizes = [true_positive['cnt'], false_positive['cnt'], false_negative['cnt'], true_negative['cnt']]
colors = ['turquoise', 'seagreen', 'mediumslateblue', 'palegreen', 'coral']
plt.pie(sizes, labels=labels, autopct='%1.1f%%', colors=colors)
plt.axis('equal')

In this chart, a "positive" result refers to the failed food inspection, while a negative result refers to a passed inspection. 