### Import all dependencies

In [1]:
import csv
import os
import sys
# Spark imports
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc
from pyspark.sql.functions import lit
from pyspark.sql.functions import desc
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as f

### Initialize Spark

In [2]:
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("NYC 311 Data Analysis") \
        .config('spark.sql.codegen.wholeStage', 'false') \
        .getOrCreate()
    return spark

### Test Data

The complete dataset contains data from 2010 till now. To ease the development of algorithm we stripped down the entire dataset into a small dataset. The development dataset contains data from August 2019.

In [3]:
filename = "data/311_Service_Requests_from_2018_to_Present.csv"

### Create dataframe

In [4]:
spark = init_spark()

In [5]:
nyc_311_df = spark.read.format("csv").option("header", "true").load(filename)

In [6]:
nyc_311_df.printSchema()

root
 |-- Unique Key: string (nullable = true)
 |-- Created Date: string (nullable = true)
 |-- Closed Date: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Agency Name: string (nullable = true)
 |-- Complaint Type: string (nullable = true)
 |-- Descriptor: string (nullable = true)
 |-- Location Type: string (nullable = true)
 |-- Incident Zip: string (nullable = true)
 |-- Incident Address: string (nullable = true)
 |-- Street Name: string (nullable = true)
 |-- Cross Street 1: string (nullable = true)
 |-- Cross Street 2: string (nullable = true)
 |-- Intersection Street 1: string (nullable = true)
 |-- Intersection Street 2: string (nullable = true)
 |-- Address Type: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Landmark: string (nullable = true)
 |-- Facility Type: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Due Date: string (nullable = true)
 |-- Resolution Description: string (nullable = true)
 |-- Resolution Action

### Utility Functions

In [7]:
def get_df_row_as_dict(df_row):
    return df_row.asDict()

In [8]:
def print_df_row_as_dict(df_row):
    for col in get_df_row_as_dict(df_row):
        print(col + ":" + str(df_row[col]))

#### Drop following columns

In [9]:
DROP_THRESHOLD = 0.5

In [10]:
DROP_COLS = ['Agency Name', 'Incident Address', 'Street Name', 'Cross Street 1', 'Cross Street 2',
             'Intersection Street 1', 'Intersection Street 2', 'Landmark', 'Facility Type', 'Location_Type',
             'Resolution Description', 'Resolution Action Updated Date', 'Community Board', 'BBL',
             'X Coordinate (State Plane)', 'Y Coordinate (State Plane)', 'Park Facility Name', 'Park Borough',
             'Vehicle Type', 'Taxi Company Borough', 'Taxi Pick Up Location', 'Bridge Highway Name',
             'Bridge Highway Direction', 'Road Ramp', 'Bridge Highway Segment', 'Latitude', 'Longitude', 'Location']

In [11]:
def get_missing_value_count(df_311):
    return df_311.select([f.count(f.when((f.isnull(c) | (f.col(c) == '')), c)).alias(c) for c in df_311.columns])

In [12]:
def drop_unwanted_cols(df_311):
    df_311 = df_311.drop(*DROP_COLS)
    df_size = df_311.count()
    missing_value_count_df = get_missing_value_count(df_311)
    missing_counts_dict = get_df_row_as_dict(missing_value_count_df.collect()[0])
    drop_list = [k for k, v in missing_counts_dict.items() if (v / df_size) >= DROP_THRESHOLD]
    if len(drop_list) > 0:
        return df_311.drop(*drop_list)
    return df_311

In [13]:
nyc_311_df = drop_unwanted_cols(nyc_311_df)

In [14]:
nyc_311_df.printSchema()

root
 |-- Unique Key: string (nullable = true)
 |-- Created Date: string (nullable = true)
 |-- Closed Date: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Complaint Type: string (nullable = true)
 |-- Descriptor: string (nullable = true)
 |-- Location Type: string (nullable = true)
 |-- Incident Zip: string (nullable = true)
 |-- Address Type: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Open Data Channel Type: string (nullable = true)



### Remove Space from Column Names

In [15]:
def remove_space_from_col_names(df_311):
    return (df_311.withColumnRenamed('Unique Key', 'Unique_Key').withColumnRenamed('Created Date', 'Created_Date')
            .withColumnRenamed('Closed Date', 'Closed_Date')
            .withColumnRenamed('Due Date', 'Due_Date').withColumnRenamed('Address Type', 'Address_Type')
            .withColumnRenamed('Location Type', 'Location_Type').withColumnRenamed('Incident Zip', 'Incident_Zip')
            .withColumnRenamed('Complaint Type', 'Complaint_Type')
            .withColumnRenamed('Open Data Channel Type', 'Open_Data_Channel_Type'))

In [16]:
nyc_311_df = remove_space_from_col_names(nyc_311_df)

In [17]:
nyc_311_df.printSchema()

root
 |-- Unique_Key: string (nullable = true)
 |-- Created_Date: string (nullable = true)
 |-- Closed_Date: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Complaint_Type: string (nullable = true)
 |-- Descriptor: string (nullable = true)
 |-- Location_Type: string (nullable = true)
 |-- Incident_Zip: string (nullable = true)
 |-- Address_Type: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Open_Data_Channel_Type: string (nullable = true)



### Capitalize Columns

In [18]:
def capitalize_cols(df_311, cols):
    for c in cols:
        df_311 = df_311.withColumn(c, f.upper(f.col(c)))
    return df_311

In [19]:
CAPITALIZE_COLS = ['City', 'Borough']

