# Building an ETL Pipeline
---

### Step 0: Install the required python packages

In [1]:
pip install --upgrade sodapy

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install --upgrade db-dtypes

Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install --upgrade pyarrow

Note: you may need to restart the kernel to use updated packages.


In [4]:
pip install --upgrade google-cloud-bigquery

Note: you may need to restart the kernel to use updated packages.


#### Now, on the top of your Notebook select "Kernel" -> "Restart and Clear Output"
Then, continue from the next cell

### Step 1: Setup your NYC Open Data variables (ACTION REQUIRED HERE)

In [5]:
import pandas as pd
import numpy as np
from sodapy import Socrata
from google.cloud import bigquery
from google.oauth2 import service_account

In [6]:
# setup the host name for the API endpoint (the https:// part will be added automatically)
# only need to change this if you are not using NYC Open Data
data_url = 'data.cityofnewyork.us'

In [7]:
# setup the data set at the API endpoint (311 data in this case)
# For example: https://data.cityofnewyork.us/Social-Services/311-Service-Requests-from-2010-to-Present/erm2-nwe9.json
# would give us 'erm2-nwe9'
# 'https://data.cityofnewyork.us/resource/uip8-fykc.json'
data_set = 'uip8-fykc'

In [8]:
data_set_2 = 'mv4k-y93f'

In [9]:
# Setup your App Token, which you created in Week 6
# You can find your app token by logging into: https://data.cityofnewyork.us/profile/edit/developer_settings
app_token = 'JaXzXa8jgZjrcfmaHSe7UEEzC'

In [10]:
# run this cell to setup your Socrata client that connects python to NYC Open Data

# create the client that points to the API endpoint
nyc_open_data_client = Socrata(data_url, app_token, timeout = 200)
print(f"nyc open data client name is: {nyc_open_data_client}")
print(f"nyc open data client data type is: {type(nyc_open_data_client)}")

nyc open data client name is: <sodapy.socrata.Socrata object at 0x0000028781E08040>
nyc open data client data type is: <class 'sodapy.socrata.Socrata'>


### Step 2: Setup your Google BigQuery variables (ACTION REQUIRED HERE)

If you did not create a key path in class on 3/30/22 (which created a json file on your computer), you must create one to continue:
1. Open BigQuery
2. On the top-left, click on the Navigation Menu
3. In the Navigation Menu, go to "IAM & Admin" -> "Sercive Accounts"
4. On the top of the page, click on "Create Service Account"
5. Account name: cis9440-spring2022
6. Click create and continue
7. Set Role to Owner
8. Click Continue
9. Click Done
10. In the new row for your Service Account, click on the 3 dots in the "Action" column. Select "Manage Keys"
11. Click "Add Key", then "Create New Key". Select the "JSON" radio button and click "Create"
12. In the next cell, set key_path to the exact file path of your new JSON file. For example, it will look like r'C:\Users\Downloads\cis9440-324315-70048a5e1138.json'

In [11]:
# CHANGE THIS TO YOUR FILE PATH
key_path = r'C:\Users\kwokw\Downloads\cis-9440-342619-57f55eab45f2.json'

In [12]:
# run this cell without changing anything to setup your credentials
credentials = service_account.Credentials.from_service_account_file(key_path,
                                                                    scopes=["https://www.googleapis.com/auth/cloud-platform"],)
bigquery_client = bigquery.Client(credentials = credentials,
                                 project = credentials.project_id)

print(f"bigquery client name is: {bigquery_client}")
print(f"bigquery client data type is: {type(bigquery_client)}")

bigquery client name is: <google.cloud.bigquery.client.Client object at 0x0000028781E08310>
bigquery client data type is: <class 'google.cloud.bigquery.client.Client'>


Now, you need to create your dataset id:
1. Go to bigquery
2. Inside the "Explorer" window, click on the 3 dots to the right of your cis9440 project called "View Actions"
3. Select "Create dataset"
4. Leave the Project ID as it is, name your Dataset ID etl_dataset
5. Expand your cis9440 project with the triangle on its left-hand side so you can see your new etl_dataset dataset
6. On the right of your etl_dataset, click the 3 dots for "View Actions" -> "Open"
7. You should now see the "Dataset info". Copy the entire "Dataset ID" and paste it in the variable below

