In [56]:
import sqlalchemy
import pandas as pd

In [57]:
connection_uri = "postgresql+psycopg2://postgres:postgres@db:5432/metrobus_db"
db_engine = sqlalchemy.create_engine(connection_uri)
db_schema = "estatico"
datafiles_list = ["alcaldias","feed_info","shapes","agency","calendar_dates","calendar","routes","trips","stops","stop_times"]

In [58]:
def extract(file_name):
    # Read a txt file and return a dataframe
    df = pd.read_csv(file_name)
    return df

In [59]:
def transform_to_date(raw_data,column):
    # Convert a column to type date
    raw_data[column] = pd.to_datetime(raw_data[column], format="%Y%m%d")
    return raw_data

In [60]:
def transform_to_time(raw_data,column):
    # Convert a column to type time
    raw_data[column] = pd.to_datetime(raw_data[column], format="%H:%M:%S")
    return raw_data

In [61]:
def transform(file_name,raw_data):
    if file_name == "calendar_dates":
       transformed_data = transform_to_date(raw_data,"date")
    elif file_name == "calendar":
        transformed_data = transform_to_date(raw_data,"start_date")
        transformed_data = transform_to_date(raw_data,"end_date")
    elif file_name == "feed_info":
        transformed_data = transform_to_date(raw_data,"feed_start_date")
        transformed_data = transform_to_date(raw_data,"feed_end_date")
    else: 
        transformed_data = raw_data
    return transformed_data

In [62]:
def load(transformed_data, con_engine,schema_name, table_name):
    # Truncate the table 
        # To be created later
    
	# Store the data in the database
    try:
        transformed_data.to_sql(
        	name=table_name,
    		con=con_engine,
            schema=schema_name,
    		if_exists="append",  # Make sure to replace existing data
    		index=False
        )
    except Exception as e:
        print(f"Error inserting record: {e}")


In [63]:
def data_pipeline():
    for f in datafiles_list:       
        # Extracting the data
        file = f + ".txt"
        raw_data = extract(file)

        #Transforming the data
        transformed_data = transform(f,raw_data)
           
        # Loading the data in the database
        table = f + "_bronze"   
        load(transformed_data,db_engine,db_schema,table)

In [64]:
data_pipeline()