## PySpark - Microsoft - Tutorial

- Predict if a restaurant or any food establishment pass/fail food-inspection

- Available on [Microsoft](https://docs.microsoft.com/en-us/azure/hdinsight/spark/apache-spark-machine-learning-mllib-ipython)

- Made necessary changes in this example as I've Python 3.x and the original tutorial is in Python 2.x

In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split

## Load Food-Inspections data and create train-test split
food_inspections_df = pd.read_csv('/home/patel.mehu/sample_data/Food_Inspections.csv')
X = food_inspections_df[['Inspection ID', 'DBA Name', 'AKA Name', 'License #', 'Facility Type', 'Risk', 'Address', 
                         'City', 'State', 'Zip', 'Inspection Date', 'Inspection Type', 'Violations', 'Latitude', 
                         'Longitude', 'Location']]
y = food_inspections_df[['Results']]

del food_inspections_df

X_train, X_test, y_train, y_test = train_test_split(X, y, random_state = 11)

In [3]:
food_train = pd.concat([X_train, y_train], axis = 1)
food_test = pd.concat([X_test, y_test], axis = 1)

In [4]:
food_train.to_csv('/home/patel.mehu/sample_data/Food_Inspections_train.csv', index = False)
food_test.to_csv('/home/patel.mehu/sample_data/Food_Inspections_test.csv', index = False)

In [5]:
import pyspark
from pyspark.sql import SQLContext

In [6]:
sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)

### Build a Logistic-Regression model 

In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

In [8]:
import csv    
from io import StringIO

In [30]:
def csvParse(s):
    """
    Create a Resilient Distributed Dataset (RDD) by importing and parsing the input data.
    """
    sio = StringIO(s)
    value = next(csv.reader(sio))
    sio.close()
    return value

inspections = sc.textFile('/home/patel.mehu/sample_data/Food_Inspections_train.csv').map(csvParse)

In [15]:
inspections.take(2)

