In [140]:
# ETL Complaint Facts
# If using the native Google BigQuery API module:
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
# import credentials
import pandas as pd
import os
import pyarrow
from datetime import datetime
from google.oauth2 import service_account

In [141]:

# Set the GCP Project, dataset and table name
gcp_project = 'cis-4400-404715'
bq_dataset = '311_illegal_parking'

path_to_service_account_key_file = 'keys.json'

In [142]:
def transform_data( df):
    """
    transform_data
    Accepts a data frame
    Performs any specific cleaning and transformation steps on the dataframe
    Returns the modified dataframe
    """
    # Convert the date_of_birth to a datetime64 data type. 2012-08-21 04:12:16.827
    df['date_of_birth'] = pd.to_datetime(df['date_of_birth'], format='%m/%d/%Y')
    # Convert the postal code into a string
    df['incident_zip'] =  df['incident_zip'].astype(str)
    return df

In [143]:
def upload_bigquery_table(bqclient, table_path, write_disposition, df):
    """
    upload_bigquery_table
    Accepts a path to a BigQuery table, the write disposition and a dataframe
    Loads the data into the BigQuery table from the dataframe.
    for credentials.
    The write disposition is either
    write_disposition="WRITE_TRUNCATE"  Erase the target data and load all new data.
    write_disposition="WRITE_APPEND"    Append to the existing table
    """
    try:
        
        job_config = bigquery.LoadJobConfig(write_disposition=write_disposition)
        
        # Submit the job
        job = bqclient.load_table_from_dataframe(df, table_path, job_config=job_config)
        
        # Show the job results
        job.result()
    except Exception as err:
        print("Failed to load BigQuery Table.", err)
        # os._exit(-1)

In [144]:
def bigquery_table_exists(table_path, bqclient):
    """
    bigquery_table_exists
    Accepts a path to a BigQuery table
    Checks if the BigQuery table exists.
    Returns True or False
    """
    try:
        bqclient.get_table(table_path)  # Make an API request.
        return True
    except NotFound:
        # print("Table {} is not found.".format(table_id))
        return False

In [145]:
def build_new_table(bqclient, table_path, df):
    """
    build_new_table
    Accepts a path to a dimensional table, the dimension name and a data frame
    Add the surrogate key and a record timestamp to the data frame
    Inserts the contents of the dataframe to the dimensional table.
    """
    upload_bigquery_table(bqclient, table_path, "WRITE_TRUNCATE", df)

In [146]:
def insert_existing_table( bqclient, table_path, df):
    """
    insert_existing_table
    Accepts a path to a dimensional table, the dimension name and a data frame
    Compares the new data to the existing data in the table.
    Inserts the new/modified records to the existing table
    """
    upload_bigquery_table( bqclient, table_path, "WRITE_APPEND", df)

In [147]:
def query_bigquery_table(table_path, bqclient, surrogate_key):
    """
    query_bigquery_table
    Accepts a path to a BigQuery table and the name of the surrogate key
    Queries the BigQuery table but leaves out the update_timestamp and surrogate key columns
    Returns the dataframe
    """    
    bq_df = pd.DataFrame
    # sql_query = 'SELECT * EXCEPT ( update_timestamp, '+surrogate_key+') FROM `' + table_path + '`'
    sql_query = 'SELECT * FROM `' + table_path + '`'
    bq_df = bqclient.query(sql_query).to_dataframe()
    return bq_df

In [148]:
def dimension_lookup(dimension_name, lookup_columns, df):
    """
    dimension_lookup
    Lookup the lookup_columns in the dimension_name and return the associated surrogate keys
    Returns dataframe augmented with the surrogate keys
    """
    bq_df = pd.DataFrame
    surrogate_key = dimension_name+"_dim_id"
    dimension_table_path = ".".join([gcp_project,bq_dataset,dimension_name+"_dimension"])
    # Fetch the existing table
    bq_df = query_bigquery_table(dimension_table_path, bqclient, surrogate_key)
    if dimension_name == 'date':
        bq_df['full_date'] = bq_df['full_date'].apply(lambda x: x.strftime('%Y-%m-%d'))

    print(bq_df)
    # Melt the dimension dataframe into an index with the lookup columns
    m = bq_df.melt(id_vars=lookup_columns, value_vars=surrogate_key)
    print(m)
    # Rename the "value" column to the surrogate key column name
    m=m.rename(columns={"value":surrogate_key})
    # Merge with the fact table record
    df = df.merge(m, on=lookup_columns, how='left')
    # Drop the "variable" column and the lookup columns
    df = df.drop(columns=lookup_columns)
    df = df.drop(columns="variable")
    #print(df)
    return df

