In [None]:
# Install necessary packages
!pip install -r requirements.txt

import os
import pandas as pd
from dotenv import dotenv_values
from sqlalchemy import create_engine, inspect
from functools import wraps

# Loading enviroment variables
CONFIG = dotenv_values(".env")
if not CONFIG:
    CONFIG = os.environ

# Log the name of the function that is executed
def logger(fn):
    from datetime import datetime, timezone

    @wraps(fn)
    def inner(*args, **kwargs):
        called_at = datetime.now(timezone.utc)
        print(f">>> Running {fn.__name__!r} function. Logged at {called_at}")
        to_execute = fn(*args, **kwargs)
        print(f">>> Function: {fn.__name__!r} executed. Logged at {called_at}")
        return to_execute

    return inner
    
# Connect to Postgres database
@logger
def connect_db():
    print("\nConnecting to DB\n")
    connection_uri = "postgresql+psycopg2://{}:{}@{}:{}".format(
        CONFIG["POSTGRES_USER"],
        CONFIG["POSTGRES_PASSWORD"],
        CONFIG["POSTGRES_HOST"],
        CONFIG["POSTGRES_PORT"],
    )

    engine = create_engine(connection_uri, pool_pre_ping=True)
    engine.connect()
    return engine

# Transform the data
@logger
def transform(file_name, sheet):
    # Read raw file
    df = pd.read_excel(file_name, sheet_name=sheet)
    
    # Transform raw data in desire schema
    df.columns = ['Combustível', 'Ano', 'Região', 'uf', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', 'Total']
    df = df.melt(id_vars = ['Combustível', 'Ano', 'Região', 'uf'])
    df = df.loc[df['variable'] != 'Total']
    df = df.rename(columns = {'value': 'volume'})
    df['year_month'] = df['Ano'].astype(str) + '-' + df['variable']
    df['year_month'] = pd.to_datetime(df['year_month'])
    df[['product','unit']] = df['Combustível'].str.rsplit(" (", n=1, expand=True)
    df['unit'] = df['unit'].map(lambda x: x.rstrip(')'))
    df = df.drop(labels = ['Combustível','variable', 'Região', 'Ano'], axis=1)
    df.insert(0, 'created_at', pd.to_datetime('now').replace(microsecond=0))
    df = df[['year_month', 'uf', 'product', 'unit', 'volume', 'created_at']]
    df = df.fillna(0)
    
    return df
    
# Load the data to Postgres DB
@logger
def load(df, engine, table_name):
    df.to_sql(table_name, engine, if_exists="replace")
    check_table_exists(table_name, engine)

# Check if table exist in DB
@logger
def check_table_exists(table_name, engine):
    if table_name in inspect(engine).get_table_names():
        print(f"\n{table_name!r} exists in the DB! Replacing...\n")
    else:
        print(f"\n{table_name} does not exist in the DB! Creating...\n")

# Run ETL pipeline
@logger
def etl():    
    file_name = 'raw' + '/' + 'raw_data.xlsx'
    
    sheet_1 = "DPCache_m3"
    table_name_1 = "sales_oil_derivative"
    sheet_2 = "DPCache_m3_2"
    table_name_2 = "sales_diesel"
    
    engine = connect_db()
    
    df1 = transform(file_name, sheet_1)
    load(df1, engine, table_name_1)
    
    df2 = transform(file_name, sheet_2)
    load(df2, engine, table_name_2)
    
    print('\nEnd of process, run pd.read_sql(f"SELECT * FROM table_name  LIMIT 10", engine) to see the results.\n')

# Run the ETL process
etl()

In [None]:
engine = connect_db()
pd.read_sql(f"SELECT * FROM sales_oil_derivative LIMIT 10", engine)

In [None]:
engine = connect_db()
pd.read_sql(f"SELECT * FROM sales_diesel LIMIT 10", engine)