In [1]:
import pandas as pd
import logging
import yaml
import os
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
import sqlite3

In [2]:
def setup_logging(log_section):
    """
    Set up the logging configuration dynamically for various ETL processes

    @param log_section: str, the section of the logging configuration to use
    """
    config_path = '/Users/akram/DataScienceProjects/customer-churn-prediction/configuration/etl/logging_paths.yaml'
    with open(config_path, 'r') as file:
        log_config = yaml.safe_load(file)

    path = log_config['etl'][log_section]
    log_dir = os.path.dirname(path)
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)

    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        filename=path,
        filemode='w'
    )

### EXTRACT

In [3]:
def open_data_file(file_path):
    """
    Open a file and return pandas DataFrame

    @param file_path (str): The path to the file to open
    @return pd.DataFrame: The data extracted from the file in a dataframe
    """
    # Log a message that the file is being opened
    logging.info(f"Opening file: {file_path}")

    # Check if the file is a csv file, if not a csv file, set an error message and raise a ValueError
    if not file_path.endswith('.csv'):
        # Log an error message
        logging.error(f"Invalid file type for file: {file_path}. Expected a csv file.")
        # Raise a ValueError
        raise ValueError(f"Invalid file type for file: {file_path}. Expected a csv file.")
        
    else:
        # Try to read the file
        try:
            # Load the data into a pandas DataFrame
            data = pd.read_csv(file_path)
            # Log a message that the data was successfully loaded
            logging.info(f"Data succesfully extracted from: {file_path}")

        # If the file is not found, catch the FileNotFoundError exception
        except FileNotFoundError:
            # Log an error message
            logging.error(f"File not found: {file_path}")
            # Raise a FileNotFoundError
            raise FileNotFoundError(f"File not found: {file_path}")
        
    # Return the file object
    return data

def open_config_file(file_path):
    """
    Open a configuration file and return a dictionary

    @param file_path (str): The path to the configuration file to open
    @return dict: The configuration data extracted from the file
    """
    # Log a message that the configuration file is being opened
    logging.info(f"Opening configuration file: {file_path}")

    # Load the YAML configuration, which contains the required columns
    # Try to open the configuration file
    try:
        # Open the configuration file, in read mode, the path is hardcoded since the same configuration file is used for all ETL processes
        with open(file_path, 'r') as file:
            # Load the YAML file, and store it in a dictionary
            config_col = yaml.safe_load(file)
        # Log a message that the required columns were successfully loaded
        logging.info(f'Required columns succesfully extracted from configuration file.')
    
    # If the configuration file is not found, catch the FileNotFoundError exception
    except FileNotFoundError:
        # Log an error message
        logging.error(f"Configuration file not found: {file_path}")
        # Raise a FileNotFoundError
        raise FileNotFoundError(f"Configuration file not found: {file_path}")
    
    # Return the configuration data
    return config_col

def schema_validation(data, required_columns):
    """
    Validate if the data contains the required columns

    @param data (pd.DataFrame): The data extracted from the file in a dataframe
    @param required_columns (list): The list of required columns to validate
    """
    # Validate if the data contains all the required 
    missing_columns = [col for col in required_columns if col not in data.columns]
    if missing_columns:
        logging.error(f"The file is missing required columns: {missing_columns}")
        raise ValueError(f"Schema validation failed. Missing columns: {missing_columns}")

    # Log a successful schema validation
    logging.info("Schema validation passed. All required columns are present.")

def extract_data(data_file_path, config_file_path):
    """
    Extracts the data from a source file, verifies if the file is in csv format, and checks if it contains the expected columns. 
    Otherwise it exits with an exception.

    @param 
    - file_path (str): The path to the file to extract
    -config_file_path (str): The path to the configuration file, which contains the required columns
    
    @return pd.DataFrame: The data extracted from the file in a dataframe

    @exceptions:
    - ValueError: If the file is not a csv file
    - FileNotFoundError: If the file is not found
    - FileNotFoundError: If the configuration file is not found
    - ValueError: If the data is missing required columns
    """
    # Set up the logging configuration, for the ETL extract process
    setup_logging('extract')

    # Log a message that the extract process has started
    logging.info(f"Starting data extract process for: {data_file_path}")

    # Open the file
    data = open_data_file(data_file_path)
            
    # Open the configuration file, which contains the required columns
    config_col = open_config_file(config_file_path)

    # Access required columns
    required_columns = config_col['columns']['required_columns_to_load']
    
    # Validate if the data contains all the required 
    schema_validation(data, required_columns)

    # Return the data
    return data

