In [None]:
import config
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sqlalchemy import create_engine
import logging
import warnings
warnings.filterwarnings('ignore')
import time
import folium
from folium.plugins import MarkerCluster

In [164]:

def setup_logging():
    """Configures logging to a file, overwriting it on each run."""
    logging.basicConfig(
        filename=config.LOG_FILE,
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        filemode='w',
        force=True
    )

def create_db_engine():
    """Creates a database engine from config settings."""
    try:
        connection_url = (
            f'postgresql+psycopg2://{config.DB_USER}:{config.DB_PASSWORD}@'
            f'{config.DB_HOST}:{config.DB_PORT}/{config.DB_NAME}'
        )
        engine = create_engine(connection_url)
        engine.connect()
        logging.info("Successfully connected to PostgreSQL database.")
        return engine
    except Exception as e:
        logging.error(f"PostgreSQL connection failed: {e}")
        return None

def load_to_postgres(df, table_name, engine):
    """Loads a DataFrame into a specified PostgreSQL table."""
    try:
        df.to_sql(table_name, engine, if_exists='replace', index=False)
        logging.info(f"Successfully loaded data into table '{table_name}'.")
    except Exception as e:
        logging.error(f"Failed to save data to table '{table_name}': {e}")
        raise

In [165]:
def main():
    """Main ETL process function."""
    start_time = time.time()
    setup_logging()
    logging.info("--- Starting SEQ Bus Data ETL Process ---")

    engine = create_db_engine()
    if not engine:
        logging.critical("Could not establish database connection. Aborting.")
        return 

    base_dir = config.GTFS_DATA_PATH

    files_to_process = {
        'calendar_dates.txt': 'calendar_dates',
        'calendar.txt': 'calendar',
        'routes.txt': 'routes',
        'shapes.txt': 'shapes',
        'stop_times.txt': 'stop_times',
        'stops.txt': 'stops',
        'trips.txt': 'trips'
    }
    
    for file_name, table_name in files_to_process.items():
        try:
            file_path = f"{base_dir}/{file_name}"
            
            logging.info(f"Reading {file_name} from disk...")
            df = pd.read_csv(file_path, low_memory=False)
            load_to_postgres(df, table_name, engine)
            
        except FileNotFoundError:
            logging.error(f"File not found: {file_path}. Skipping.")
        except MemoryError:
            logging.error(f"OUT OF MEMORY while loading {file_name}. Aborting process.")
            break 
        except Exception as e:
            logging.error(f"An ETL error occurred for {file_name}. Skipping. Details: {e}")

    end_time = time.time()
    total_seconds = end_time - start_time
    minutes, seconds = divmod(total_seconds, 60)

    final_message = (
        f"--- ETL Process Finished in {int(minutes)} minutes and {seconds:.2f} seconds. ---"
        )
    logging.info(final_message)
    print(final_message)

if __name__ == "__main__":
    main()

--- ETL Process Finished in 3 minutes and 1.58 seconds. ---


In [166]:
engine = create_db_engine()

calendar_dates = pd.read_sql_table('calendar_dates', engine)
calendar = pd.read_sql_table('calendar', engine)
routes= pd.read_sql_table('routes', engine)
shapes = pd.read_sql_table('shapes', engine)
stop_times = pd.read_sql_table('stop_times', engine)
stops = pd.read_sql_table('stops', engine)
trips = pd.read_sql_table('trips', engine)


In [167]:
dfs = {
    'calendar_dates': calendar_dates,
    'calendar': calendar,
    'routes': routes,
    'shapes': shapes,
    'stop_times': stop_times,
    'stops': stops,
    'trips': trips
}


In [169]:
for name, df in dfs.items():
    name_duplicated = df.duplicated().sum()
    print(f'{name}: {name_duplicated}')     

calendar_dates: 0
calendar: 0
routes: 0
shapes: 0
stop_times: 0
stops: 0
trips: 0


<h2><strong><center>DATA CLEANING</h2></strong></center>

In [None]:
#Replacing Irrelevent Values
logging.info("Cleaning 'routes' table")
routes['route_desc'].fillna('No Description', inplace=True)

logging.info("Cleaning 'routes' table")
stops['stop_code'].fillna('0', inplace=True)
stops['stop_desc'].fillna('No Description', inplace=True)
stops['zone_id'].fillna(0, inplace=True)
stops['stop_url'].fillna('Not Available', inplace=True)
stops['platform_code'].fillna('N/A', inplace=True)

#Data Type Conversion for calendar Table
logging.info("Convert  'calendar' date column")
calendar['start_date'] = pd.to_datetime(calendar['start_date'], format='%Y%m%d')
calendar['end_date'] = pd.to_datetime(calendar['end_date'], format='%Y%m%d')

logging.info("Convert  'calendar_dates' date column")
calendar_dates['date'] = pd.to_datetime(calendar_dates['date'], format='%Y%m%d')

#Checking Duplicates
logging.info('Checking all tables for duplicate row')
for name, df in dfs.items():
    num_duplicate = df.duplicated().sum()
    if num_duplicate > 0:
        logging.warning(f'Found {num_duplicate} duplicate rows in {name}. Removing Them')
        df.drop_duplicates(inplace=True)
    else:
        logging.info(f'No Duplicate rows found in {name}')

#Removing whitespace from routes_df
logging.info("Trimming Whitespaces from 'route_df' columns")
routes['route_short_name'] = routes['route_short_name'].str.strip()
routes['route_long_name'] = routes['route_long_name'].str.strip()

#Removing whitespace from stops_df
logging.info("Trimming Whitespaces from 'stops_df' columns")
stops['stop_name'] = stops['stop_name'].str.strip()

#pushing back to Postgres after cleaning data
logging.info("Pushing all modified data back to PostgreSQL...")

engine = create_db_engine()

if engine:
    try:
        for table_name, df in dfs.items():
            df.to_sql(table_name, engine, if_exists='replace', index=False)
        logging.info('All cleaned tables have been successfully pushed to the database')
    except Exception as e:
        error_message = (f'Error Occured during the database push: {e}')
        print(error_message)
        logging.error(error_message)
else:
    connection_error_message = "Database connection failed. Could not push data."
    print(connection_error_message)
    logging.critical(connection_error_message)