In [120]:
!pip install pyspark
!pip install openclean



In [1]:

#importing packages required
from pyspark import SparkContext, SparkConf
import os
import requests
from six.moves import urllib
import sys 
import pandas as pd
import matplotlib 
import matplotlib as plt
import numpy as np
import scipy as sp
import IPython
from IPython import display
import sklearn
import random
import time
import warnings
import re
import matplotlib.pyplot as plt
%matplotlib inline
from openclean.pipeline import stream
from openclean.profiling.column import DefaultColumnProfiler
from openclean.data.source.socrata import Socrata
from openclean.pipeline import stream
from openclean.function.eval.datatype import IsDatetime
import datetime
import pandas as pd
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType

In [2]:
from geopy.geocoders import ArcGIS
geocoder=ArcGIS()
#example:
geocoder.reverse('40.61157006600007, -73.74736517199995')

Location(11-64 Redfern Ave, Far Rockaway, New York 11691, USA, (40.61161616586613, -73.74738361194636, 0.0))

In [3]:
#Creating Spark Session
sc = SparkContext.getOrCreate();
spark = SparkSession(sc)

In [4]:
#Downloading file from NYC Open Data
fn_src = 'https://data.cityofnewyork.us/api/views/qgea-i56i/rows.csv?accessType=DOWNLOAD'
fn_dst = '/content/drive/MyDrive/NYPD_Complaint_Data_Historic.csv'

#https://data.cityofnewyork.us/resource/h9gi-nx95.csv

from six.moves import urllib

if os.path.isfile(fn_dst):
    print('File %s has already been downloaded' % fn_dst)
else:
    print('Fetching file %s[2.4GB]. This may take a while...' % fn_dst)
    urllib.request.urlretrieve(fn_src, fn_dst)
    print('File %s has been downloaded' % fn_dst)

File /content/drive/MyDrive/NYPD_Complaint_Data_Historic.csv has already been downloaded


In [5]:
#Using openclean for finding anomalies
ds = stream(fn_dst)

# Lets Profile the data first and check for number of null values present in each columns. 
CMPLNT_NUM, RPT_DT, KY_CD, LAW_CAT_CD have no null values

Also, based on the qualitative analysis, lets include our area of interest (columns to consider).

In [132]:
#Creating profile of our dataset
profiles = ds.profile(default_profiler=DefaultColumnProfiler)
profiles.stats()

In [8]:
# Lets check the all the data types and their unique values' count for each column.

print('Schema\n------')
for col in ds.columns:
    p = profiles.column(col)
    print("  '{}' ({})".format(col, p['datatypes']['distinct'].most_common(2)))

In [6]:
#lets load the data to spark
df_spark=spark.read.option("header",True).csv(fn_dst,inferSchema=True) # data set 1.

# 1. We can consider the following columns as our area of interest:

1. CMPLNT_NUM	
Randomly generated persistent ID for each complaint
2. CMPLNT_FR_DT	
Exact date of occurrence for the reported event 
3. CMPLNT_FR_TM	
Exact time of occurrence for the reported event
4. ADDR_PCT_CD	
The precinct in which the incident occurred
5. KY_CD	
Three digit offense classification code
6. LAW_CAT_CD	
Level of offense: felony, misdemeanor, violation
7. BORO_NM	
The name of the borough in which the incident occurred
8. PREM_TYP_DESC	
Specific description of premises; grocery store, residence, street, etc.
9. VIC_AGE_GROUP	
Victim’s Age Group
10. VIC_RACE	
Victim’s Race Description
11. VIC_SEX	
Victim’s Sex Description
12. Latitude
13. Longitude
14. SUSP_AGE_GROUP	
Suspect’s Age Group
15. SUSP_RACE	
Suspect’s Race Description
16. SUSP_SEX	
Suspect’s Sex Description
17. JURISDICTION_CODE
18. PATROL_BORO
19. PD_CD
20. HOUSING_PSA	
Development Level Code

