## Socrata API 

Data Fields
* https://dev.socrata.com/foundry/data.cityofnewyork.us/qgea-i56i

After some experimentation with the Socrata API, I learned that it was too slow for my needs. The limitations are that the API only allows 1000 rows to be returned, the rest will have to be paginated. In 2017, there were around 460k rows, so that would mean around 460 API calls, which would have been too slow. So, I've decided to download the entire dataset beforehand and save it as a parquet file.

# Data Cleaning for NYC Crime Data

### DSCI 521

### Author: Ao Wang

<img src="https://loving-newyork.com/wp-content/uploads/2019/09/fun-things-to-do-in-nyc-at-night-160914155540002-1920x960.jpg">

## Import Libraries

In [1]:
import warnings
from os.path import join
from typing import Tuple

import pandas as pd
import plotly.io as pio
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame as PySparkDataFrame
from pyspark.sql.functions import (
    count,
    when,
    lit,
    isnan,
    col,
    concat_ws,
    from_unixtime,
    unix_timestamp,
    year
)
from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    DoubleType,
    StringType
)

pio.renderers.default = "iframe"
warnings.filterwarnings("ignore")

# from sodapy import Socrata

% load_ext dotenv
% dotenv

In [2]:
# APP_TOKEN = os.getenv("APP_TOKEN")
# RESULTS_PER_PAGE = 1000
# DATASET_ID = "qgea-i56i"
# API_URL = f"https://data.cityofnewyork.us/resource/{DATASET_ID}.csv"

In [3]:
# client = Socrata("data.cityofnewyork.us", APP_TOKEN)

In [4]:
# def get_crime_by_year(year: int = 2021):
#     dfs = []
#     offset = 0
#     size = client.get(DATASET_ID, where=f"date_extract_y(cmplnt_fr_dt)={year}", select="count(*)")
#     count = int(size[0]["count"])
#     while count > 0:
#         print(count)
#         df = pd.DataFrame.from_records(
#             client.get(DATASET_ID, where=f"date_extract_y(cmplnt_fr_dt)={year}", offset=offset)
#         )  
#         offset += RESULTS_PER_PAGE if count >= RESULTS_PER_PAGE else RESULTS_PER_PAGE - count
#         count -= RESULTS_PER_PAGE

#         dfs.append(df)
#     return pd.concat(dfs)

In [5]:
# df = get_crime_by_year()

## Preprocessing
I've decided to use PySpark for processing since the data is around 3 GBs. I will then create smaller datasets, such as focusing on a specific year, to allow for visualizations and further analysis.

In [6]:
def initializeSpark() -> Tuple[SparkSession, SparkContext]:
    """Create a Spark Session for Streamlit app"""
    conf = SparkConf().setAppName("crime-processor").setMaster("local")
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    return spark, spark.sparkContext

In [7]:
spark, _ = initializeSpark()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/07 19:55:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [8]:
def getShape(sdf: PySparkDataFrame) -> Tuple[int, int]:
    return sdf.count(), len(sdf.columns)

In [9]:
def readCrimeSDF(fname: str = join("..", "data", "NYPD_Complaint_Data_Historic.csv")) -> PySparkDataFrame:
    crimeSchema = StructType([
        StructField("CMPLNT_NUM", IntegerType()),
        StructField("CMPLNT_FR_DT", StringType()),
        StructField("CMPLNT_FR_TM", StringType()),
        StructField("CMPLNT_TO_DT", StringType()),
        StructField("CMPLNT_TO_TM", StringType()),
        StructField("ADDR_PCT_CD", IntegerType()),
        StructField("RPT_DT", StringType()),
        StructField("KY_CD", IntegerType()),
        StructField("OFNS_DESC", StringType()),
        StructField("PD_CD", IntegerType()),
        StructField("PD_DESC", StringType()),
        StructField("CRM_ATPT_CPTD_CD", StringType()),
        StructField("LAW_CAT_CD", StringType()),
        StructField("BORO_NM", StringType()),
        StructField("LOC_OF_OCCUR_DESC", StringType()),
        StructField("PREM_TYP_DESC", StringType()),
        StructField("JURIS_DESC", StringType()),
        StructField("JURISDICTION_CODE", IntegerType()),
        StructField("PARKS_NM", StringType()),
        StructField("HADEVELOPT", StringType()),
        StructField("HOUSING_PSA", StringType()),
        StructField("X_COORD_CD", IntegerType()),
        StructField("Y_COORD_CD", IntegerType()),
        StructField("SUSP_AGE_GROUP", StringType()),
        StructField("SUSP_RACE", StringType()),
        StructField("SUSP_SEX", StringType()),
        StructField("TRANSIT_DISTRICT", IntegerType()),
        StructField("LATITUDE", DoubleType()),
        StructField("LONGITUDE", DoubleType()),
        StructField("LAT_LON", StringType()),
        StructField("PATROL_BORO", StringType()),
        StructField("STATION_NAME", StringType()),
        StructField("VIC_AGE_GROUP", StringType()),
        StructField("VIC_RACE", StringType()),
        StructField("VIC_SEX", StringType())
    ])
    sdf = spark.read.csv(
        path=fname,
        header="true",
        schema=crimeSchema,
        dateFormat="yyyy-MM-dd"
    )
    return sdf

