<a href="https://colab.research.google.com/github/ChingWingYeung/Assignment2WebCrawler/blob/main/etl_dimension.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# If using Google CoLab, run the following lines to set up Authentication with GCP
from google.colab import auth
auth.authenticate_user()
print('Authenticated')


Authenticated


In [2]:
# If using Google CoLab, import these  modules for BigQuery
%load_ext google.cloud.bigquery
%load_ext google.colab.data_table

In [11]:
# If using the native Google BigQuery API module:
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
import pandas as pd
import os
import pyarrow
import logging
from datetime import datetime

In [8]:
# If using a service account key file, save the path to that file in credentials.py and import credentials
path_to_service_account_key_file = "D:\Desktop\genuine-period-442417-u9-ccc724799f16.json"
import credentials


In [9]:
# Set the name of the dimension
dimension_name = 'agency'

# Set the name of the surrogate key
surrogate_key = f"{dimension_name}_dim_id"

# Set the name of the business key
business_key = f'{dimension_name}_id'

# Set the GCP Project, dataset and table name
gcp_project = 'My First Project'
bq_dataset = '311_warehouse'
table_name = f"{dimension_name}_dimension"
# Construct the full BigQuery path to the table
dimension_table_path = f"{gcp_project}.{bq_dataset}.{table_name}"

# Set the path to the source data files. Use double-slash for Windows paths C:\\myfolder
# For Linux use forward slashes    /home/username/python_etl
# For Mac use forward slashes      /users/username/python_etl
# file_source_path = 'c:\\Python_ETL'
# file_source_path = 'C:\\Users\\rholo\\OneDrive\\Documents\\classes\\4400\\311'
file_source_path = 'D:\\Desktop'

In [12]:
# Set up logging
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)
current_date = datetime.today().strftime('%Y%m%d')
log_filename = "_".join(["etl",dimension_name,current_date])+".log"
logging.basicConfig(filename=log_filename, encoding='utf-8', format='%(asctime)s %(message)s', level=logging.DEBUG)
logging.info("=========================================================================")
logging.info(f"Starting ETL Run for dimension {dimension_name} on date {current_date}")


In [13]:
def load_csv_data_file(logging: logging.Logger,
                      file_source_path: str,
                      file_name: str,
                      df: pd.DataFrame):
    """
    load_csv_data_file
    Accepts a file source path and a file name
    Loads the file into a data frame
    Exits the program on error
    Returns the dataframe
    """
    file_source = os.path.join(file_source_path, file_name)
    logging.info(f"Reading source data file: {file_source}")
    # Read in the source data file for the customers data
    try:
        df = pd.read_csv(file_source)
        # Set all of the column names to lower case letters
        df = df.rename(columns=str.lower)
        logging.info(f"Read {len(df)} records from source data file: {file_source}")
        return df
    except:
        logging.error(f"Failed to read file: {file_source}")
        # os._exit(-1)
    return df

In [26]:
def transform_data(logging: logging.Logger,
                   df: pd.DataFrame):
    """
    transform_data
    Accepts a data frame
    Performs any specific cleaning and transformation steps on the dataframe
    Returns the modified dataframe
    """
    # Convert the date_of_birth to a datetime64 data type. 2012-08-21 04:12:16.827
    logging.info("Transforming dataframe.")
    # Select the columns for this dimension
    column_list = ['agency','agency_name']
    df = df[column_list]
    # Remove duplicates
    df = df.drop_duplicates()
    return df

In [15]:
def create_bigquery_client(logging):
    """
    create_bigquery_client
    Creates a BigQuery client using the path to the service account key file
    for credentials.
    Returns the BigQuery client object
    """
    try:
        # If authenticating using a service account key file, use the following code:
        # bqclient = bigquery.Client.from_service_account_json(credentials.path_to_service_account_key_file)
        # Google Colab authentication already completed
        bqclient = bigquery.Client(gcp_project)
        logging.info("Created BigQuery Client: %s",bqclient)
        return bqclient
    except Exception as err:
        logging.error("Failed to create BigQuery Client.", err)
        # os._exit(-1)
    return bqclient


In [17]:
def upload_bigquery_table(logging, bqclient, table_path, write_disposition, df):
    """
    upload_bigquery_table
    Accepts a path to a BigQuery table, the write disposition and a dataframe
    Loads the data into the BigQuery table from the dataframe.
    for credentials.
    The write disposition is either
    write_disposition="WRITE_TRUNCATE"  Erase the target data and load all new data.
    write_disposition="WRITE_APPEND"    Append to the existing table
    """
    try:
        logging.info("Creating BigQuery Job configuration with write_disposition=%s", write_disposition)
        # Set up a BigQuery job configuration with the write_disposition.
        job_config = bigquery.LoadJobConfig(write_disposition=write_disposition)
        # Submit the job
        logging.info("Submitting the BigQuery job")
        job = bqclient.load_table_from_dataframe(df, table_path, job_config=job_config)
        # Show the job results
        logging.info("Job  results: %s",job.result())
    except Exception as err:
        logging.error("Failed to load BigQuery Table. %s", err)
        #os._exit(-1)


In [18]:
def bigquery_table_exists(bqclient, table_path):
    """
    bigquery_table_exists
    Accepts a path to a BigQuery table
    Checks if the BigQuery table exists.
    Returns True or False
    """
    try:
        bqclient.get_table(table_path)  # Make an API request.
        return True
    except NotFound:
        return False

