# Capstone Project 2
# How Soon Will a Complaint be Resolved?
## A Case Study on New York City 311 Call
## Notebook in pyspark

Data Source: NYC Open Data - 311 Service Requests from 2010 to Present
URL: https://nycopendata.socrata.com/Social-Services/311-Service-Requests-from-2010-to-Present/erm2-nwe9
Analyst: Eugene Wen

### Subsample datasets for model development
First installed subsample package through pip, then changed directory to NYC311 data folder and run command as follows to randomly draw 500,000 rows:

`subsample -n 500000 311_Service_Requests_from_2010_to_Present.csv -r > nyc311_sample.csv`  

Note that this dataset is used for development. In the final report all 10GB data will be used for analysis.

In [1]:
# Load packages
import pandas as pd
from pyspark.sql import SparkSession

In [2]:
# Start a spark session
spark = SparkSession.builder.appName('nyc311').getOrCreate()

In [16]:
# Load sample dataset
df = spark.read.csv('../NYC311/nyc311_sample.csv', inferSchema=False, header=True)
df.printSchema()

root
 |-- Unique Key: string (nullable = true)
 |-- Created Date: string (nullable = true)
 |-- Closed Date: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Agency Name: string (nullable = true)
 |-- Complaint Type: string (nullable = true)
 |-- Descriptor: string (nullable = true)
 |-- Location Type: string (nullable = true)
 |-- Incident Zip: string (nullable = true)
 |-- Incident Address: string (nullable = true)
 |-- Street Name: string (nullable = true)
 |-- Cross Street 1: string (nullable = true)
 |-- Cross Street 2: string (nullable = true)
 |-- Intersection Street 1: string (nullable = true)
 |-- Intersection Street 2: string (nullable = true)
 |-- Address Type: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Landmark: string (nullable = true)
 |-- Facility Type: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Due Date: string (nullable = true)
 |-- Resolution Description: string (nullable = true)
 |-- Resolution Action

In [10]:
# Check the first three columns we are interested in.
df['Unique Key', 'Created Date', 'Closed Date'].show()

+----------+--------------------+--------------------+
|Unique Key|        Created Date|         Closed Date|
+----------+--------------------+--------------------+
|  32199603|12/14/2015 12:00:...|01/04/2016 12:00:...|
|  20074547|03/21/2011 04:22:...|03/23/2011 02:49:...|
|  28951515|09/25/2014 06:18:...|09/25/2014 06:19:...|
|  17575598|07/03/2010 10:11:...|07/07/2010 12:00:...|
|  28270434|06/16/2014 12:00:...|06/18/2014 12:00:...|
|  34115581|08/18/2016 02:24:...|08/31/2016 12:47:...|
|  28261221|06/14/2014 09:12:...|06/14/2014 10:44:...|
|  22829180|03/06/2012 12:05:...|03/07/2012 01:36:...|
|  29709630|01/13/2015 05:51:...|01/13/2015 05:51:...|
|  20809019|07/11/2011 12:00:...|07/18/2011 12:00:...|
|  29450972|12/07/2014 12:00:...|12/09/2014 12:00:...|
|  34728843|11/07/2016 10:24:...|11/08/2016 10:58:...|
|  34260889|09/07/2016 06:03:...|09/27/2016 03:49:...|
|  36413130|06/11/2017 01:20:...|06/11/2017 04:35:...|
|  25301783|04/04/2013 11:30:...|04/05/2013 12:49:...|
|  1599464

In [19]:
# Calculate the response time as the difference between created date and closed date, in minutes.
from pyspark.sql.functions import unix_timestamp

fmt = "MM/dd/yyyy hh:mm:ss a"
# fmt2 = "MM/dd/yyyy HH:mm:ss" ## 12-Hr based time. Can't handle AM/PM.
starttime = unix_timestamp(df['Created Date'], format=fmt)
endtime = unix_timestamp(df['Closed Date'], format=fmt)

df = df.withColumn("Resp_time", endtime - starttime)
df['Unique Key', 'Resp_time'].show()

+----------+---------+
|Unique Key|Resp_time|
+----------+---------+
|  32199603|  1814400|
|  20074547|   167208|
|  28951515|       57|
|  17575598|   352140|
|  28270434|   172800|
|  34115581|  1117391|
|  28261221|     5472|
|  22829180|    91874|
|  29709630|       48|
|  20809019|   604800|
|  29450972|   172800|
|  34728843|   131670|
|  34260889|  1719949|
|  36413130|    11671|
|  25301783|     4782|
|  15994647|     5040|
|  20885742|  2160000|
|  19128116|     4566|
|  32064460|  1209167|
|  30699222|    16071|
+----------+---------+
only showing top 20 rows



### Data Wrangling


In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [None]:
# Check missing patterns

In [None]:
data = spark.read.csv('Ecommerce_Customers.csv', \
                      inferSchema=True, header=True)



In [None]:
assembler = VectorAssembler(inputCols=['A', 'B', 'C'], \
                            outputCol='features')
output = assembler.transofrom(data) # Dense vector
final_data = output.select(['features', 'target'])

### Exploratory Data Analysis

In [None]:
# Plot response time distribution

### Machine Learning Models

In [14]:
# First run
from pyspark.ml.regression import LinearRegression
training = spark.read.format('libsvm').load('file path')

# training set requires two columns: label and features.

lr = LinearRegression(featuresCol = 'features', \
                      labelCol = 'label', \
                      predictionCol = 'prediction')
lrModel = lr.fit(training)
lrModel.coefficients
lrModel.intercept
training_summary = lrModel.summary
training_summary.r2
training_summary.rootMeanSquaredError



In [None]:
# Split dataset into training and testing sets
all_data = all_data
train_data, test_data = all_data.randomSplit([0.7, 0.3])

In [None]:
correct_model = lr.fit(train_data)
test_results = correct_model.evaluate(test_data)
test_results.rootMeanSquaredError

In [None]:
# Prediction

predictions = correct_model.transform(unlabeled_data)

In [15]:
# Tuning with optimal subset

In [16]:
# Final model choice with optimal hyperparameter(s)

In [None]:
from pyspark.sql.functions import corr
df.select(corr('crew', 'passengers')).show()

In [None]:
# Decision Tree


In [None]:
# Random Forest
from pyspark.ml.regression import RandomForestRegressor, \
                                  GBTRegressor, \
                                  GeneralizedLinearRegression

### Results

In [17]:
# Final model training

In [18]:
# Final model evaluation

In [9]:
df = readSeq((1, "12/14/2015 12:00:45 AM"), (2, "#$**#")).toDF("id", "dts")

NameError: name 'Seq' is not defined