In [10]:
sdf = readCrimeSDF()

In [11]:
sdf.limit(5).toPandas()

23/03/07 19:55:21 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,CMPLNT_NUM,CMPLNT_FR_DT,CMPLNT_FR_TM,CMPLNT_TO_DT,CMPLNT_TO_TM,ADDR_PCT_CD,RPT_DT,KY_CD,OFNS_DESC,PD_CD,...,SUSP_SEX,TRANSIT_DISTRICT,LATITUDE,LONGITUDE,LAT_LON,PATROL_BORO,STATION_NAME,VIC_AGE_GROUP,VIC_RACE,VIC_SEX
0,506547392,03/29/2018,20:30:00,,,32,03/30/2018,351,CRIMINAL MISCHIEF & RELATED OF,254,...,,,40.810877,-73.941064,"(40.810877241, -73.941064151)",PATROL BORO MAN NORTH,,25-44,WHITE,F
1,629632833,02/06/2018,23:15:00,,,52,02/07/2018,341,PETIT LARCENY,333,...,F,,40.873671,-73.908014,"(40.873671035, -73.908013649)",PATROL BORO BRONX,,UNKNOWN,UNKNOWN,D
2,787203902,11/21/2018,00:15:00,11/21/2018,00:20:00,75,11/21/2018,341,PETIT LARCENY,321,...,F,,40.651782,-73.885457,"(40.651782232, -73.885456761)",PATROL BORO BKLYN NORTH,,UNKNOWN,UNKNOWN,D
3,280364018,06/09/2018,21:42:00,06/09/2018,21:43:00,10,06/10/2018,361,OFF. AGNST PUB ORD SENSBLTY &,639,...,M,,40.75931,-73.994706,"(40.759310399, -73.994706072)",PATROL BORO MAN SOUTH,,18-24,WHITE HISPANIC,F
4,985800320,11/10/2018,19:40:00,11/10/2018,19:45:00,19,11/10/2018,341,PETIT LARCENY,333,...,F,,40.764536,-73.970728,"(40.764535539, -73.970728388)",PATROL BORO MAN NORTH,,UNKNOWN,UNKNOWN,D


In [12]:
sdf.printSchema()

root
 |-- CMPLNT_NUM: integer (nullable = true)
 |-- CMPLNT_FR_DT: string (nullable = true)
 |-- CMPLNT_FR_TM: string (nullable = true)
 |-- CMPLNT_TO_DT: string (nullable = true)
 |-- CMPLNT_TO_TM: string (nullable = true)
 |-- ADDR_PCT_CD: integer (nullable = true)
 |-- RPT_DT: string (nullable = true)
 |-- KY_CD: integer (nullable = true)
 |-- OFNS_DESC: string (nullable = true)
 |-- PD_CD: integer (nullable = true)
 |-- PD_DESC: string (nullable = true)
 |-- CRM_ATPT_CPTD_CD: string (nullable = true)
 |-- LAW_CAT_CD: string (nullable = true)
 |-- BORO_NM: string (nullable = true)
 |-- LOC_OF_OCCUR_DESC: string (nullable = true)
 |-- PREM_TYP_DESC: string (nullable = true)
 |-- JURIS_DESC: string (nullable = true)
 |-- JURISDICTION_CODE: integer (nullable = true)
 |-- PARKS_NM: string (nullable = true)
 |-- HADEVELOPT: string (nullable = true)
 |-- HOUSING_PSA: string (nullable = true)
 |-- X_COORD_CD: integer (nullable = true)
 |-- Y_COORD_CD: integer (nullable = true)
 |-- SUSP_AG

