# NYPD_CALL_FOR_SERVICE:

a. To get the required columns, use this module:


1.   get_area_of_interest(df_spark, interested_columns)


b. Preprocessing pipeline: Pass your data through these functions. (if your columns fall in those categories)

1.   valid_date_check(date)
2.   valid_time_check(time)
3.   reverse_geo_code_boros(df_spark, Latitude, Longitude, Boro, lat_index, long_index)
4.   refine_age_group_race(df_spark, victim_age_group=None, suspect_age_group=None, suspect_race=None, victim_race=None)
5.   refine_sex_gender_impute(df_spark, suspect_age=None, suspect_gender=None, victim_age=None, victim_gender=None)
6.   refine_precinct_jur(df_spark, precinct=None, Jur_code=None)



In [1]:
!pip install pyspark
!pip install openclean

Collecting openclean
  Downloading openclean-0.2.1-py3-none-any.whl.metadata (9.3 kB)
Collecting openclean-core==0.4.1 (from openclean)
  Downloading openclean_core-0.4.1-py3-none-any.whl.metadata (7.6 kB)
Collecting appdirs>=1.4.4 (from openclean-core==0.4.1->openclean)
  Downloading appdirs-1.4.4-py2.py3-none-any.whl.metadata (9.0 kB)
Collecting dill (from openclean-core==0.4.1->openclean)
  Downloading dill-0.4.0-py3-none-any.whl.metadata (10 kB)
Collecting histore>=0.4.0 (from openclean-core==0.4.1->openclean)
  Downloading histore-0.4.1-py3-none-any.whl.metadata (6.1 kB)
Collecting flowserv-core>=0.8.0 (from openclean-core==0.4.1->openclean)
  Downloading flowserv_core-0.9.4-py3-none-any.whl.metadata (8.3 kB)
Collecting jellyfish (from openclean-core==0.4.1->openclean)
  Downloading jellyfish-1.2.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.6 kB)
Collecting refdata>=0.2.0 (from openclean-core==0.4.1->openclean)
  Downloading refdata-0.2.0-py3-none-any.w

In [3]:
# importing packages required
from pyspark import SparkContext, SparkConf
import os
import requests
from six.moves import urllib
import sys
import pandas as pd
import matplotlib
import matplotlib as plt
import numpy as np
import scipy as sp
import IPython
from IPython import display
import sklearn
import random
import time
import warnings
import re
import matplotlib.pyplot as plt
%matplotlib inline
from openclean.pipeline import stream
from openclean.profiling.column import DefaultColumnProfiler
from openclean.data.source.socrata import Socrata
from openclean.pipeline import stream
from openclean.function.eval.datatype import IsDatetime
import datetime
import pandas as pd
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType

In [4]:
from geopy.geocoders import ArcGIS
geocoder=ArcGIS()
#example:
geocoder.reverse('40.61157006600007, -73.74736517199995')

Location(808 Redfern Ave, Far Rockaway, New York 11691, USA, (40.611614718328, -73.747382377557, 0.0))

In [5]:
#Creating Spark Session
sc = SparkContext.getOrCreate();
spark = SparkSession(sc)

In [6]:
# Correct API endpoint with $limit for NYPD Shooting Incident Data (Historic)
fn_src = 'https://data.cityofnewyork.us/resource/d6zx-ckhd.csv?$limit=10000'
fn_dst = '/content/NYPD_Shooting_Incident_Historic_10K.csv'

# Download only if not already present
if os.path.isfile(fn_dst):
    print('File has already been downloaded', fn_dst)
else:
    print('Fetching file. This may take a while...', fn_dst)
    urllib.request.urlretrieve(fn_src, fn_dst)
    print('File %s has been downloaded' % fn_dst)

# Load into pandas DataFrame
df = pd.read_csv(fn_dst)
print("Number of rows downloaded:", len(df))


Fetching file. This may take a while... /content/NYPD_Shooting_Incident_Historic_10K.csv
File /content/NYPD_Shooting_Incident_Historic_10K.csv has been downloaded
Number of rows downloaded: 10000


In [7]:
#similarly, lets get them into pyspark rdd
def get_area_of_interest(df_spark, interested_columns):
  df_spark=df_spark.select(interested_columns)
  return df_spark

# 2. Module for date related columns

As the dataset is for the data from 2006 to 2025, we can see that there is data from unknown format of "1010-05-14" to the year 2025. We need to clean this. Over here, we remove the null values where the complaint date is <2006.

