In [None]:
__author__ = "Yasaman Emami"
__email__ = ['emami.yasamann@gmail.com','yasaman.emami@sjsu.edu']
__sid__ = "015325557"

In [None]:
import pyspark.sql.types as typ
import pyspark.sql.functions as fn
from pyspark.sql import SparkSession

## Creating Schema

labels = [
    ('ID', typ.IntegerType()),
    ('Name', typ.StringType()),
    ('Street_Address', typ.StringType()),
    ('City', typ.StringType()),
    ('State', typ.StringType()),
    ('Zip_Code', typ.LongType()),
    ('Latitude', typ.FloatType()),
    ('Longitude', typ.FloatType()),
    ('Junk', typ.StringType()),
    ('Phone_Number', typ.LongType()),
    ('ID+Unknown_Date', typ.StringType()),
    ('Unknown_Date', typ.StringType()),
    ('Score?', typ.IntegerType()),
    ('Inspection_type', typ.StringType()),
    ('Unique_inspection_id', typ.StringType()),
    ('Complaint', typ.StringType()),
    ('Risk_Level', typ.StringType()),
]
schema = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels
])

spark = SparkSession.builder.getOrCreate()

## Reading Data
r2016 = spark.read.csv(
'dataset2016.tsv', header=False, schema=schema, sep="\t"
)
r2016.name = 'r2016'
r2017 = spark.read.csv(
'dataset2017.tsv', header=False, schema=schema, sep="\t"
)
r2017.name = 'r2017'
r2018 = spark.read.csv(
'dataset2018.tsv', header=False, schema=schema, sep="\t"
)
r2018.name = 'r2018'
dfs = r2016.union(r2017).union(r2018)

## Preprocessing data like drop duplicates fill nulls,...

In [None]:
data_no_duplicate = dfs.dropDuplicates()

In [None]:
from pyspark.sql.functions import substring
from pyspark.sql.functions import when

#encode risk level column to 0 and 1(0 for nulls and 1 for High,Moderate,Low risk values)
data_coded_level = data_no_duplicate.withColumn('risk_level_coded', 
                                            when(data_no_duplicate.Risk_Level == "Low Risk", "1") \
                                           .when(data_no_duplicate.Risk_Level == "Moderate Risk", "1") \
                                           .when(data_no_duplicate.Risk_Level == "High Risk", "1") \
                                           .when(data_no_duplicate.Risk_Level.isNull(), "0"))
#adding 2 seperate column as year and month that gots generated from date column original dataframe
df = data_coded_level.withColumn('year', substring('Unknown_Date', 1,4)).withColumn('month', substring('Unknown_Date', 6,2))

#df1.show(1)

In [None]:
#fill null values for column score
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

df_stats = df.select(
    _mean(col('Score?')).alias('mean_score')
   
   ).collect()

mean_number = df_stats[0]['mean_score']
df = df.na.fill(mean_number, ["Score?"])

df = df.na.fill("unknown", ["Inspection_type"]).na.fill("unknown", ["Complaint"])

In [None]:
#check for null values
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
#df schema
df.printSchema()

In [None]:
#changing data type from string to integer for some numeric columns
from pyspark.sql.types import IntegerType

from pyspark.sql.functions import col
from pyspark.sql.types import StringType,BooleanType,DateType
df = df.withColumn("risk_level_coded",col("risk_level_coded").cast(IntegerType())) \
         .withColumn("year",col("year").cast(IntegerType())) \
         .withColumn("month",col("month").cast(IntegerType()))
df.printSchema()

## Understanding data and checking distinct data values for few columns to select features

In [None]:
df.select(["Score?"]).distinct().count()

In [None]:
df.select(["Inspection_type"]).distinct().count()

In [None]:
df.select(["Complaint"]).distinct().count()

## Feature selection

In [None]:
#removing obvious unrelated columns from feature list like address, zipcode,...
df = df.select(["Name", "Score?", "Inspection_type", "Complaint", "risk_level_coded", "year", "month"])

In [None]:
# check the frequencies of categorical column values
n_cols = ["Name","Score?", "risk_level_coded", "year", "month"]
categorical_cols = [
e for e in df.columns if e not in n_cols
]
categorical_rdd = df.select(categorical_cols) \
.rdd.map(lambda row: [e for e in row])
for i, col in enumerate(categorical_cols):
    agg = categorical_rdd.groupBy(lambda row: row[i]) \
        .map(lambda row: (row[0], len(row[1])))
    print(
        col, sorted(agg.collect(), key=lambda el: el[1],
        reverse=True)
    )


In [None]:
# for categorical features, run a Chi-square test to determine
# if there are significant differences.
import pyspark.mllib.linalg as ln
import pyspark.mllib.stat as st

