# NYPD Motor Vehicle Collisions:

a. To get the required columns, use this module: 


1.   get_area_of_interest(df_spark, interested_columns)


b. Preprocessing pipeline: Pass your data through these functions. (if your columns fall in those categories)

1.   valid_date_check(date)
2.   valid_time_check(time)
3.   reverse_geo_code_boros(df_spark, Latitude, Longitude, Boro, lat_index, long_index)
4.   refine_age_group_race(df_spark, victim_age_group=None, suspect_age_group=None, suspect_race=None, victim_race=None)
5.   refine_sex_gender_impute(df_spark, suspect_age=None, suspect_gender=None, victim_age=None, victim_gender=None)
6.   refine_precinct_jur(df_spark, precinct=None, Jur_code=None)



In [None]:
!pip install pyspark
!pip install openclean



In [None]:
#importing packages required
from pyspark import SparkContext, SparkConf
import os
import requests
from six.moves import urllib
import sys 
import pandas as pd
import matplotlib 
import matplotlib as plt
import numpy as np
import scipy as sp
import IPython
from IPython import display
import sklearn
import random
import time
import warnings
import re
import matplotlib.pyplot as plt
%matplotlib inline
from openclean.pipeline import stream
from openclean.profiling.column import DefaultColumnProfiler
from openclean.data.source.socrata import Socrata
from openclean.pipeline import stream
from openclean.function.eval.datatype import IsDatetime
import datetime
import pandas as pd
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType

In [None]:
from geopy.geocoders import ArcGIS
geocoder=ArcGIS()
#example:
geocoder.reverse('40.61157006600007, -73.74736517199995')

Location(11-64 Redfern Ave, Far Rockaway, New York 11691, USA, (40.61161616586613, -73.74738361194636, 0.0))

In [None]:
#Creating Spark Session
sc = SparkContext.getOrCreate();
spark = SparkSession(sc)

In [None]:
#Downloading file from NYC Open Data
fn_src = 'https://data.cityofnewyork.us/api/views/h9gi-nx95/rows.csv?accessType=DOWNLOAD'
fn_dst = '/content/Motor_Vehicle_Collission.csv'

from six.moves import urllib

if os.path.isfile(fn_dst):
    print('File %s has already been downloaded' % fn_dst)
else:
    print('Fetching file %s. This may take a while...' % fn_dst)
    urllib.request.urlretrieve(fn_src, fn_dst)
    print('File %s has been downloaded' % fn_dst)

Fetching file /content/Motor_Vehicle_Collission.csv. This may take a while...
File /content/Motor_Vehicle_Collission.csv has been downloaded


In [None]:
src = 'https://data.beta.nyc/dataset/0ff93d2d-90ba-457c-9f7e-39e47bf2ac5f/resource/7caac650-d082-4aea-9f9b-3681d568e8a5/download/nyc_zip_borough_neighborhoods_pop.csv'
dst = '/content/nyc_zip_borough_neighborhoods_pop.csv'

#https://data.cityofnewyork.us/resource/h9gi-nx95.csv

from six.moves import urllib

if os.path.isfile(dst):
    print('File %s has already been downloaded' % dst)
else:
    urllib.request.urlretrieve(src, dst)
    print('File %s has been downloaded' % dst)

File /content/nyc_zip_borough_neighborhoods_pop.csv has been downloaded


In [None]:
#similarly, lets get them into pyspark rdd
def get_area_of_interest(df_spark, interested_columns):
  df_spark=df_spark.select(interested_columns)
  return df_spark

## Data Profiling

In [None]:
#Using openclean for finding anomalies
from openclean.pipeline import stream
ds = stream(fn_dst)

In [None]:
#Creating profile of our dataset
from openclean.profiling.column import DefaultColumnProfiler
profiles = ds.profile(default_profiler=DefaultColumnProfiler)

In [None]:
#CMPLNT_NUM, RPT_DT, KY_CD, LAW_CAT_CD have no null values
profiles.stats()

Unnamed: 0,total,empty,distinct,uniqueness,entropy
CRASH DATE,1847668,0,3445,0.001865,11.686566
CRASH TIME,1847668,0,1440,0.000779,8.930995
BOROUGH,1847668,570984,5,4e-06,2.118367
ZIP CODE,1847668,571206,232,0.000182,7.221091
LATITUDE,1847668,215664,122513,0.075069,15.633984
LONGITUDE,1847668,215664,96061,0.058861,15.34288
LOCATION,1847668,215664,244712,0.149946,16.186128
ON STREET NAME,1847668,378084,16135,0.010979,10.59973
CROSS STREET NAME,1847668,665517,19279,0.016308,11.809301
OFF STREET NAME,1847668,1561953,180506,0.631769,16.921074