In [8]:
import datetime
def valid_date_check(date, format):
  if isinstance(date, datetime.datetime):
    date=str(date.date())
  if date==None or date==" " or date=="":
      return False
  else:
    date_cpy=date
    split_date=date.split("-")
    format_date=format.split("-")
    if len(split_date)!=3 and len(format_date)!=3:
      date=date.split("/")
      format=format.split("/")
    else:
      date=split_date
      format=format_date
    if len(date)!=3:
      return False
    try:
      year=int(date[format.index('yyyy')])
      month=int(date[format.index('mm')])
      day=int(date[format.index('dd')])
      if year>=2006 and year<=2025:
        try:
          refined_date=datetime.datetime(year, month, day)
          return True
        except:
          return False
      else:
        return False
    except:
      return False

# 3. Module for time related columns

Similarly, lets check for the time as well. Here we must have time between
the standard 24 hours.

In [9]:
#Deleting invalid time
def valid_time_check(time):
  if time==None or time==" " or time=="":
    return False
  else :
    cpy_time=time
    time=time.split(":")
    try:
      hour=int(time[0])
      mins=int(time[1])
      secs= int(time[2])
      # if hours is 24 then change it to 0 hours
      if hour == 24 and mins== 0 and secs == 0:
        hour=0
      try:
        newTime= datetime.time(hour,mins,secs)
        return True
      except :
        return False
    except:
      return False

#4. Module for Age Group and Race columns
The module works for only those columns whose column names are passed

In [10]:
def refine_age_group_race(df_spark, victim_age_group=None, suspect_age_group=None, suspect_race=None, victim_race=None):
  #params: dataframe, col names for the respective age, gender cols
  if victim_age_group:
    df_spark = df_spark.na.fill("UNKNOWN",subset=[victim_age_group])
  if suspect_age_group:
    df_spark = df_spark.na.fill("UNKNOWN",subset=[suspect_age_group])
  if suspect_race:
    df_spark = df_spark.na.fill("UNKNOWN",subset=[suspect_race])
  if victim_race:
    df_spark = df_spark.na.fill("UNKNOWN",subset=[victim_race])
  return df_spark

# 5. Module for Gender, Race Columns for suspects and victims

The module works for only those columns whose column names are passed

In [11]:
def refine_sex_gender_impute(df_spark, suspect_age=None, suspect_gender=None, victim_age=None, victim_gender=None):
  #params: dataframe, col names for the respective age, gender cols
  if suspect_age:
    df_spark=df_spark.na.fill("U",subset=[suspect_age])
  if victim_age:
    df_spark=df_spark.na.fill("U",subset=[victim_age])
  if suspect_gender:
    df_spark = df_spark.na.fill("UNKNOWN",subset=[suspect_gender])
  if victim_gender:
    df_spark = df_spark.na.fill("UNKNOWN",subset=[victim_gender])
  return df_spark

# 6.a: Module for Precinct, Jurisdiction Code:
  dropping the null values

  The module works for only those columns whose column names are passed along with the df

In [12]:
def valid_precinct_check(precinct):
  if precinct==None or precinct==" " or precinct=="":
    return False
  else :
    return True

def valid_jur_check(jur):
  if jur==None or jur==" " or jur=="":
    return False
  else :
    return True

# 6.b Module for Reverse Geocoding the boroughs using latitudes and longitudes.

1. First we will remove the rows where latitude, longitude and boroughs are null. (around 450 tuples removed)
2. Then, where the boroughs are empty, take the latitude and longitude value and reverse geocode it using the module "reverseGeocoder".
3. Impute the borough name retrived in the empty space.


### USING MASTER DATASET
In the case of geocoding, geocoder gives us the zipcodes based on the latitude and longitude values. Inturn, we can use the master dataset of zipcodes inorder to retrive the borough names



NOTE: The dataset can be downloaded from : https://data.beta.nyc/en/dataset/pediacities-nyc-neighborhoods/resource/7caac650-d082-4aea-9f9b-3681d568e8a5

In [13]:
def reverse_geo_code_boros(df_spark, Latitude, Longitude, Boro, lat_index, long_index):
  #select data where we have to impute
  df_temp_boro_clean=df_spark.filter((df_spark[Latitude].isNotNull()) & (df_spark[Longitude].isNotNull()))
  boro_cleaner=df_temp_boro_clean.filter((df_temp_boro_clean[Boro].isNull())|(df_temp_boro_clean[Boro]=='NEW YORK'))

  # print("We have "+ str(boro_cleaner.count())+ " points to impute")
  print("___intializing Zip Code Look up ____")
  print("____ imputing the points ____")


  #use your path for master dataset here.
  df_zips=pd.read_csv(dst)
  zip_master={}
  zips=df_zips['zip']
  boro=df_zips['borough']
  for i, j in zip(zips, boro):
    zip_master[i]=j
  zip_master[10020]='Manhattan'
  zip_master[11249]='Brooklyn'

  def reverseGeoCoder(latitude, longitude):
    loc=geocoder.reverse(str(latitude)+', '+str(longitude), timeout=1000)
    zipCode=str(loc).split(",")[2][-5:]
    if not int(zipCode) in zip_master:
      boro="UNKNOWN"
    else:
      boro=zip_master[int(zipCode)]
    boro=boro.upper()
    return boro

  #creating UD function
  ud_func= udf(reverseGeoCoder, StringType())
  boro_cleaned_dataframe = boro_cleaner.withColumn(Boro, ud_func(boro_cleaner[lat_index], boro_cleaner[long_index]))

  #joining the imputed dataset to the maindataset and returning

  joiner_dataset=df_spark.filter((df_spark[Latitude].isNotNull()) & (df_spark[Boro]!='NEW YORK') & (df_spark[Longitude].isNotNull()) & (df_spark[Boro].isNotNull()))
  fin_df=joiner_dataset.union(boro_cleaned_dataframe)
  return fin_df