In [13]:
dataset_id = 'etl_dataset'   # PASTE THIS DATASET ID FROM ABOVE STEPS

dataset_id = dataset_id.replace(':', '.')
print(f"your dataset_id is: {dataset_id}")

your dataset_id is: etl_dataset


### Step 3: Extract data

1. connect to NYC Open Data with API Key
2. pull specific dataset as a pandas dataframe
3. Look at shape of extracted data

#### sodapy client.get parameters
1. select
2. where
3. order
4. limit
5. group

In [14]:
# Get the total number of records in our the entire data set
total_record_count = nyc_open_data_client.get(data_set, select = "COUNT(*)")
print(f"total records in {data_set}: {total_record_count}")

total records in uip8-fykc: [{'COUNT': '155507'}]


In [15]:
# Get the total number of records in our target data set
# UPDATE YOUR WHERE FILTER HERE IF NEEDED, below is only an example
target_record_count = nyc_open_data_client.get(data_set,
                                               where = "Arrest_Date > '2020-12-31'",
                                               select= "COUNT(*)")
print(f"target records in {data_set}: {target_record_count}")

target records in uip8-fykc: [{'COUNT': '155507'}]


In [16]:
# Now, loop through target data set to pull all rows in chunks (we cannot pull all rows at once)
# AGAIN, UPDATE WHERE FILTER INSIDE BELOW FUNCTION

def pull_data_in_chunks(target_record_count):
    
    # measure time this function takes
    import time
    start_time = time.time()

    start = 0             # start at 0
    chunk_size = 2000     # fetch 2000 rows at a time
    results = []          # empty out our result list
    record_count = target_record_count

    while True:

        # fetch the set of records starting at 'start'
        results.extend(nyc_open_data_client.get(data_set,
                                                where = "Arrest_Date > '2020-12-31'",
                                                offset = start,
                                                limit = chunk_size))

        # update the starting record number
        start = start + chunk_size

        # if we have fetched all of the records (we have reached record_count), exit loop
        if (start > int(record_count[0]['COUNT'])):
            break

    # convert the list into a pandas data frame
    data = pd.DataFrame.from_records(results)

    end_time = time.time()
    print(f"function took {round(end_time - start_time, 1)} seconds")

    print(f"the shape of your dataframe is: {data.shape}")
    return data

data = pull_data_in_chunks(target_record_count)

function took 32.2 seconds
the shape of your dataframe is: (155507, 19)


In [17]:
target_record_count_2 = nyc_open_data_client.get(data_set_2,
                                               where = "Summons_date > '2020-12-31'",
                                               select= "COUNT(*)")

In [18]:
# Now, loop through target data set to pull all rows in chunks (we cannot pull all rows at once)
# AGAIN, UPDATE WHERE FILTER INSIDE BELOW FUNCTION

def pull_data_in_chunks(target_record_count_2):
    
    # measure time this function takes
    import time
    start_time = time.time()

    start = 0             # start at 0
    chunk_size = 2000     # fetch 2000 rows at a time
    results = []          # empty out our result list
    record_count = target_record_count_2

    while True:

        # fetch the set of records starting at 'start'
        results.extend(nyc_open_data_client.get(data_set_2,
                                                where = "Summons_date > '2020-12-31'",
                                                offset = start,
                                                limit = chunk_size))

        # update the starting record number
        start = start + chunk_size

        # if we have fetched all of the records (we have reached record_count), exit loop
        if (start > int(record_count[0]['COUNT'])):
            break

    # convert the list into a pandas data frame
    data_2 = pd.DataFrame.from_records(results)

    end_time = time.time()
    print(f"function took {round(end_time - start_time, 1)} seconds")

    print(f"the shape of your dataframe is: {data.shape}")
    return data_2

data_2 = pull_data_in_chunks(target_record_count_2)