for cat in categorical_cols[1:]:
    agg = df.groupby('risk_level_coded') \
    .pivot(cat).count()
    agg_rdd = agg.rdd.map(lambda row: (row[1:])) \
    .flatMap(lambda row: [0 if e == None else e for e in row]) \
    .collect()
    row_length = len(agg.collect()[0]) - 1
    agg = ln.Matrices.dense(row_length, 2, agg_rdd)
    test = st.Statistics.chiSqTest(agg)
    print(cat, round(test.pValue, 4))
print(ln.Matrices.dense(3,2, [1,2,3,4,5,6]))

In [None]:
features_to_keep = ["Name", "Score?", "Inspection_type", "Complaint", "risk_level_coded", "year", "month"]

In [None]:
#check for nulls in the final df for feeding ML model
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

## Using label encoder for two of the selected features (Complaint, Inspection_Type) which is string

In [None]:
#to be able to feed in onehotencoder
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Complaint", outputCol="Complaint_encode") 
df = indexer.fit(df).transform(df) 
indexer2 = StringIndexer(inputCol="Inspection_type", outputCol="Inspection_type_encode") 
df = indexer2.fit(df).transform(df)

#df.show()

In [None]:
#Droping two columns of (Complaint, Inspection_type) which already is generated in numeric categories
df = df.drop("Complaint", "Inspection_type")

#df.show()

In [None]:
#plotting values just out of curiousity for checking how they are scattered 
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

dfp = df.toPandas()
plt.scatter(dfp["Score?"],dfp["risk_level_coded"])

In [None]:
# convert to numeric values
import pyspark.ml.feature as ft

# create Transformer: onehotencoder for two columns(Complaint, Inspection_Type)
encoder = ft.OneHotEncoder(
    inputCol='Inspection_type_encode', outputCol='Inspection_type_vec'
)
 
encoder2 = ft.OneHotEncoder(
    inputCol='Complaint_encode', outputCol='Complaint_type_vec'
)

featurs_col = ['Score?', 'Inspection_type_encode', 'Complaint_encode']

# since we have 3 features try with score only, score and inspection_type and all three features
inputCols = ['Score?','Inspection_type_vec', 'Complaint_type_vec']

inputCols2 = ['Score?','Inspection_type_vec']

inputCols3 = ['Score?']

## Please change inputCols to inputCols and 3 to see the different PR and ROC value

In [None]:
# create a single column with all the features collated together.
featuresCreator = ft.VectorAssembler(
inputCols=inputCols,
outputCol='features'
)


## Creating ML Model

In [None]:
# Creating an estimator
import pyspark.ml.classification as cl

logistic = cl.LogisticRegression(
maxIter=100, regParam=0.01, labelCol='risk_level_coded'
)
# Creating a pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[encoder,encoder2, featuresCreator, logistic
])




In [None]:
#creating train data from 2016 to end of march 2018
train = df.filter((df.year == 2016) | (df.year == 2017) | ((df.year == 2018) & (df.month<5)))

In [None]:
#creating test data from april 2018 to the end
test = df.filter((df.year == 2018) & (df.month >= 5))

In [None]:
test.show()

In [None]:
predict = df.filter((df.year == 2018) & (df.month == 9))

In [None]:
predict.show()

In [None]:
# Fitting the model
model = pipeline.fit(train)
# estimation
test_model = model.transform(test)

In [None]:
train.show()

In [None]:
test.show(20)

In [None]:
# Evaluating the performance
test_model.show(truncate=False)

In [None]:
#model evaluating
import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol='probability',
labelCol='risk_level_coded'
)
print(evaluator.evaluate(
test_model, {evaluator.metricName: 'areaUnderROC'}
))
print(evaluator.evaluate(
test_model, {evaluator.metricName: 'areaUnderPR'}
))

## =================== ##
## if the selected features are inputCols2 ['Score?','Inspection_type_vec']
## areaUnderROC ==> 0.9687605681033745
## areaUnderPR ==> 0.9901315409221931


## if if the selected features are inputCols3 ['Score?']
## areaUnderROC ==> 0.493075464722042
## areaUnderPR ==> 0.8225093580554805

# so the pefrect classification model would be with the three features of ['Score?','Inspection_type_vec', 'Complaint_type_vec'] as
# inputCols which has the value of 1 for both areaUnderROC and areaUnderPR which shows how perfect the model works building model with those features

## ===================== ##

In [None]:
# save the Pipeline definition for later use
pipelinePath = './restaurant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)

## Predicting the restaurants which are likely to have another food safety issue in September 2018

In [None]:
# Saving the model
from pyspark.ml import PipelineModel
modelPath = './restaurant_oneHotEncoder_Logistic_PipelineModel'
model.write().overwrite().save(modelPath)

In [None]:
df_predict = model.transform(predict)

In [None]:
df_predict = df_predict.select(["Name", "risk_level_coded","prediction" ]).filter(df_predict["prediction"]==1)

df_predict.show()

In [None]:
# number of restaunts which are likely to have another food safety issue in September 2018
df_predict.count()