NYPD Call for service is of 20M Rows of data.

The size of dataset ~ 20M tuples.
So, we need around 500 data points for 95% confidence level
with 10% interval.

The size of data is almost 1% of the data. So we can get it into our df now

In [14]:
df_spark=spark.read.option("header",True).csv(fn_dst,inferSchema=True)
df_spark=df_spark.sample(0.5)
df_spark.count()

5036

# PROFILING TO CHECK FOR NULL VALUES IN ALL THE COLUMNS

In [15]:
pandasDF = df_spark.toPandas()
ds=stream(pandasDF)
#Creating profile of our dataset
profiles = ds.profile(default_profiler=DefaultColumnProfiler)
profiles.stats()

Unnamed: 0,total,empty,distinct,uniqueness,entropy
objectid,5036,5036,0,,
cad_evnt_id,5036,0,4982,0.989277,12.276617
create_date,5036,0,24,0.004766,3.344741
incident_date,5036,0,24,0.004766,3.344855
incident_time,5036,0,4846,0.962272,12.222006
nypd_pct_cd,5036,0,45,0.008936,5.028001
boro_nm,5036,0,4,0.000794,0.920257
patrl_boro_nm,5036,0,7,0.00139,1.78064
geo_cd_x,5036,0,2751,0.546267,10.961646
geo_cd_y,5036,0,2837,0.563344,11.012739


## a. Select the columns that are common with the original dataset:
'cad_evnt_id',

'incident_date',

'incident_time',

'boro_nm',

'latitude',

'longitude'

'patrol_boro_nm'

**We can consider the primary key along with this**
**"cad_evnt_id"**


In [16]:
df_spark.printSchema()