In [None]:
#Finding outliers i complaint dates
from openclean.profiling.anomalies.sklearn import DBSCANOutliers

incident_dates = ds.distinct('CRASH DATE')
DBSCANOutliers().find(incident_dates)

[]

In [None]:
#Finding outliers 
DBSCANOutliers(eps=0.05).find(incident_dates)

['02/03/2014',
 '03/06/2015',
 '02/22/2020',
 '02/20/2020',
 '12/15/2017',
 '11/15/2018',
 '01/21/2014',
 '02/02/2020',
 '05/19/2017',
 '01/18/2015',
 '12/21/2012']

# 2. Module for date related columns

As the dataset is for the data from 2012 to 2021, we can see that there is data from unknown format of "1010-05-14" to the year 2020. We need to clean this. 

In [None]:
# fileName='1010-05-14 00:00:00'
# # matches=re.search("([0-9]{4}\-[0-9]{2}\-[0-9]{2})", fileName)
# re.search(r'([0-9]{4}\-[0-9]{2}\-[0-9]{2})', fileName).group(0)

def valid_date_check(date):
  if date==None or date==" " or date=="":
      return False
  else:
    date_cpy=date
    date=date.split("/")
    try:
      month=int(date[0])
      day= int(date[1])
      year=int(date[2])
      if year>=2012 and year<=2021:
        try:
          refined_date=datetime.datetime(year, month, day)
          return True
        except:
          return False
      else:
        return False
    except:
      return False

# 3. Module for time related columns

Similarly, lets check for the time as well. Here we must have time between 
the standard 24 hours.

In [None]:
#Deleting invalid time
def valid_time_check(time):
  if time==None or time==" " or time=="":
    return False
  else :
    cpy_time=time
    time=time.split(":")
    try:
      hour=int(time[0])
      mins=int(time[1])
      # if hours is 24 then change it to 0 hours
      if hour == 24 and mins== 0:
        hour=0
      try:
        newTime= datetime.time(hour,mins)
        return True
      except :
        return False
    except:
      return False

# 6.b Module for Reverse Geocoding the boroughs using latitudes and longitudes.

1. First we will remove the rows where latitude, longitude and boroughs are null. (around 450 tuples removed)
2. Then, where the boroughs are empty, take the latitude and longitude value and reverse geocode it using the module "reverseGeocoder".
3. Impute the borough name retrived in the empty space.


### USING MASTER DATASET
In the case of geocoding, geocoder gives us the zipcodes based on the latitude and longitude values. Inturn, we can use the master dataset of zipcodes inorder to retrive the borough names



NOTE: The dataset can be downloaded from : https://data.beta.nyc/en/dataset/pediacities-nyc-neighborhoods/resource/7caac650-d082-4aea-9f9b-3681d568e8a5

In [None]:
def reverse_geo_code_boros(df_spark, Latitude, Longitude, Boro, lat_index, long_index):
  #select data where we have to impute
  df_temp_boro_clean=df_spark.filter((df_spark[Latitude].isNotNull()) & (df_spark[Longitude].isNotNull()))
  boro_cleaner=df_temp_boro_clean.filter((df_temp_boro_clean[Boro].isNull())|(df_temp_boro_clean[Boro]=='NEW YORK'))

  # print("We have "+ str(boro_cleaner.count())+ " points to impute")
  print("___intializing Zip Code Look up ____")
  print("____ imputing the points ____")


  #use your path for master dataset here. 
  df_zips=pd.read_csv(dst)
  zip_master={}
  zips=df_zips['zip']
  boro=df_zips['borough']
  for i, j in zip(zips, boro):
    zip_master[i]=j
  zip_master[10020]='Manhattan'
  zip_master[11249]='Brooklyn'

  def reverseGeoCoder(latitude, longitude):
    loc=geocoder.reverse(str(latitude)+', '+str(longitude), timeout=1000)
    zipCode=str(loc).split(",")[2][-5:]
    if not int(zipCode) in zip_master:
      boro="UNKNOWN"
    else:
      boro=zip_master[int(zipCode)]
    boro=boro.upper()
    return boro

  #creating UD function
  ud_func= udf(reverseGeoCoder, StringType())
  boro_cleaned_dataframe = boro_cleaner.withColumn(Boro, ud_func(boro_cleaner[lat_index], boro_cleaner[long_index]))

  #joining the imputed dataset to the maindataset and returning

  joiner_dataset=df_spark.filter((df_spark[Latitude].isNotNull()) & (df_spark[Boro]!='NEW YORK') & (df_spark[Longitude].isNotNull()) & (df_spark[Boro].isNotNull()))
  fin_df=joiner_dataset.union(boro_cleaned_dataframe)
  return fin_df