In [7]:
#similarly, lets get them into pyspark rdd
def get_area_of_interest(df_spark, interested_columns):
  df_spark=df_spark.select(interested_columns)
  return df_spark

In [8]:
interested_columns=['CMPLNT_NUM','CMPLNT_FR_DT','CMPLNT_FR_TM', 'ADDR_PCT_CD', 'KY_CD', 'LAW_CAT_CD', 'LAW_CAT_CD', 'BORO_NM', 'PREM_TYP_DESC', 'VIC_AGE_GROUP', 
           'VIC_RACE', 'VIC_SEX', 'Latitude', 'Longitude', 'SUSP_AGE_GROUP', 'SUSP_RACE', 'SUSP_SEX', 'JURISDICTION_CODE', 'PATROL_BORO', 'PD_CD','HOUSING_PSA']


In [24]:
df_spark=get_area_of_interest(df_spark, interested_columns)

In [25]:
df_temp=df_spark.rdd
# temp=df_temp.toDF(schema=df_spark.columns)

#2. Lets work with the dates first

As the dataset is for the data from 2006 to 2020, we can see that there is data from unknown format of "1010-05-14" to the year 2020. We need to clean this. Over here, we remove the null values where the complaint date is <2006. 

In [26]:
# fileName='1010-05-14 00:00:00'
# # matches=re.search("([0-9]{4}\-[0-9]{2}\-[0-9]{2})", fileName)
# re.search(r'([0-9]{4}\-[0-9]{2}\-[0-9]{2})', fileName).group(0)

def valid_date_check(date):
  if date==None or date==" " or date=="":
      return False
  else:
    date_cpy=date
    date=date.split("/")
    try:
      month=int(date[0])
      day= int(date[1])
      year=int(date[2])
      if year>=2006 and year<=2020:
        try:
          refined_date=datetime.datetime(year, month, day)
          return True
        except:
          return False
      else:
        return False
    except:
      return False

In [27]:
#filter the dates with proper format for Column-2 (CMPLNT_FR_DT)
df_temp_=df_temp.map(lambda x:(x, valid_date_check(x[1]))).filter(lambda x: x[1]==True)
df_temp=df_temp_.map(lambda x: x[0])

# 3. Lets work on time related columns

Similarly, lets check for the time as well. Here we must have time between 
the standard 24 hours.


In [28]:
#Deleting invalid time
def valid_time_check(time):
  if time==None or time==" " or time=="":
    return False
  else :
    cpy_time=time
    time=time.split(":")
    try:
      hour=int(time[0])
      mins=int(time[1])
      secs= int(time[2])
      # if hours is 24 then change it to 0 hours
      if hour == 24 and mins== 0 and secs == 0:
        hour=0
      try:
        newTime= datetime.time(hour,mins,secs)
        return True
      except :
        return False
    except:
      return False

In [29]:
df_temp_=df_temp.map(lambda x:(x, valid_time_check(x[2]))).filter(lambda x: x[1]==True)
df_temp=df_temp_.map(lambda x: x[0])

In [30]:
df_temp.take(2)

[Row(CMPLNT_NUM=394506329, CMPLNT_FR_DT='12/31/2019', CMPLNT_FR_TM='17:30:00', ADDR_PCT_CD=32, KY_CD=118, LAW_CAT_CD='FELONY', LAW_CAT_CD='FELONY', BORO_NM='MANHATTAN', PREM_TYP_DESC='STREET', VIC_AGE_GROUP='UNKNOWN', VIC_RACE='UNKNOWN', VIC_SEX='E', Latitude=40.82092679700002, Longitude=-73.94332421899996, SUSP_AGE_GROUP=None, SUSP_RACE=None, SUSP_SEX=None, JURISDICTION_CODE=0, PATROL_BORO='PATROL BORO MAN NORTH', PD_CD=793, HOUSING_PSA=None),
 Row(CMPLNT_NUM=968873685, CMPLNT_FR_DT='12/29/2019', CMPLNT_FR_TM='16:31:00', ADDR_PCT_CD=47, KY_CD=113, LAW_CAT_CD='FELONY', LAW_CAT_CD='FELONY', BORO_NM='BRONX', PREM_TYP_DESC='STREET', VIC_AGE_GROUP='UNKNOWN', VIC_RACE='UNKNOWN', VIC_SEX='E', Latitude=40.885701406000074, Longitude=-73.86164032499995, SUSP_AGE_GROUP=None, SUSP_RACE=None, SUSP_SEX=None, JURISDICTION_CODE=0, PATROL_BORO='PATROL BORO BRONX', PD_CD=729, HOUSING_PSA=None)]