In [19]:
def query_bigquery_table(logging, table_path, bqclient, surrogate_key):
    """
    query_bigquery_table
    Accepts a path to a BigQuery table and the name of the surrogate key
    Queries the BigQuery table but leaves out the update_timestamp and surrogate key columns
    Returns the dataframe
    """
    bq_df = pd.DataFrame
    sql_query = 'SELECT * EXCEPT ( update_timestamp, '+surrogate_key+') FROM `' + table_path + '`'
    logging.info("Running query: %s", sql_query)
    try:
        bq_df = bqclient.query(sql_query).to_dataframe()
    except Exception as err:
        logging.info("Error querying the table. %s", err)
    return bq_df

In [20]:
def add_surrogate_key(df, dimension_name='customers', offset=1):
    """
    add_surrogate_key
    Accepts a data frame and inserts an integer identifier as the first column
    Returns the modified dataframe
    """
    # Reset the index to count from 0
    df.reset_index(drop=True, inplace=True)
    # Add the new surrogate key starting from offset
    df.insert(0, dimension_name+'_dim_id', df.index+offset)
    return df

In [21]:
def add_update_date(df, current_date):
    """
    add_update_date
    Accepts a data frame and inserts the current date as a new field
    Returns the modified dataframe
    """
    df['update_date'] = pd.to_datetime(current_date)
    return df

In [22]:
def add_update_timestamp(df):
    """
    add_update_timestamp
    Accepts a data frame and inserts the current datetime as a new field
    Returns the modified dataframe
    """
    df['update_timestamp'] = pd.to_datetime('now', utc=True).replace(microsecond=0)
    return df

In [23]:
def build_new_table(logging, bqclient, dimension_table_path, dimension_name, df):
    """
    build_new_table
    Accepts a path to a dimensional table, the dimension name and a data frame
    Add the surrogate key and a record timestamp to the data frame
    Inserts the contents of the dataframe to the dimensional table.
    """
    logging.info("Target dimension table %s does not exit", dimension_table_path)
    # Add a surrogate key
    df = add_surrogate_key(df, dimension_name, 1)
    # Add the update timestamp
    df = add_update_timestamp(df)
    # Upload the dataframe to the BigQuery table
    upload_bigquery_table(logging, bqclient, dimension_table_path, "WRITE_TRUNCATE", df)

In [24]:
def insert_existing_table(logging, bqclient, dimension_table_path, dimension_name, surrogate_key, df):
    """
    insert_existing_table
    Accepts a path to a dimensional table, the dimension name and a data frame
    Compares the new data to the existing data in the table.
    Inserts the new/modified records to the existing table
    """
    bq_df = pd.DataFrame
    logging.info("Target dimension table %s exits. Checking for differences.", dimension_table_path)
    # Fetch the existing table
    bq_df = query_bigquery_table(logging, dimension_table_path, bqclient, surrogate_key)
    # Compare with the new data set
    new_records_df = pd.concat([df,bq_df]).drop_duplicates(keep=False)
    logging.info("Found %d new records.", new_records_df.shape[0])
    if new_records_df.shape[0] > 0:
        # Set the surrogate key for the new records. bq_df.shape[0] is number of records already in the database
        new_surrogate_key_value = bq_df.shape[0]+1
        new_records_df = add_surrogate_key(new_records_df, dimension_name, new_surrogate_key_value)
        # Add the current date for the new records
        new_records_df = add_update_timestamp(new_records_df)
        # Upload the new records into the dimension table
        upload_bigquery_table(logging, bqclient, dimension_table_path, "WRITE_APPEND", new_records_df)

In [28]:
# Program main
# Load the CSV File into a dataframe
# Transform the Dataframe
# Create a BigQuery client
# See if the target dimension table exists
#    If not exists, load the data into a new table
#    If exists, insert new records into the table
if __name__ == "__main__":
    df = pd.DataFrame
    # Load in the data file
    df = load_csv_data_file(logging, file_source_path, "311_rodent_complaint.csv", df)
    # Transform the data
    df = transform_data(logging, df)
    # Create the BigQuery Client
    bqclient = create_bigquery_client(logging)
    # See if the target dimensional table exists
    target_table_exists = bigquery_table_exists(bqclient, dimension_table_path  )
    # If the target dimension table does not exist, load all of the data into a new table
    if not target_table_exists:
        build_new_table(logging, bqclient, dimension_table_path, dimension_name, df)
    # If the target table exists, then perform an incremental load
    if target_table_exists:
        insert_existing_table(logging, bqclient, dimension_table_path, dimension_name, surrogate_key, df)
    # Flush the log file
    logging.shutdown()

TypeError: 'type' object is not subscriptable

In [32]:
df = pd.read_csv("mnist_test.csv")
display(df)

FileNotFoundError: [Errno 2] No such file or directory: 'mnist_test.csv'

In [None]:
# Check the log. Use cat, head or tail
!tail -35 etl_agency_20230329.log


2023-03-29 15:56:28,977 Started reading table 'handy-bonbon-142723._5ef0660eeeb79c6dc70d5e60af1e07f5112fb556.anon7a21409bf6c5b560a28d311262b7ed445e8d2ce7cba18071e5306baf5ac457ce' with BQ Storage API session 'projects/handy-bonbon-142723/locations/us/sessions/CAISDEFYaEtjNGhOSEQzNRoCaXcaAmpk'.
2023-03-29 15:56:29,719 Found 0 new records.
2023-03-29 15:58:01,091 Making request: GET http://169.254.169.254
2023-03-29 15:58:01,099 Making request: GET http://metadata.google.internal/computeMetadata/v1/project/project-id
2023-03-29 15:58:01,102 Making request: GET http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/?recursive=true
2023-03-29 15:58:01,104 Starting new HTTP connection (1): metadata.google.internal:80
2023-03-29 15:58:01,265 http://metadata.google.internal:80 "GET /computeMetadata/v1/instance/service-accounts/default/?recursive=true HTTP/1.1" 200 198
2023-03-29 15:58:01,266 Making request: GET http://metadata.google.internal/computeMetadata/v1/in