function took 8.8 seconds
the shape of your dataframe is: (155507, 19)


### Step 4: Data Profiling

1. Distinct values per column
2. Null values per column
3. Summary statistics per numeric column

In [19]:
# what are the columns in our dataframe?
data.columns

Index(['arrest_key', 'arrest_date', 'pd_cd', 'pd_desc', 'ky_cd', 'ofns_desc',
       'law_code', 'law_cat_cd', 'arrest_boro', 'arrest_precinct',
       'jurisdiction_code', 'age_group', 'perp_sex', 'perp_race', 'x_coord_cd',
       'y_coord_cd', 'latitude', 'longitude', 'geocoded_column'],
      dtype='object')

In [20]:
data_2.columns

Index(['summons_key', 'summons_date', 'offense_description',
       'law_section_number', 'law_description', 'summons_category_type',
       'age_group', 'jurisdiction_code', 'boro', 'precinct_of_occur', 'sex',
       'race', 'x_coordinate_cd', 'y_coordinate_cd', 'latitude', 'longitude',
       'geocoded_column'],
      dtype='object')

In [21]:
# create and run a function to ceate data profiling dataframe

def create_data_profiling_df(data):
    
    # create an empty dataframe to gather information about each column
    data_profiling_df = pd.DataFrame(columns = ["column_name",
                                                "column_type",
                                                "unique_values",
                                                "duplicate_values",
                                                "null_values",
                                                "non_null_values",
                                                "percent_null"])

    # loop through each column to add rows to the data_profiling_df dataframe
    for column in data.columns:

        info_dict = {}

        try:
            info_dict["column_name"] = column
            info_dict["column_type"] = data[column].dtypes
            info_dict["unique_values"] = len(data[column].unique())
            info_dict["duplicate_values"] = (data[column].shape[0] - data[column].isna().sum()) - len(data[column].unique())
            info_dict["null_values"] = data[column].isna().sum()
            info_dict["non_null_values"] = data[column].shape[0] - data[column].isna().sum()
            info_dict["percent_null"] = round((data[column].isna().sum()) / (data[column].shape[0]), 3)

        except:
            print(f"unable to read column: {column}, you may want to drop this column")

        data_profiling_df = data_profiling_df.append(info_dict, ignore_index=True)

    data_profiling_df.sort_values(by = ['unique_values', "non_null_values"],
                                  ascending = [False, False],
                                  inplace=True)
    
    return data_profiling_df

data_profiling_df = create_data_profiling_df(data)

unable to read column: geocoded_column, you may want to drop this column


In [22]:
# ACTION REQUIRED
# If any of the above columns were unable to be read by your function, you may want to drop them
# To drop a column, update the column name in the line below and run this cell
data.drop(["geocoded_column"], axis = 1, inplace = True)

In [23]:
# view your data profiling dataframe
data_profiling_df

Unnamed: 0,column_name,column_type,unique_values,duplicate_values,null_values,non_null_values,percent_null
0,arrest_key,object,155507.0,0.0,0.0,155507.0,0.0
16,latitude,object,34210.0,121297.0,0.0,155507.0,0.0
17,longitude,object,34210.0,121297.0,0.0,155507.0,0.0
15,y_coord_cd,object,29300.0,126207.0,0.0,155507.0,0.0
14,x_coord_cd,object,27466.0,128041.0,0.0,155507.0,0.0
6,law_code,object,1005.0,154502.0,0.0,155507.0,0.0
1,arrest_date,object,365.0,155142.0,0.0,155507.0,0.0
2,pd_cd,object,261.0,155217.0,29.0,155478.0,0.0
3,pd_desc,object,242.0,155162.0,103.0,155404.0,0.001
9,arrest_precinct,object,77.0,155430.0,0.0,155507.0,0.0


### Step 5: Data Cleansing

1. drop unneeded columns
2. drop duplicate rows
3. check for outliers

In [24]:
# Run this to look at a list of your columns
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 155507 entries, 0 to 155506
Data columns (total 18 columns):
 #   Column             Non-Null Count   Dtype 