#4. Lets refine the Age Group and Race columns
The module works for only those columns whose column names are passed

In [31]:
def refine_age_group_race(df_spark, victim_age_group=None, suspect_age_group=None, suspect_race=None, victim_race=None):
  #params: dataframe, col names for the respective age, gender cols
  if victim_age_group:
    df_spark = df_spark.na.fill("UNKNOWN",subset=[victim_age_group])
  if suspect_age_group:
    df_spark = df_spark.na.fill("UNKNOWN",subset=[suspect_age_group])
  if suspect_race:
    df_spark = df_spark.na.fill("UNKNOWN",subset=[suspect_race])
  if victim_race:
    df_spark = df_spark.na.fill("UNKNOWN",subset=[victim_race])
  return df_spark

In [32]:
df_temp=refine_age_group_race(df_spark, "VIC_AGE_GROUP", "SUSP_AGE_GROUP", 'SUSP_RACE', 'VIC_RACE')

# 5. Lets refine the Gender, Race Columns for suspects and victims

The module works for only those columns whose column names are passed

In [33]:
def refine_sex_gender_impute(df_spark, suspect_age=None, suspect_gender=None, victim_age=None, victim_gender=None):
  #params: dataframe, col names for the respective age, gender cols
  if suspect_age:
    df_spark=df_spark.na.fill("U",subset=[suspect_age])
  if victim_age:
    df_spark=df_spark.na.fill("U",subset=[victim_age])
  if suspect_gender:
    df_spark = df_spark.na.fill("UNKNOWN",subset=[suspect_gender])
  if victim_gender:
    df_spark = df_spark.na.fill("UNKNOWN",subset=[victim_gender])
  return df_spark

In [34]:
df_temp=refine_sex_gender_impute(df_temp, None, 'SUSP_SEX', None, 'VIC_SEX')

# 6. Geospacial Attributes Imputation: 

## 6.a: Precinct, Jurisdiction Code:
  dropping the null values

  The module works for only those columns whose column names are passed along with the df

In [35]:
def valid_precinct_check(precinct):
  if precinct==None or precinct==" " or precinct=="":
    return False
  else :
    return True

def valid_jur_check(jur):
  if jur==None or jur==" " or jur=="":
    return False
  else :
    return True

In [36]:
df_temp=df_temp.rdd

df_temp_=df_temp.map(lambda x:(x, valid_precinct_check(x[3]))).filter(lambda x: x[1]==True)
df_temp=df_temp_.map(lambda x: x[0])

df_temp_=df_temp.map(lambda x:(x, valid_jur_check(x[-4]))).filter(lambda x: x[1]==True)
df_temp=df_temp_.map(lambda x: x[0])

In [37]:
df_temp=df_temp.toDF(schema=df_spark.schema)

## 6.b Reverse Geocoding the boroughs using latitudes and longitudes.

1. First we will remove the rows where latitude, longitude and boroughs are null. (around 450 tuples removed)
2. Then, where the boroughs are empty, take the latitude and longitude value and reverse geocode it using the module "reverseGeocoder".
3. Impute the borough name retrived in the empty space.

So, lets remove the rows where latitudes and longitudes are NULL. After that, we can take only those rows where BORO_NM is Null. So, we have 5000 points of BORO_NM to impute using latitudes and longitudes. These are the rows where BORO_NM is null

