# ETL Pipeline

## Importing Libraries

In [None]:
import pandas as pd
from datetime import datetime 
import os
from subprocess import call
from dotenv import load_dotenv
from sqlalchemy import create_engine, text, INT, VARCHAR, DATE, TIMESTAMP, DECIMAL
from sqlalchemy.exc import SQLAlchemyError

## Setting Up PostgreSQL Connection

### Loading Environmental Variables

In [None]:
# Load environment variables from .env file
load_dotenv()

# Retrieve individual components from environment variables
user = os.getenv('POSTGRES_USER')
password = os.getenv('POSTGRES_PASSWORD')
host = os.getenv('POSTGRES_HOST')
port = os.getenv('POSTGRES_PORT')
db_name = os.getenv('POSTGRES_DB')

# Ensure the connection URI is retrieved successfully
if not all([user, password, host, db_name]):
    raise ValueError("One or more environment variables for the database connection are not set")

# Construct the connection URI
connection_uri = f"postgresql://{user}:{password}@{host}:{port}/{db_name}"

# Ensure the connection URI is retrieved successfully
if connection_uri is None:
    raise ValueError("DATABASE_URL environment variable is not set")

## Creating Schemas, Tables, and Views in PostgreSQL

### Creating a PostgreSQL Connection Engine with SQLAlchemy

In [None]:
# Define function to create an SQLAlchemy engine
def create_db_engine(connection_uri: str):
    """
    Create and return a SQLAlchemy engine based on the provided connection URI.

    Args:
        connection_uri (str): The connection URI for the database.

    Returns:
        Engine: A SQLAlchemy engine connected to the specified database.
    """
    try:
        db_engine = create_engine(connection_uri)
        print("Database engine created successfully.")
    except SQLAlchemyError as e:
        print(f"Error occurred while creating the database engine: {str(e)}")
        return None
    # Log or handle the error as needed
    except Exception as e:
        print(f"An unexpected error occurred: {str(e)}")
        return None
    
    return db_engine

### Executing SQL Scripts Against PostgreSQL (Schemas, Tables, Views)

In [None]:
# Function to run SQL script using shell command
# I had to pass the env parameters explicitly  to the subprocess.call() -> (PGPASSWORD, PGUSER, PGHOST, PGPORT, PGDATABASE)
# This avoided Jupyter Notebook asking for password. 
def run_sql_script(script_name):
    script_path = f"/workspace/external_postgres/{script_name}"
    command = f"psql -U {user} -d {db_name} -h {host} -p {port} -f {script_path}"
    return call(command, shell=True, env={
                                        'PGPASSWORD': password,
                                        'PGUSER': user,
                                        'PGHOST': host,
                                        'PGPORT': port,
                                        'PGDATABASE': db_name
    })

### Checking if Schemas Exist in PostgreSQL

In [None]:
# Function to check schema existence
def check_schema_existence(connection_uri, schema_names):
    try:
        db_engine = create_db_engine(connection_uri)
        if db_engine is None:
            print("Failed to create the database engine.")
            return
        
        with db_engine.connect() as connection:
            print("--- Checking if Schemas exist in the database ---")
            for schema_name in schema_names:
                result = connection.execute(
                    text("SELECT schema_name FROM information_schema.schemata WHERE schema_name = :schema"),
                    {"schema": schema_name}
                )
                schema_exists = result.fetchone() is not None
                if schema_exists:
                    print(f"Schema '{schema_name}' exists in the database.")
                else:
                    print(f"Schema '{schema_name}' does not exist in the database.")
            print("----- End of Schema Checking -----")
    
    except SQLAlchemyError as e:
        print(f"Error occurred while connecting to the database or executing query: {str(e)}")
    except Exception as e:
        print(f"An unexpected error occurred: {str(e)}")

### Checking if Tables Exist in PostgreSQL

In [None]:
# Function to check table existence
def check_table_existence(connection_uri, schema_name, table_names):
    try:
        db_engine = create_db_engine(connection_uri)
        if db_engine is None:
            print("Failed to create the database engine.")
            return
        
        with db_engine.connect() as connection:
            print("--- Checking if Tables exist ---")
            for table_name in table_names:
                result = connection.execute(
                    text("SELECT table_name FROM information_schema.tables WHERE table_schema = :schema AND table_name = :table"),
                    {"schema": schema_name, "table": table_name}
                )
                table_exists = result.fetchone() is not None
                if table_exists:
                    print(f"Table '{table_name}' exists in schema '{schema_name}'.")
                else:
                    print(f"Table '{table_name}' does not exist in schema '{schema_name}'.")
            print("----- End of Checking Tables -----")
    
    except SQLAlchemyError as e:
        print(f"Error occurred while connecting to the database or executing query: {str(e)}")
    except Exception as e:
        print(f"An unexpected error occurred: {str(e)}")

