### Bronze to Silver Medallion Data Layer for Stop, Question and Frisk Data (2017-2022)

We conduct the following transformations and cleaning to get to the Silver Medallion Layer
- Borough mapping for consistency (ex: PBMN and PBMS both map to Manhattan, Staten Is and Staten Island both map to Staten Island)
- Null value mapping (ex: Null values represented in different formats like (null), Nan, NULL, etc.)
- Getting column values into a consistent format (ex: 2017.00 to 2017)
- Dropping irrelevant columns for the analysis
- Date preprocessing (ex: Stop Frisk Date has two different data representations: 2/5/2020 and 2021-07-29)

In [1]:
import sys
sys.path.append("..")

In [2]:
from snowflake.snowpark import Session
import snowflake.snowpark.functions as F
from snowflake.snowpark.functions import when, col
from snowflake.snowpark.functions import expr
from datetime import date
from helpers import SnowflakeHelper
import json
import os
import matplotlib.pyplot as plt
import pandas as pd

In [3]:
borough_mapping = {
    'PBBX': 'BRONX', 
    'PBSI': 'STATEN ISLAND', 
    'PBMN': 'MANHATTAN', 
    'PBMS': 'MANHATTAN',
    'PBBN': 'BROOKLYN', 
    'PBBS': 'BROOKLYN', 
    'PBQS': 'QUEENS', 
    'PBQN': 'QUEENS',
    'STATEN IS': 'STATEN ISLAND'
}

null_value_mapping = {
    '(null)' : None,
    'NaN' : None,
    '(' : None,
    'NULL': None
}

In [4]:
snowflake_helper = SnowflakeHelper()
snowflake_config = './../helpers/snowflake_config.json'
session = snowflake_helper.create_snowpark_session(snowflake_config)

[INFO] No schema passed, using default schema SAFEGUARDING_NYC_SCHEMA_BRONZE for the session
[SUCCESS] Config file loaded successfully!
[SUCCESS] Snowspark Session created successfully on schema SAFEGUARDING_NYC_SCHEMA_BRONZE!


In [5]:
sqf_data = session.table('SQF')

In [6]:
sqf_data.show()

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [7]:
sqf_data.count()

102421

In [8]:
columns_to_drop= ['_AIRBYTE_RAW_ID', '_AIRBYTE_EXTRACTED_AT', '_AIRBYTE_META', 'STOP_LOCATION_ZIP_CODE', 'ID_CARD_IDENTIFIES_OFFICER_FLAG', 'OFFICER_NOT_EXPLAINED_STOP_DESCRIPTION',\
                    'SUSPECTS_ACTIONS_CASING_FLAG', 'SUMMONS_ISSUED_FLAG', 'VERBAL_IDENTIFIES_OFFICER_FLAG', 'SEARCH_BASIS_ADMISSION_FLAG', 'SEARCH_BASIS_OTHER_FLAG', 'SEARCH_BASIS_CONSENT_FLAG',\
                    'BACKROUND_CIRCUMSTANCES_SUSPECT_KNOWN_TO_CARRY_WEAPON_FLAG', 'RECORD_STATUS_CODE', 'PHYSICAL_FORCE_RESTRAINT_USED_FLAG', 'PHYSICAL_FORCE_HANDCUFF_SUSPECT_FLAG',\
                    'OTHER_PERSON_STOPPED_FLAG', 'SUSPECTS_ACTIONS_PROXIMITY_TO_SCENE_FLAG', 'DEMEANOR_CODE', 'SUPERVISING_OFFICER_COMMAND_CODE', 'STOP_ID_ANONY', 'ISSUING_OFFICER_COMMAND_CODE',\
                    'LOCATION_IN_OUT_CODE', 'JURISDICTION_DESCRIPTION', 'PHYSICAL_FORCE_VERBAL_INSTRUCTION_FLAG']
sqf_data = sqf_data.drop(*columns_to_drop)
sqf_data.show()

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [9]:
sqf_data.group_by('STOP_LOCATION_PATROL_BORO_NAME')\
        .count()\
        .show()