### TRANSFORM

In [4]:
def normalize(data, columns_to_normalize):
    """
    Normalizes the columns in the data, using the MinMaxScaler
    Normalization is chosen over standardization because the data's difference is meaningful 
    and the data is not normally distributed. 

    @param data (pd.DataFrame): The data to normalize
    @param columns_to_normalize (list): The columns to normalize
    @return pd.DataFrame: The data with the normalized columns
    """
    # Log a message that the normalization process has started
    logging.info("Starting normalization process")

    # Initialize the MinMaxScaler
    scaler = MinMaxScaler()
    # Normalize the columns
    data[columns_to_normalize] = scaler.fit_transform(data[columns_to_normalize])

    # Log a message that the normalization process has ended
    logging.info("Normalization process completed")

    # Return the data with the normalized columns
    return data

def label_encode(data, columns_to_label_encode):
    """
    Label encodes the columns in the data, using the LabelEncoder.
    Label encoding is used for columns that are ordinal, meaning the data has a meaningful order.

    @param data (pd.DataFrame): The data to label encode
    @param columns_to_label_encode (list): The columns to label encode
    @return pd.DataFrame: The data with the label encoded columns
    """
    # Log a message that the label encoding process has started
    logging.info("Starting label encoding process")

    # Initialize the LabelEncoder object
    label_encoder = LabelEncoder()
    # Label encode the columns
    data[columns_to_label_encode] = data[columns_to_label_encode].apply(label_encoder.fit_transform)

    # Log a message that the label encoding process has ended
    logging.info("Label encoding process completed")

    # Return the data with the label encoded columns
    return data

def one_hot_encode(data, columns_to_one_hot_encode):
    """
    One hot encodes the columns in the data, using the get_dummies method.
    One hot encoding is used for columns that are nominal, meaning the data has no meaningful order.

    @param data (pd.DataFrame): The data to one hot encode
    @param columns_to_one_hot_encode (list): The columns to one hot encode
    @return pd.DataFrame: The data with the one hot encoded columns
    """
    # Log a message that the one hot encoding process has started
    logging.info("Starting one hot encoding process")

    # One hot encode the columns
    data = pd.get_dummies(data, columns=columns_to_one_hot_encode, dtype=int)

    # Log a message that the one hot encoding process has ended
    logging.info("One hot encoding process completed")

    # Return the data with the one hot encoded columns
    return data

def open_config_file(file_path):
    """
    Open a configuration file and return a dictionary

    @param file_path (str): The path to the configuration file to open
    @return dict: The configuration data extracted from the file
    """
    # Log a message that the configuration file is being opened
    logging.info(f"Opening configuration file: {file_path}")
    
    # Load the YAML configuration, which contains the columns
    try:
        with open(file_path, 'r') as file:
            config_col = yaml.safe_load(file)
        # Log a message that the configuration file was loaded successfully
        logging.info("Configuration file loaded successfully")
        
    except FileNotFoundError:
        logging.error(f"Configuration file not found: {file_path}")
        raise FileNotFoundError(f"Configuration file not found: {file_path}")
    
    return config_col

def transform(data, config_file):
    """
    Transforms the data by normalizing, label encoding, and one hot encoding the columns in the data.
    The columns to normalize, label encode, and one hot encode are loaded from the configuration file.

    @param data (pd.DataFrame): The data to transform
    @param config_file (str): The path to the configuration file
    @return pd.DataFrame: The transformed data
    """
    # Set up the logging configuration, for the ETL transform process
    setup_logging('transform')

    # Log a message that the transform process has started
    logging.info("Starting data transform process")

    # Open the configuration file, which contains the columns
    config_columns = open_config_file(config_file)
    
    # Access the columns from the configuration file
    columns_to_change_from_object_to_numeric = config_columns['columns']['columns_to_change_from_object_to_numeric']
    columns_one_hot_encode = config_columns['columns']['columns_one_hot_encode']
    columns_label_encode = config_columns['columns']['columns_label_encode']
    columns_to_normalize = config_columns['columns']['columns_to_normalize']
    # Columns that depend on another column, these columns will be label encoded
    columns_that_depend_on_another_column = config_columns['columns']['columns_that_depend_on_another_column']

    # Change the columns from object to numeric
    data[columns_to_change_from_object_to_numeric] = data[columns_to_change_from_object_to_numeric].apply(pd.to_numeric, errors='coerce')

    # Fill missing values with zero value for the columns that will be converted to numeric, since empty means no information about the customer, which means no charges has been made
    data[columns_to_change_from_object_to_numeric] = data[columns_to_change_from_object_to_numeric].fillna(0)

    # Normalize the columns
    data = normalize(data, columns_to_normalize)

    # Label encode independent columns
    data = label_encode(data, columns_label_encode)

    # One hot encode the columns
    data = one_hot_encode(data, columns_one_hot_encode)

    # Label encode columns that depend on another column
    data = label_encode(data, columns_that_depend_on_another_column)
    
    # Log a message that the transform process has ended
    logging.info("Data transform process completed")

    # Return the transformed data
    return data

