In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, replace, count, when, isnan, isnull, to_timestamp, unix_timestamp, countDistinct
from pyspark.sql.types import StructType, StructField, TimestampType
from pyspark.ml.feature import StringIndexer

spark = SparkSession.builder.appName("NYC311Calls").getOrCreate()

In [0]:
PATH = "dbfs:/public/"

# Data Descriptions

#### Read Data

In [0]:
import os
uri = os.path.join(PATH, "311_Service_Requests_from_2020_to_Present.csv")

data1 = spark.read.csv(
    uri,
    header=True,
    inferSchema=True
)
data=data1

#### Data Samples

In [0]:
data.display(5)

bbl,latitude,longitude,resolution_time,agency_label,complaint_type_label,location_type_label,incident_zip_label,address_type_label,city_label,community_board_label,borough_label,open_data_channel_type_label
1008960023,40.73251803248665,-73.98483468502799,2.4166666666666665,0.0,10.0,6.0,59.0,0.0,2.0,53.0,3.0,0.0
3050260344,40.65698230332589,-73.96222514965272,4.45,0.0,1.0,2.0,17.0,0.0,0.0,28.0,0.0,1.0
4005700053,40.76625579583187,-73.9262035584613,3.75,6.0,23.0,5.0,101.0,0.0,5.0,11.0,2.0,2.0
3050260344,40.65698230332589,-73.96222514965272,5.95,0.0,1.0,2.0,17.0,0.0,0.0,28.0,0.0,1.0
1013000001,40.75450745921105,-73.97595054706579,23.0,0.0,20.0,0.0,175.0,0.0,2.0,47.0,3.0,2.0
3003940001,40.682868001833405,-73.9848135266054,25.516666666666666,0.0,0.0,0.0,64.0,0.0,0.0,32.0,0.0,2.0
3016360024,40.688334599490894,-73.93014442097454,1.7666666666666666,0.0,1.0,2.0,11.0,0.0,0.0,9.0,0.0,0.0
1007000027,40.75121898488021,-74.00278632161532,10.483333333333333,0.0,1.0,2.0,110.0,0.0,2.0,38.0,3.0,0.0
1009330025,40.740295354643706,-73.97695165980414,15.733333333333333,0.0,1.0,2.0,91.0,0.0,2.0,53.0,3.0,0.0
4051830046,40.7578403661312,-73.82106920352469,22.7,0.0,1.0,2.0,74.0,0.0,6.0,15.0,2.0,1.0


#### Data Schema

In [0]:
data.printSchema()

root
 |-- Unique Key: integer (nullable = true)
 |-- Created Date: string (nullable = true)
 |-- Closed Date: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Agency Name: string (nullable = true)
 |-- Complaint Type: string (nullable = true)
 |-- Descriptor: string (nullable = true)
 |-- Location Type: string (nullable = true)
 |-- Incident Zip: string (nullable = true)
 |-- Incident Address: string (nullable = true)
 |-- Street Name: string (nullable = true)
 |-- Cross Street 1: string (nullable = true)
 |-- Cross Street 2: string (nullable = true)
 |-- Intersection Street 1: string (nullable = true)
 |-- Intersection Street 2: string (nullable = true)
 |-- Address Type: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Landmark: string (nullable = true)
 |-- Facility Type: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Due Date: string (nullable = true)
 |-- Resolution Description: string (nullable = true)
 |-- Resolution Actio

#### Data Size

In [0]:
data.count()

9374581

#### Nulls Count

In [0]:
null_counts = (data
    .select([count(when(col(c).isNull(),c)).alias(c) for c in data.columns])
    .collect()[0])

null_counts_dict = null_counts.asDict()

print("Null Counts in Each Column:")
for column, count in null_counts_dict.items():
    print(f"{column}: {count}")

