# NYC Crime Report ETL Process

In [1]:
# import libraries
import pandas as pd
import numpy as np
from sodapy import Socrata
from google.cloud import bigquery
from google.oauth2 import service_account

In [2]:
# setting up the host name for the API endpoint
data_url = 'data.cityofnewyork.us'

In [3]:
# setting up the data set at the API endpoint
data_set = '5uac-w243'

In [4]:
# Setting up the App Token
# Got it from the: https://data.cityofnewyork.us/profile/edit/developer_settings
app_token = 'qYDKNaklkMtFp8UDLPLdr75OP'

In [5]:
# create the client that points to the API endpoint
nyc_open_data_client = Socrata(data_url, app_token, timeout = 200)
print(f"nyc open data client name is: {nyc_open_data_client}")
print(f"nyc open data client data type is: {type(nyc_open_data_client)}")

nyc open data client name is: <sodapy.socrata.Socrata object at 0x7fb53a36bca0>
nyc open data client data type is: <class 'sodapy.socrata.Socrata'>


In [6]:
# Key TO FILE PATH
key_path = r'/Users/wangdu/Desktop/Group Project/pro-bruin-361120-d4ff3b13a86f.json'

In [7]:
# setting up big query credentials

credentials = service_account.Credentials.from_service_account_file(key_path,
                                                                    scopes=["https://www.googleapis.com/auth/cloud-platform"],)
bigquery_client = bigquery.Client(credentials = credentials,
                                 project = credentials.project_id)

print(f"bigquery client name is: {bigquery_client}")
print(f"bigquery client data type is: {type(bigquery_client)}")

bigquery client name is: <google.cloud.bigquery.client.Client object at 0x7fd35961ae80>
bigquery client data type is: <class 'google.cloud.bigquery.client.Client'>


In [7]:
#defining the location in BigQuery to load our dimensions later
dataset_id = 'pro-bruin-361120.NYPD_Complaint_Data'   
dataset_id = dataset_id.replace(':', '.')
print(f"your dataset_id is: {dataset_id}")

your dataset_id is: pro-bruin-361120.NYPD_Complaint_Data


In [8]:
# Get the total number of records in our the entire data set
total_record_count = nyc_open_data_client.get(data_set, select = "COUNT(*)")
print(f"total records in {data_set}: {total_record_count[0]['COUNT']}")

total records in 5uac-w243: 531768


In [9]:
# Now, loop through target data set to pull all rows in chunks (we cannot pull all rows at once)
# AGAIN, UPDATE WHERE FILTER INSIDE BELOW FUNCTION

def extract_socrata_data(chunk_size = 2500,
                         data_set = data_set,
                         where = None):
    
    # measure time this function takes
    import time
    start_time = time.time()
    
    # get total number or records
    if where == None:
        total_records = int(nyc_open_data_client.get(data_set,
                                                     select= "COUNT(*)")[0]["COUNT"])
    else:
        total_records = int(nyc_open_data_client.get(data_set,
                                                     where = where,
                                                     select= "COUNT(*)")[0]["COUNT"])
    
    # start at 0, empty list for results
    start = 0                   
    results = []                

    while True:

        if where == None:
            # fetch the set of records starting at 'start'
            results.extend(nyc_open_data_client.get(data_set,
                                                    offset = start,
                                                    limit = chunk_size))
            
        elif where != None:
            results.extend(nyc_open_data_client.get(data_set,
                                                    where = where,
                                                    offset = start,
                                                    limit = chunk_size))
        # update the starting record number
        start = start + chunk_size

        # if we have fetched all of the records (we have reached total_records), exit loop
        if (start > total_records):
            break

    # convert the list into a pandas data frame
    data = pd.DataFrame.from_records(results)

    end_time = time.time()
    print(f"function took {round(end_time - start_time, 1)} seconds")

    print(f"the shape of your dataframe is: {data.shape}")
    return data

In [None]:
#Creating our dataframe
df = extract_socrata_data(chunk_size = 2500,
                         data_set = data_set)

In [12]:
#checking the names of our columns
df.columns

