# Import Libraries

In [78]:
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
import pandas as pd
import os
import logging
from datetime import datetime

# Create Functions

In [79]:
def load_csv_data_file(logging, file_name, df):
    '''
    load_csv_data_file
    Accepts a file source path and a file name
    Loads the file into a data frame
    Exits the program on error
    Returns the dataframe
    '''
    logging.info(f'Reading source data file: {file_name}')
    try:
        df = pd.read_csv(file_name, low_memory=False)
        df = df.rename(columns=str.lower)
        logging.info(f'Read {len(df)} records from source data file: {file_name}')
        return df
    except:
        logging.error(f'Failed to read file: {file_name}')
    return df

In [80]:
def create_bigquery_client(logging):
    '''
    create_bigquery_client
    Creates a BigQuery client using the path to the service account key file
    for credentials.
    Returns the BigQuery client object
    '''
    try:
        bqclient = bigquery.Client.from_service_account_json('keys/new-cis4400-381214-f4f2229d6853.json') # replace with your own SA keys
        logging.info('Created BigQuery Client: %s',bqclient)
        return bqclient
    except Exception as err:
        logging.error('Failed to create BigQuery Client.', err)
        # os._exit(-1)
    return bqclient

In [81]:
def upload_bigquery_table(logging, bqclient, table_path, write_disposition, df):
    '''
    upload_bigquery_table
    Accepts a path to a BigQuery table, the write disposition and a dataframe
    Loads the data into the BigQuery table from the dataframe.
    for credentials.
    The write disposition is either
    write_disposition='WRITE_TRUNCATE'  Erase the target data and load all new data.   
    write_disposition='WRITE_APPEND'    Append to the existing table
    '''
    try:
        job_config = bigquery.LoadJobConfig(write_disposition=write_disposition)
        # Submit the job
        job = bqclient.load_table_from_dataframe(df, table_path, job_config=job_config)  
        # Show the job results
        job.result()
    except Exception as err:
        logging.error('Failed to load BigQuery Table.', err)
        # os._exit(-1)

In [82]:
def bigquery_table_exists(table_path, bqclient):
    '''
    bigquery_table_exists
    Accepts a path to a BigQuery table
    Checks if the BigQuery table exists.
    Returns True or False
    '''    
    try:
        bqclient.get_table(table_path)  # Make an API request.
        return True
    except NotFound:
        # print('Table {} is not found.'.format(table_id))
        return False

In [83]:
def query_bigquery_table(logging, table_path, bqclient, surrogate_key):
    '''
    query_bigquery_table
    Accepts a path to a BigQuery table and the name of the surrogate key
    Queries the BigQuery table but leaves out the update_timestamp and surrogate key columns
    Returns the dataframe
    '''    
    bq_df = pd.DataFrame
    # sql_query = 'SELECT * EXCEPT ( update_timestamp, ' + surrogate_key + ') FROM `' + table_path + '`'
    sql_query = 'SELECT * EXCEPT (update_timestamp) FROM `' + table_path + '`'
    logging.info('Running query: %s', sql_query)
    bq_df = bqclient.query(sql_query).to_dataframe()
    return bq_df

In [84]:
def add_surrogate_key(df, dimension_name, offset=1):
    '''
    add_surrogate_key  
    Accepts a data frame and inserts an integer identifier as the first column
    Returns the modified dataframe
    '''
    # Reset the index
    df.reset_index(drop=True, inplace=True)
    # Add the new surrogate key starting from offset
    df.insert(0, dimension_name+'_dim_id', df.index+offset)
    return df

In [85]:
def add_update_date(df, current_date):
    '''
    add_update_date
    Accepts a data frame and inserts the current date as a new field
    Returns the modified dataframe
    '''
    df['update_date'] = pd.to_datetime(current_date)
    return df

In [86]:
def add_update_timestamp(df):
    '''
    add_update_timestamp
    Accepts a data frame and inserts the current datetime as a new field
    Returns the modified dataframe
    '''
    df['update_timestamp'] = pd.Timestamp('now', tz='utc').replace(microsecond=0)
    return df

In [87]:
def build_new_table(logging, bqclient, table_path, df):
    '''
    build_new_table
    Accepts a path to a dimensional table, the dimension name and a data frame 
    Add the surrogate key and a record timestamp to the data frame
    Inserts the contents of the dataframe to the dimensional table.
    '''
    logging.info('Target table %s does not exit', table_path)
    upload_bigquery_table(logging, bqclient, table_path, 'WRITE_TRUNCATE', df)

