In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.types import DoubleType

In [2]:
spark1 = SparkSession.builder.appName('ds504_projet').getOrCreate()

In [3]:
df_raw = spark1.read.csv("/FileStore/tables/2018.csv", inferSchema=True, header=True).drop('latitude', 'longitude', 'open_y')

In [4]:
df_raw.count()

In [5]:
def get_dummy(df,categoricalCols,continuousCols,labelCol):
  """
  function for encoding categorical features and combine with numeric features.
  """
  from pyspark.ml import Pipeline
  from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
  from pyspark.sql.functions import col

  indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
               for c in categoricalCols ]

  # default setting: dropLast=True
  encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
               outputCol="{0}_encoded".format(indexer.getOutputCol()))
               for indexer in indexers ]

  assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                              + continuousCols, outputCol="features")

  pipeline = Pipeline(stages=indexers + encoders + [assembler])

  model=pipeline.fit(df)
  data = model.transform(df)

  data = data.withColumn('label',col(labelCol))

  return data.select('features','label')

In [6]:
# data processing for classifiers
catcols = ['reason', 'department', 'neighborhood', 'source']
num_cols = ['open_m', 'open_day_of_week', 'Avg Temp (F)', 'Precip (in)']
labelCol = 'class'

data = get_dummy(df_raw,catcols,num_cols,labelCol)
data.show(5)

# Split the data into training and test sets (10% held out for testing)
(trainingData, testData) = data.randomSplit([0.9, 0.1], seed=0)

In [7]:
# classifications
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import MultilayerPerceptronClassifier



for c in ['Logistic regression', 'Random forest', 'GBT', 'MPC']:
  if c == 'Logistic regression':
    clf = LogisticRegression(featuresCol='features', labelCol='label')
  elif c == 'Random forest':
    clf = RandomForestClassifier(labelCol="label", featuresCol="features", seed=0, featureSubsetStrategy='sqrt')
  elif c == 'GBT':
    clf = GBTClassifier(labelCol="label", featuresCol="features", seed=0)
  elif c == 'Decision tree':
    clf = DecisionTreeClassifier(labelCol="label", featuresCol="features", seed=0)
  elif c == 'MPC':
    clf = MultilayerPerceptronClassifier(labelCol="label", featuresCol="features", seed=0, layers=[88,3,2])
  
  
  model = clf.fit(trainingData)
  predictions = model.transform(testData)
  evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
  accuracy = evaluator.evaluate(predictions)
  
  print(c)
  print('Accuracy: %.2f' % accuracy)
  print('=====================')

In [8]:
# data processing for regressors
catcols = ['department', 'neighborhood', 'source']
num_cols = ['open_m', 'open_day_of_week', 'Avg Temp (F)', 'Precip (in)']
labelCol = 'duration'

data = get_dummy(df_raw.where(df_raw['reason']=='Sanitation'),catcols,num_cols,labelCol)
data.show(5)

(trainingData, testData) = data.randomSplit([0.9, 0.1], seed=0)
# df_raw.where(df_raw['reason']=='Sanitation').show(5)

In [9]:
# regressions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor


for r in ['Linear regression', 'Decision tree', 'Random forest', 'GBT']:
  if r == 'Linear regression':
    reg = LinearRegression(featuresCol='features', labelCol='label')
  elif r == 'Decision tree':
    reg = DecisionTreeRegressor(featuresCol='features', labelCol='label', seed=0)
  elif r == 'Random forest':
    reg = RandomForestRegressor(featuresCol='features', labelCol='label', seed=0, featureSubsetStrategy='sqrt')
  elif r == 'GBT':
    reg = GBTRegressor(featuresCol='features', labelCol='label', seed=0)
  
  model = reg.fit(trainingData)
  predictions = model.transform(testData)
  evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
  r2 = evaluator.evaluate(predictions)
  
  print(r)
  print('R2: %.2f' % r2)
  print('====================')