In [38]:
df_temp_boro_clean=df_temp.filter((df_temp.Latitude.isNotNull()) & (df_temp.Longitude.isNotNull()))

In [39]:
boro_cleaner=df_temp_boro_clean.filter(df_temp_boro_clean.BORO_NM.isNull())

In [40]:
boro_cleaner.count()

5049

### USING MASTER DATASET
In the case of geocoding, geocoder gives us the zipcodes based on the latitude and longitude values. Inturn, we can use the master dataset of zipcodes inorder to retrive the borough names



NOTE: The dataset can be downloaded from : https://data.beta.nyc/en/dataset/pediacities-nyc-neighborhoods/resource/7caac650-d082-4aea-9f9b-3681d568e8a5

In [41]:
#use your path for master dataset here. 
df_zips=pd.read_csv("/content/drive/MyDrive/nyc_zip_borough_neighborhoods_pop.csv")
zip_master={}
zips=df_zips['zip']
boro=df_zips['borough']
for i, j in zip(zips, boro):
  zip_master[i]=j

In [42]:
zip_master[10020]='Manhattan'
zip_master[11249]='Brooklyn'

We stored the zip codes and their corresponding borough names in the dictionary "zip_master" in the form of a look-up because, it takes O(1) time to retrive borough names hence reduces parse time.

In [43]:
def reverseGeoCoder(latitude, longitude):
  loc=geocoder.reverse(str(latitude)+', '+str(longitude))
  zipCode=str(loc).split(",")[2][-5:]
  if not int(zipCode) in zip_master:
    boro="UNKNOWN"
  else:
    boro=zip_master[int(zipCode)]
  boro=boro.upper()
  return boro

In [44]:
ud_func= udf(reverseGeoCoder, StringType())
boro_cleaned_dataframe = boro_cleaner.withColumn("BORO_NM", ud_func(boro_cleaner[12], boro_cleaner[13]))

In [45]:
boro_cleaned_dataframe.take(10)