In [88]:
def insert_existing_table(logging, bqclient, table_path, df):
    '''
    insert_existing_table
    Accepts a path to a dimensional table, the dimension name and a data frame 
    Compares the new data to the existing data in the table.
    Inserts the new/modified records to the existing table
    '''
    logging.info('Target table %s exits. Appending records.', table_path)
    upload_bigquery_table(logging, bqclient, table_path, 'WRITE_APPEND', df)

In [89]:
def dimension_lookup(logging, dimension_name, lookup_columns, df):
    '''
    dimension_lookup
    Lookup the lookup_columns in the dimension_name and return the associated surrogate keys
    Returns dataframe augmented with the surrogate keys
    '''
    bq_df = pd.DataFrame
    logging.info('Lookup dimension %s.', dimension_name)
    surrogate_key = dimension_name + '_dim_id'
    dimension_table_path = '.'.join([gcp_project, bq_dataset, dimension_name + '_dimension'])
    bq_df = query_bigquery_table(logging, dimension_table_path, bqclient, surrogate_key)
    m = bq_df.melt(id_vars=lookup_columns, value_vars=surrogate_key)
    m = m.rename(columns={'value':surrogate_key})
    df = df.merge(m, on=lookup_columns, how='left')
    df = df.drop(columns=lookup_columns)
    df = df.drop(columns='variable')
    return df

In [90]:
def date_dimension_lookup(logging, dimension_name, lookup_column, df):
    '''
    dimension_lookup
    Lookup the lookup_columns in the dimension_name and return the associated surrogate keys
    Returns dataframe augmented with the surrogate keys
    '''
    bq_df = pd.DataFrame
    logging.info('Lookup date dimension on column %s.', lookup_column)
    surrogate_key = dimension_name + '_dim_id'
    dimension_table_path = '.'.join([gcp_project, bq_dataset, dimension_name + '_dimension'])
    bq_df = query_bigquery_table(logging, dimension_table_path, bqclient, surrogate_key)
    bq_df['full_date'] = pd.to_datetime(bq_df['full_date'])
    bq_df['full_date'] = bq_df.full_date.dt.date
    
    df[lookup_column] = pd.to_datetime(df[lookup_column])
    df[lookup_column] = df[lookup_column].dt.date
    
    m = bq_df.melt(id_vars='full_date', value_vars=surrogate_key)
    m = m.rename(columns={'value':lookup_column+'_dim_id'})
    df = df.merge(m, left_on=lookup_column, right_on='full_date', how='left')

    df = df.drop(columns=lookup_column)
    df = df.drop(columns='variable')
    df = df.drop(columns='full_date')
    return df

In [117]:
def time_dimension_lookup(logging, dimension_name, lookup_column, df):
    '''
    dimension_lookup
    Lookup the lookup_columns in the dimension_name and return the associated surrogate keys
    Returns dataframe augmented with the surrogate keys
    '''
    bq_df = pd.DataFrame
    logging.info('Lookup time dimension on column %s.', lookup_column)
    surrogate_key = dimension_name + '_dim_id'
    dimension_table_path = '.'.join([gcp_project, bq_dataset, dimension_name + '_dimension'])
    
    bq_df = query_bigquery_table(logging, dimension_table_path, bqclient, surrogate_key)
    
    df[lookup_column] = pd.to_datetime(df[lookup_column]).dt.time
    
    m = bq_df.melt(id_vars='full_time', value_vars=surrogate_key)
    m = m.rename(columns={'value':lookup_column+'_dim_id'})
    
    df = df.merge(m, left_on=lookup_column, right_on='full_time', how='left')

    df = df.drop(columns=lookup_column)
    df = df.drop(columns='variable')
    df = df.drop(columns='full_time')
    return df

# Create Fact Table

## For 311 Complaints Data

In [2]:
df = pd.DataFrame

fact_name = 'complaints'

gcp_project = 'cis4400-381214' # replace to your own project id
bq_dataset = '311_complaints_dataset' # replace to your own dataset name
table_name = fact_name + '_fact'

fact_table_path = '.'.join([gcp_project,bq_dataset,table_name])

In [98]:
# Set up logging
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)
current_date = datetime.today().strftime('%Y%m%d')
log_filename = '_'.join(['etl_complaint_fact_',current_date])+'.log'
logging.basicConfig(filename=log_filename, encoding='utf-8', format='%(asctime)s %(message)s', level=logging.DEBUG)
logging.info('=========================================================================')
logging.info('Starting ETL Run for complaint fact on date '+current_date)

