### Sourcing and Extracting the data

In [35]:
import pandas as pd
import psycopg2

In [5]:
def extract_data(filepath: object) -> object:
    """
        input; filepath; str, file path to CSV data
        output; pandas dataframe, extracted from CSV data
    """
    try:
        df = pd.read_csv(filepath)
    except FileNotFoundError:
        print("File Not Found;")
    except pd.errors.EmptyDataError: 
        print("Error: The file is empty.")
    except Exception as e:
        print(f"Error: {e}")
    
    return df

In [7]:
df_crashes = extract_data("datasets/traffic_crashes.csv")
print("DF Crashes: ", df_crashes.shape)

DF Crashes:  (1000, 49)


### Transformation and data cleansing

In [65]:
def transform_data(df: object) -> object:
    """
    input: df; pandas dataframe, extracted data
    output: df; pandas dataframe, transformed data
    """
     # drop duplicate rows
    df = df.drop_duplicates()

    # replace missing values in numeric columns with the mean
    #df.fillna(df.mean(), inplace=True)

    # replace missing values in categorical columns with the mode
    df.fillna(df.mode().iloc[0], inplace=True)

    # Some columns needs to be converted to appropriate data types
    try:
        df['CRASH_DATE'] = pd.to_datetime(df['CRASH_DATE'], format="%m%d%Y")
    except:
        pass
    try:
        df['POSTED_SPEED_LIMIT'] = df['POSTED_SPEED_LIMIT'].astype('int32')
    except:
        pass

    return df

### Load data into PostgresSQL DB

In [45]:
import configparser

config = configparser.ConfigParser()
config.read('config.ini', encoding='utf-8')

['config.ini']

In [48]:
# Establish a connection to the PostgreSQL DB
# Establish the connection
conn = psycopg2.connect(
    database=config['database']['DB_NAME'],
    user=config['database']['DB_user'],
    password=config['database']['DB_password'],
    host=config['database']['DB_host'],
    port=config['database']['DB_port']
)

#create a cursor object for running SQL queries
cur = conn.cursor()
print('successful creation of cursor object.')

successful creation of cursor object.


In [50]:
# suggested continued learning: this function can be modified to be fully dynamic
def load_data(df: object, postgre_table: object, postgre_schema: object) -> object:
    """
    Load transformed data into respective PostgreSQL Table
    :param cur: posgre cursor object
    :return: cursor object
    """
    insert_query = f"INSERT INTO {postgre_table} {postgre_schema};"

    # insert transformed data into PostgreSQL table
    # TODO: REFACTOR TO MAKE SENSE - VERY SLOW / POOR USE OF CPUs
    for index, row in df.iterrows():

        if postgre_table == 'chicago_dmv.Crash':
            insert_values = (row['CRASH_UNIT_ID'],
                              row['CRASH_ID'],
                              row['PERSON_ID'],
                              row['VEHICLE_ID'],
                              row['NUM_UNITS'],
                              row['TOTAL_INJURIES'])

        elif postgre_table == 'chicago_dmv.Vehicle':
            insert_values = (row['CRASH_UNIT_ID'],
                              row['CRASH_ID'],
                              row['CRASH_DATE'],
                              row['VEHICLE_ID'],
                              row['VEHICLE_MAKE'],
                              row['VEHICLE_MODEL'],
                              row['VEHICLE_YEAR'],
                              row['VEHICLE_TYPE'])

        elif postgre_table == 'chicago_dmv.Person':
            insert_values = (row['PERSON_ID'],
                              row['CRASH_ID'],
                              row['CRASH_DATE'],
                              row['PERSON_TYPE'],
                              row['VEHICLE_ID'],
                              row['PERSON_SEX'],
                              row['PERSON_AGE'])

        else:
            raise ValueError(f'Postgre Data Table {postgre_table} does not exist in this pipeline.')

        # Insert data int
        cur.execute(insert_query, insert_values)

    # Commit all changes to the database
    conn.commit()

In [51]:
def close_conn(cur):
    """
    Closing Postgre connection
    :param cur: posgre cursor object
    :return: none
    """

    # Close the cursor and database connection
    cur.close()
    conn.close()
    print('successful closing of cursor object.')

### Running the pipeline

In [54]:
import yaml

# import pipeline configuration
with open('config.yaml', 'r') as file:
    config_data = yaml.safe_load(file)

In [None]:
# Step 1: Extract data
crashes_df = extract_data("datasets/traffic_crashes.csv")
#crashes_df.head()

# Step 2: Transform data
crashes_transformed_df = transform_data(crashes_df)

# Step 3: Load data
load_data(df=crashes_transformed_df,
                postgre_table=config_data['crash_table_PSQL'],
                postgre_schema=config_data['crash_insert_PSQL'])