----------------------------------------------
|"STOP_LOCATION_PATROL_BORO_NAME"  |"COUNT"  |
----------------------------------------------
|PBBX                              |23550    |
|PBBN                              |16363    |
|PBSI                              |3640     |
|0238                              |1        |
|0237                              |1        |
|0183                              |1        |
|1011                              |1        |
|0154                              |1        |
|1022                              |1        |
|PBMS                              |9951     |
----------------------------------------------



In [10]:
sqf_data.group_by('STOP_LOCATION_BORO_NAME')\
        .count()\
        .show()

---------------------------------------
|"STOP_LOCATION_BORO_NAME"  |"COUNT"  |
---------------------------------------
|BRONX                      |23550    |
|BROOKLYN                   |31686    |
|0210334                    |1        |
|0190241                    |1        |
|PBMS                       |1        |
|PBBS                       |1        |
|QUEENS                     |17440    |
|STATEN ISLAND              |3094     |
|0237177                    |1        |
|0208169                    |1        |
---------------------------------------



In [11]:
mapping_expr = when(col('STOP_LOCATION_PATROL_BORO_NAME') == 'PBBX', 'BRONX')
for key, value in borough_mapping.items():
    mapping_expr = mapping_expr.when(col("STOP_LOCATION_PATROL_BORO_NAME") == key, value)

preprocessed_sqf_df = sqf_data.withColumn("STOP_LOCATION_PATROL_BORO_NAME", mapping_expr.otherwise(col("STOP_LOCATION_PATROL_BORO_NAME")))

mapping_expr = when(col('STOP_LOCATION_BORO_NAME') == 'PBBX', 'BRONX')
for key, value in borough_mapping.items():
    mapping_expr = mapping_expr.when(col("STOP_LOCATION_BORO_NAME") == key, value)

preprocessed_sqf_df = preprocessed_sqf_df.withColumn("STOP_LOCATION_BORO_NAME", mapping_expr.otherwise(col("STOP_LOCATION_BORO_NAME")))

In [12]:
preprocessed_sqf_df.show()

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [13]:
preprocessed_sqf_df = preprocessed_sqf_df.filter(col('STOP_LOCATION_PATROL_BORO_NAME').isin(list(borough_mapping.values())))
preprocessed_sqf_df = preprocessed_sqf_df.filter(col('STOP_LOCATION_BORO_NAME').isin(list(borough_mapping.values())))

In [14]:
preprocessed_sqf_df.count()

101989

In [15]:
preprocessed_sqf_df.group_by('STOP_LOCATION_BORO_NAME')\
        .count()\
        .show()

---------------------------------------
|"STOP_LOCATION_BORO_NAME"  |"COUNT"  |
---------------------------------------
|QUEENS                     |17440    |
|STATEN ISLAND              |3640     |
|BRONX                      |23550    |
|BROOKLYN                   |31686    |
|MANHATTAN                  |25673    |
---------------------------------------



In [16]:
preprocessed_sqf_df.group_by('STOP_LOCATION_PATROL_BORO_NAME')\
        .count()\
        .show()

----------------------------------------------
|"STOP_LOCATION_PATROL_BORO_NAME"  |"COUNT"  |
----------------------------------------------
|BRONX                             |23550    |
|BROOKLYN                          |31686    |
|MANHATTAN                         |25673    |
|QUEENS                            |17440    |
|STATEN ISLAND                     |3640     |
----------------------------------------------



In [17]:
preprocessed_sqf_df.group_by('YEAR2')\
        .count()\
        .sort('YEAR2', ascending=True)\
        .show()

---------------------
|"YEAR2"  |"COUNT"  |
---------------------
|2017.00  |11197    |
|2018     |11008    |
|2019     |26918    |
|2020     |13715    |
|2021     |8947     |
|2022     |30204    |
---------------------



In [18]:
mapping_expr_year = when(col('YEAR2') == '2017.00', '2017')
preprocessed_sqf_df = preprocessed_sqf_df.withColumn("YEAR2", mapping_expr_year.otherwise(col("YEAR2")))
preprocessed_sqf_df.group_by('YEAR2')\
        .count()\
        .sort('YEAR2', ascending=True)\
        .show()