In [20]:
for year in range(2019, 2024):
    logging.info('=========================================================================')
    if __name__ == '__main__':
        df = pd.DataFrame

        bqclient = create_bigquery_client(logging)

        df = load_csv_data_file(logging, f'data/311_traffic_signal_complaints_{year}.csv', df)

        df = dimension_lookup(logging, dimension_name='agency', lookup_columns=['agency', 'agency_name'], df=df)
        df = dimension_lookup(logging, dimension_name='location', lookup_columns=['incident_zip', 'intersection_street_1', 'intersection_street_2', 'borough'], df=df)
        df = dimension_lookup(logging, dimension_name='complaints_status', lookup_columns=['status'], df=df)
        df = dimension_lookup(logging, dimension_name='complaint_type', lookup_columns=['complaint_type', 'descriptor'], df=df)

        df = date_dimension_lookup(logging, dimension_name='date', lookup_column='created_date', df=df)
        df = date_dimension_lookup(logging, dimension_name='date', lookup_column='closed_date', df=df)

        surrogate_keys=['agency_dim_id','location_dim_id','complaints_status_dim_id','complaint_type_dim_id','created_date_dim_id','closed_date_dim_id']
        df = df[surrogate_keys]

        # Add complaint count (for daily snapshot grain)
        df['complaint_count'] = 1
        df = df.groupby(surrogate_keys)['complaint_count'].agg('count').reset_index()

        # See if the target table exists
        target_table_exists = bigquery_table_exists(fact_table_path, bqclient)
        # If the target table does not exist, load all of the data into a new table
        if not target_table_exists:
            build_new_table(logging, bqclient, fact_table_path, df)
        # If the target table exists, then perform an incremental load    
        if target_table_exists:
            insert_existing_table(logging, bqclient, fact_table_path, df)
        logging.shutdown()

## For Motor Vehicle Collision Data 

In [118]:
df = pd.DataFrame

fact_name = 'collision'

gcp_project = 'cis4400-381214' # replace to your own project id
bq_dataset = 'motor_vehicle_collision_dataset' # replace to your own dataset name
table_name = fact_name + '_fact'

fact_table_path = '.'.join([gcp_project,bq_dataset,table_name])

In [119]:
# Set up logging
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)
current_date = datetime.today().strftime('%Y%m%d')
log_filename = '_'.join(['etl_collision_fact_',current_date])+'.log'
logging.basicConfig(filename=log_filename, encoding='utf-8', format='%(asctime)s %(message)s', level=logging.DEBUG)
logging.info('=========================================================================')
logging.info('Starting ETL Run for collision fact on date '+current_date)

In [125]:
for year in range(2019, 2024):
    logging.info('=========================================================================')
    if __name__ == '__main__':
        df = pd.DataFrame
        bqclient = create_bigquery_client(logging)
        df = load_csv_data_file(logging, f'data/motor_vehicle_collision_{year}.csv', df)
        
        df = dimension_lookup(logging, dimension_name='location', lookup_columns=['zip_code', 'borough'], df=df)
        df = dimension_lookup(logging, dimension_name='contributing_factor', lookup_columns=['contributing_factor_vehicle_1','vehicle_type_code1'], df=df)

        df = date_dimension_lookup(logging, dimension_name='date', lookup_column='crash_date', df=df)
        df = time_dimension_lookup(logging, dimension_name='time', lookup_column='crash_time', df=df)

        surrogate_keys=['crash_date_dim_id','crash_time_dim_id','location_dim_id','contributing_factor_dim_id','collision_id','number_of_persons_injured','number_of_pedestrians_injured','number_of_pedestrians_killed','number_of_cyclist_injured','number_of_cyclist_killed','number_of_motorist_injured','number_of_motorist_killed']
        df = df[surrogate_keys]

        # Add collision count (for daily snapshot grain)
        df['collision_count'] = 1
        df = df.groupby(surrogate_keys)['collision_count'].agg('count').reset_index()

        # See if the target table exists
        target_table_exists = bigquery_table_exists(fact_table_path, bqclient)
        # If the target table does not exist, load all of the data into a new table
        if not target_table_exists:
            build_new_table(logging, bqclient, fact_table_path, df)
        # If the target table exists, then perform an incremental load    
        if target_table_exists:
            insert_existing_table(logging, bqclient, fact_table_path, df)
        logging.shutdown()

  df[lookup_column] = pd.to_datetime(df[lookup_column]).dt.time
  df[lookup_column] = pd.to_datetime(df[lookup_column]).dt.time
  df[lookup_column] = pd.to_datetime(df[lookup_column]).dt.time
  df[lookup_column] = pd.to_datetime(df[lookup_column]).dt.time
  df[lookup_column] = pd.to_datetime(df[lookup_column]).dt.time