The size of dataset ~ 5M tuples. So, we need around 5000 data points for 95% confidence level with 1% interval. The size of data is almost 0.1% of the data. So we can get it into our df now

In [None]:
df_spark=spark.read.option("header",True).csv(fn_dst,inferSchema=True)
df_spark=df_spark.sample(0.001)
df_spark.count()

1891

# PROFILING TO CHECK FOR NULL VALUES IN ALL THE COLUMNS

In [None]:
pandasDF = df_spark.toPandas()
ds=stream(pandasDF)
#Creating profile of our dataset
profiles = ds.profile(default_profiler=DefaultColumnProfiler)
profiles.stats()

Unnamed: 0,total,empty,distinct,uniqueness,entropy
CRASH DATE,1891,0,1440,0.761502,10.374932
CRASH TIME,1891,0,601,0.317821,8.449732
BOROUGH,1891,600,5,0.003873,2.105473
ZIP CODE,1891,601,171,0.132558,7.114951
LATITUDE,1891,221,1608,0.962874,10.630929
LONGITUDE,1891,221,1602,0.959281,10.622839
LOCATION,1891,221,1617,0.968263,10.642159
ON STREET NAME,1891,382,857,0.567926,9.307246
CROSS STREET NAME,1891,673,903,0.741379,9.573535
OFF STREET NAME,1891,1601,290,1.0,8.179909


## a. Select the columns that are common with the original dataset:
1. CRASH_DATE
2. CRASH_TIME
3. BOROUGH
4. LATITUDE
5. LONGITUDE


We can consider the primary key along with this
1. COLLISSION_ID


In [None]:
interested_columns_1=['COLLISION_ID', 'CRASH DATE','CRASH TIME','BOROUGH', 'LATITUDE', 'LONGITUDE']
df_spark=get_area_of_interest(df_spark, interested_columns_1)

## b. Lets pass the dataset through the preprocessing pipeline

In [None]:
df_temp=df_spark.rdd

1. Date validation

In [None]:
df_temp_=df_temp.map(lambda x:(x, valid_date_check(str(x[1])))).filter(lambda x: x[1]==True)
df_temp=df_temp_.map(lambda x: x[0])

In [None]:
df_temp.count()

1891

2. Time Validation

In [None]:
df_temp_=df_temp.map(lambda x:(x, valid_time_check(str(x[2])))).filter(lambda x: x[1]==True)
df_temp=df_temp_.map(lambda x: x[0])

In [None]:
df_temp.count()

1891

3. Geocoding

In [None]:
#geospacial attributes imputation
df_temp=df_temp.toDF(schema=df_spark.schema)
df_spk=reverse_geo_code_boros(df_temp, 'LATITUDE', 'LONGITUDE', 'BOROUGH', -2, -1)

___intializing Zip Code Look up ____
____ imputing the points ____


4. Jurisdiction Code, Precinct check

In [None]:
df_temp=df_spk.rdd

df_spark=df_temp.toDF(schema=df_spark.schema)

Lets profile the data now.

In [None]:
pandasDF = df_spk.toPandas()
ds=stream(pandasDF)

#Creating profile of our dataset
profiles = ds.profile(default_profiler=DefaultColumnProfiler)
profiles.stats()

Unnamed: 0,total,empty,distinct,uniqueness,entropy
COLLISION_ID,1670,0,1670,1.0,10.705632
CRASH DATE,1670,0,1310,0.784431,10.249069
CRASH TIME,1670,0,563,0.337126,8.398961
BOROUGH,1670,0,6,0.003593,2.128604
LATITUDE,1670,0,1608,0.962874,10.630929
LONGITUDE,1670,0,1602,0.959281,10.622839


### PRECISION AND RECALL

1. True Positive = 1670
2. selected elements = 1891
3. Relevant elements = 1670

4. precision= 1670/1891
5. recall = 1670/1670