### LOAD

In [5]:
def load_data(data, db_path):
    """
    Load the transformed data into a SQLite database

    @param data (pd.DataFrame): The transformed data to load
    @param db_path (str): The path to the SQLite database
    """
    # Set up the logging configuration, for the ETL load process
    setup_logging('load')

    # Log a message that the load process has started
    logging.info("Starting data load process")

    try:
        # Connect to SQLite database (this will create a new file if it doesn't exist)
        conn = sqlite3.connect(db_path)

        # Create a table in SQLite dynamically from the DataFrame columns if it doesn't exist
        # Using `to_sql` automatically handles table creation if it doesn't exist.
        data.to_sql('customers', conn, if_exists='replace', index=False)

        # Commit the transaction
        conn.commit()
        logging.info(f"Data successfully loaded into database at {db_path}")
    except Exception as e:
        logging.error(f"Error during data load: {e}")
    finally:
        # Close the database connection
        conn.close()

### ETL

In [6]:
def etl(data_file_path, config_file_path, db_path):
    """
    Extracts, transforms, and loads the data from a source file to a SQLite database

    @param data_file_path (str): The path to the file to extract
    @param config_file_path (str): The path to the configuration file
    @param db_path (str): The path to the SQLite database
    """
    # Set up the logging configuration, for the ETL process
    setup_logging('etl')

    # Log a message that the ETL process has started
    logging.info("Starting ETL process")

    # Extract the data
    data = extract_data(data_file_path, config_file_path)

    # Transform the data
    transformed_data = transform(data, config_file_path)

    # Load the transformed data into the SQLite database
    load_data(transformed_data, db_path)

    # Log a message that the ETL process has ended
    logging.info("ETL process completed")

In [7]:
etl('/Users/akram/DataScienceProjects/customer-churn-prediction/data/WA_Fn-UseC_-Telco-Customer-Churn.csv', 
                    '/Users/akram/DataScienceProjects/customer-churn-prediction/configuration/etl/data_columns.yaml', 
                    '/Users/akram/DataScienceProjects/customer-churn-prediction/data_warehouse/customers.db')

In [8]:
# Connect to the SQLite database
conn = sqlite3.connect('/Users/akram/DataScienceProjects/customer-churn-prediction/data_warehouse/customers.db')

# Query to retrieve all the data from the 'customers' table
query = "SELECT * FROM customers"

# Load the data into a pandas DataFrame
df = pd.read_sql(query, conn)

df.head()

Unnamed: 0,customerID,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,OnlineSecurity,OnlineBackup,DeviceProtection,...,Churn,gender_Female,gender_Male,InternetService_DSL,InternetService_Fiber optic,InternetService_No,PaymentMethod_Bank transfer (automatic),PaymentMethod_Credit card (automatic),PaymentMethod_Electronic check,PaymentMethod_Mailed check
0,7590-VHVEG,0,1,0,0.013889,0,1,0,2,0,...,0,1,0,1,0,0,0,0,1,0
1,5575-GNVDE,0,0,0,0.472222,1,0,2,0,2,...,0,0,1,1,0,0,0,0,0,1
2,3668-QPYBK,0,0,0,0.027778,1,0,2,2,0,...,1,0,1,1,0,0,0,0,0,1
3,7795-CFOCW,0,0,0,0.625,0,1,2,0,2,...,0,0,1,1,0,0,1,0,0,0
4,9237-HQITU,0,0,0,0.027778,1,0,0,0,0,...,1,1,0,0,1,0,0,0,1,0