---------------------
|"YEAR2"  |"COUNT"  |
---------------------
|2017     |11197    |
|2018     |11008    |
|2019     |26918    |
|2020     |13715    |
|2021     |8947     |
|2022     |30204    |
---------------------



In [19]:
for column in preprocessed_sqf_df.columns:
    for key, value in null_value_mapping.items():
        preprocessed_sqf_df = preprocessed_sqf_df.withColumn(column, when(col(column) == key, value).otherwise(col(column)))

In [20]:
for col, type in preprocessed_sqf_df.dtypes:
    distinct_values = preprocessed_sqf_df.select(col).distinct()
    print(f"Distinct values in {col}:")
    print('=' * 35)
    distinct_values.show()

Distinct values in PHYSICAL_FORCE_OC_SPRAY_USED_FLAG:
---------------------------------------
|"PHYSICAL_FORCE_OC_SPRAY_USED_FLAG"  |
---------------------------------------
|NULL                                 |
|Y                                    |
---------------------------------------

Distinct values in SEARCH_BASIS_HARD_OBJECT_FLAG:
-----------------------------------
|"SEARCH_BASIS_HARD_OBJECT_FLAG"  |
-----------------------------------
|NULL                             |
|Y                                |
-----------------------------------

Distinct values in SUSPECT_WEIGHT:
--------------------
|"SUSPECT_WEIGHT"  |
--------------------
|225               |
|235               |
|210               |
|190               |
|170               |
|165               |
|280               |
|135               |
|110               |
|182               |
--------------------

Distinct values in STOP_LOCATION_SECTOR_CODE:
-------------------------------
|"STOP_LOCATION_SECTOR_CODE"  

In [20]:
for column in preprocessed_sqf_df.columns:
    for key, value in null_value_mapping.items():
        preprocessed_sqf_df = preprocessed_sqf_df.withColumn(column, when(col(column) == key, value).otherwise(col(column)))

In [21]:
preprocessed_sqf_df.show()

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [22]:
preprocessed_sqf_df.select('STOP_FRISK_DATE').distinct().show()

---------------------
|"STOP_FRISK_DATE"  |
---------------------
|1/2/2019           |
|1/23/2019          |
|1/24/2019          |
|2/4/2019           |
|3/10/2019          |
|4/2/2019           |
|6/20/2019          |
|9/5/2019           |
|12/27/2019         |
|3/19/2019          |
---------------------



In [23]:
sql_expr_for_date_conversion = """
    CASE
        WHEN REGEXP_LIKE(STOP_FRISK_DATE, '\\\\d{4}-\\\\d{2}-\\\\d{2}') THEN TO_DATE(STOP_FRISK_DATE, 'YYYY-MM-DD')
        WHEN REGEXP_LIKE(STOP_FRISK_DATE, '\\\\d{1,2}/\\\\d{1,2}/\\\\d{4}') THEN TO_DATE(STOP_FRISK_DATE, 'MM/DD/YYYY')
        ELSE NULL
    END
"""

preprocessed_sqf = preprocessed_sqf_df.withColumn("STOP_FRISK_DATE", expr(sql_expr_for_date_conversion))

In [26]:
preprocessed_sqf.select('STOP_FRISK_DATE').distinct().limit(5).show()

---------------------
|"STOP_FRISK_DATE"  |
---------------------
|2021-07-30         |
|2021-08-02         |
|2020-06-19         |
|2020-06-16         |
|2020-06-08         |
---------------------



In [27]:
preprocessed_sqf.show()

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [28]:
table_name = 'SQF'
schema_name = 'SAFEGUARDING_NYC_SCHEMA_SILVER'
snowflake_helper.save_data_in_snowflake(session, schema_name, table_name, preprocessed_sqf, mode="overwrite")

[SUCCESS] Data saved successfully in SAFEGUARDING_NYC_SCHEMA_SILVER.SQF table in Snowflake!