In [13]:
getShape(sdf)

                                                                                

(7825499, 35)

## Missing Values
Here we see a lot of missing values for the `TRANSIT_DISTRICT`, `STATION_NAME`, `HADEVELOPT`, and `PARKS_NM`. Therefore, we'll drop those columns since they wouldn't provide much analysis. Although some of the values are missing for `SUSP_AGE_GROUP`, `SUSP_SEX`, and `SUSP_RACE` it would be interesting to know the demographic of people who commit crimes or create a predictive model, like a Decision Tree that recieves a `LATITUDE`, `LONGITUDE`, and time and outputs the suspect's race, sex, and age group.

In [14]:
def getPercentageOfMissingValues(sdf: PySparkDataFrame) -> pd.DataFrame:
    amountMissing = sdf.select([(count(when(isnan(c) | col(c).isNull(), c)) / count(lit(1))).alias(c)
                                for c in sdf.columns])
    amountMissingDf = amountMissing.toPandas()
    amountMissingDf = amountMissingDf.T.rename(columns={0: "% Missing"}).sort_values(by="% Missing",
                                                                                     ascending=False) * 100

    return amountMissingDf

In [15]:
getPercentageOfMissingValues(sdf)

                                                                                

Unnamed: 0,% Missing
TRANSIT_DISTRICT,97.795987
STATION_NAME,97.795987
HADEVELOPT,95.548028
SUSP_AGE_GROUP,62.403292
PARKS_NM,46.970283
SUSP_SEX,46.61865
SUSP_RACE,44.915065
CMPLNT_TO_DT,22.289876
CMPLNT_TO_TM,22.228346
HOUSING_PSA,21.191684


In [16]:
def dropNulls(sdf: PySparkDataFrame) -> PySparkDataFrame:
    sdf = sdf.drop(
        "TRANSIT_DISTRICT",
        "STATION_NAME",
        "HADEVELOPT",
        "PARKS_NM",
        "HOUSING_PSA",
        "Y_COORD_CD",
        "X_COORD_CD",
        "LAT_LON"
    ).na.drop(subset=[
        "CMPLNT_FR_TM",
        "CRM_ATPT_CPTD_CD",
        "VIC_SEX",
        "VIC_RACE",
        "CMPLNT_FR_DT",
        "ADDR_PCT_CD",
        "PD_CD",
        "JURISDICTION_CODE",
        "PD_DESC",
        "PATROL_BORO",
        "BORO_NM",
        "LATITUDE",
        "LONGITUDE",
        "OFNS_DESC",
        "PREM_TYP_DESC"
    ])
    return sdf

In [17]:
droppedNullsSDF = dropNulls(sdf)

In [18]:
getShape(droppedNullsSDF)

                                                                                

(7741826, 27)

## Date Types Conversion
I want to combine the date and times of when the crimes occurred and ended. The columns, like `CMPLNT_TO_DT` and `CMPLNT_TO_TM` are currently strings, but I want to convert them into timestamps. Then we can use the PySpark functions to grab the year, month, day of month, or any granularity for analysis. Like figuring out which month, day, or time has the highest rate of crime.