[['Inspection ID',
  'DBA Name',
  'AKA Name',
  'License #',
  'Facility Type',
  'Risk',
  'Address',
  'City',
  'State',
  'Zip',
  'Inspection Date',
  'Inspection Type',
  'Violations',
  'Latitude',
  'Longitude',
  'Location',
  'Results'],
 ['1188382',
  'PUNJABI DHABHA',
  'PUNJABI DHABHA',
  '74498.0',
  'Restaurant',
  'Risk 1 (High)',
  '2525 W DEVON ',
  'CHICAGO',
  'IL',
  '60659.0',
  '10/10/2012',
  'Canvass',
  '3. POTENTIALLY HAZARDOUS FOOD MEETS TEMPERATURE REQUIREMENT DURING STORAGE, PREPARATION DISPLAY AND SERVICE - Comments: FOUND POTENTIALLY HAZARDOUS FOOD ON THE TOP OF PREP TABLE NOT MEETING PROPER TEMPERATURE. HALF AND HALF AT 61.8F, WHIPPED CREAM AT 52.3F, CREAM SAUCE AT 91.8F. ALSO FOUND POTENTIALLY HAZARDOUS FOODS NOT MEETING PROPER TEMPERATURE ON THE BUFFET TABLE . SUCH AS YOGURT AT 51.2F, RICE PUDDING AT 45.7F. INSTRUCTED MANAGER THAT ALL POTENTIALLY HAZARDOUS FOODS MUST MAINTAIN A TEMPERATURE OF 40F OR BELOW ON THE BUFFET TABLE AND INSIDE THE COOLER. MA

In [31]:
## Remove first row (as it has headers) so that it does not create issues while changing data-types. 
firstRow = inspections.first()
inspections = inspections.filter(lambda row:row != firstRow)
firstRow

['Inspection ID',
 'DBA Name',
 'AKA Name',
 'License #',
 'Facility Type',
 'Risk',
 'Address',
 'City',
 'State',
 'Zip',
 'Inspection Date',
 'Inspection Type',
 'Violations',
 'Latitude',
 'Longitude',
 'Location',
 'Results']

In [32]:
### Run the following code to create a dataframe (df) and a temporary table (CountResults) with a few columns that are 
## useful for the predictive analysis. sqlContext is used to perform transformations on structured data.

schema = StructType([
                    StructField("id", IntegerType(), False),
                    StructField("name", StringType(), False),
                    StructField("results", StringType(), False),
                    StructField("violations", StringType(), True)])

df = sqlContext.createDataFrame(inspections.map(lambda l: (int(l[0]), l[1], l[16], l[12])) , schema)
# df = sqlContext.createDataFrame(inspections.map(lambda l: (float(l[0]), l[1], l[12], l[13])) , schema)
df.registerTempTable('CountResults')

df.show(10)

+-------+--------------------+------------------+--------------------+
|     id|                name|           results|          violations|
+-------+--------------------+------------------+--------------------+
|1188382|      PUNJABI DHABHA|              Fail|3. POTENTIALLY HA...|
|1324669|PEKING CHINESE FO...|              Pass|30. FOOD IN ORIGI...|
|1473980|Jitlada Thai Hous...|              Pass|33. FOOD AND NON-...|
| 419498|         ST MATTHIAS|Pass w/ Conditions|4. SOURCE OF CROS...|
|1360722|     HACHI'S KITCHEN|              Pass|32. FOOD AND NON-...|
|1546260|   CAFE UTJEHA, INC.|              Pass|38. VENTILATION: ...|
|1515239|     CHINESE YUM YUM|              Pass|33. FOOD AND NON-...|
|1566559|NORWEGIAN-AMERICI...|              Pass|                    |
| 381259|  POPS FOR CHAMPAGNE|              Pass|33. FOOD AND NON-...|
|1329266|BURGUNDY RESTAURA...|              Pass|30. FOOD IN ORIGI...|
+-------+--------------------+------------------+--------------------+
only s

In [33]:
## Unique values in target-column
df.select('results').distinct().show()

+--------------------+
|             results|
+--------------------+
|           Not Ready|
|                Fail|
|            No Entry|
|Business Not Located|
|  Pass w/ Conditions|
|     Out of Business|
|                Pass|
+--------------------+



In [34]:
## Distribution of target-variable
df.groupBy('results').count().show()

+--------------------+-----+
|             results|count|
+--------------------+-----+
|           Not Ready| 1054|
|                Fail|26008|
|            No Entry| 4062|
|Business Not Located|   49|
|  Pass w/ Conditions|14653|
|     Out of Business|11838|
|                Pass|76253|
+--------------------+-----+



In [35]:
def labelForResults(s):
    """
    Transform target-variable (pass or fail)
    """
    if s == 'Fail':
        return 0.0
    elif (s == 'Pass') | (s == 'Pass w/ Conditions'):
        return 1.0
    else:
        return -1.0

label = UserDefinedFunction(labelForResults, DoubleType())
labeledData = df.select(label(df.results).alias('label'), df.violations).where('label >= 0')

In [36]:
labeledData.take(2)

[Row(label=0.0, violations='3. POTENTIALLY HAZARDOUS FOOD MEETS TEMPERATURE REQUIREMENT DURING STORAGE, PREPARATION DISPLAY AND SERVICE - Comments: FOUND POTENTIALLY HAZARDOUS FOOD ON THE TOP OF PREP TABLE NOT MEETING PROPER TEMPERATURE. HALF AND HALF AT 61.8F, WHIPPED CREAM AT 52.3F, CREAM SAUCE AT 91.8F. ALSO FOUND POTENTIALLY HAZARDOUS FOODS NOT MEETING PROPER TEMPERATURE ON THE BUFFET TABLE . SUCH AS YOGURT AT 51.2F, RICE PUDDING AT 45.7F. INSTRUCTED MANAGER THAT ALL POTENTIALLY HAZARDOUS FOODS MUST MAINTAIN A TEMPERATURE OF 40F OR BELOW ON THE BUFFET TABLE AND INSIDE THE COOLER. MANAGER DISCARDED THE SAID PROPER WORTH $30.00, TOTAL 8 LBS. CITATION ISSUED CRITICAL 7-38-005(A). | 16. FOOD PROTECTED DURING STORAGE, PREPARATION, DISPLAY, SERVICE AND TRANSPORTATION - Comments: ICE ARE NOT PROTECTED DURING STORAGE. FOUND INTERIOR PANEL OF THE ICE MACHINE NOT MAINTAINED. WITH SLIMY BLACK SUBSTANCE ON THE INTERIOR SURFACES. INSTRUCTED TO CLEAN AND SANITIZE IN DETAIL AND MAINTAIN. ALSO MUS

In [37]:
## Define a model-pipeline
tokenizer = Tokenizer(inputCol = 'violations', outputCol = 'words')
hashingTF = HashingTF(inputCol = tokenizer.getOutputCol(), outputCol = 'features')
lr = LogisticRegression(maxIter = 100, regParam = 0.01)
pipeline = Pipeline(stages = [tokenizer, hashingTF, lr])

## Fit the model on training-data
model = pipeline.fit(labeledData)

#### Test-data

In [38]:
## Import test-data and make necessary transformations
testData = sc.textFile('/home/patel.mehu/sample_data/Food_Inspections_test.csv').map(csvParse)

### Filter first (header) row
firstRow = testData.first()
testData = testData.filter(lambda row:row != firstRow)

### Convert to dataframe with pre-defined schema
testData = testData.map(lambda l: (int(l[0]), l[1], l[16], l[12]))
testDF = sqlContext.createDataFrame(testData, schema).where("results = 'Fail' OR results = 'Pass' OR results = 'Pass w/ Conditions'")

In [39]:
## Make predictions on test-data
predictionsDF = model.transform(testDF)
predictionsDF.registerTempTable('Predictions')
predictionsDF.columns

['id',
 'name',
 'results',
 'violations',
 'words',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [40]:
predictionsDF.take(1)

[Row(id=1948738, name="MR. D'S SNACK SHOP", results='Pass', violations='33. FOOD AND NON-FOOD CONTACT EQUIPMENT UTENSILS CLEAN, FREE OF ABRASIVE DETERGENTS - Comments: COOKING HOOD FRONT COOKING AREA, NOT CLEAN, INSTRUCTED TO CLEAN AND REMOVE ALL GREASE DRIPPING, | 34. FLOORS: CONSTRUCTED PER CODE, CLEANED, GOOD REPAIR, COVING INSTALLED, DUST-LESS CLEANING METHODS USED - Comments: FLOOR REAR PREP AREA NOT CLEAN, INSTRUCTED TO CLEAN ALONG WALLS AND IN CORNERS ESPECIALLY UNDER AND AROUND EQUIPMENT, | 35. WALLS, CEILINGS, ATTACHED EQUIPMENT CONSTRUCTED PER CODE: GOOD REPAIR, SURFACES CLEAN AND DUST-LESS CLEANING METHODS - Comments: WALL REAR PREP AREA HAS FOOD DEBRIS, INSTRUCTED TO CLEAN AND BETTER MARITAIN, | 38. VENTILATION: ROOMS AND EQUIPMENT VENTED AS REQUIRED: PLUMBING: INSTALLED AND MAINTAINED - Comments: LOW WATER PRESSURE AT EXPOSED AND SINK REAR PREP AND AT HAND SINK IN BATHROOM, AND SLOW DRAINING THREE COMPARTMENT SINK, INSTRUCTED TO REPAIR AND BETTER MAINTAIN,  | 40. REFRIGERA

In [41]:
## Check accuracy
numSuccesses = predictionsDF.where("""(prediction = 0 AND results = 'Fail') OR
                                        (prediction = 1 AND (results = 'Pass' OR
                                                            results = 'Pass w/ Conditions'))""").count()
numInspections = predictionsDF.count()

print ("There were", numInspections, "inspections and there were", numSuccesses, "successful predictions")
print ("This is a", str((float(numSuccesses) / float(numInspections)) * 100) + "%", "success rate")

There were 39118 inspections and there were 36438 successful predictions
This is a 93.1489339945805% success rate