root
 |-- objectid: string (nullable = true)
 |-- cad_evnt_id: integer (nullable = true)
 |-- create_date: timestamp (nullable = true)
 |-- incident_date: timestamp (nullable = true)
 |-- incident_time: timestamp (nullable = true)
 |-- nypd_pct_cd: integer (nullable = true)
 |-- boro_nm: string (nullable = true)
 |-- patrl_boro_nm: string (nullable = true)
 |-- geo_cd_x: integer (nullable = true)
 |-- geo_cd_y: integer (nullable = true)
 |-- radio_code: string (nullable = true)
 |-- typ_desc: string (nullable = true)
 |-- cip_jobs: string (nullable = true)
 |-- add_ts: timestamp (nullable = true)
 |-- disp_ts: timestamp (nullable = true)
 |-- arrivd_ts: timestamp (nullable = true)
 |-- closng_ts: timestamp (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- location: string (nullable = true)



In [20]:
df_spark = spark.read.option("header", True).csv("/content/NYPD_Shooting_Incident_Historic_10K.csv")

interested_columns_1 = ['CAD_EVNT_ID', 'INCIDENT_DATE', 'BORO_NM', 'Latitude', 'Longitude', 'PATRL_BORO_NM']
df_spark_get_area_of_interest = df_spark.select(*interested_columns_1)
df_spark_get_area_of_interest.show(5)

+-----------+--------------------+---------+------------------+-------------------+--------------------+
|CAD_EVNT_ID|       INCIDENT_DATE|  BORO_NM|          Latitude|          Longitude|       PATRL_BORO_NM|
+-----------+--------------------+---------+------------------+-------------------+--------------------+
|   53138028|2018-11-25T00:00:...| BROOKLYN|40.695367447000081|-73.983243424999955|PATROL BORO BKLYN...|
|   52752600|2018-11-12T00:00:...|MANHATTAN|40.808142838000038|-73.945300202999988|PATROL BORO MAN N...|
|   53130520|2018-11-25T00:00:...| BROOKLYN|40.694953110000029|-73.931624663999969|PATROL BORO BKLYN...|
|   53143326|2018-11-26T00:00:...| BROOKLYN|40.688594490000071|-73.949534163999942|PATROL BORO BKLYN...|
|   52976279|2018-11-19T00:00:...| BROOKLYN|40.628052893000074|-73.996449312999971|PATROL BORO BKLYN...|
+-----------+--------------------+---------+------------------+-------------------+--------------------+
only showing top 5 rows



## b. Lets pass the dataset through the preprocessing pipeline

In [21]:
df_temp=df_spark.rdd

1. Date and Time

In [22]:
pandasDF.head()

Unnamed: 0,objectid,cad_evnt_id,create_date,incident_date,incident_time,nypd_pct_cd,boro_nm,patrl_boro_nm,geo_cd_x,geo_cd_y,radio_code,typ_desc,cip_jobs,add_ts,disp_ts,arrivd_ts,closng_ts,latitude,longitude,location
0,,52752600,2018-11-12,2018-11-12,2025-05-09 00:21:22,28,MANHATTAN,PATROL BORO MAN NORTH,999393,233707,75D,VISIBILITY PATROL: DIRECTED,Non CIP,2018-11-12 00:21:22,2018-11-12 00:21:22,2018-11-12 00:21:23,2018-11-12 00:43:01,40.808143,-73.9453,POINT (-73.94530020299999 40.80814283800004)
1,,53908689,2018-12-23,2018-12-23,2025-05-09 01:36:05,13,MANHATTAN,PATROL BORO MAN SOUTH,986996,207138,75S,STATION INSPECTION BY TRANSIT BUREAU PERSONNEL,Non CIP,2018-12-23 01:36:05,2018-12-23 01:36:06,2018-12-23 01:36:07,2018-12-23 01:42:59,40.73523,-73.990093,POINT (-73.99009289199995 40.73523041000004)
2,,53919706,2018-12-23,2018-12-23,2025-05-09 13:25:25,13,MANHATTAN,PATROL BORO MAN SOUTH,986996,207138,68Q9,SEE COMPLAINANT: OTHER/TRANSIT,Non CIP,2018-12-23 13:25:25,2018-12-23 13:27:40,2018-12-23 13:48:30,2018-12-23 13:52:18,40.73523,-73.990093,POINT (-73.99009289199995 40.73523041000004)
3,,53877333,2018-12-21,2018-12-21,2025-05-09 22:42:52,14,MANHATTAN,PATROL BORO MAN SOUTH,990748,213516,10H9,INVESTIGATE/POSSIBLE CRIME: CALLS FOR HELP/TRA...,Non CIP,2018-12-21 22:42:52,2018-12-21 22:47:11,2018-12-21 22:48:20,2018-12-21 22:53:02,40.752734,-73.976548,POINT (-73.97654816699998 40.75273446700004)
4,,53922339,2018-12-23,2018-12-23,2025-05-09 15:32:02,18,MANHATTAN,PATROL BORO MAN SOUTH,986383,218546,54S1,AMBULANCE CASE: SERIOUS/INSIDE,Non CIP,2018-12-23 15:32:02,2018-12-23 15:47:41,NaT,2018-12-23 17:32:18,40.766543,-73.992301,POINT (-73.99230119999999 40.766542661000074)


### From data profiling we found that the date and time has no NULL Values

In [None]:
# checking date and time for null values

df_temp_=df_temp.map(lambda x:(x, valid_date_check(x[1], "yyyy-mm-dd"))).filter(lambda x: x[1]==True)
df_temp=df_temp_.map(lambda x: x[0])

In [None]:
df_temp.take(1)

[Row(cad_evnt_id=66231134, incident_date=datetime.datetime(2020, 2, 17, 0, 0), boro_nm='BROOKLYN', latitude=40.67780157, longitude=-73.87134797, patrl_boro_nm='PATROL BORO BKLYN NORTH')]

3. Geocoding

In [None]:
#geospacial attributes imputation

df_temp=df_temp.toDF(schema=df_spark.schema)
df_spk=reverse_geo_code_boros(df_temp, 'latitude', 'longitude', 'boro_nm', -2, -1)

___intializing Zip Code Look up ____
____ imputing the points ____


Lets profile the data now.

In [None]:
pandasDF = df_spk.toPandas()
ds=stream(pandasDF)

#Creating profile of our dataset
profiles = ds.profile(default_profiler=DefaultColumnProfiler)
profiles.stats()

Unnamed: 0,total,empty,distinct,uniqueness,entropy
cad_evnt_id,467,0,467,1.0,8.867279
incident_date,467,0,7,0.014989,2.315489
boro_nm,467,0,2,0.004283,0.399457
latitude,467,0,236,0.505353,7.393382
longitude,467,0,236,0.505353,7.393382
patrl_boro_nm,467,0,2,0.004283,0.399457