Index(['cmplnt_num', 'addr_pct_cd', 'boro_nm', 'cmplnt_fr_dt', 'cmplnt_fr_tm',
       'cmplnt_to_tm', 'crm_atpt_cptd_cd', 'hadevelopt', 'jurisdiction_code',
       'juris_desc', 'ky_cd', 'law_cat_cd', 'loc_of_occur_desc', 'ofns_desc',
       'parks_nm', 'patrol_boro', 'pd_cd', 'pd_desc', 'prem_typ_desc',
       'rpt_dt', 'station_name', 'susp_age_group', 'susp_race', 'susp_sex',
       'vic_age_group', 'vic_race', 'vic_sex', 'x_coord_cd', 'y_coord_cd',
       'latitude', 'longitude', 'lat_lon', 'geocoded_column',
       ':@computed_region_efsh_h5xi', ':@computed_region_f5dn_yrer',
       ':@computed_region_yeji_bk3q', ':@computed_region_92fq_4b7q',
       ':@computed_region_sbqj_enih', 'cmplnt_to_dt', 'housing_psa',
       'transit_district'],
      dtype='object')

In [13]:
# deleting the columns that are not needed

drop_columns = ["susp_race", "addr_pct_cd", "hadevelopt", "jurisdiction_code", "juris_desc", "ky_cd", 
                "loc_of_occur_desc", "ofns_desc", "parks_nm", "patrol_boro", "prem_typ_desc",
                "rpt_dt", "station_name", "x_coord_cd", "y_coord_cd", "lat_lon", "geocoded_column",
                ":@computed_region_efsh_h5xi", ":@computed_region_f5dn_yrer", ":@computed_region_yeji_bk3q",
                ":@computed_region_92fq_4b7q", ":@computed_region_sbqj_enih", "housing_psa", "transit_district"]

for column in drop_columns:
    try:
        df.drop(column, axis = 1, inplace = True)
    except:
        print(f"unable to drop {column}")

print(f"columns left in dataframe: {df.columns}")

columns left in dataframe: Index(['cmplnt_num', 'boro_nm', 'cmplnt_fr_dt', 'cmplnt_fr_tm', 'cmplnt_to_tm',
       'crm_atpt_cptd_cd', 'law_cat_cd', 'pd_cd', 'pd_desc', 'susp_age_group',
       'susp_sex', 'vic_age_group', 'vic_race', 'vic_sex', 'latitude',
       'longitude', 'cmplnt_to_dt'],
      dtype='object')


In [14]:
# creating and running a function to ceate data profiling dataframe

def create_data_profiling_df(data):
    
    # create an empty dataframe to gather information about each column
    data_profiling_df = pd.DataFrame(columns = ["column_name",
                                                "column_type",
                                                "unique_values",
                                                "duplicate_values",
                                                "null_values",
                                                "non_null_values"])

    # loop through each column to add rows to the data_profiling_df dataframe
    for column in data.columns:

        info_dict = {}

        try:
            info_dict["column_name"] = column
            info_dict["column_type"] = data[column].dtypes
            info_dict["unique_values"] = len(data[column].unique())
            info_dict["duplicate_values"] = data[column].count() - len(data[column].dropna().unique())
            info_dict["null_values"] = data[column].isna().sum()
            info_dict["non_null_values"] = data[column].count()

        except:
            print(f"unable to read column: {column}, you may want to drop this column")

        data_profiling_df = data_profiling_df.append(info_dict, ignore_index=True)

    data_profiling_df.sort_values(by = ['unique_values', "non_null_values"],
                                  ascending = [False, False],
                                  inplace=True)
    
    return data_profiling_df

In [15]:

data_profiling_df = create_data_profiling_df(data = df)

In [16]:
data_profiling_df

Unnamed: 0,column_name,column_type,unique_values,duplicate_values,null_values,non_null_values
0,cmplnt_num,object,396865,113,0,396978
15,longitude,object,57762,339208,9,396969
14,latitude,object,57387,339583,9,396969
2,cmplnt_fr_dt,object,1785,395193,0,396978
4,cmplnt_to_tm,object,1441,395537,0,396978
3,cmplnt_fr_tm,object,1440,395538,0,396978
16,cmplnt_to_dt,object,1226,366360,29393,367585
7,pd_cd,object,361,396178,440,396538
8,pd_desc,object,351,396627,0,396978
11,vic_age_group,object,24,396954,0,396978


