In [3]:
sc.install_pypi_package("pandas")
# Load the dataset
#file_path = "s3://an674-hw3-bucket/311_Service_Requests_from_2015_to_Present_head_1000.csv"
file_path = "s3://pgarias-bucket-cloud/311_Service_Requests_from_2015_to_Present.csv"
df=spark.read.csv(file_path,inferSchema = True,header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas
  Using cached https://files.pythonhosted.org/packages/52/3f/f6a428599e0d4497e1595030965b5ba455fd8ade6e977e3c819973c4b41d/pandas-0.25.3-cp36-cp36m-manylinux1_x86_64.whl
Collecting python-dateutil>=2.6.1
  Using cached https://files.pythonhosted.org/packages/d4/70/d60450c3dd48ef87586924207ae8907090de0b306af2bce5d134d78615cb/python_dateutil-2.8.1-py2.py3-none-any.whl
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-0.25.3 python-dateutil-2.8.1

In [4]:
# Print the initial count
print('Data count: ', df.count())
# Filter on 'Closed' status alone
df=df.filter((df.Status=='Closed'))
print('Data count for \'Closed status\': ', df.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data count:  10420594
Data count for 'Closed status':  9871711

In [5]:
# Drop columns that are not required
keep_columns = ['Created Date', 'Closed Date', 'Agency','Complaint Type', 'Location Type', 
                'Facility Type', 'Borough']
all_columns = df.columns
drop_columns = [c for c in all_columns if c not in keep_columns]
df = df.drop(*drop_columns)
print('Relevant columns: ', df.columns)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Relevant columns:  ['Created Date', 'Closed Date', 'Agency', 'Complaint Type', 'Location Type', 'Facility Type', 'Borough']

In [4]:
# Drop N/A
print('Count before dropna: ', df.count())
df = df.dropna()
print('Count after dropna: ', df.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Count before dropna:  9871711
Count after dropna:  7775683

In [5]:
# Compute the duration in seconds and extract month of created date
from pyspark.sql.functions import month, to_timestamp

date_format = 'MM/dd/yyyy hh:mm:ss aa'
duration_column = 'Duration'
created_month_column = 'Created Date_month'
df = df.withColumn(created_month_column,month(to_timestamp('Created Date', date_format)))
df = df.withColumn(duration_column,(to_timestamp('Closed Date', date_format).cast('int') -to_timestamp('Created Date', date_format).cast('int'))/60)
print('Computed the duration')


# Remove the data whose duration is 0 ['Created Date'] == ['Closed Date']
print('Count before timestamp filtering: ', df.count())
df = df.filter(df[duration_column] > 0)
print('Count after timestamp filtering: ', df.count())

df = df.drop('Created Date', 'Closed Date')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Computed the duration
Count before timestamp filtering:  7775683
Count after timestamp filtering:  7711139

In [6]:
# Normalize the complaint types
from pyspark.sql.functions import when, col

df = df.withColumn('Complaint Type',
                           when(col('Complaint Type').startswith('Noise'),'Noise')
                   .when(col('Complaint Type').startswith('Advocate'),'Advocate')
                   .when(col('Complaint Type').startswith('Bus Stop'),'Bus Stop Shleter Complaint/Placement')
                   .when(col('Complaint Type').startswith('Dead'),'Dead/Dying')
                   .when(col('Complaint Type').startswith('Damaged'),'Dead/Dying')
                   .when(col('Complaint Type').startswith('Derelict'),'Derelict Vehicle')
                   .when(col('Complaint Type').startswith('DOF Parking'),'DOF Parking')
                   .when(col('Complaint Type').startswith('DOF Property'),'DOF Property')
                   .when(col('Complaint Type').startswith('Ferry'),'Ferry complaint/inquiry/permit')
                   .when(col('Complaint Type').startswith('Highway Sign'),'Highway Sign-Damaged/dangling/missing')
                   .when(col('Complaint Type').startswith('For Hire'),'For Hire Vehicle Complaint/Report')
                   .when(col('Complaint Type').startswith('Street Sign'),'Street Sign-Damaged/Dangling/Missing')
                   .when(col('Complaint Type').startswith('Sweeping'),'Sweeping-Missed/Inadequate')
                   .otherwise(col('Complaint Type')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# Create a feature vector
categorical_columns = list(df.columns)
categorical_columns.remove(duration_column)
label_column = duration_column

# Prepare the data
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col
categoricalCols = categorical_columns

indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
             for c in categoricalCols ]

# default setting: dropLast=True
encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
             outputCol="{0}_encoded".format(indexer.getOutputCol()))
             for indexer in indexers ]

assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                            , outputCol="features")

pipeline = Pipeline(stages=indexers + encoders + [assembler])

model=pipeline.fit(df)
data = model.transform(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Prepare for the outliers using the z score
outlier_columns = [c for c in data.columns if c.endswith("_indexed")] + [duration_column]
print('Oultier columns: ', outlier_columns)

In [None]:
for col in outlier_columns:
    remove_outlier(col, data)
    

In [None]:
sc.install_pypi_package("scipy")
def remove_outlier(column, data):
    df_outlier = data.select(column)
    from scipy import stats
    import numpy as np
    df_outlier_pandas = df_outlier.toPandas()
    print('Count before removing outliers for column ', column, ': ', df_outlier.count())
    z = np.abs(stats.zscore(df_outlier_pandas))
    df_outlier_pandas = df_outlier_pandas[(z < 3).all(axis=1)]
    print('Count after removing outliers for column ', column, ': ', df_outlier.count())

In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Get the maximum number of categories in a dataframe
max_categories = 0
for col in categorical_columns:
    item=df.select(col).distinct().count()
    if item > max_categories:
        max_categories = item
print('Max categories: ', max_categories)
featureIndexer = VectorIndexer(inputCol="features", \
                               outputCol="indexedFeatures",\
                               maxCategories=max_categories).fit(data)

data = featureIndexer.transform(data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Max categories:  134

In [9]:
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# Import LinearRegression class
from pyspark.ml.regression import RandomForestRegressor

# Define LinearRegression algorithm
prediction_column = "Prediction_Duration"
rf = RandomForestRegressor(featuresCol="indexedFeatures", labelCol=duration_column,
                           predictionCol=prediction_column, seed=42) 
# featuresCol="indexedFeatures",numTrees=2, maxDepth=2, seed=42

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Chain indexer and tree in a Pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
model = pipeline.fit(trainingData)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
predictions = model.transform(testData)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# Select example rows to display.
predictions.select("features", duration_column, prediction_column).show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------+-------------------+
|            features|          Duration|Prediction_Duration|
+--------------------+------------------+-------------------+
|(297,[15,111,213,...| 7.666666666666667| 14976.256324558815|
|(297,[15,111,213,...|              2.25| 14976.256324558815|
|(297,[15,111,213,...|               3.3| 14976.256324558815|
|(297,[15,111,213,...|3.6666666666666665| 14976.256324558815|
|(297,[15,111,213,...|3.7666666666666666| 14976.256324558815|
+--------------------+------------------+-------------------+
only showing top 5 rows

In [15]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol=duration_column, predictionCol=prediction_column, metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [1]:
import pandas as pd

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
25,application_1574631695946_0026,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

No module named 'pandas'
Traceback (most recent call last):
ModuleNotFoundError: No module named 'pandas'



In [22]:
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
ExtractFeatureImp(model.stages[-1].featureImportances, predictions, "features").head(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

     idx                                               name     score
55    55    Complaint Type_indexed_encoded_New Tree Request  0.361427
46    46  Complaint Type_indexed_encoded_Overgrown Tree/...  0.144154
4      4                         Agency_indexed_encoded_DPR  0.135084
153  153               Location Type_indexed_encoded_Street  0.068237
44    44            Complaint Type_indexed_encoded_Graffiti  0.047485
279  279             Facility Type_indexed_encoded_Precinct  0.035215
278  278                  Facility Type_indexed_encoded_N/A  0.033090
285  285              Borough_indexed_encoded_STATEN ISLAND  0.023243
36    36          Complaint Type_indexed_encoded_Dead/Dying  0.016478
167  167            Location Type_indexed_encoded_Mixed Use  0.015327