In [46]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [47]:
data=spark.read.csv('C:/Users/dicks/Documents/Food_Inspections.csv', header=True, inferSchema=True)

In [3]:
data.show(5)

+-------------+--------------------+--------------------+---------+-------------+-------------+--------------------+-------+-----+-----+---------------+---------------+---------------+--------------------+-----------+------------+--------------------+
|Inspection ID|            DBA Name|            AKA Name|License #|Facility Type|         Risk|             Address|   City|State|  Zip|Inspection Date|Inspection Type|        Results|          Violations|   Latitude|   Longitude|            Location|
+-------------+--------------------+--------------------+---------+-------------+-------------+--------------------+-------+-----+-----+---------------+---------------+---------------+--------------------+-----------+------------+--------------------+
|      2345239|SAORY RESTAURANT,...|SAORY RESTAURANT,...|  2523066|   Restaurant|Risk 1 (High)|2700 S TRUMBULL AVE |CHICAGO|   IL|60623|      11/7/2019|        Canvass|Out of Business|                null|41.84251235|-87.71122481|(-87.711224813

In [5]:
data.printSchema()

root
 |-- Inspection ID: integer (nullable = true)
 |-- DBA Name: string (nullable = true)
 |-- AKA Name: string (nullable = true)
 |-- License #: integer (nullable = true)
 |-- Facility Type: string (nullable = true)
 |-- Risk: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zip: integer (nullable = true)
 |-- Inspection Date: string (nullable = true)
 |-- Inspection Type: string (nullable = true)
 |-- Results: string (nullable = true)
 |-- Violations: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)



In [6]:
#Converting Latitude and Longitude to string type
#Converting inspection date into datetime object
from pyspark.sql.types import StringType
data= data.withColumn("Longitude", data["Longitude"].cast(StringType()))
data= data.withColumn("Latitude", data["Latitude"].cast(StringType()))

In [7]:
#Changing Date column to datetime format and printing the dataframe schema
import pyspark.sql.functions as f
data=(data.withColumn('Inspection Date', f.to_date('Inspection Date','MM/d/yyyy')
                     )
     )
data.printSchema()

root
 |-- Inspection ID: integer (nullable = true)
 |-- DBA Name: string (nullable = true)
 |-- AKA Name: string (nullable = true)
 |-- License #: integer (nullable = true)
 |-- Facility Type: string (nullable = true)
 |-- Risk: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zip: integer (nullable = true)
 |-- Inspection Date: date (nullable = true)
 |-- Inspection Type: string (nullable = true)
 |-- Results: string (nullable = true)
 |-- Violations: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)



#### Exploratory Analysis: SQL Transformations

In [8]:
data.createOrReplaceTempView('data_view')


In [9]:
#Count of number of inspected facilities per result category
(spark.sql('''
SELECT COUNT(Results) AS Count, Results FROM data_view GROUP BY Results ORDER BY Count DESC
''').show()
)

+------+--------------------+
| Count|             Results|
+------+--------------------+
|105796|                Pass|
| 37868|                Fail|
| 27106|  Pass w/ Conditions|
| 16847|     Out of Business|
|  6234|            No Entry|
|  1877|           Not Ready|
|    68|Business Not Located|
+------+--------------------+



In [10]:
#Count of number of restaurants per result category
(spark.sql('''
SELECT COUNT(*) AS Number_of_Facilities, Risk FROM data_view GROUP BY Risk ORDER BY Number_of_Facilities DESC
''').show()
)

+--------------------+---------------+
|Number_of_Facilities|           Risk|
+--------------------+---------------+
|              140384|  Risk 1 (High)|
|               38227|Risk 2 (Medium)|
|               17082|   Risk 3 (Low)|
|                  73|           null|
|                  30|            All|
+--------------------+---------------+



In [11]:
# Top 10 Violation offences in Chicago
(spark.sql('''
SELECT COUNT(*) AS Number_of_Facilities, Violations FROM data_view GROUP BY Violations ORDER BY Number_of_Facilities DESC LIMIT 15
''').show()
)

+--------------------+--------------------+
|Number_of_Facilities|          Violations|
+--------------------+--------------------+
|               51934|                null|
|                  11|32. FOOD AND NON-...|
|                  10|45. FOOD HANDLER ...|
|                  10|30. FOOD IN ORIGI...|
|                   7|45. FOOD HANDLER ...|
|                   7|2. FACILITIES TO ...|
|                   7|3. MANAGEMENT, FO...|
|                   7|32. FOOD AND NON-...|
|                   6|3. MANAGEMENT, FO...|
|                   6|40. REFRIGERATION...|
|                   6|38. VENTILATION: ...|
|                   5|41. PREMISES MAIN...|
|                   5|2. FACILITIES TO ...|
|                   5|45. FOOD HANDLER ...|
|                   5|2. FACILITIES TO ...|
+--------------------+--------------------+



In [12]:
(spark.sql('''
SELECT Risk, Results, COUNT(*) AS Number_of_Facilities FROM data_view
WHERE Results IN ('Pass', 'Pass w/ Conditions', 'Fail') AND Risk NOT IN ('null','All')
GROUP BY Risk, Results ORDER BY Risk, Number_of_Facilities DESC 
''').show()
)

