# ETL Pipeline

## Importing Libraries

In [25]:
import pandas as pd # Data Transformation
from datetime import datetime 
import os
from subprocess import call
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError

## Setting Up PostgreSQL Connection

### Loading Environmental Variables

In [26]:
# Load environment variables from .env file
load_dotenv()

# Retrieve individual components from environment variables
user = os.getenv('POSTGRES_USER')
password = os.getenv('POSTGRES_PASSWORD')
host = os.getenv('POSTGRES_HOST')
port = os.getenv('POSTGRES_PORT')
db_name = os.getenv('POSTGRES_DB')

# Ensure the connection URI is retrieved successfully
if not all([user, password, host, db_name]):
    raise ValueError("One or more environment variables for the database connection are not set")

# Construct the connection URI
connection_uri = f"postgresql://{user}:{password}@{host}:{port}/{db_name}"

# Ensure the connection URI is retrieved successfully
if connection_uri is None:
    raise ValueError("DATABASE_URL environment variable is not set")

## Creating Schemas, Tables, and Views in PostgreSQL

### Creating a PostgreSQL Connection Engine with SQLAlchemy

In [27]:
# Define function to create an SQLAlchemy engine
def create_db_engine(connection_uri: str):
    """
    Create and return a SQLAlchemy engine based on the provided connection URI.

    Args:
        connection_uri (str): The connection URI for the database.

    Returns:
        Engine: A SQLAlchemy engine connected to the specified database.
    """
    try:
        db_engine = create_engine(connection_uri)
        print("Database engine created successfully.")
    except SQLAlchemyError as e:
        print(f"Error occurred while creating the database engine: {str(e)}")
        return None
    # Log or handle the error as needed
    except Exception as e:
        print(f"An unexpected error occurred: {str(e)}")
        return None
    
    return db_engine

### Creating Schemas in PostgreSQL

In [28]:
# Function to run SQL script using shell command
# I had to pass the env parameters explicitly  to the subprocess.call() -> (PGPASSWORD, PGUSER, PGHOST, PGPORT, PGDATABASE)
# This avoided Jupyter Notebook asking for password. 
def run_sql_script(script_name):
    script_path = f"/workspace/sql_scripts/{script_name}"
    print(f"{user}:{password}@{host}/{db_name}")
    command = f"psql -U {user} -d {db_name} -h {host} -p {port} -f {script_path}"
    return call(command, shell=True, env={
                                        'PGPASSWORD': password,
                                        'PGUSER': user,
                                        'PGHOST': host,
                                        'PGPORT': port,
                                        'PGDATABASE': db_name
    })

### Checking if Schemas Exist in PostgreSQL

In [29]:
# Function to check schema existence
def check_schema_existence(connection_uri, schema_names):
    try:
        db_engine = create_db_engine(connection_uri)
        if db_engine is None:
            print("Failed to create the database engine.")
            return
        
        with db_engine.connect() as connection:
            for schema_name in schema_names:
                result = connection.execute(
                    text("SELECT schema_name FROM information_schema.schemata WHERE schema_name = :schema"),
                    {"schema": schema_name}
                )
                schema_exists = result.fetchone() is not None
                if schema_exists:
                    print(f"Schema '{schema_name}' exists in the database.")
                else:
                    print(f"Schema '{schema_name}' does not exist in the database.")
    
    except SQLAlchemyError as e:
        print(f"Error occurred while connecting to the database or executing query: {str(e)}")
    except Exception as e:
        print(f"An unexpected error occurred: {str(e)}")

### Creating Tables in the Bronze Layer

In [30]:
# # Function to execute /workspace/sql_scripts/bronze/create_bronze_tables.sql
# def create_bronze_tables(connection_uri, bronze_schema, script_path):
#     try:
#         with open(script_path, 'r') as file:
#             print(f"File '{script_path}' opened successfully.")
#             sql_script = file.read()
        
#         engine = create_db_engine(connection_uri)
#         with engine.connect() as connection:
#             # Set the search path to the bronze schema
#             connection.execute(text(f"SET search_path TO {bronze_schema};"))
#             # Execute the script to create tables
#             connection.execute(text(sql_script))      
#         print("Bronze tables created successfully.")
        