In [17]:
# find number of duplicate rows

print(f"number of duplicate rows: {len(df[df.duplicated()])}")

number of duplicate rows: 11


In [18]:
# drop duplicate rows based on entire row
df = df.drop_duplicates(keep = 'first')

# Or, based on a subset of rows, uncomment below and adjust accordingly
## data = data.drop_duplicates(subset = ["subset column"], keep = 'first')
## data = data.drop_duplicates(subset = ["subset column 1", "subset column 2"], keep = 'first')

print(f"number of rows after duplicates dropped: {len(df)}")

number of rows after duplicates dropped: 396967


In [19]:
#drop rows with null values
#2 = if 2 null values in a row then its dropped (threshold)
#subset to drop rows where null in route and direction
#.dropna(thresh = 2, subset = ["route", "direction"], inplace = True)
df.dropna(thresh = 2, inplace = True)
print(len(df))

396967


In [20]:
#Creating the suspect dimension, first we copy the df
SuspectDim = df.copy()

In [21]:
#Now, subset for only the wanted columns in the dimension
SuspectDim = SuspectDim[["susp_sex", "susp_age_group"]]

In [22]:
# Now, add SUSP_ID as a surrogate key
SuspectDim.insert(0, 'susp_id', range(1, 1 + len(SuspectDim)))
SuspectDim.head()

Unnamed: 0,susp_id,susp_sex,susp_age_group
0,1,M,25-44
1,2,U,25-44
2,3,(null),(null)
3,4,(null),(null)
4,5,(null),(null)


In [23]:
# Creating the Victim Dimension, first we copy the df
VictimDim = df.copy()

In [24]:
#Now, subset for only the wanted columns in the dimension
VictimDim = VictimDim[["vic_sex", "vic_age_group", "vic_race"]]

In [25]:
# Now, add VIC_ID as a surrogate key
VictimDim.insert(0, 'vic_id', range(1, 1 + len(VictimDim)))
VictimDim.head()

Unnamed: 0,vic_id,vic_sex,vic_age_group,vic_race
0,1,D,UNKNOWN,UNKNOWN
1,2,F,<18,BLACK
2,3,M,18-24,ASIAN / PACIFIC ISLANDER
3,4,M,45-64,WHITE
4,5,M,65+,BLACK


In [26]:
#Now, subset for only the wanted columns in the dimension
df["cmplnt_fr_dt"] = pd.to_datetime(df["cmplnt_fr_dt"], errors = 'coerce')
df["cmplnt_fr_dt"] = df["cmplnt_fr_dt"].dt.floor('D')
df["cmplnt_fr_dt"].head()

0   2022-07-11
1   2021-01-01
2   2021-12-24
3   2015-06-15
4   2018-01-01
Name: cmplnt_fr_dt, dtype: datetime64[ns]

In [28]:
#Now, subset for only the wanted columns in the dimension
df["cmplnt_to_dt"] = pd.to_datetime(df["cmplnt_to_dt"], errors = 'coerce')
df["cmplnt_to_dt"] = df["cmplnt_to_dt"].dt.floor('D')
df["cmplnt_to_dt"].head()

0          NaT
1   2022-09-07
2   2022-01-03
3   2022-07-13
4   2021-04-14
Name: cmplnt_to_dt, dtype: datetime64[ns]

In [29]:
# Creating the date dimension, first, copy the entire table
DateDim = df.copy()

In [30]:
## ACTION REQUIRED: update the start and end date at the bottom of the sql_query variable to fit your needs

sql_query = """
            SELECT
              CONCAT (FORMAT_DATE("%Y",d),FORMAT_DATE("%m",d),FORMAT_DATE("%d",d)) as date_id,
              d AS full_date,
              FORMAT_DATE('%w', d) AS week_day,
              FORMAT_DATE('%A', d) AS day_name,
              FORMAT_DATE('%B', d) as month_name,
              FORMAT_DATE('%Q', d) as fiscal_qtr,
              FORMAT_DATE('%Y', d) AS year,
            FROM (
              SELECT
                *
              FROM
                UNNEST(GENERATE_DATE_ARRAY('2020-01-01', '2024-01-01', INTERVAL 1 DAY)) AS d )
            """