In [19]:
def convertToDateTime(sdf: PySparkDataFrame) -> PySparkDataFrame:
    sdf = sdf.withColumn(
        "CMPLNT_TO",
        concat_ws(" ", "CMPLNT_TO_DT", "CMPLNT_TO_TM").alias("CMPLNT_TO")
    )

    sdf = sdf.withColumn(
        "CMPLNT_FR",
        concat_ws(" ", "CMPLNT_FR_DT", "CMPLNT_FR_TM").alias("CMPLNT_FR")
    )

    sdf = sdf.withColumn("CMPLNT_FR", from_unixtime(unix_timestamp("CMPLNT_FR", "MM/dd/yyyy HH:mm:ss")))

    sdf = sdf.withColumn("CMPLNT_TO", from_unixtime(unix_timestamp("CMPLNT_TO", "MM/dd/yyyy HH:mm:ss")))
    sdf = sdf.withColumn(
        "RPT_DT", from_unixtime(unix_timestamp("RPT_DT", "MM/dd/yyyy"))).drop("CMPLNT_FR_DT", "CMPLNT_FR_TM",
                                                                              "CMPLNT_TO_DT", "CMPLNT_TO_TM")

    return sdf

In [20]:
convertedDateTimeSDF = convertToDateTime(droppedNullsSDF)

In [21]:
convertedDateTimeSDF.select(["CMPLNT_FR", "CMPLNT_TO", "RPT_DT"]).show()

+-------------------+-------------------+-------------------+
|          CMPLNT_FR|          CMPLNT_TO|             RPT_DT|
+-------------------+-------------------+-------------------+
|2018-03-29 20:30:00|               null|2018-03-30 00:00:00|
|2018-02-06 23:15:00|               null|2018-02-07 00:00:00|
|2018-11-21 00:15:00|2018-11-21 00:20:00|2018-11-21 00:00:00|
|2018-06-09 21:42:00|2018-06-09 21:43:00|2018-06-10 00:00:00|
|2018-11-10 19:40:00|2018-11-10 19:45:00|2018-11-10 00:00:00|
|2018-03-12 11:48:00|               null|2018-03-12 00:00:00|
|2013-04-07 16:00:00|               null|2013-04-07 00:00:00|
|2018-09-12 18:30:00|2018-09-12 18:35:00|2018-09-12 00:00:00|
|2018-01-16 14:30:00|2018-01-16 15:00:00|2018-01-16 00:00:00|
|2018-08-04 22:15:00|               null|2018-08-04 00:00:00|
|2018-09-26 18:20:00|2018-09-26 18:24:00|2018-09-26 00:00:00|
|2018-02-11 17:30:00|2018-02-12 06:00:00|2018-02-12 00:00:00|
|2018-11-04 11:15:00|               null|2018-11-04 00:00:00|
|2017-12

## Change the Offense Descriptions
The offense descriptions for crimes isn't intuitive, like `ENDAN WELFARE INCOMP` would be `ENDANGERING WELFARE OF INCOMPETENT`. So I created a mapping to make it more obvious. Additionally, I removed the numbers from the offense, like `HARRASSMENT 2` which means to the second degree because it's also not intuitive.

In [22]:
convertedDateTimeSDF.select(["OFNS_DESC", "PD_DESC"]).limit(20).show()

+--------------------+--------------------+
|           OFNS_DESC|             PD_DESC|
+--------------------+--------------------+
|CRIMINAL MISCHIEF...|MISCHIEF, CRIMINA...|
|       PETIT LARCENY|LARCENY,PETIT FRO...|
|       PETIT LARCENY|LARCENY,PETIT FRO...|
|OFF. AGNST PUB OR...|AGGRAVATED HARASS...|
|       PETIT LARCENY|LARCENY,PETIT FRO...|
|             FORGERY|FORGERY,ETC.,UNCL...|
|   DANGEROUS WEAPONS|WEAPONS POSSESSION 3|
|CRIMINAL MISCHIEF...|MISCHIEF,CRIMINAL...|
|ASSAULT 3 & RELAT...|           ASSAULT 3|
|ASSAULT 3 & RELAT...|           ASSAULT 3|
|      FELONY ASSAULT|ASSAULT 2,1,UNCLA...|
|CRIMINAL MISCHIEF...|MISCHIEF, CRIMINA...|
|      FELONY ASSAULT|ASSAULT 2,1,UNCLA...|
|       GRAND LARCENY|LARCENY,GRAND FRO...|
|OFFENSES AGAINST ...|PUBLIC ADMINISTAT...|
|CRIMINAL MISCHIEF...|CRIMINAL MISCHIEF...|
|CRIMINAL MISCHIEF...|MISCHIEF, CRIMINA...|
|       GRAND LARCENY|LARCENY,GRAND FRO...|
|CRIMINAL MISCHIEF...|MISCHIEF, CRIMINA...|
|       PETIT LARCENY|LARCENY,PE