In [149]:
def rename_column(df, bq_dataset, dimension_name):

    # Renaming for 311
    if bq_dataset == '311_illegal_parking':
        if dimension_name == 'complaint':
            df = df.rename(columns={'descriptor': 'complaint_description'})
        elif dimension_name == 'complaint_source':
            df = df.rename(columns={'open_data_channel_type': 'complaint_source_channel'})
        elif dimension_name == 'location':
            df = df.rename(columns={'city': 'incident_city', 'incident_zip': 'incident_zipcode'})
        elif dimension_name == 'date':
            df = df.rename(columns={'created_date': 'full_date'})

    # Renaming for Open Parking
    elif bq_dataset == 'open_parking':
        if dimension_name == 'agency':
            df = df.rename(columns={'issuing_agency': 'agency_name'})
        elif dimension_name == 'location':
            df = df.rename(columns={'precinct': 'precinct_num', 'county': 'borough'})
        elif dimension_name == 'violation':
            df = df.rename(columns={'violation': 'violation_description'})
        elif dimension_name == 'violator':
            df = df.rename(columns={'plate': 'violator_plate', 'state': 'violator_state'})
    return df

In [150]:
def handle_null_values(df, bq_dataset, dimension_name):
    # Renaming for 311
    if bq_dataset == '311_illegal_parking':
      if dimension_name == 'location':
        default_values = {
          'city': 'Unspecified',
          'incident_zip': 0,
          'borough': 'Unspecified'
        }
        df.fillna(default_values, inplace=True)
        
    # Renaming for Open Parking
    elif bq_dataset == 'open_parking':
      if dimension_name == 'agency':
        default_values = {
          'issuing_agency': 'N/A'
        }
        df.fillna(default_values, inplace=True)

      elif dimension_name == 'violation':
        default_values = {
          'violation_status': 'N/A'
        }
        df.fillna(default_values, inplace=True)

      elif dimension_name == 'violator':
        default_values = {
          'plate': 'N/A'
        }
        df.fillna(default_values, inplace=True)
        
    return df

In [151]:
if __name__ == "__main__":
    df = pd.DataFrame
    # Create the BigQuery Client
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path_to_service_account_key_file

    # Construct a BigQuery client object
    bqclient = bigquery.Client()
    
    fact_name = '311_illegal_parking'
    table_name = fact_name + '_fact'
    # Construct the full BigQuery path to the table
    fact_table_path = ".".join([gcp_project,bq_dataset,table_name])
    file_source_path = 'data/311_master.csv'

    # Load in the data file
    with open(file_source_path, 'r') as data:
            df = pd.read_csv(data)
    # Set all of the column names to lower case letters
    df = df.rename(columns=str.lower)

    df = rename_column(df, bq_dataset, 'complaint')
    df = dimension_lookup(dimension_name='complaint', lookup_columns=['complaint_type', 'complaint_description'], df=df)

    df = rename_column(df, bq_dataset, 'complaint_source') 
    df = dimension_lookup(dimension_name='complaint_source', lookup_columns=['complaint_source_channel'], df=df)

    df = rename_column(df, bq_dataset, 'date') 
    # Convert column to datetime
    df['full_date'] = pd.to_datetime(df['full_date'])
    df['year'] = df['full_date'].dt.year
    df['month'] = df['full_date'].dt.month
    df['month_name'] = df['full_date'].dt.strftime('%B')
    df['day'] = df['full_date'].dt.day
    df['weekday_name'] = df['full_date'].dt.strftime('%A')
    df['full_date'] = df['full_date'].apply(lambda x: x.strftime('%Y-%m-%d'))
    df = dimension_lookup(dimension_name='date', lookup_columns=['full_date', 'year', 'month', 'month_name', 'day', 'weekday_name'], df=df)

    df = handle_null_values(df, bq_dataset, 'location')
    df = rename_column(df, bq_dataset, 'location') 
    df = dimension_lookup(dimension_name='location', lookup_columns=['borough', 'incident_city', 'incident_zipcode'], df=df)

    df = rename_column(df, bq_dataset, 'status') 
    df = dimension_lookup(dimension_name='status', lookup_columns=['status'], df=df)

    # A list of all of the surrogate keys
    # For transaction grain, also include the 'unique_key' column
    surrogate_keys=['unique_key', 'complaint_dim_id','complaint_source_dim_id','date_dim_id','location_dim_id','status_dim_id']
    
    # Remove all of the other non-surrogate key columns
    df = df[surrogate_keys]

    # See if the target table exists
    target_table_exists = bigquery_table_exists(fact_table_path, bqclient )
    # If the target table does not exist, load all of the data into a new table
    if not target_table_exists:
        build_new_table( bqclient, fact_table_path, df)
    # If the target table exists, then perform an incremental load
    if target_table_exists:
        insert_existing_table( bqclient, fact_table_path, df)

    complaint_dim_id   complaint_type           complaint_description