---  ------             --------------   ----- 
 0   arrest_key         155507 non-null  object
 1   arrest_date        155507 non-null  object
 2   pd_cd              155478 non-null  object
 3   pd_desc            155404 non-null  object
 4   ky_cd              155404 non-null  object
 5   ofns_desc          155404 non-null  object
 6   law_code           155507 non-null  object
 7   law_cat_cd         154114 non-null  object
 8   arrest_boro        155507 non-null  object
 9   arrest_precinct    155507 non-null  object
 10  jurisdiction_code  155507 non-null  object
 11  age_group          155507 non-null  object
 12  perp_sex           155507 non-null  object
 13  perp_race          155507 non-null  object
 14  x_coord_cd         155507 non-null  object
 15  y_coord_cd         155507 non-null  object
 16  latitude           1

In [25]:
# ACTION REQUIRED
# edit the drop_columns list below to include all the columns you would like to drop
# then, run this cell to drop columns

drop_columns = ["ky_cd",
               "pd_cd",
               "law_code",
               "arrest_precinct",
               "x_coord_cd",
               "y_coord_cd"]

for column in drop_columns:
    try:
        data.drop(column, axis = 1, inplace = True)
    except:
        print(f"unable to drop {column}")

print(f"columns left in dataframe: {data.columns}")

columns left in dataframe: Index(['arrest_key', 'arrest_date', 'pd_desc', 'ofns_desc', 'law_cat_cd',
       'arrest_boro', 'jurisdiction_code', 'age_group', 'perp_sex',
       'perp_race', 'latitude', 'longitude'],
      dtype='object')


In [26]:
# find number of duplicate rows

print(f"number of duplicate rows: {len(data[data.duplicated()])}")

number of duplicate rows: 0


In [27]:
# drop duplicate rows based on entire row
data = data.drop_duplicates(keep = 'first')

# Or, based on a subset of rows, uncomment below and adjust accordingly
## data = data.drop_duplicates(subset = ["subset column"], keep = 'first')
## data = data.drop_duplicates(subset = ["subset column 1", "subset column 2"], keep = 'first')

print(f"number of rows after duplicates dropped: {len(data)}")

number of rows after duplicates dropped: 155507


In [28]:
# Run this to look at a list of your columns
data_2.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 45717 entries, 0 to 45716
Data columns (total 17 columns):
 #   Column                 Non-Null Count  Dtype 
---  ------                 --------------  ----- 
 0   summons_key            45717 non-null  object
 1   summons_date           45717 non-null  object
 2   offense_description    45717 non-null  object
 3   law_section_number     45717 non-null  object
 4   law_description        33789 non-null  object
 5   summons_category_type  33789 non-null  object
 6   age_group              45717 non-null  object
 7   jurisdiction_code      45717 non-null  object
 8   boro                   45717 non-null  object
 9   precinct_of_occur      45717 non-null  object
 10  sex                    22579 non-null  object
 11  race                   22578 non-null  object
 12  x_coordinate_cd        45536 non-null  object
 13  y_coordinate_cd        45536 non-null  object
 14  latitude               45536 non-null  object
 15  longitude          

In [29]:
# ACTION REQUIRED
# edit the drop_columns list below to include all the columns you would like to drop
# then, run this cell to drop columns

drop_columns = ["law_section_number",
               "precinct_of_occur",
               "x_coordinate_cd",
               "y_coordinate_cd",
               'geocoded_column']

for column in drop_columns:
    try:
        data_2.drop(column, axis = 1, inplace = True)
    except:
        print(f"unable to drop {column}")

print(f"columns left in dataframe: {data_2.columns}")

columns left in dataframe: Index(['summons_key', 'summons_date', 'offense_description', 'law_description',
       'summons_category_type', 'age_group', 'jurisdiction_code', 'boro',
       'sex', 'race', 'latitude', 'longitude'],
      dtype='object')


In [30]:
# find number of duplicate rows

print(f"number of duplicate rows: {len(data_2[data_2.duplicated()])}")

number of duplicate rows: 0