Null Counts in Each Column:
Unique Key: 0
Created Date: 0
Closed Date: 367452
Agency: 0
Agency Name: 0
Complaint Type: 0
Descriptor: 105525
Location Type: 1207787
Incident Zip: 157722
Incident Address: 429011
Street Name: 429328
Cross Street 1: 2586873
Cross Street 2: 2587377
Intersection Street 1: 3395188
Intersection Street 2: 3390668
Address Type: 1183637
City: 508123
Landmark: 4113063
Facility Type: 8687500
Status: 0
Due Date: 9347266
Resolution Description: 427235
Resolution Action Updated Date: 141129
Community Board: 7608
BBL: 1125785
Borough: 7608
X Coordinate (State Plane): 168585
Y Coordinate (State Plane): 167463
Open Data Channel Type: 0
Park Facility Name: 0
Park Borough: 7608
Vehicle Type: 9365786
Taxi Company Borough: 9369615
Taxi Pick Up Location: 9292550
Bridge Highway Name: 9296372
Bridge Highway Direction: 9335351
Road Ramp: 9352663
Bridge Highway Segment: 9296370
Latitude: 168699
Longitude: 168699
Location: 168699
Zip Codes: 201102
Community Districts: 171525
Boroug

# Data Preprocessing

###All these steps have been combined to a single block at the end

#### Fix Column Names

#### Keep Useful Columns

#### Keep Only Closed 311 Calls

#### Drop Null Rows

#### Fix Data Types (DateTime)

#### Remove Created Date > Closed Date

### Feature Engineering

#### Add Resolution Time (Minutes)

#### Label Categorical Data 

#### Agency Distribution

### Main Data Preprocessing

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, replace, count, when, isnan, isnull, to_timestamp, unix_timestamp, countDistinct
from pyspark.sql.types import StructType, StructField, TimestampType
from pyspark.ml.feature import StringIndexer

spark = SparkSession.builder.appName("NYC311Calls").getOrCreate()