In [20]:
nyc_311_df = capitalize_cols(nyc_311_df,CAPITALIZE_COLS)

In [21]:
nyc_311_df.printSchema()

root
 |-- Unique_Key: string (nullable = true)
 |-- Created_Date: string (nullable = true)
 |-- Closed_Date: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Complaint_Type: string (nullable = true)
 |-- Descriptor: string (nullable = true)
 |-- Location_Type: string (nullable = true)
 |-- Incident_Zip: string (nullable = true)
 |-- Address_Type: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Open_Data_Channel_Type: string (nullable = true)



### Cast zipcode from String to Float

In [22]:
def format_zip_code(df_311):
    df_311 = df_311.withColumn('Incident_Zip', f.substring('Incident_Zip', 0, 5).cast('float'))
    return df_311

In [23]:
nyc_311_df = format_zip_code(nyc_311_df)

In [24]:
nyc_311_df.printSchema()

root
 |-- Unique_Key: string (nullable = true)
 |-- Created_Date: string (nullable = true)
 |-- Closed_Date: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Complaint_Type: string (nullable = true)
 |-- Descriptor: string (nullable = true)
 |-- Location_Type: string (nullable = true)
 |-- Incident_Zip: float (nullable = true)
 |-- Address_Type: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Open_Data_Channel_Type: string (nullable = true)



### Update the burough city from zip code

In [25]:
def update_burough_city_from_zip_code(df_311):
    areas = ['City', 'Borough']
    for area in areas:
        df_311 = df_311.withColumn(area, f.when(
            (f.col("Incident_Zip") > 10450.0) & (f.col("Incident_Zip") < 10475.0) & (f.col(area).isNull()),
            "BRONX").otherwise(f.col(area)))
        df_311 = df_311.withColumn(area, f.when(
            (f.col("Incident_Zip") > 11200.0) & (f.col("Incident_Zip") < 11240.0) & (f.col(area).isNull()),
            "BROOKLYN").otherwise(f.col(area)))
        df_311 = df_311.withColumn(area, f.when(
            (f.col("Incident_Zip") > 10000.0) & (f.col("Incident_Zip") < 10280.0) & (f.col(area).isNull()),
            "MANHATTAN").otherwise(f.col(area)))
        df_311 = df_311.withColumn(area, f.when(
            (f.col("Incident_Zip") > 10300.0) & (f.col("Incident_Zip") < 10315.0) & (f.col(area).isNull()),
            "STATEN ISLAND").otherwise(f.col(area)))
        df_311 = df_311.withColumn(area, f.when(
            (f.col("Incident_Zip") > 11350.0) & (f.col("Incident_Zip") < 11700.0) & (f.col(area).isNull()),
            "QUEENS").otherwise(f.col(area)))
    return df_311

In [26]:
nyc_311_df = update_burough_city_from_zip_code(nyc_311_df)

### Remove records with no city values and the issues that are not closed

In [27]:
def drop_empty_null_values(df_311):
    df_311 = df_311.filter((f.col('City').isNotNull()) & (f.col('City') != "") & (f.col('Closed_Date').isNotNull()) & (
            f.col('Closed_Date') != ""))
    return df_311

In [28]:
nyc_311_df = drop_empty_null_values(nyc_311_df)

In [29]:
def calculate_time_to_resolve_in_seconds(df_311):
    time_fmt = "dd/MM/yyyy HH:mm:ss"
    time_fmt2 = "MM/dd/yyyy HH:mm:ss"
    time_diff = f.when(f.to_timestamp(df_311.Closed_Date, time_fmt).isNull(), f.unix_timestamp('Closed_Date',
                                                                                               format=time_fmt2) - f.unix_timestamp(
        'Created_Date', format=time_fmt)).otherwise(
        f.unix_timestamp('Closed_Date', format=time_fmt) - f.unix_timestamp('Created_Date', format=time_fmt))
    return df_311.withColumn("Time_to_Resolve", time_diff)

In [30]:
nyc_311_df = calculate_time_to_resolve_in_seconds(nyc_311_df)

In [31]:
nyc_311_df.printSchema()

root
 |-- Unique_Key: string (nullable = true)
 |-- Created_Date: string (nullable = true)
 |-- Closed_Date: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Complaint_Type: string (nullable = true)
 |-- Descriptor: string (nullable = true)
 |-- Location_Type: string (nullable = true)
 |-- Incident_Zip: float (nullable = true)
 |-- Address_Type: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Open_Data_Channel_Type: string (nullable = true)
 |-- Time_to_Resolve: long (nullable = true)



In [32]:
nyc_311_df.show()

+----------+--------------------+--------------------+------+-------------------+----------------+--------------------+------------+------------+-------------+------+-------------+----------------------+---------------+
|Unique_Key|        Created_Date|         Closed_Date|Agency|     Complaint_Type|      Descriptor|       Location_Type|Incident_Zip|Address_Type|         City|Status|      Borough|Open_Data_Channel_Type|Time_to_Resolve|
+----------+--------------------+--------------------+------+-------------------+----------------+--------------------+------------+------------+-------------+------+-------------+----------------------+---------------+
|  38070822|01/01/2018 12:00:...|01/06/2018 09:31:...|   HPD|     HEAT/HOT WATER|  APARTMENT ONLY|RESIDENTIAL BUILDING|     10468.0|     ADDRESS|        BRONX|Closed|        BRONX|                MOBILE|       13033886|
|  38071530|01/01/2018 12:01:...|01/01/2018 02:13:...|  NYPD|   Blocked Driveway|       No Access|     Street/Sidewalk| 