In [31]:
# drop duplicate rows based on entire row
data_2 = data_2.drop_duplicates(keep = 'first')

# Or, based on a subset of rows, uncomment below and adjust accordingly
## data = data.drop_duplicates(subset = ["subset column"], keep = 'first')
## data = data.drop_duplicates(subset = ["subset column 1", "subset column 2"], keep = 'first')

print(f"number of rows after duplicates dropped: {len(data_2)}")

number of rows after duplicates dropped: 45717


In [32]:
data_2.head()

Unnamed: 0,summons_key,summons_date,offense_description,law_description,summons_category_type,age_group,jurisdiction_code,boro,sex,race,latitude,longitude
0,238410432,2021-12-29T00:00:00.000,FEDERAL MOTOR VEH. SAFETY REG,NYS Transportation,NYS TRANS,UNKNOWN,0,BROOKLYN,,,,
1,238484459,2021-12-28T00:00:00.000,OBSTRUCTS VEHICULAR OR PEDESTRIAN TRAFFIC,,,45-64,2,BRONX,M,WHITE HISPANIC,,
2,238325341,2021-12-27T00:00:00.000,FEDERAL MOTOR VEH. SAFETY REG,NYS Transportation,NYS TRANS,UNKNOWN,0,QUEENS,,,,
3,238309466,2021-12-26T00:00:00.000,RECKLESS DRIVING,VTL,VTL,18-24,0,BROOKLYN,F,WHITE,40.81489234700007,-73.92188087799997
4,238338883,2021-12-26T00:00:00.000,MOTOR VEHICLE; ENGINE ON/KEY IN IGNITION (THER...,,,25-44,0,BROOKLYN,M,ASIAN / PACIFIC ISLANDER,,


### Step 4: Create Location Dimension

In [33]:
# first, copy the entire table
location_dim = data.copy()

In [34]:
location_dim.columns

Index(['arrest_key', 'arrest_date', 'pd_desc', 'ofns_desc', 'law_cat_cd',
       'arrest_boro', 'jurisdiction_code', 'age_group', 'perp_sex',
       'perp_race', 'latitude', 'longitude'],
      dtype='object')

In [35]:
# second, subset for only the wanted columns in the dimension
location_dim = location_dim[["arrest_boro",
                             "longitude",
                             "latitude"]]

In [36]:
# third, drop duplicate rows in dimension
location_dim = location_dim.drop_duplicates(subset = ["arrest_boro"], keep = 'first')
location_dim = location_dim.reset_index(drop = True)
location_dim.head()

Unnamed: 0,arrest_boro,longitude,latitude
0,Q,-73.73523390399998,40.72618713000002
1,K,-73.89158591899997,40.63686750900007
2,B,-73.91036538099996,40.82433953100008
3,M,-73.94110928599997,40.800694331000045
4,S,-74.07550839399995,40.625563328000055


In [37]:
# fourth, add location_id as a surrogate key
location_dim.insert(0, 'location_id', range(1000, 1000 + len(location_dim)))
location_dim.head()

Unnamed: 0,location_id,arrest_boro,longitude,latitude
0,1000,Q,-73.73523390399998,40.72618713000002
1,1001,K,-73.89158591899997,40.63686750900007
2,1002,B,-73.91036538099996,40.82433953100008
3,1003,M,-73.94110928599997,40.800694331000045
4,1004,S,-74.07550839399995,40.625563328000055


In [38]:
# fifth, add the location_id to the data table
data = data.merge(location_dim[['arrest_boro', 'location_id']],
                  left_on = 'arrest_boro',
                  right_on = 'arrest_boro',
                  how = 'left')

data.head(2)

Unnamed: 0,arrest_key,arrest_date,pd_desc,ofns_desc,law_cat_cd,arrest_boro,jurisdiction_code,age_group,perp_sex,perp_race,latitude,longitude,location_id
0,238013474,2021-12-18T00:00:00.000,RAPE 1,RAPE,F,Q,97,18-24,M,BLACK,40.72618713000002,-73.73523390399998,1000
1,236943583,2021-11-25T00:00:00.000,"ARSON 2,3,4",ARSON,F,K,71,25-44,M,BLACK,40.63686750900007,-73.89158591899997,1001


### Step 5: Create Age Dimension

In [39]:
# first, copy the entire table
Age_dim = data.copy()

In [40]:
Age_dim.columns

Index(['arrest_key', 'arrest_date', 'pd_desc', 'ofns_desc', 'law_cat_cd',
       'arrest_boro', 'jurisdiction_code', 'age_group', 'perp_sex',
       'perp_race', 'latitude', 'longitude', 'location_id'],
      dtype='object')

In [41]:
# second, subset for only the wanted columns in the dimension
Age_dim = Age_dim[["age_group"]]

In [42]:
# third, drop duplicate rows in dimension
Age_dim = Age_dim.drop_duplicates(subset = ["age_group"], keep = 'first')
Age_dim = Age_dim.reset_index(drop = True)
Age_dim.head()

Unnamed: 0,age_group
0,18-24
1,25-44
2,45-64
3,65+
4,<18


In [43]:
# fourth, add age_id as a surrogate key
Age_dim.insert(0, 'age_id', range(10, 10 + len(Age_dim)))
Age_dim.head()

Unnamed: 0,age_id,age_group
0,10,18-24
1,11,25-44
2,12,45-64
3,13,65+
4,14,<18


In [44]:
# fifth, add the age_id to the Fact table
data = data.merge(Age_dim[['age_group', 'age_id']],
                  left_on = 'age_group',
                  right_on = 'age_group',
                  how = 'left')

data.head(2)

Unnamed: 0,arrest_key,arrest_date,pd_desc,ofns_desc,law_cat_cd,arrest_boro,jurisdiction_code,age_group,perp_sex,perp_race,latitude,longitude,location_id,age_id
0,238013474,2021-12-18T00:00:00.000,RAPE 1,RAPE,F,Q,97,18-24,M,BLACK,40.72618713000002,-73.73523390399998,1000,10
1,236943583,2021-11-25T00:00:00.000,"ARSON 2,3,4",ARSON,F,K,71,25-44,M,BLACK,40.63686750900007,-73.89158591899997,1001,11


### Step 6: Create Date Dimension

In [45]:
## ACTION REQUIRED: update the start and end date at the bottom of the sql_query variable to fit your needs

sql_query = """
            SELECT
              CONCAT (FORMAT_DATE("%Y",d),FORMAT_DATE("%m",d),FORMAT_DATE("%d",d)) as date_id,
              d AS full_date,
              FORMAT_DATE('%w', d) AS week_day,
              FORMAT_DATE('%A', d) AS day_name,
              FORMAT_DATE('%B', d) as month_name,
              FORMAT_DATE('%Q', d) as fiscal_qtr,
              FORMAT_DATE('%Y', d) AS year,
            FROM (
              SELECT
                *
              FROM
                UNNEST(GENERATE_DATE_ARRAY('2020-12-31', '2022-01-01', INTERVAL 1 DAY)) AS d )
            """

# store extracted data in new dataframe
date_dim = bigquery_client.query(sql_query).to_dataframe()

# validate that > 0 rows have been extracted and return dataframe
if len(date_dim) > 0:
    print(f"date dimension created successfully, shape of dimension: {date_dim.shape}")
else:
    print("date dimension FAILED")

date dimension created successfully, shape of dimension: (367, 7)


In [46]:
# create date_id column in the Fact Table
data['date_id'] = data['arrest_date'].apply(lambda x: pd.to_datetime(x).strftime("%Y%m%d"))

### Create Type Dimension

In [47]:
Type_dim = data.copy()

In [48]:
Type_dim.columns

Index(['arrest_key', 'arrest_date', 'pd_desc', 'ofns_desc', 'law_cat_cd',
       'arrest_boro', 'jurisdiction_code', 'age_group', 'perp_sex',
       'perp_race', 'latitude', 'longitude', 'location_id', 'age_id',
       'date_id'],
      dtype='object')

In [49]:
Type_dim = Type_dim[["pd_desc",
                    "ofns_desc"]]

In [50]:
Type_dim = Type_dim.drop_duplicates(subset = ["ofns_desc"], keep = 'first')
Type_dim = Type_dim.reset_index(drop = True)
Type_dim.head()

Unnamed: 0,pd_desc,ofns_desc
0,RAPE 1,RAPE
1,"ARSON 2,3,4",ARSON
2,OBSCENITY 1,SEX CRIMES
3,,
4,ASSAULT 3,ASSAULT 3 & RELATED OFFENSES


In [51]:
Type_dim.insert(0, 'type_id', range(1, 1 + len(Type_dim)))
Type_dim.head()

Unnamed: 0,type_id,pd_desc,ofns_desc
0,1,RAPE 1,RAPE
1,2,"ARSON 2,3,4",ARSON
2,3,OBSCENITY 1,SEX CRIMES
3,4,,
4,5,ASSAULT 3,ASSAULT 3 & RELATED OFFENSES


In [52]:
data = data.merge(Type_dim[['ofns_desc', 'type_id']],
                  left_on = 'ofns_desc',
                  right_on = 'ofns_desc',
                  how = 'left')

data.head(2)

Unnamed: 0,arrest_key,arrest_date,pd_desc,ofns_desc,law_cat_cd,arrest_boro,jurisdiction_code,age_group,perp_sex,perp_race,latitude,longitude,location_id,age_id,date_id,type_id
0,238013474,2021-12-18T00:00:00.000,RAPE 1,RAPE,F,Q,97,18-24,M,BLACK,40.72618713000002,-73.73523390399998,1000,10,20211218,1
1,236943583,2021-11-25T00:00:00.000,"ARSON 2,3,4",ARSON,F,K,71,25-44,M,BLACK,40.63686750900007,-73.89158591899997,1001,11,20211125,2


### Step 5: Creating Fact(s)

In [53]:
# Creating Fact Table

# creating a copy of the data table
fact_table = data.copy()


In [54]:
data_2.head()

Unnamed: 0,summons_key,summons_date,offense_description,law_description,summons_category_type,age_group,jurisdiction_code,boro,sex,race,latitude,longitude
0,238410432,2021-12-29T00:00:00.000,FEDERAL MOTOR VEH. SAFETY REG,NYS Transportation,NYS TRANS,UNKNOWN,0,BROOKLYN,,,,
1,238484459,2021-12-28T00:00:00.000,OBSTRUCTS VEHICULAR OR PEDESTRIAN TRAFFIC,,,45-64,2,BRONX,M,WHITE HISPANIC,,
2,238325341,2021-12-27T00:00:00.000,FEDERAL MOTOR VEH. SAFETY REG,NYS Transportation,NYS TRANS,UNKNOWN,0,QUEENS,,,,
3,238309466,2021-12-26T00:00:00.000,RECKLESS DRIVING,VTL,VTL,18-24,0,BROOKLYN,F,WHITE,40.81489234700007,-73.92188087799997
4,238338883,2021-12-26T00:00:00.000,MOTOR VEHICLE; ENGINE ON/KEY IN IGNITION (THER...,,,25-44,0,BROOKLYN,M,ASIAN / PACIFIC ISLANDER,,


In [55]:
fact_table = fact_table.merge(data_2[['summons_date','summons_key']],
                  left_on = 'arrest_date',
                  right_on = 'summons_date',
                  how ="right")
fact_table.head()

Unnamed: 0,arrest_key,arrest_date,pd_desc,ofns_desc,law_cat_cd,arrest_boro,jurisdiction_code,age_group,perp_sex,perp_race,latitude,longitude,location_id,age_id,date_id,type_id,summons_date,summons_key
0,238418805,2021-12-29T00:00:00.000,ASSAULT 3,ASSAULT 3 & RELATED OFFENSES,M,Q,0,25-44,M,WHITE HISPANIC,40.73828726100004,-73.91872662599997,1000,11,20211229,5,2021-12-29T00:00:00.000,238410432
1,238395969,2021-12-29T00:00:00.000,UNAUTHORIZED USE VEHICLE 3,UNAUTHORIZED USE OF A VEHICLE,M,M,0,25-44,M,WHITE,40.752856835000046,-73.97146620999997,1003,11,20211229,24,2021-12-29T00:00:00.000,238410432
2,238405159,2021-12-29T00:00:00.000,ASSAULT 3,ASSAULT 3 & RELATED OFFENSES,M,B,0,25-44,F,BLACK,40.82449489700008,-73.91252947799995,1002,11,20211229,5,2021-12-29T00:00:00.000,238410432
3,238413491,2021-12-29T00:00:00.000,ASSAULT 3,ASSAULT 3 & RELATED OFFENSES,M,B,0,25-44,M,WHITE HISPANIC,40.86099660100007,-73.90533786599997,1002,11,20211229,5,2021-12-29T00:00:00.000,238410432
4,238405174,2021-12-29T00:00:00.000,"MENACING,UNCLASSIFIED",ASSAULT 3 & RELATED OFFENSES,M,S,0,45-64,M,WHITE,40.52938190900005,-74.16135242599995,1004,12,20211229,5,2021-12-29T00:00:00.000,238410432


In [57]:
# take a subset of fact_table for only the needed columns: which are keys and measures
fact_table = fact_table[["date_id",
                         "location_id",
                         "type_id",
                         "age_id",
                         "arrest_key",
                         "summons_key"]]

fact_table.head()

Unnamed: 0,date_id,location_id,type_id,age_id,arrest_key,summons_key
0,20211229,1000,5,11,238418805,238410432
1,20211229,1003,24,11,238395969,238410432
2,20211229,1002,5,11,238405159,238410432
3,20211229,1002,5,11,238413491,238410432
4,20211229,1004,5,12,238405174,238410432


### Step 6: Deliver Facts and Dimensions to Data Warehouse (BigQuery)

In [58]:
# create a function to load dataframes to BigQuery

def load_table_to_bigquery(df,
                          table_name,
                          dataset_id):

    dataset_id = dataset_id #change 301800 to match your project id

    dataset_ref = bigquery_client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.write_disposition = "WRITE_TRUNCATE"

    upload_table_name = f"{dataset_id}.{table_name}"
    
    load_job = bigquery_client.load_table_from_dataframe(df,
                                                upload_table_name,
                                                job_config = job_config)
        
    print(f"Starting job {load_job}")

In [59]:
# load your first dimension to BigQuery

load_table_to_bigquery(df = location_dim,
                      table_name = "location_dim",
                      dataset_id = dataset_id)

Starting job LoadJob<project=natural-treat-340818, location=US, id=73113c59-79d6-4406-abc9-d91fde8bb5bf>


In [60]:
# load your second dimension to BigQuery

load_table_to_bigquery(df = Age_dim,
                      table_name = "Age_dim",
                      dataset_id = dataset_id)

Starting job LoadJob<project=natural-treat-340818, location=US, id=588a45a7-48ca-4911-8287-53a8385fa1e2>


In [61]:
# load your third dimension to BigQuery

load_table_to_bigquery(df = date_dim,
                      table_name = "date_dim",
                      dataset_id = dataset_id)

Starting job LoadJob<project=natural-treat-340818, location=US, id=484cb116-a17e-4888-8826-b2b6d5665143>


In [62]:
# load your third dimension to BigQuery

load_table_to_bigquery(df = Type_dim,
                      table_name = "Type_dim",
                      dataset_id = dataset_id)

Starting job LoadJob<project=natural-treat-340818, location=US, id=d53b32df-45dc-43b6-8a06-b79171eb0039>


In [63]:
# load your fact table to BigQuery
load_table_to_bigquery(df = fact_table,
                      table_name = "NYC_Crime",
                      dataset_id = dataset_id)

Starting job LoadJob<project=natural-treat-340818, location=US, id=bbfd3ed0-a152-4e8d-8799-24ce3fa49282>