#     except FileNotFoundError:
#         print("create_bronze_tables.sql not found.")
#     except Exception as e:
#         print(f"Error executing create_bronze_tables.sql: {str(e)}")

### Creating Views in the Silver Layer

In [31]:
# # Function to execute create_silver_views.sql
# def create_silver_views(connection_uri, silver_schema, script_path):
#     try:
#         with open(script_path, 'r') as file:
#             sql_script = file.read()
        
#         engine = create_db_engine(connection_uri)
#         with engine.connect() as connection:
#             # Set the search path to the silver schema
#             connection.execute(text(f"SET search_path TO {silver_schema};"))
#             # Execute the script to create views
#             connection.execute(text(sql_script))
            
#         print("Silver views created successfully.")
#     except FileNotFoundError:
#         print("create_silver_views.sql not found.")
#     except Exception as e:
#         print(f"Error executing create_silver_views.sql: {str(e)}")

## Data Ingestion into the Bronze Layer

### Extract

In [32]:
# Define function to extract data from CSV files
def extract(csv_folder_path):
    """
    Extract data from all CSV files in a folder, one by one.
    
    Args:
    - csv_folder_path (str): Path to the folder containing CSV files.
    
    Returns:
    - dict: A dictionary where keys are table names and values are DataFrames containing data from each CSV file.
    """
    # Test if a folder path exists
    if not os.path.exists(csv_folder_path):
        print(f"Folder '{csv_folder_path}' does not exist.")
        return None
    
    # Create a list of CSV files in the designated folder
    csv_files = [f for f in os.listdir(csv_folder_path) if f.endswith('.csv')]
    if not csv_files:
        print(f"No CSV files found in folder '{csv_folder_path}'.")
        return None
    
    # Create a dictionary where keys are table names and values are DataFrames containing data from each CSV file
    # This allows us to iterate over all the tables and perform specific transformations in the transform_raw() function  
    data_frames = {}

    # Iterating over each CSV file in the folder
    for csv_file in csv_files:
        # Separate the file name from the extension and store it
        table_name = os.path.splitext(csv_file)[0]  # Assuming table name is CSV filename without extension
        # Join CSV folder path with the CSV file name, inserting '/' as needed
        file_path = os.path.join(csv_folder_path, csv_file)
        try:
            df = pd.read_csv(file_path)
            print(f"-> CSV file '{csv_file}' loaded successfully.")
            
            # Add 'extracted_at' column with current timestamp
            df['extracted_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            
            # Store the CSV in DataFrame format as a value of the dictionary's key
            data_frames[table_name] = df
        except Exception as e:
            print(f"Error reading CSV file '{csv_file}': {str(e)}")
            data_frames[table_name] = None
    
    # Return the dictionary
    return data_frames

### Transform

**[Based on the CSV Files]**

**Customers**
* SignupDate: will be converted to datetime (to be used in PostgreSQL).

**Dates**
* DateID : will be converted from boolean to integer (to be used in PostgreSQL).

**Product Usage**
* Date column not available.

**Subscriptions**
* StartDate,EndDate: will be converted to datetime (to be used in PostgreSQL).
* Status: will be converted from boolean to integer (to be used in PostgreSQL).

**Support Interactions**
* Date column not available.

In [33]:
# Define function to transform raw data
def transform_raw(data_frames, date_columns_map):
    """
    Transform multiple raw DataFrames extracted from CSV files.
    Perform cleaning procedures: Convert specified date columns to pandas datetime and 
    boolean columns to integer.

    Args:
        data_frames (dict): A dictionary where keys are table names and values are DataFrames 
                            containing raw data extracted from CSV files.
        date_columns_map (dict): A dictionary where keys are table names and values are 
                                 column names to convert to pandas datetime.
        boolean_columns_map (dict): (I ommited this argument as there is no boolean column at this time) A dictionary where keys are table names and values are 
                                    column names to convert from boolean to integer.

    Returns:
        dict: A dictionary where keys are table names and values are cleaned DataFrames 
              with the specified transformations applied.
    """
    # Create a dictionary where keys are table names and values are the clean DataFrames after performing specific transformations.
    cleaned_data_frames = {}

    # Iterating over all tables. If table is not in the dict returned by the extract() function, then it is skipped for transformation.
    # df contains the actual data (in the DataFrame format) for each table
    for table_name, df in data_frames.items():
    
        try:
            if table_name in date_columns_map:
            # we specify the date columns in each table to perform transformations.
                # If a value of a key of date_columns_map is a single date column:
                #   Ex: date_columns == 'SignupDate', a string.
                # If a value of a key of date_columns_map is a list of date columns:
                #   Ex: date_columns == ['StartDate', 'EndDate'], a list.
                date_columns = date_columns_map[table_name] # accessing the value of the key 'table_name'
            
            # Note: the transform_raw() can receive a list of date columns, so we need to ensure the date_columns variable
            # is always treated as a list, even if a single date column name is provided.
                # If date_columns is a list, the condition is True
                # If date_columns is not a list, the condition is False, then it transforms it into a list.
                if not isinstance(date_columns, list):
                    date_columns = [date_columns]

                # Iterate over a potential list of columns (either single or multiple), one by one making the transformation.
                for date_column in date_columns:
                    # Check if the date column exists in the DataFrames that correspond to the data of each table. 
                    if date_column not in df.columns:
                        raise ValueError(f"Column '{date_column}' does not exist in the DataFrame for table '{table_name}'.")
                    
                    # Format the date column to 'YYYY-MM-DD' format
                    df[date_column] = pd.to_datetime(df[date_column]).dt.strftime('%Y-%m-%d')
                    print(f"Successfully converted column '{date_column}' to 'YYYY-MM-DD' format for table '{table_name}'.")
                    print(f"Data type after conversion: {df[date_column].dtype}")
                
                # Builds a DataFrame where date columns have been cleaned for each table, which is a key of this dict.
                # Each cleaned DataFrame is stored as a value of each table.
            
            cleaned_data_frames[table_name] = df

        except ValueError as ve:
            print(ve)
            # Indicates that an error occurred during the processing of the DataFrame for table_name and it
            # sets to None to signify that the data transformation or cleaning for that table was unsuccessful.
            cleaned_data_frames[table_name] = None
        except Exception as e:
            print(f"An error occurred when converting the date for table '{table_name}': {e}")
            cleaned_data_frames[table_name] = None
            
    # Returns a clean DataFrame when dates have been treated.
    return cleaned_data_frames

### Load

In [34]:
# Define function to ingest CSV data into the Bronze layer using DataFrame.to_sql
# Note: we use if_exists='replace'. This performs a full refresh of the table content (drop the table and ingest last updated data).
# Note: use if_exists='append' is you want to append data for the specific table.
def ingest_csv_to_bronze(csv_folder_path, connection_uri, schema_name, table_name, date_columns_map):
    """
    Extract, transform, and ingest CSV data into the Bronze layer of a PostgreSQL database.

    Args:
        csv_folder_path (str): Path to the folder containing CSV files.
        connection_uri (str): Connection URI for the PostgreSQL database.
        schema_name (str): Name of the schema in which tables exist or will be created.
        table_name (str): Name of the table to ingest data into.
        date_columns (list or str): Name(s) of the column(s) to convert to pandas datetime.

    Returns:
        dict: A dictionary where keys are table names and values are DataFrames with the transformed data.
    """

    print("Ingest Into Bronze Function.")

    # Calling the Extract Function for all CSV files
    # Returns: raw_data_dfs, a dict where keys are table names and values are DataFrames with raw data extracted from CSV files.
    print("Extract Function.")
    raw_data_dfs = extract(csv_folder_path)
    if raw_data_dfs is None:
        # Handle case where extraction fails
        print("Extraction failed.")
        return

    # Calling the Transformation Function
    # For each table stored in raw_data_dfs, the transform_raw() perform specific date transformation for each date column.
    # The date_columns_map is a dict where the key is the table name and the values are single date columns (in string format)
    # or multiple columns (in list format). Recall that the transform_raw() also ensures that the date columns are always treated
    # as a list for consistency.
    # It returns transformed_data_dfs, where keys are table names and values are the cleaned DataFrames.
    print("Transformation Function into Bronze.")
    transformed_data_dfs = transform_raw(raw_data_dfs, date_columns_map)
    if transformed_data_dfs is None:
        # Handle case where transformation fails
        print("Error occurred during transformation. Processing aborted.")
        return

    try:
        # Create the database engine
        db_engine = create_db_engine(connection_uri)
        if db_engine is None:
            print("Failed to create the database engine.")
            return

        # Verify connection and schema existence
        with db_engine.connect() as connection:
            # Check if the schema exists
            result = connection.execute(
                text(f"SELECT schema_name FROM information_schema.schemata WHERE schema_name = :schema"),
                {"schema": schema_name}
            )
            schema_exists = result.fetchone() is not None
            if not schema_exists:
                raise ValueError(f"Schema '{schema_name}' does not exist in the database.")
            print(f"Schema '{schema_name}' verified to exist.")

            # Set the search path to the specified schema
            connection.execute(text(f"SET search_path TO {schema_name};"))
            print(f"Search path set to schema '{schema_name}'.")
            
            # Iterate over transformed DataFrames and ingest data into the database
            for table_name, cleaned_data_df in transformed_data_dfs.items():
                if cleaned_data_df is None:
                    print(f"Skipping ingestion for table '{table_name}' due to previous errors.")
                    continue
                
                print(f"Ingesting data into {schema_name}.{table_name}...")

                # Add 'inserted_at' timestamp column
                cleaned_data_df['inserted_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

                # Ingest data into the specified schema and table
                # Note: if_exists='replace' performs a full refresh of the table content (drop the table and ingest last updated data).
                # Note: use if_exists='append' is you want to append data for the specific table.
                cleaned_data_df.to_sql(table_name, db_engine, schema=schema_name, if_exists='replace', index=False)

                print(f"CSV data ingested successfully into {schema_name}.{table_name}.")
                
        return transformed_data_dfs

    except FileNotFoundError:
        print("Ingest Function: Error - CSV file not found.")
    except SQLAlchemyError as e:
        print(f"Error occurred while connecting to the database or ingesting data: {str(e)}")
    except ValueError as ve:
        print(f"ValueError: {str(ve)}")
    except Exception as e:
        print(f"An unexpected error occurred: {str(e)}")


## Data Integration and Transformation into the Silver Layer

In [35]:
# Function for ingesting transformed data from Bronze to Silver
def ingest_bronze_to_silver(connection_uri, bronze_schema, silver_schema, transformed_data_dfs):
    
    print("Ingest from Bronze Into Silver Function.")

    try:
        # Create database engine
        engine = create_db_engine(connection_uri)
        if engine is None:
            print("Failed to create the database engine.")
            return

        # Verify connection and schema existence
        with engine.connect() as connection:
            # Check if the bronze and silver schemas exist
            for schema_name in [bronze_schema, silver_schema]:
                result = connection.execute(
                    text(f"SELECT schema_name FROM information_schema.schemata WHERE schema_name = :schema"),
                    {"schema": schema_name}
                )
                schema_exists = result.fetchone() is not None
                if not schema_exists:
                    raise ValueError(f"Schema '{schema_name}' does not exist in the database.")
                print(f"Schema '{schema_name}' verified to exist.")

            # Set the search path to the silver schema
            connection.execute(text(f"SET search_path TO {silver_schema};"))
            print(f"Search path set to schema '{silver_schema}'.")

            # Iterate over transformed DataFrames and ingest data into the database
            for table_name, cleaned_data_df in transformed_data_dfs.items():
                if cleaned_data_df is None:
                    print(f"Skipping ingestion for table '{table_name}' due to previous errors.")
                    continue
                
                print(f"Ingesting data into {silver_schema}.{table_name}...")

                # Add 'inserted_at' timestamp column
                cleaned_data_df['inserted_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

                # Ingest data into the specified schema and table
                # Note: if_exists='replace' performs a full refresh of the table content (drop the table and ingest last updated data).
                # Note: use if_exists='append' is you want to append data for the specific table.
                cleaned_data_df.to_sql(table_name, engine, schema=silver_schema, if_exists='replace', index=False)

                print(f"Transformed data ingested successfully into {silver_schema}.{table_name}.")

    except FileNotFoundError:
        print("Ingest Function: Error - CSV file not found.")
    except Exception as e:
        print(f"Error occurred while connecting to the database or ingesting data: {str(e)}")
    except ValueError as ve:
        print(f"ValueError: {str(ve)}")
    except Exception as e:
        print(f"An unexpected error occurred: {str(e)}")

## Executing

In [36]:
# Ingestion Parameters - Bronze Layer
csv_folder_path = '/workspace/data/raw'
schema_names = ['bronze'] # CHANGE HERE!
bronze_schema = 'bronze'
silver_schema = 'silver'

# Paths to SQL scripts
create_schemas_script_path = 'schemas/create_schemas.sql'
create_bronze_tables_script_path = 'bronze/create_bronze_tables.sql'
create_silver_views_script_path = 'silver/create_silver_views.sql'

# Tables in Bronze
tables_in_bronze = ['customers', 'dates', 'product_usage', 'products', 'subscriptions', 'support_interactions']
tables_in_silver = ['dim_customers', 'dim_dates', 'dim_products', 'fact_product_usage', 'fact_support_interactions','fact_subscriptions']

# Define Date Columns for each Table in Bronze
date_columns_map = {
    'customers': 'SignupDate',
    'dates': 'Date',
    'subscriptions': ['StartDate', 'EndDate'],
}

# Execute functions

# Run create_schemas.sql
result = run_sql_script(create_schemas_script_path)
if result == 0:
    print("SQL script executed successfully. Schemas were created.")
    # Check if schemas exist in the database
    check_schema_existence(connection_uri, schema_names)
else:
    print("Error executing SQL script.")

# Run create_bronze_tables.sql
result = run_sql_script(create_bronze_tables_script_path)
if result == 0:
    print("SQL script executed successfully. Tables were created into the Bronze Layer.")
    # Check if schemas exist in the database
    check_schema_existence(connection_uri, schema_names)
else:
    print("Error executing SQL script.")

# Ingest into Bronze
transformed_data_dfs = ingest_csv_to_bronze(csv_folder_path, connection_uri, bronze_schema, tables_in_bronze, date_columns_map)

# Run create_silver_views.sql
result = run_sql_script(create_silver_views_script_path)
if result == 0:
    print("SQL script executed successfully. Views were created into the Silver Layer.")
    # Check if schemas exist in the database
    check_schema_existence(connection_uri, schema_names)
else:
    print("Error executing SQL script.")

ingest_bronze_to_silver(connection_uri, bronze_schema, silver_schema, transformed_data_dfs)

myuser:mypassword@postgres/mydatabase
CREATE SCHEMA
CREATE SCHEMA
CREATE SCHEMA
SQL script executed successfully. Schemas were created.
Database engine created successfully.
Schema 'bronze' exists in the database.
myuser:mypassword@postgres/mydatabase
CREATE TABLE
CREATE TABLE
CREATE TABLE
CREATE TABLE
CREATE TABLE
CREATE TABLE
SQL script executed successfully. Tables were created into the Bronze Layer.
Database engine created successfully.
Schema 'bronze' exists in the database.
Ingest Into Bronze Function.
Extract Function.
-> CSV file 'customers.csv' loaded successfully.
-> CSV file 'dates.csv' loaded successfully.
-> CSV file 'products.csv' loaded successfully.
-> CSV file 'product_usage.csv' loaded successfully.
-> CSV file 'subscriptions.csv' loaded successfully.
-> CSV file 'support_interactions.csv' loaded successfully.
Transformation Function into Bronze.
Successfully converted column 'SignupDate' to 'YYYY-MM-DD' format for table 'customers'.
Data type after conversion: object