[Row(CMPLNT_NUM=375517764, CMPLNT_FR_DT='12/22/2019', CMPLNT_FR_TM='14:32:00', ADDR_PCT_CD=25, KY_CD=343, LAW_CAT_CD='MISDEMEANOR', LAW_CAT_CD='MISDEMEANOR', BORO_NM='MANHATTAN', PREM_TYP_DESC='TRANSIT - NYC SUBWAY', VIC_AGE_GROUP='UNKNOWN', VIC_RACE='UNKNOWN', VIC_SEX='E', Latitude=40.80093037300003, Longitude=-73.94109824099996, SUSP_AGE_GROUP='25-44', SUSP_RACE='BLACK', SUSP_SEX='M', JURISDICTION_CODE=1, PATROL_BORO='PATROL BORO MAN NORTH', PD_CD=475, HOUSING_PSA=None),
 Row(CMPLNT_NUM=598430372, CMPLNT_FR_DT='12/20/2019', CMPLNT_FR_TM='19:50:00', ADDR_PCT_CD=6, KY_CD=106, LAW_CAT_CD='FELONY', LAW_CAT_CD='FELONY', BORO_NM='MANHATTAN', PREM_TYP_DESC='TRANSIT - NYC SUBWAY', VIC_AGE_GROUP='UNKNOWN', VIC_RACE='UNKNOWN', VIC_SEX='E', Latitude=40.73389675800007, Longitude=-74.005395837, SUSP_AGE_GROUP='UNKNOWN', SUSP_RACE='UNKNOWN', SUSP_SEX='UNKNOWN', JURISDICTION_CODE=1, PATROL_BORO='PATROL BORO MAN SOUTH', PD_CD=106, HOUSING_PSA=None),
 Row(CMPLNT_NUM=775366730, CMPLNT_FR_DT='12/02/201

So, we had the dataframe (boro_cleaner) where there were null values in BORO_NM column derived from the main dataframe (df_temp_boro_clean). Now we merge back the imputed dataframe back to the main dataframe.

In [46]:
joiner_dataset=df_temp.filter((df_temp.Latitude.isNotNull()) & (df_temp.Longitude.isNotNull()) & (df_temp.BORO_NM.isNotNull()))
fin_df=joiner_dataset.union(boro_cleaned_dataframe)

### NOTE: Below modules for reverse geocoding is used for future purpose (Reverse Geocoding)

In [47]:
def reverseGeoCoder(latitude, longitude):
  loc=geocoder.reverse(str(latitude)+', '+str(longitude))
  zipCode=str(loc).split(",")[2][-5:]
  if not int(zipCode) in zip_master:
    boro="UNKNOWN"
  else:
    boro=zip_master[int(zipCode)]
  boro=boro.upper()
  return boro

def reverse_geo_code_boros(df_spark, Latitude, Longitude, Boro, lat_index, long_index):
  #select data where we have to impute
  df_temp_boro_clean=df_spark.filter((df_spark[Latitude].isNotNull()) & (df_spark[Longitude].isNotNull()))
  boro_cleaner=df_temp_boro_clean.filter(df_temp_boro_clean[Boro].isNull())
  print("We have "+ str(boro_cleaner.count())+ " points to impute")
  print("___intializing Zip Code Look up ____")
  
  #use your path for master dataset here. 
  df_zips=pd.read_csv("/content/drive/MyDrive/nyc_zip_borough_neighborhoods_pop.csv")
  zip_master={}
  zips=df_zips['zip']
  boro=df_zips['borough']
  for i, j in zip(zips, boro):
    zip_master[i]=j
  zip_master[10020]='Manhattan'
  zip_master[11249]='Brooklyn'

  print("____ imputing the points ____")
  #creating UD function
  ud_func= udf(reverseGeoCoder, StringType())
  boro_cleaned_dataframe = boro_cleaner.withColumn(Boro, ud_func(boro_cleaner[lat_index], boro_cleaner[long_index]))

  #joining the imputed dataset to the maindataset and returning
  joiner_dataset=df_spark.filter((df_spark[Latitude].isNotNull()) & (df_spark[Longitude].isNotNull()) & (df_spark[Boro].isNotNull()))
  fin_df=joiner_dataset.union(boro_cleaned_dataframe)
  return fin_df

# ASSIGNMENT-3: SCALED PREPROCESSING

Lets apply all the above functions to the different datasets

In [48]:
## Shooting incidents dataset
#columns to concentrate: 
#1. BORO_NM
#2. PRECINCT
#3. OCCUR_DATE
#4. OCCUR_TIME
#5. JURISDICTION_CODE
#6. Latitude
#7. Longitude
#8. VIC_RACE
#9. VIC_SEX
#10. VIC_AGE_GROUP
#11. PERP_AGE
#12. PERP_SEX
#13. PERP_RACE

In [49]:
## Service Call dataset
#columns to concentrate: 
#1. BORO_NM
#2. PATROL_BORO_NM
#3. NYPD_PCT_CD
#4. Latitude
#5. Longitude
#6. ARRIVD_TS
#7. INCIDENT_DATE
#8. INCIDENT_TIME

In [50]:
##NYPD Criminal Court Summons dataset
#columns to concentrate: 
#1. SEX
#2. RACE
#3. PRECINCT_OF_OCCUR
#4. Latitude
#5. Longitude
#6. SUMMONS_DATE
#7. BORO
#8. AGE_GROUP

In [51]:
##NYPD B Summons (Historic) dataset
#columns to concentrate: 
#1. CITY_NM
#2. VIOLATION_DATE
#3. VIOLATION_TIME
#4. Latitude
#5. Longitude

# I. Import dataset. Download it and store it in your drive


Links for dataset to replace at "fn_dst" variable:

1.   NYPD Shooting Incidents dataset: https://data.cityofnewyork.us/api/views/833y-fsy8/rows.csv?accessType=DOWNLOAD


2.   List item



# 1. NYPD SHOOTING INCIDENTS DATASET:

a. To get the required columns, use this module: 


1.   get_area_of_interest(df_spark, interested_columns)


b. Preprocessing pipeline: Pass your data through these functions. (if your columns fall in those categories)

1.   valid_date_check(date)
2.   valid_time_check(time)
3.   reverse_geo_code_boros(df_spark, Latitude, Longitude, Boro, lat_index, long_index)
4.   refine_age_group_race(df_spark, victim_age_group=None, suspect_age_group=None, suspect_race=None, victim_race=None)
5.   refine_sex_gender_impute(df_spark, suspect_age=None, suspect_gender=None, victim_age=None, victim_gender=None)
6.   refine_precinct_jur(df_spark, precinct=None, Jur_code=None)



In [52]:
#Downloading file from NYC Open Data

fn_src = 'https://data.cityofnewyork.us/api/views/833y-fsy8/rows.csv?accessType=DOWNLOAD'
fn_dst = '/content/drive/MyDrive/NYPD_Shooting_Incident_Data__Historic_.csv'

from six.moves import urllib

if os.path.isfile(fn_dst):
    print('File %s has already been downloaded' % fn_dst)
else:
    print('Fetching file %s[2.4GB]. This may take a while...' % fn_dst)
    urllib.request.urlretrieve(fn_src, fn_dst)
    print('File %s has been downloaded' % fn_dst)

File /content/drive/MyDrive/NYPD_Shooting_Incident_Data__Historic_.csv has already been downloaded


In [53]:
df_spark=spark.read.option("header",True).csv(fn_dst,inferSchema=True)

The size of dataset ~ 24k tuples. So, we need around 2000 data points for 95% confidence level with 2% interval. The size of data is almost 10% of the data. So we can get it into our df now

In [54]:
df_spark=df_spark.sample(0.1)

In [55]:
df_spark.count()

2375

## a. Select the columns that are common with the original dataset:
1. BORO
2. PRECINCT
3. JURISDICTION_CODE
4. PREP_AGE_GROUP
5. PERP_RACE
6. VIC_AGE_GROUP
7. VIC_SEX
8. Latitude
9. Longitude
10. VIC_RACE
11. PERP_SEX
12. OCCUR_DATE
13. OCCUR_TIME

We can consider the primary key along with this
14. INCIDENT_KEY

In [56]:
interested_columns_1=['INCIDENT_KEY', 'OCCUR_TIME', 'OCCUR_DATE', 'BORO', 'PRECINCT', 'JURISDICTION_CODE', 'PERP_AGE_GROUP', 'PERP_RACE', 'PERP_SEX', 'VIC_AGE_GROUP', 'VIC_SEX', 'VIC_RACE', 'Latitude', 'Longitude']

In [57]:
df_spark=get_area_of_interest(df_spark, interested_columns_1)

## b. Lets pass the dataset through the preprocessing pipeline

In [58]:
df_temp=df_spark.rdd

1. Date and Time

In [59]:
df_temp_=df_temp.map(lambda x:(x, valid_date_check(x[2]))).filter(lambda x: x[1]==True)
df_temp=df_temp_.map(lambda x: x[0])

In [60]:
df_temp_=df_temp.map(lambda x:(x, valid_time_check(x[1]))).filter(lambda x: x[1]==True)
df_temp=df_temp_.map(lambda x: x[0])

2. Age group, Race, Gender imputation

In [61]:
# #as this code requires the pyspark dataframe(Not the rdd)
df_temp=df_temp.toDF(schema=df_spark.schema)

In [62]:
df_temp=refine_age_group_race(df_temp, 'VIC_AGE_GROUP', 'PERP_AGE_GROUP', 'PERP_RACE', 'VIC_RACE')

In [63]:
df_temp=refine_sex_gender_impute(df_temp, None, "PERP_SEX", None, "VIC_SEX")

3. Geocoding

In [64]:
df_spk=reverse_geo_code_boros(df_temp, 'Latitude', 'Longitude', 'BORO', -2, -1)

We have 0 points to impute
___intializing Zip Code Look up ____
____ imputing the points ____


4. Precinct and Jurisdiction Code imputation


In [65]:
df_temp=df_spk.rdd
df_temp_=df_temp.map(lambda x:(x, valid_precinct_check(x[4]))).filter(lambda x: x[1]==True)
df_temp=df_temp_.map(lambda x: x[0])

df_temp_=df_temp.map(lambda x:(x, valid_jur_check(x[5]))).filter(lambda x: x[1]==True)
df_temp=df_temp_.map(lambda x: x[0])

df_spark=df_temp.toDF(schema=df_spark.schema)

In [66]:
df_spark.take(10)

[Row(INCIDENT_KEY=137564752, OCCUR_TIME='00:25:00', OCCUR_DATE='07/04/2014', BORO='QUEENS', PRECINCT=101, JURISDICTION_CODE=0, PERP_AGE_GROUP='UNKNOWN', PERP_RACE='UNKNOWN', PERP_SEX='UNKNOWN', VIC_AGE_GROUP='18-24', VIC_SEX='M', VIC_RACE='BLACK', Latitude=40.59403780700006, Longitude=-73.75777869199999),
 Row(INCIDENT_KEY=73911288, OCCUR_TIME='01:34:00', OCCUR_DATE='07/29/2010', BORO='BROOKLYN', PRECINCT=77, JURISDICTION_CODE=2, PERP_AGE_GROUP='UNKNOWN', PERP_RACE='UNKNOWN', PERP_SEX='UNKNOWN', VIC_AGE_GROUP='25-44', VIC_SEX='M', VIC_RACE='BLACK', Latitude=40.67125355700006, Longitude=-73.92671475399999),
 Row(INCIDENT_KEY=149455199, OCCUR_TIME='00:08:00', OCCUR_DATE='01/17/2016', BORO='BROOKLYN', PRECINCT=72, JURISDICTION_CODE=0, PERP_AGE_GROUP='18-24', PERP_RACE='BLACK HISPANIC', PERP_SEX='M', VIC_AGE_GROUP='25-44', VIC_SEX='M', VIC_RACE='BLACK HISPANIC', Latitude=40.64819320200007, Longitude=-74.00711572799997),
 Row(INCIDENT_KEY=197700945, OCCUR_TIME='04:15:00', OCCUR_DATE='05/28/

Lets profile the data now.

In [67]:
pandasDF = df_spark.toPandas()
ds=stream(pandasDF)

#Creating profile of our dataset
profiles = ds.profile(default_profiler=DefaultColumnProfiler)
profiles.stats()

Unnamed: 0,total,empty,distinct,uniqueness,entropy
INCIDENT_KEY,2375,0,2301,0.968842,11.150125
OCCUR_TIME,2375,0,925,0.389474,9.453297
OCCUR_DATE,2375,0,1814,0.763789,10.699901
BORO,2375,0,5,0.002105,1.964757
PRECINCT,2375,0,72,0.030316,5.540832
JURISDICTION_CODE,2375,0,3,0.001263,0.68771
PERP_AGE_GROUP,2375,0,7,0.002947,1.82171
PERP_RACE,2375,0,6,0.002526,1.651526
PERP_SEX,2375,0,4,0.001684,1.331222
VIC_AGE_GROUP,2375,0,6,0.002526,1.73851


# 2. NYPD B Summons (Historic) dataset

a. To get the required columns, use this module: 


1.   get_area_of_interest(df_spark, interested_columns)


b. Preprocessing pipeline: Pass your data through these functions. (if your columns fall in those categories)

1.   valid_date_check(date)
2.   valid_time_check(time)
3.   reverse_geo_code_boros(df_spark, Latitude, Longitude, Boro, lat_index, long_index)
4.   refine_age_group_race(df_spark, victim_age_group=None, suspect_age_group=None, suspect_race=None, victim_race=None)
5.   refine_sex_gender_impute(df_spark, suspect_age=None, suspect_gender=None, victim_age=None, victim_gender=None)
6.   refine_precinct_jur(df_spark, precinct=None, Jur_code=None)