# store extracted data in new dataframe
date_dim = bigquery_client.query(sql_query).to_dataframe()

# validate that > 0 rows have been extracted and return dataframe
if len(date_dim) > 0:
    print(f"date dimension created successfully, shape of dimension: {date_dim.shape}")
else:
    print("date dimension FAILED")

date dimension created successfully, shape of dimension: (1462, 7)


In [31]:
date_dim.tail()

Unnamed: 0,date_id,full_date,week_day,day_name,month_name,fiscal_qtr,year
1457,20231228,2023-12-28,4,Thursday,December,4,2023
1458,20231229,2023-12-29,5,Friday,December,4,2023
1459,20231230,2023-12-30,6,Saturday,December,4,2023
1460,20231231,2023-12-31,0,Sunday,December,4,2023
1461,20240101,2024-01-01,1,Monday,January,1,2024


In [68]:
# Creating the offense dimension, first, copy the entire table
OffenseDim = df.copy()

In [69]:
#Now, subset for only the wanted columns in the dimension
OffenseDim = OffenseDim[['law_cat_cd', 'pd_cd', 'pd_desc']]

In [70]:
# Now, add offense_id as a surrogate key
OffenseDim.insert(0, 'Offense_id', range(100, 100 + len(OffenseDim)))
OffenseDim.head()

Unnamed: 0,Offense_id,law_cat_cd,pd_cd,pd_desc
0,100,FELONY,361,"ROBBERY,BANK"
1,101,FELONY,155,RAPE 2
2,102,FELONY,424,"LARCENY,GRAND BY CREDIT CARD ACCT COMPROMISE-E..."
3,103,FELONY,739,"FRAUD,UNCLASSIFIED-FELONY"
4,104,FELONY,739,"FRAUD,UNCLASSIFIED-FELONY"


In [71]:
#Creating the location dimension, first, copy the entire table
LocationDim = df.copy()

In [72]:
#Now, subset for only the wanted columns in the dimension
LocationDim = LocationDim[['latitude', 'longitude', 'boro_nm']]

In [73]:
# Now, add location_id as a surrogate key
LocationDim.insert(0, 'Location_id', range(111000, 111000 + len(LocationDim)))
LocationDim.head()

Unnamed: 0,Location_id,latitude,longitude,boro_nm
0,111000,40.833567,-73.861118,BRONX
1,111001,40.6488507469884,-73.951016510623,BROOKLYN
2,111002,40.619768,-74.08407,STATEN ISLAND
3,111003,40.749791,-73.893792,QUEENS
4,111004,40.671636,-73.863359,BROOKLYN


In [74]:
#Merging the original df with all the dimensions created to make one large dataset
#We tried to use the df.merge but got a memory issue
#Now using pandas concatenate to merge the file
df1 = pd.concat([df, date_dim.date_id, time_dim.time_id, VictimDim.vic_id, SuspectDim.susp_id,
                           OffenseDim.Offense_id, LocationDim.Location_id],axis = 1)

In [75]:
df1.head()

Unnamed: 0,cmplnt_num,boro_nm,cmplnt_fr_dt,cmplnt_fr_tm,cmplnt_to_tm,crm_atpt_cptd_cd,law_cat_cd,pd_cd,pd_desc,susp_age_group,...,vic_sex,latitude,longitude,cmplnt_to_dt,date_id,time_id,vic_id,susp_id,Offense_id,Location_id
0,247853895,BRONX,2022-07-11,09:09:00,(null),COMPLETED,FELONY,361,"ROBBERY,BANK",25-44,...,D,40.833567,-73.861118,NaT,20200101,0,1,1,100,111000
1,250722124,BROOKLYN,2021-01-01,12:00:00,12:00:00,COMPLETED,FELONY,155,RAPE 2,25-44,...,F,40.6488507469884,-73.951016510623,2022-09-07,20200102,1,2,2,101,111001
2,239511545,STATEN ISLAND,2021-12-24,08:00:00,08:00:00,COMPLETED,FELONY,424,"LARCENY,GRAND BY CREDIT CARD ACCT COMPROMISE-E...",(null),...,M,40.619768,-74.08407,2022-01-03,20200103,2,3,3,102,111002
3,247992275,QUEENS,2015-06-15,00:01:00,17:00:00,COMPLETED,FELONY,739,"FRAUD,UNCLASSIFIED-FELONY",(null),...,M,40.749791,-73.893792,2022-07-13,20200104,3,4,4,103,111003
4,239602232,BROOKLYN,2018-01-01,09:00:00,23:59:00,COMPLETED,FELONY,739,"FRAUD,UNCLASSIFIED-FELONY",(null),...,M,40.671636,-73.863359,2021-04-14,20200105,4,5,5,104,111004