spark.conf.set("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
spark.conf.set("parquet.enable.summary-metadata", "false")
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

#####I am using all those features where null values were less than 50% of the data. Since there were some features related to Taxi Department, I had to drop all those features becasue majority of those features had null values.

In [0]:
keeps = ['created_date', 'closed_date', 'agency', 'complaint_type', 'location_type', 'incident_zip', 'address_type', 'city', 'status', 'bbl', 'borough', 'open_data_channel_type', 'latitude', 'longitude','facility_type', 'park_facility_name','park_borough']

dateFormat = "MM/dd/yyyy hh:mm:ss a"

data = (data1.toDF(*[col.lower().replace(' ', '_') for col in data1.columns])
        .select(*keeps)
        .where(lower(col("status")) == "closed")
        .drop("status")
        .dropna()
        .withColumn("created_date", to_timestamp(col("created_date"), dateFormat))
        .withColumn("closed_date", to_timestamp(col("closed_date"), dateFormat))
        .where(col("created_date") < col("closed_date"))
        .withColumn("resolution_time", (unix_timestamp("closed_date") - unix_timestamp("created_date")) / 60)
        .drop("created_date", "closed_date"))

categories = ['agency', 'complaint_type', 'location_type', 'incident_zip', 'address_type', 'city', 'borough', 'open_data_channel_type','facility_type', 'park_facility_name','park_borough']

for column in categories:
    data = StringIndexer(inputCol=column, outputCol=column+"_label").fit(data).transform(data)

data = data.drop(*categories)
data.show()

+----------+------------------+------------------+---------------+------------+--------------------+-------------------+------------------+------------------+----------+-------------+----------------------------+-------------------+------------------------+------------------+
|       bbl|          latitude|         longitude|resolution_time|agency_label|complaint_type_label|location_type_label|incident_zip_label|address_type_label|city_label|borough_label|open_data_channel_type_label|facility_type_label|park_facility_name_label|park_borough_label|
+----------+------------------+------------------+---------------+------------+--------------------+-------------------+------------------+------------------+----------+-------------+----------------------------+-------------------+------------------------+------------------+
|3001800021| 40.68546089593264|-73.97899339156241|         1440.0|         0.0|                 2.0|                1.0|              46.0|               0.0|       0.0|

In [0]:
#Checking the distribution of target variable resolution_time
data.select("resolution_time").summary('min','max','stddev','25%','50%','75%').show()

+-------+------------------+
|summary|   resolution_time|
+-------+------------------+
|    min|               1.0|
|    max|        5658329.15|
| stddev|165611.11293470918|
|    25%|            1440.0|
|    50%|            2880.0|
|    75%|            7200.0|
+-------+------------------+



In [0]:
#Scaling target Variable
#Applying log transformation to scale the dependent variable
from pyspark.sql.functions import log

# Adding a small constant to avoid log(0)
data = data.withColumn('resolution_time', log(col('resolution_time') + 1))

In [0]:
#Checking the distribution of target variable resolution_time
data.select("resolution_time").summary('min','max','stddev','25%','50%','75%').show()

+-------+--------------------+
|summary|     resolution_time|
+-------+--------------------+
|    min|0.016529301951210506|
|    max|   15.54863938014872|
| stddev|  2.8202852501408673|
|    25%|    4.05814151355814|
|    50%|   7.112612652609029|
|    75%|   8.842611702954414|
+-------+--------------------+



######Now am I going to use Random Forest for feature selection

###Random Forest for Feature Selection

In [0]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
import time
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt

train, test = data.randomSplit([0.8, 0.2], seed=42) #splitting the dataset
PredVar = ["resolution_time"] #Prediction variable
featureColumns = [column for column in data.columns if column not in PredVar] # saving all features to a single variable to be passed to vector assembler
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
# Define the model
rf = RandomForestRegressor(featuresCol='features', labelCol="resolution_time",maxBins=256)
train = assembler.transform(train)
test = assembler.transform(test)

# Fit the model
rf_model = rf.fit(train)
importances = rf_model.featureImportances

# 'importances' will be a list of importance scores for each feature
#featureColumns is the list of feature names
feature_importance_list = [(feature, round(importance, 4)) for feature, importance in zip(featureColumns, importances)]
sorted_features = sorted(feature_importance_list, key=lambda x: x[1], reverse=True)

# Now 'sorted_features' contains features sorted by importance
#I could have set a threshold or choose top N featutres
# Here I am selecting top 10 features
N = 10
selected_features = [feature for feature, importance in sorted_features[:N]]


In [0]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
import time
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt

In [0]:
sorted_features

[('location_type_label', 0.4457),
 ('open_data_channel_type_label', 0.229),
 ('complaint_type_label', 0.2173),
 ('incident_zip_label', 0.0424),
 ('facility_type_label', 0.0278),
 ('bbl', 0.0131),
 ('city_label', 0.0113),
 ('latitude', 0.0047),
 ('borough_label', 0.0039),
 ('park_borough_label', 0.0028),
 ('longitude', 0.0021),
 ('agency_label', 0.0),
 ('address_type_label', 0.0),
 ('park_facility_name_label', 0.0)]

In [0]:
selected_features

['location_type_label',
 'open_data_channel_type_label',
 'complaint_type_label',
 'incident_zip_label',
 'facility_type_label',
 'bbl',
 'city_label',
 'latitude',
 'borough_label',
 'park_borough_label']

In [0]:
#Plotinmg resolution time now to check if the distribution got less skewed or not
GroupedTime = data.withColumn("range", 
                       when((data["resolution_time"] >= 0) & (data["resolution_time"] < 3), "0-3")
                       .when((data["resolution_time"] >= 3) & (data["resolution_time"] < 6), "3-6")
                       .when((data["resolution_time"] >= 6) & (data["resolution_time"] <9), "6-9")
                       .when((data["resolution_time"] >= 9) & (data["resolution_time"] < 12), "9-12")
                       .when((data["resolution_time"] >= 12) & (data["resolution_time"] < 16), "12-15")
).groupBy("range").count().sort(col("count").desc()).show()


+-----+-------+
|range|  count|
+-----+-------+
|  3-6|2221331|
|  6-9|1972982|
| 9-12|1260975|
|  0-3| 516925|
|12-15| 124097|
+-----+-------+



In [0]:
selected_features=selected_features+['resolution_time']
data=data.select(*selected_features)
train, test = data.randomSplit([0.8, 0.2], seed=42)
PredVar = ["resolution_time"]
featureColumns = [column for column in data.columns if column not in PredVar]
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")

rf = RandomForestRegressor(featuresCol='features', labelCol='resolution_time',maxBins=256)
pipeline = Pipeline(stages=[assembler, rf])



####Grid Search

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

starttime = time.time()

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [10, 20, 50])  # Number of trees
             .addGrid(rf.maxDepth, [5, 10])  # Maximum depth
             .addGrid(rf.maxBins, [256, 270])
             .build())