In [23]:
def fixOffenseDescription(sdf: PySparkDataFrame) -> PySparkDataFrame:
    offenseDescriptionMapping = {
        "OTHER OFFENSES RELATED TO THEF": "OTHER OFFENSES RELATED TO THEFT",
        "OFF. AGNST PUB ORD SENSBLTY &": "OFFENSES AGAINST PUBLIC ORDER/ADMINISTRATION",
        "CRIMINAL MISCHIEF & RELATED OF": "CRIMINAL MISCHIEF",
        "LOITERING/GAMBLING (CARDS, DIC": "GAMBLING",
        "HARRASSMENT 2": "HARRASSMENT",
        "ASSAULT 3 & RELATED OFFENSES": "ASSAULT & RELATED OFFENSES",
        "ESCAPE 3": "ESCAPE",
        "OTHER STATE LAWS (NON PENAL LA": "OTHER STATE LAWS (NON PENAL LAW)",
        "UNLAWFUL POSS. WEAP. ON SCHOOL": "UNLAWFUL POSSESSION OF WEAPON ON SCHOOL",
        "OFFENSES AGAINST PUBLIC ADMINI": "OFFENSES AGAINST PUBLIC ADMINISTRATION",
        "AGRICULTURE & MRKTS LAW-UNCLASSIFIED": "AGRICULTURE & MARKETS LAW",
        "DISRUPTION OF A RELIGIOUS SERV": "DISRUPTION OF A RELIGIOUS SERVICE",
        "OFFENSES AGAINST MARRIAGE UNCL": "OFFENSES AGAINST MARRIAGE",
        "HOMICIDE-NEGLIGENT,UNCLASSIFIE": "HOMICIDE-NEGLIGENT",
        "ENDAN WELFARE INCOMP": "ENDANGERING WELFARE OF INCOMPETENT"
    }
    return sdf.na.replace(offenseDescriptionMapping, "OFNS_DESC")

In [24]:
finalSDF = fixOffenseDescription(convertedDateTimeSDF)

In [25]:
finalSDF.select(["OFNS_DESC", "PD_DESC"]).limit(15).show()

+--------------------+--------------------+
|           OFNS_DESC|             PD_DESC|
+--------------------+--------------------+
|   CRIMINAL MISCHIEF|MISCHIEF, CRIMINA...|
|       PETIT LARCENY|LARCENY,PETIT FRO...|
|       PETIT LARCENY|LARCENY,PETIT FRO...|
|OFFENSES AGAINST ...|AGGRAVATED HARASS...|
|       PETIT LARCENY|LARCENY,PETIT FRO...|
|             FORGERY|FORGERY,ETC.,UNCL...|
|   DANGEROUS WEAPONS|WEAPONS POSSESSION 3|
|   CRIMINAL MISCHIEF|MISCHIEF,CRIMINAL...|
|ASSAULT & RELATED...|           ASSAULT 3|
|ASSAULT & RELATED...|           ASSAULT 3|
|      FELONY ASSAULT|ASSAULT 2,1,UNCLA...|
|   CRIMINAL MISCHIEF|MISCHIEF, CRIMINA...|
|      FELONY ASSAULT|ASSAULT 2,1,UNCLA...|
|       GRAND LARCENY|LARCENY,GRAND FRO...|
|OFFENSES AGAINST ...|PUBLIC ADMINISTAT...|
+--------------------+--------------------+



## Save Spark DataFrame as Parquet 
* While querying columnar storage, it skips the nonrelevant data very quickly, making faster query execution. As a result aggregation queries consume less time compared to row-oriented databases.
* Pyspark SQL provides support for both reading and writing Parquet files that automatically capture the schema of the original data, It also reduces data storage by 75% on average. Pyspark by default supports Parquet in its library hence we don’t need to add any dependency libraries.



In [26]:
fname = join("..", "data", "NYPD_Complaint_Data_Historic.parquet")
finalSDF.filter(year("CMPLNT_FR") >= 2006).write.mode("overwrite").parquet(fname)

                                                                                