+---------------+------------------+--------------------+
|           Risk|           Results|Number_of_Facilities|
+---------------+------------------+--------------------+
|  Risk 1 (High)|              Pass|               77869|
|  Risk 1 (High)|              Fail|               26753|
|  Risk 1 (High)|Pass w/ Conditions|               21112|
|Risk 2 (Medium)|              Pass|               20536|
|Risk 2 (Medium)|              Fail|                7352|
|Risk 2 (Medium)|Pass w/ Conditions|                5088|
|   Risk 3 (Low)|              Pass|                7379|
|   Risk 3 (Low)|              Fail|                3736|
|   Risk 3 (Low)|Pass w/ Conditions|                 906|
+---------------+------------------+--------------------+



In [13]:
#Count of Facilities Inspected excluding the null values
(spark.sql('''
SELECT `Facility Type`, COUNT(*) AS Number_of_Facilities FROM data_view
WHERE `Facility Type` NOT IN ("null")
GROUP BY `Facility Type` ORDER BY Number_of_Facilities DESC 
''').show()
)

+--------------------+--------------------+
|       Facility Type|Number_of_Facilities|
+--------------------+--------------------+
|          Restaurant|              130520|
|       Grocery Store|               24936|
|              School|               12110|
|Children's Servic...|                3077|
|              Bakery|                2868|
|Daycare (2 - 6 Ye...|                2690|
|Daycare Above and...|                2369|
|      Long Term Care|                1349|
|            Catering|                1192|
|Mobile Food Dispe...|                 869|
|              Liquor|                 856|
|  Daycare Combo 1586|                 751|
|Mobile Food Preparer|                 631|
|        Golden Diner|                 568|
|            Hospital|                 548|
|           Wholesale|                 535|
|              TAVERN|                 282|
|Daycare (Under 2 ...|                 250|
|       Special Event|                 218|
|Shared Kitchen Us...|          

### Feature Engineering & Model Fitting

In [35]:
#Filtering data: filtering out those food joints whose results were neither pass, pass with conditions or fail
dat=data.filter('Results IN ("Fail","Pass","Pass w/ Conditions")').select('DBA Name','Risk','Violations','Results')
dat.show(5)
 

+--------------------+---------------+--------------------+-------+
|            DBA Name|           Risk|          Violations|Results|
+--------------------+---------------+--------------------+-------+
|    L & M FINE FOODS|  Risk 1 (High)|51. PLUMBING INST...|   Pass|
|CHICAGO'S FINEST ...|  Risk 1 (High)|33. PROPER COOLIN...|   Fail|
|CHICAGO'S FINEST ...|   Risk 3 (Low)|                null|   Fail|
|       NOMAD-CHICAGO|Risk 2 (Medium)|                null|   Pass|
|           STARBUCKS|Risk 2 (Medium)|                null|   Pass|
+--------------------+---------------+--------------------+-------+
only showing top 5 rows



In [48]:
from pyspark.sql.functions import UserDefinedFunction
"""Converting the existing dataframe(df) into a new dataframe where each inspection is represented as a label-violations pair. 
In this case, a label of 0.0 represents a failure, a label of 1.0 represents a success, 
and a label of -1.0 represents some results besides those two"""
def labelForResults(s):
    if s == 'Fail':
        return 0.0
    elif s == 'Pass w/ Conditions' or s == 'Pass':
        return 1.0
    else:
        return -1.0
label = UserDefinedFunction(labelForResults, DoubleType())
labeledData = data.select(label(data.Results).alias('label'), data.Violations).where('label >= 0')

In [49]:
#Let's view one row of the labeled data
labeledData.take(1)

[Row(label=1.0, Violations='51. PLUMBING INSTALLED; PROPER BACKFLOW DEVICES - Comments: 3-COMPARTMENT SINK STOPPERS NOT WORKING ALSO OBSERVED 1ST FLOOR PREP AREA HANDWASH BOWL FAUCET LEAKING. MUST PROVIDE WORKING STOPPERS (X3) FOR 3-COMPARTMENT SINK AND MAINTAIN 1ST FLOOR HANDWASH BOWL.')]

In [50]:
#Splitting into Train and Test Data 
training_df,test_df=labeledData.randomSplit([0.75,0.25])
print(training_df.count()) 
print(test_df.count())

127546
43224


##### The final task is to convert the labeled data into a format that can be analyzed by logistic regression. The input to a logistic regression algorithm needs be a set of label-feature vector pairs, where the "feature vector" is a vector of numbers representing the input point. 

##### So, we need to convert the "violations" column, which is semi-structured and contains many comments in text format, to an array of real numbers that a machine could understand.

##### One standard machine learning approach for processing natural language is to assign each distinct word an "index", and then pass a vector to the machine learning algorithm such that each index's value contains the relative frequency of that word in the text string

##### MLlib provides an easy way to perform this operation. First, we tokenize each violations string to get the individual words in each string. Then, use a HashingTF to convert each set of tokens into a feature vector that can then be passed to the Machine Leraning algorithm to construct a model. We conduct all of these steps in sequence using a "pipeline".

In [53]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

tokenizer = Tokenizer(inputCol="Violations", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(labeledData)

AttributeError: 'Tokenizer' object has no attribute 'getoutputCol'

In [40]:

from pyspark.ml.feature import Bucketizer, StringIndexer, VectorAssembler, IndexToString
risk_idx = StringIndexer(inputCol = "Risk", outputCol="Risk_idx",handleInvalid="keep")
label_idx = StringIndexer(inputCol="Results", outputCol="Label",handleInvalid="keep")
#Create labels list to decode predictions 
resultLabels= label_idx.fit(dat).labels
riskLabels=risk_idx.fit(dat).labels
riskLabels

['Risk 1 (High)', 'Risk 2 (Medium)', 'Risk 3 (Low)', 'All']

In [41]:
#create a  a single vector combining all input features
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler(inputCols=['Label'],outputCol="features") 

In [42]:
train_df,test_df=dat.randomSplit([0.7,0.3])
print(train_df.count()) 
print(test_df.count())


119401
51369


In [44]:
from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
dt= DecisionTreeClassifier(labelCol="Risk_idx", featuresCol="features")
lc = IndexToString(inputCol="prediction",outputCol="predictedLabel",labels=resultLabels)
dt_pipeline = Pipeline(stages=[label_idx,risk_idx,va,dt,lc])
dtModel =dt_pipeline.fit(train_df) 
resultDF = dtModel.transform(test_df)
#Look for observations where prediction did not match
resultDF.filter("Label != prediction").select( "DBA Name","Label","prediction","Results","predictedLabel").show() 
# Calculation of the accuracy of our classifier
predictionAndLabels = resultDF.select("prediction", "label") 
evaluator = MulticlassClassificationEvaluator().setMetricName("accuracy")
print ("Accuracy:",(evaluator.evaluate(predictionAndLabels))*100,"%")


+--------------------+-----+----------+------------------+--------------+
|            DBA Name|Label|prediction|           Results|predictedLabel|
+--------------------+-----+----------+------------------+--------------+
|       #1 WOK N ROLL|  1.0|       0.0|              Fail|          Pass|
|1,200 SQ.FT. - YE...|  1.0|       0.0|              Fail|          Pass|
|10 PIN  BOWLING L...|  1.0|       0.0|              Fail|          Pass|
|    11 DEGREES NORTH|  2.0|       0.0|Pass w/ Conditions|          Pass|
|           14 PARISH|  2.0|       0.0|Pass w/ Conditions|          Pass|
|1492 CUBAN FUSION...|  2.0|       0.0|Pass w/ Conditions|          Pass|
|        1800 LIQUORS|  1.0|       0.0|              Fail|          Pass|
|24/7 EXPRESS FOOD...|  1.0|       0.0|              Fail|          Pass|
|          25 DEGREES|  2.0|       0.0|Pass w/ Conditions|          Pass|
|            3 ABEJAS|  1.0|       0.0|              Fail|          Pass|
|3 JJJ'S BETTER TA...|  1.0|       0.0

In [19]:
#Random Forest Classifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator
rf = RandomForestClassifier(labelCol="Risk_idx", featuresCol="features",numTrees=50) # Train 50 decision trees on data
rf_pipeline = Pipeline(stages=[risk_idx, label_idx,va,dt,lc]) 
rfResultDF = rf_pipeline.fit(train_df).transform(test_df) 
rfResultDF.filter("Label != prediction").select("DBA Name","Label","Results","prediction","predictedLabel").show()
predictionAndLabels = rfResultDF.select("prediction", "label") 
evaluator = MulticlassClassificationEvaluator().setMetricName("accuracy") 
print ("Accuracy:",(evaluator.evaluate(predictionAndLabels))*100,"%")


+--------------------+-----+------------------+----------+--------------+
|            DBA Name|Label|           Results|prediction|predictedLabel|
+--------------------+-----+------------------+----------+--------------+
|1,200 SQ.FT. - YE...|  1.0|              Fail|       0.0|          Pass|
|10 PIN  BOWLING L...|  1.0|              Fail|       0.0|          Pass|
|1000 LIQUORS / BI...|  2.0|Pass w/ Conditions|       0.0|          Pass|
|      11 DINING, LLC|  2.0|Pass w/ Conditions|       0.0|          Pass|
|        1800 LIQUORS|  1.0|              Fail|       0.0|          Pass|
|24 HRS GYROS & SU...|  1.0|              Fail|       0.0|          Pass|
|          25 DEGREES|  2.0|Pass w/ Conditions|       0.0|          Pass|
|3 JJJ'S BETTER TA...|  1.0|              Fail|       0.0|          Pass|
|          4 BANDERAS|  1.0|              Fail|       0.0|          Pass|
|          4 BANDERAS|  2.0|Pass w/ Conditions|       0.0|          Pass|
|5 STARS MINI MART...|  1.0|          