# Defining evaluator. It will give the best parameters which gives the most optimal rmse values. 
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="resolution_time", metricName="rmse")

# Setting up 3-fold cross-validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

# Fit the model
cvModel = crossval.fit(train)

# Fetch best model
bestModel = cvModel.bestModel

# Print out the best parameters
endtime = time.time()
elapsed_time=endtime-starttime
print(f"Time taken for Grid Search: {elapsed_time} seconds")

print("Best number of trees: ", bestModel.stages[-1]._java_obj.getNumTrees())
print("Best max depth: ", bestModel.stages[-1]._java_obj.getMaxDepth())
print("Best max bins: ", bestModel.stages[-1]._java_obj.getMaxBins())


Time taken for Grid Search: 1576.7722833156586 seconds
Best number of trees:  50
Best max depth:  10
Best max bins:  256


In [0]:

#Passing the values for the best parameters
rf = RandomForestRegressor(featuresCol='features', labelCol='resolution_time',maxBins=256,maxDepth=10,numTrees=50)
pipeline = Pipeline(stages=[assembler, rf])

In [0]:
from pyspark.sql.functions import exp

scales = [0.2, 0.4,0.6, 0.8, 1.0]
times = []
for scale in scales:
    train_sample = train.sample(False, scale, seed=42)
    start_time = time.time()
    # Fit the pipeline
    model = pipeline.fit(train_sample)

    end_time = time.time()
    elapsed_time = end_time - start_time
    times.append(elapsed_time)

    print(f"Time taken for {int(scale*100)}% of data: {elapsed_time} seconds")

    # Evaluate the model on the test set
    predictions = model.transform(test)
   
    #This commented code is for applying inverse log transformation. If I work further on this problem, I would apply inverse transformation so that the target variable can be presented to stakeholders (Unit of target variable is minutes)

    # Apply exponential function to both actual and predicted values to scale them back
    #predictions = predictions.withColumn("prediction_Time", exp(predictions["prediction"]) - 1)
    #predictions = predictions.withColumn("actual_Time", exp(predictions["resolution_time"]) - 1)

    # Use the scaled back values for evaluation
    #evaluator_rmse = RegressionEvaluator(labelCol="actual_Time", predictionCol="prediction_exp", metricName="rmse")
    #evaluator_mse = RegressionEvaluator(labelCol="actual_Time", predictionCol="prediction_exp", metricName="mse")

    evaluator_rmse = RegressionEvaluator(labelCol="resolution_time", predictionCol="prediction", metricName="rmse")
    evaluator_mse = RegressionEvaluator(labelCol="resolution_time", predictionCol="prediction", metricName="mse")

    rmse = evaluator_rmse.evaluate(predictions)
    mse = evaluator_mse.evaluate(predictions)

    print(f"Scale {int(scale*100)}% - RMSE: {rmse}, MSE: {mse}")

Time taken for 20% of data: 201.96930980682373 seconds
Scale 20% - RMSE: 1.0982423125133602, MSE: 1.206136176994693
Time taken for 40% of data: 199.914892911911 seconds
Scale 40% - RMSE: 1.0907217214506686, MSE: 1.1896738736443098
Time taken for 60% of data: 200.30164408683777 seconds
Scale 60% - RMSE: 1.089783474036064, MSE: 1.1876280202821126
Time taken for 80% of data: 196.01264595985413 seconds
Scale 80% - RMSE: 1.0882956017962881, MSE: 1.184387316889145
Time taken for 100% of data: 200.03263330459595 seconds
Scale 100% - RMSE: 1.0884935779497502, MSE: 1.1848182692378493