In [76]:
#Now, we create the fact table (CrimeComplaints)
CrimeComplaints_fact = df1[["crm_atpt_cptd_cd", "date_id", "time_id", "vic_id", "susp_id",
                           "Offense_id", "Location_id"]]
CrimeComplaints_fact.head()

Unnamed: 0,crm_atpt_cptd_cd,date_id,time_id,vic_id,susp_id,Offense_id,Location_id
0,COMPLETED,20200101,0,1,1,100,111000
1,COMPLETED,20200102,1,2,2,101,111001
2,COMPLETED,20200103,2,3,3,102,111002
3,COMPLETED,20200104,3,4,4,103,111003
4,COMPLETED,20200105,4,5,5,104,111004


In [77]:
# we create a function to load dataframes to BigQuery

def load_table_to_bigquery(df,
                          table_name,
                          dataset_id):

    dataset_id = dataset_id 

    dataset_ref = bigquery_client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.write_disposition = "WRITE_TRUNCATE"

    upload_table_name = f"{dataset_id}.{table_name}"
    
    load_job = bigquery_client.load_table_from_dataframe(df,
                                                upload_table_name,
                                                job_config = job_config)
        
    print(f"completed job {load_job}")

In [78]:
#1st loading CrimeComplaints_fact table to BigQuery

load_table_to_bigquery(df = CrimeComplaints_fact,
                          table_name = "CrimeComplaints_fact",
                          dataset_id = dataset_id)

completed job LoadJob<project=pro-bruin-361120, location=US, id=6aeddc7b-e941-4bf8-bc5c-b7091f86368e>


In [79]:
#loading SuspectDim table to BigQuery

load_table_to_bigquery(df = SuspectDim,
                          table_name = "SuspectDim",
                          dataset_id = dataset_id)

completed job LoadJob<project=pro-bruin-361120, location=US, id=e3ee10ae-9ec1-4afe-8e0c-9539b2f18665>


In [80]:
#loading VictimDim table to BigQuery

load_table_to_bigquery(df = VictimDim,
                          table_name = "VictimDim",
                          dataset_id = dataset_id)

completed job LoadJob<project=pro-bruin-361120, location=US, id=636099aa-8102-4495-ab94-f71f6a2b3eff>


In [81]:
#loading DateDim table to BigQuery

load_table_to_bigquery(df = date_dim,
                          table_name = "DateDim",
                          dataset_id = dataset_id)

completed job LoadJob<project=pro-bruin-361120, location=US, id=0134bab9-adab-407c-b1a3-a6b977f02a39>


In [82]:
#loading TimeDim table to BigQuery

load_table_to_bigquery(df = time_dim,
                          table_name = "TimeDim",
                          dataset_id = dataset_id)

completed job LoadJob<project=pro-bruin-361120, location=US, id=8c43c051-d4c1-464f-8411-1b7b1d06b32c>


In [83]:
#loading OffenseDim table to BigQuery

load_table_to_bigquery(df = OffenseDim,
                          table_name = "OffenseDim",
                          dataset_id = dataset_id)

completed job LoadJob<project=pro-bruin-361120, location=US, id=cbe90c92-a2a2-49e9-872d-5fb9b674bfaf>


In [84]:
#loading LocationDim table to BigQuery

load_table_to_bigquery(df = LocationDim,
                          table_name = "LocationDim",
                          dataset_id = dataset_id)

completed job LoadJob<project=pro-bruin-361120, location=US, id=e0724fc1-c989-4ff8-b472-218d46a80426>


# Done: Loaded all Dimensions and Tables to BigQuery