0                  1  Illegal Parking                 Blocked Hydrant
1                  2  Illegal Parking               Blocked Crosswalk
2                  3  Illegal Parking   Posted Parking Sign Violation
3                  4  Illegal Parking  Double Parked Blocking Traffic
4                  5  Illegal Parking    Commercial Overnight Parking
5                  6  Illegal Parking                Blocked Sidewalk
6                  7  Illegal Parking     Parking Permit Improper Use
7                  8  Illegal Parking  Double Parked Blocking Vehicle
8                  9  Illegal Parking               Blocked Bike Lane
9                 10  Illegal Parking        Unauthorized Bus Layover
10                11  Illegal Parking    Overnight Commercial Storage
11                12  Illegal Parking                Detached Trailer
12                13  Illegal Parking            Paper License Plates
13                14

OPEN PARKING

In [None]:
if __name__ == "__main__":
    df = pd.DataFrame
    # Create the BigQuery Client
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path_to_service_account_key_file

    # Construct a BigQuery client object
    bqclient = bigquery.Client()

    fact_name = 'open_parking'
    table_name = fact_name + '_fact'
    # Construct the full BigQuery path to the table
    fact_table_path = ".".join([gcp_project,bq_dataset,table_name])
    file_source_path = 'data/open_parking_master.csv'

    # Load in the data file
    with open(file_source_path, 'r') as data:
            df = pd.read_csv(data)
    # Set all of the column names to lower case letters
    df = df.rename(columns=str.lower)

    df = rename_column(df, bq_dataset, 'agency')
    df = dimension_lookup(dimension_name='agency', lookup_columns=['agency_name'], df=df)

    df = rename_column(df, bq_dataset, 'date') 
    # Convert column to datetime
    df['full_date'] = pd.to_datetime(df['full_date'])
    df['year'] = df['full_date'].dt.year
    df['month'] = df['full_date'].dt.month
    df['month_name'] = df['full_date'].dt.strftime('%B')
    df['day'] = df['full_date'].dt.day
    df['weekday_name'] = df['full_date'].dt.strftime('%A')
    df['full_date'] = df['full_date'].apply(lambda x: x.strftime('%Y-%m-%d'))
    df = dimension_lookup(dimension_name='date', lookup_columns=['full_date', 'year', 'month', 'month_name', 'day', 'weekday_name'], df=df)

    df = rename_column(df, bq_dataset, 'location')
    df = dimension_lookup(dimension_name='location', lookup_columns=['precinct_num', 'borough', 'incident_zipcode'], df=df)

    df = rename_column(df, bq_dataset, 'violation') 
    df = dimension_lookup(dimension_name='violation', lookup_columns=['violation_description', 'violation_status'], df=df)

    df = rename_column(df, bq_dataset, 'violator') 
    df = dimension_lookup(dimension_name='violator', lookup_columns=['violator_plate', 'violator_state', 'license_type'], df=df)

    # A list of all of the surrogate keys
    # For transaction grain, also include the 'unique_key' column
    surrogate_keys=['unique_key', 'agency_dim_id', 'location_dim_id', 'date_dim_id', 'violation_dim_id', 'violator_dim_id']
    
    # Remove all of the other non-surrogate key columns
    df = df[surrogate_keys]

    # See if the target table exists
    target_table_exists = bigquery_table_exists(fact_table_path, bqclient )
    # If the target table does not exist, load all of the data into a new table
    if not target_table_exists:
        build_new_table( bqclient, fact_table_path, df)
    # If the target table exists, then perform an incremental load
    if target_table_exists:
        insert_existing_table( bqclient, fact_table_path, df)