### Getting Tables Column Names in the Raw Database

In [None]:
def get_schema_table_columns(connection_uri, schema_name, tables_in_schema):
    """
    Fetches column names for a set of tables in a specified schema from a database.

    Args:
        connection_uri (str): The database connection URI.
        schema_name (str): The schema name where the tables are located.
        tables_in_silver (list of str): A list of table names for which the column names are to be fetched.

    Returns:
        dict: A dictionary where the keys are table names and the values are lists of column names for each table.
    """
    columns_dict = {}
    try:
        engine = create_db_engine(connection_uri)
        if engine is None:
            print("Failed to create the database engine.")
        
        with engine.connect() as connection:
            for table_name in tables_in_schema:
                query = text(f"""
                    SELECT column_name 
                    FROM information_schema.columns 
                    WHERE table_schema = '{schema_name}' 
                    AND table_name = '{table_name}';
                """)
                result = connection.execute(query)
                columns = [row[0] for row in result]  # Extract the first element (column_name) and create a list columns of column_names
                columns_dict[table_name] = columns # Fill the columns_dict with keys (table_name) and values (list of column names) 

    except Exception as e:
        print(f"Error occurred while fetching view columns: {str(e)}")

    return columns_dict

### Mapping Raw Table Data Types

In [None]:
def get_raw_table_data_types():
    """
    Returns a dictionary with data types for columns in raw tables.
    """
    raw_data_types = {
        'customers': {
            'CustomerID': INT,
            'Name': VARCHAR(100),
            'Age': INT,
            'Gender': VARCHAR(10),
            'SignupDate': DATE
        },
        'dates': {
            'DateID': INT,
            'Date': DATE,
            'Week': INT,
            'Month': INT,
            'Quarter': INT,
            'Year': INT
        },
        'product_usage': {
            'UsageID': INT,
            'CustomerID': INT,
            'DateID': INT,
            'ProductID': INT,
            'NumLogins': INT,
            'Amount': DECIMAL(10, 2)
        },
        'products': {
            'ProductID': INT,
            'ProductName': VARCHAR(100),
            'Category': VARCHAR(50),
            'Price': DECIMAL(10, 2)
        },
        'subscriptions': {
            'SubscriptionID': INT,
            'CustomerID': INT,
            'StartDate': DATE,
            'EndDate': DATE,
            'Type': VARCHAR(50),
            'Status': VARCHAR(50)
        },
        'support_interactions': {
            'InteractionID': INT,
            'CustomerID': INT,
            'DateID': INT,
            'IssueType': VARCHAR(100),
            'ResolutionTime': INT
        }
    }
    return raw_data_types

## Extract

In [None]:
# Define function to extract data from CSV files
def extract(csv_folder_path):
    """
    Extract data from all CSV files in a folder, one by one.
    
    Args:
    - csv_folder_path (str): Path to the folder containing CSV files.
    
    Returns:
    - dict: A dictionary where keys are table names and values are DataFrames containing data from each CSV file.
    """
    # Test if a folder path exists
    if not os.path.exists(csv_folder_path):
        print(f"Folder '{csv_folder_path}' does not exist.")
        return None
    
    # Create a list of CSV files in the designated folder
    csv_files = [f for f in os.listdir(csv_folder_path) if f.endswith('.csv')]
    if not csv_files:
        print(f"No CSV files found in folder '{csv_folder_path}'.")
        return None
    
    # Create a dictionary where keys are table names and values are DataFrames containing data from each CSV file
    # This allows us to iterate over all the tables and perform specific transformations in the transform_raw() function  
    data_frames = {}

    # Iterating over each CSV file in the folder
    for csv_file in csv_files:
        # Separate the file name from the extension and store it
        table_name = os.path.splitext(csv_file)[0]  # Assuming table name is CSV filename without extension
        # Join CSV folder path with the CSV file name, inserting '/' as needed
        file_path = os.path.join(csv_folder_path, csv_file)
        try:
            df = pd.read_csv(file_path)
            print(f"-> CSV file '{csv_file}' loaded successfully.")
            
            # Add 'extracted_at' column with current timestamp
            # df['extracted_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            
            # Store the CSV in DataFrame format as a value of the dictionary's key
            data_frames[table_name] = df
        except Exception as e:
            print(f"Error reading CSV file '{csv_file}': {str(e)}")
            data_frames[table_name] = None
    
    # Return the dictionary
    return data_frames

## Data Ingestion into RAW

### Load

In [None]:
def ingest_csv_to_raw(dfs, connection_uri, raw_schema_name):
    """
    Extract, transform, and ingest CSV data into a RAW PostgreSQL database.

    Args:
        csv_folder_path (str): Path to the folder containing CSV files.
        connection_uri (str): Connection URI for the PostgreSQL database.
        schema_name (str): Name of the schema in which tables exist or will be created.
        date_columns_map (dict): A dictionary where keys are table names and values are columns to convert to pandas datetime.

    Returns:
        dict: A dictionary where keys are table names and values are DataFrames with the transformed data.
    """

    print("----- Ingesting Data Into Raw. -----")

    try:
        # Create the database engine
        engine = create_engine(connection_uri)

        # Verify connection and schema existence
        with engine.connect() as connection:
            # Set the search path to the specified schema
            set_search_path_query = text(f"SET search_path TO {raw_schema_name};")
            connection.execute(set_search_path_query)
            print(f"Search path set to schema '{raw_schema_name}'.")

            # Iterate over transformed DataFrames and ingest data into the database
            print("-- to_sql() Ingestion Procedure in Raw. --")
            for table_name, cleaned_data_df in dfs.items():
                if cleaned_data_df is None:
                    print(f"Skipping ingestion for table '{table_name}' due to previous errors.")
                    continue

                print(f"Ingesting data into {raw_schema_name}.{table_name}...")

                # Add 'inserted_at' timestamp columns
                # cleaned_data_df['inserted_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

                # Get data types for the table from the dictionary
                raw_data_types = get_raw_table_data_types()
                data_type_dict = raw_data_types.get(table_name)

                if data_type_dict is None:
                    raise ValueError(f"Data types not found for table '{table_name}' in Raw.")

                # Ingest data into the specified schema and table with specified data types
                cleaned_data_df.to_sql(table_name, engine, schema=raw_schema_name, if_exists='replace', index=False, dtype=data_type_dict)

                print(f"-> CSV data ingested successfully into {raw_schema_name}.{table_name}.")

        print("----- END OF DATA INGESTION INTO RAW -----")

    except FileNotFoundError:
        print("Ingest Function: Error - CSV file not found.")
    except SQLAlchemyError as e:
        print(f"Error occurred while connecting to the database or ingesting data: {str(e)}")
    except ValueError as ve:
        print(f"ValueError: {str(ve)}")
    except Exception as e:
        print(f"An unexpected error occurred: {str(e)}")

## Executing

In [None]:
# Ingestion Parameters for RAW
csv_folder_path = '/workspace/external_postgres/data'
schema_names = ['raw']
raw_schema = 'raw'

create_schemas_script_path = 'create_schemas.sql'

# Note, do not alter the order of the table names in silver, or it will not be ingested correctly
tables_in_raw = ['customers', 'dates', 'product_usage', 'products', 'subscriptions', 'support_interactions']

# Define date columns for each table in Raw to perform specific transformations to conform to PostgreSQL syntax.
date_columns_map = {            
    # Date columns should be follow the CSV files column names
    'customers': 'SignupDate',
    'dates': 'Date',
    'subscriptions': ['StartDate', 'EndDate'],
}


# Execute functions (please, respect the order)
    # 1) Run create_schemas.sql
    # 2) (Raw) Calling the Extract Function for all CSV files
    # 3) (Raw) Data Ingestion with Minor Transformation

# 1) Run create_schemas.sql 
print("----- Creating SCHEMAS in PostgreSQL -----")
result = run_sql_script(create_schemas_script_path)
if result == 0:
    print("SQL script executed successfully. Schemas were created.")
    # Check if schemas exist in the database
    check_schema_existence(connection_uri, schema_names)
else:
    print("Error executing SQL script.")

# 2) (Raw) Calling the Extract Function for all CSV files
print(" -- Extract Function. --")
raw_data_dfs = extract(csv_folder_path)
if raw_data_dfs is None:
    print("Extraction failed.")

# 3) (Raw) Data Ingestion with Minor Transformation
ingest_csv_to_raw(raw_data_dfs, connection_uri, raw_schema)

# TEST COLUMN NAMES BEFORE INGESTION IN RAW
print("--- PRINTING COLUMN NAMES AFTER INGESTING IN RAW---")
raw_column_names_before_ingestion = get_schema_table_columns(connection_uri, raw_schema, tables_in_raw)
print(raw_column_names_before_ingestion.items())