# Cleaning Data and Creating DataLoader Function

## Imports

In [None]:
import os
import io
import sys
from pathlib import Path
from importlib.metadata import version
from logging import Logger
from typing import List, Optional
import logging
import joblib
import pandas as pd
from pandas.errors import ParserError
import torch
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from imblearn.over_sampling import RandomOverSampler

In [None]:
# Set up logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

In [None]:
packages = ["pandas", "importlib-metadata", "pyarrow"]
for package in packages:
    try:
        logger.info(f"{package} version: {version(package)}")
    except Exception as e:
        logger.warning(f"Could not get version for package {package}: {e}")

## Load Dataframe from csv file in local directory

In [None]:
DATA_ROOT = Path("../Data")
RAW_DATA_DIR_NAME = "Downloaded-Data"
# DATA_RAW_FILE_NAME = "credit-card-fraud.csv"

DATA_RAW_FILE_NAME = "ENTER A DATA FILE NAME (e.g., data-RAW.csv)"
DATA_CLEAN_FILE_NAME = "ENTER A DATA FILE NAME (e.g., data-CLEAN.csv)"

RAW_DATA_PATH = DATA_ROOT / RAW_DATA_DIR_NAME / DATA_RAW_FILE_NAME
DATA_PATH = DATA_ROOT / RAW_DATA_DIR_NAME / DATA_CLEAN_FILE_NAME

In [None]:
df = pd.read_csv(RAW_DATA_PATH)

## Data Cleaning

### Dropping Columns

In [None]:
drop_these = [
    "ADD",
    "COLUMNS",
    "TO",
    "DROP",
    "HERE",
]

In [None]:
df.drop(columns=drop_these, inplace=True)

In [None]:
df.columns

In [None]:
df.shape

In [None]:
df.info()

### Create Dictionaries for Encoding/Mapping Data

### Examples

In [None]:
categories = df["category"].unique().tolist()

In [None]:
categories.sort()

In [None]:
categories

In [None]:
categories_dict = {category: float(idx) for idx, category in enumerate(categories)}

In [None]:
categories_dict

In [None]:
genders = sorted(df["gender"].unique().tolist())

In [None]:
gender_dict = {gender: float(idx) for idx, gender in enumerate(genders)}

In [None]:
gender_dict

In [None]:
states = sorted(df["state"].unique().tolist())

In [None]:
states.sort()

In [None]:
states_dict = {state: float(idx) for idx, state in enumerate(states)}

In [None]:
states_dict

### Apply Encoding/Mapping to the columns

In [None]:
df["category"] = df["category"].map(categories_dict)
df["gender"] = df["gender"].map(gender_dict)
df["state"] = df["state"].map(states_dict)

### Convert all columns into Specific datatypes

In [None]:
df = df.astype("float32")

### Print details of the Final Data Frame

In [None]:
df.info()

### End of Data Cleaning

# SAVING CLEANED DATA TO FILE

In [None]:
df.to_csv(DATA_PATH, index=False)

# Read and Test datatypes

In [None]:
dq = pd.read_csv(
    DATA_PATH, dtype="float32"
)  # Does not convert to float32 by default, dtype has to be explicitly provided

In [None]:
dq.head()

In [None]:
print(dq.info())

# Creating DataLoaders

### Clean Data Function

In [None]:
def clean_data(
    df: pd.DataFrame, logger: Logger, extra_dropped_columns: Optional[List[str]] = None
) -> pd.DataFrame:
    """Cleans the input DataFrame.
    Args:
        df (pd.DataFrame): The input DataFrame to be cleaned.
        logger (Logger): Logger object for logging information.
        extra_dropped_columns (List[str], optional): Columns to drop from the features in original dataset.
    Returns:
        pd.DataFrame: The cleaned DataFrame.
    """
    show_dataframe_info = True  # Set to True to log DataFrame info

    # Log the initial state of the DataFrame
    logger.info(f"Initial DataFrame shape: {df.shape}")

    if show_dataframe_info:
        buffer = io.StringIO()  # Create a buffer to capture the info output
        df.info(buf=buffer)  # Store the output into the buffer
        logger.info(f"Initial DataFrame info:\n " + buffer.getvalue())

    # Drop any unused columns
    try:
        df.drop(columns=extra_dropped_columns, inplace=True)
    except Exception as e:
        raise RuntimeError(f"Problem dropping columns:\n{e}")

    # Create dictionaries for mapping/encoding

    # ================================
    # EXAMPLE PROCESS
    # ================================

    categories = sorted(df["category"].unique().tolist())
    categories_dict = {category: idx for idx, category in enumerate(categories)}

    genders = sorted(df["gender"].unique().tolist())
    gender_dict = {gender: idx for idx, gender in enumerate(genders)}

    states = sorted(df["state"].unique().tolist())
    states_dict = {state: idx for idx, state in enumerate(states)}

    logger.info("Encoding categorical variables...")
    try:
        df["category"] = df["category"].map(categories_dict)
        df["gender"] = df["gender"].map(gender_dict)
        df["state"] = df["state"].map(states_dict)
    except Exception as e:
        logger.info(f"Problem encoding columns, {e}")

    # ================================
    # END OF MAPPING/ENCODING EXAMPLE
    # ================================

    # Handle missing values (if any)
    if df.isnull().sum().sum() > 0:
        logger.info("Handling missing values...")
        df = df.dropna()  # Example: Drop rows with missing values
        logger.info(f"DataFrame shape after dropping missing values: {df.shape}")

    # Convert to 'float32' to reduce memory usage
    logger.info("Converting Entire Data Frame to 'float32'...")
    df = df.astype("float32")

    if show_dataframe_info:
        # Reinitialize the buffer to clear any previous content in order to log the final dataframe info
        buffer = io.StringIO()
        df.info(buf=buffer)
        logger.info(f"Final DataFrame info:\n " + buffer.getvalue())

    return df

### Custom Dataset Class

In [None]:
class CustomDataset(Dataset):
    """Dataset class For the Custom Dataset"""

    def __init__(self, csv_file: str = "../Data/DataSplits/test.csv", label_column: str = "Label"):
        """Initializer for the Dataset class.
        Args:
            csv_file (str): Path to the CSV file containing the dataset.
            label_column (str): The name of the column indicating the label.
        """
        try:
            self.data = pd.read_csv(csv_file)  # Assign a pandas data frame
        except FileNotFoundError:  # Raise an error if the file is not found
            raise FileNotFoundError(f"File not found: {csv_file}")

        # Define feature and label columns
        self.label_column = label_column
        # Omit the label column to create the list of feature columns
        self.feature_columns = self.data.columns.drop([self.label_column])

    def __getitem__(self, index: int) -> tuple[torch.Tensor, torch.Tensor]:
        """Returns a tuple (features, label) for the given index.
        Args:
            index (int): Index of the data sample to retrieve.
        Returns:
            tuple: (features, label) where features is a tensor of input features and label is the corresponding label.
        """
        # Use 'iloc' instead of 'loc' for efficiency
        features = self.data.iloc[index][self.feature_columns].values
        label = self.data.iloc[index][self.label_column]  # Extract the label for the given index
        return (torch.tensor(features, dtype=torch.float32), torch.tensor(label, dtype=torch.long))

    def __len__(self) -> int:
        """Returns the amount of samples in the dataset."""
        return len(self.data)

### Data Pipeline Function

In [None]:
def data_pipeline(
    logger: Logger,
    dataset_url: str,
    root_data_dir: str = "../Data",
    data_file_path: str = "Dataset.csv",
    data_splits_dir: str = "DataSplits",
    scaler_dir="Scalers",
    target_column: str = "Target",
    extra_dropped_columns: Optional[List[str]] = None,
    batch_size: int = 64,
    num_workers: int = 0,
    pin_memory: bool = False,
    drop_last: bool = True,
) -> tuple[
    Dataset, Dataset, Dataset, DataLoader, DataLoader, DataLoader, MinMaxScaler, MinMaxScaler
]:
    """This function prepares the train, test, and validation datasets.
    Args:
        logger (Logger): The logger instance to log messages.
        dataset_url (str): The URL to download the dataset from, if not found locally.
        root_data_dir (str): The root of the Data Directory
        data_file_path (str): The name of the original dataset (with .csv file extension).
        data_splits_dir (str): Path to the train, test, and validation datasets.
        scaler_dir (str): Path to the feature and label scalers.
        target_column (str): The name of the target column to predict.
        extra_dropped_columns (List[str], optional): Columns to drop from the features in original dataset.
        batch_size (int): The dataloader's batch_size.
        num_workers (int): The dataloader's number of workers.
        pin_memory (bool): The dataloader's pin memory option.
        drop_last (bool): The dataloader's drop_last option.

    Returns:
        train_dataset (Dataset): Dataset Class for the training dataset.
        test_dataset (Dataset): Dataset Class for the test dataset.
        validation_dataset (Dataset): Dataset Class for the validation dataset.
        train_dataloader (DataLoader): The train dataloader.
        test_dataloader (DataLoader): The test dataloader.
        validation_dataloader (DataLoader): The validation dataloader.
        feature_scaler (MinMaxScaler): The scaler used to scale the features of the model input.
        label_scaler (MinMaxScaler): The scaler used to scale the labels of the model input.
    """
    if (
        not root_data_dir or not data_file_path or not data_splits_dir
    ):  # Check for empty strings at the beginning
        raise ValueError("File and directory paths cannot be empty strings.")
    DATA_ROOT = Path(root_data_dir)

    DATA_CLEAN_PATH = DATA_ROOT / data_file_path  # Set the path to the complete dataset

    if DATA_CLEAN_PATH.exists():
        logger.info(f"CSV file detected, reading from '{DATA_ROOT}'")
        df = pd.read_csv(
            DATA_CLEAN_PATH, dtype="float32"
        )  # Convert data to float32 instead of, float64
    else:
        logger.info(f"Downloading CSV file from '{dataset_url}'\nand saving into '{DATA_ROOT}'")
        try:
            os.makedirs(DATA_ROOT, exist_ok=True)  # Create the Data Root Directory
            # Download and read the data into a pandas dataframe
            df = pd.read_csv(dataset_url)  # Keep data as is, may not be able to expect float32 data

            # Clean the data before saving
            try:
                df = clean_data(df, logger, extra_dropped_columns=extra_dropped_columns)
            except Exception as e:
                raise RuntimeError(f"An unexpected error occurred cleaning the dataset:\n{e}")

            df.to_csv(DATA_CLEAN_PATH, index=False)  # Save the file, omitting saving the row index
        except OSError as e:
            raise RuntimeError(f"OS error occurred: {e}")
        except ParserError:
            raise RuntimeError(f"Failed to parse CSV from '{dataset_url}'")
        except ValueError as e:
            raise RuntimeError(f"Data cleaning error:\n{e}")
        except Exception as e:
            raise RuntimeError(
                f"An unexpected error occurred when downloading or saving the "
                f"dataset from '{dataset_url}' to '{DATA_CLEAN_PATH}':\n{e}"
            )

    # Define the paths for the data splits and scalers
    DATA_SPLITS_DIR = DATA_ROOT / data_splits_dir
    SCALER_DIR = DATA_ROOT / scaler_dir

    TRAIN_DATA_PATH = DATA_SPLITS_DIR / "train.csv"
    TEST_DATA_PATH = DATA_SPLITS_DIR / "test.csv"
    VALIDATION_DATA_PATH = DATA_SPLITS_DIR / "val.csv"

    FEATURE_SCALER_PATH = SCALER_DIR / "feature-scaler.joblib"
    LABEL_SCALER_PATH = SCALER_DIR / "label-scaler.joblib"

    # Dictate whether to use label scaler
    USE_LABEL_SCALER = False  # TOGGLE IF NEEDED

    # Define the columns to drop from the features
    columns_to_drop = [target_column]

    # Define the Data Splits
    TRAIN_SPLIT_PERCENTAGE = 0.9
    VALIDATION_SPLIT_PERCENTAGE = 0.5

    if (
        os.path.exists(TRAIN_DATA_PATH)
        and os.path.exists(TEST_DATA_PATH)
        and os.path.exists(VALIDATION_DATA_PATH)
    ):
        logger.info(
            f"Train, Test, and Validation CSV datasets detected in '{DATA_SPLITS_DIR}.' Skipping generation and loading scaler(s)"
        )
        try:
            feature_scaler = joblib.load(FEATURE_SCALER_PATH)
            logger.info(f"Feature scaler stored in: ({FEATURE_SCALER_PATH})")
            if USE_LABEL_SCALER:
                joblib.dump(
                    label_scaler, LABEL_SCALER_PATH
                )  # Not used for this classification task
                logger.info(f"Label scaler stored in: ({LABEL_SCALER_PATH})")
            else:
                label_scaler = None  # Omit the label scaler loading

        except FileNotFoundError as e:
            raise RuntimeError(f"Scaler file not found: {e}")
        except EOFError as e:
            raise RuntimeError(f"Scaler file appears to be empty or corrupted: {e}")
        except Exception as e:
            raise RuntimeError(f"An unexpected error occurred when loading scalers: {e}")
    else:
        logger.info(
            f"Datasets not found in '{DATA_SPLITS_DIR}' or incomplete. Generating datasets..."
        )
        os.makedirs(DATA_SPLITS_DIR, exist_ok=True)  # Create the Data Splits Parent Directory
        os.makedirs(SCALER_DIR, exist_ok=True)  # Create the Scaler Parent Directory

        # Create the scaler objects
        feature_scaler = MinMaxScaler()
        if USE_LABEL_SCALER:
            label_scaler = MinMaxScaler()
        else:
            label_scaler = None  # Not used for this Classification task

        try:
            df_features = df.drop(columns=columns_to_drop, inplace=False)
            df_labels = df[
                [target_column]
            ]  # Instead of returning a pandas Series using "[]", return a dataframe using the "[[]]" to get a shape with (-1,1)
        except KeyError as e:
            raise KeyError(
                f"One or more specified columns to drop do not exist in the DataFrame: {e}"
            )

        # ================================
        # ADD OVERSAMPLING AND OTHER DATA BALANCING TECHNIQUES HERE
        # ================================

        # Example of using OverSampling Technique to Balance out the Dataset for an Unbalanced Dataset
        ros = RandomOverSampler(random_state=42)
        df_features_resampled, df_labels_resampled = ros.fit_resample(df_features, df_labels)

        # Split into smaller DataFrames for the Train, Test, and Validation splits
        X_train, X_inter, Y_train, Y_inter = train_test_split(
            df_features_resampled,
            df_labels_resampled,
            test_size=1 - TRAIN_SPLIT_PERCENTAGE,
            random_state=42,
        )

        # ================================
        # END  OF OVERSAMPLING AND OTHER DATA BALANCING TECHNIQUES ; OTHERWISE
        # ================================

        # Split into smaller DataFrames for the Train, Test, and Validation splits
        X_train, X_inter, Y_train, Y_inter = train_test_split(
            df_features,
            df_labels,
            test_size=1 - TRAIN_SPLIT_PERCENTAGE,
            random_state=42,
        )

        X_validation, X_test, Y_validation, Y_test = train_test_split(
            X_inter, Y_inter, test_size=1 - VALIDATION_SPLIT_PERCENTAGE, random_state=42
        )

        # Fit the scalers to the data
        feature_scaler.fit(X_train)
        # Only scale the labels if required
        if USE_LABEL_SCALER:
            label_scaler.fit(Y_train)  # Not used for this Classification task

        # Save the fitted scaler object
        try:
            joblib.dump(feature_scaler, FEATURE_SCALER_PATH)
            logger.info(f"Feature scaler stored in: ({FEATURE_SCALER_PATH})")
            # Save the Label Scaler if utilized
            if USE_LABEL_SCALER:
                joblib.dump(
                    label_scaler, LABEL_SCALER_PATH
                )  # Not used for this Classification task
                logger.info(f"Label scaler stored in: ({LABEL_SCALER_PATH})")
        except FileNotFoundError as e:
            raise RuntimeError(f"Save path not found: {e}")
        except Exception as e:
            raise RuntimeError(f"An unexpected error occurred when saving  Scaler(s): {e}")

        # Scale all Feature Inputs
        X_train_scaled = feature_scaler.transform(X_train)
        X_validation_scaled = feature_scaler.transform(X_validation)
        X_test_scaled = feature_scaler.transform(X_test)

        if USE_LABEL_SCALER:  # HANDLE EACH ON A CASE BY CASE BASIS
            Y_train = label_scaler.transform(Y_train)
            Y_validation = label_scaler.transform(Y_validation)
            Y_test = label_scaler.transform(Y_test)

        logger.info(f"Train Features (Scaled) Shape: {X_train_scaled.shape}")
        logger.info(f"Validation Features (Scaled) Shape: {X_validation_scaled.shape}")
        logger.info(f"Test Features (Scaled) Shape: {X_test_scaled.shape}")

        if USE_LABEL_SCALER:
            logger.info(f"Train Labels (Scaled) Shape: {Y_train.shape}")
            logger.info(f"Validation Labels (Scaled) Shape: {Y_validation.shape}")
            logger.info(f"Test Labels (Scaled) Shape: {Y_test.shape}")
        else:
            logger.info(f"Train Labels Shape: {Y_train.shape}")
            logger.info(f"Validation Labels Shape: {Y_validation.shape}")
            logger.info(f"Test Labels Shape: {Y_test.shape}")

        # Define the column names of the features and label
        features_names = df_features.columns
        label_name = df_labels.columns

        # Create dataframes using the scaled data
        X_train_df = pd.DataFrame(X_train_scaled, columns=features_names)
        X_test_df = pd.DataFrame(X_test_scaled, columns=features_names)
        X_validation_df = pd.DataFrame(X_validation_scaled, columns=features_names)
        Y_train_df = pd.DataFrame(Y_train, columns=label_name)
        Y_test_df = pd.DataFrame(Y_test, columns=label_name)
        Y_validation_df = pd.DataFrame(Y_validation, columns=label_name)

        # Concatenate the features and labels back into a single DataFrame for each set
        train_data_frame = pd.concat([X_train_df, Y_train_df.reset_index(drop=True)], axis=1)
        test_data_frame = pd.concat([X_test_df, Y_test_df.reset_index(drop=True)], axis=1)
        validation_data_frame = pd.concat(
            [X_validation_df, Y_validation_df.reset_index(drop=True)], axis=1
        )

        # Saving the split data to csv files
        try:
            train_data_frame.to_csv(TRAIN_DATA_PATH, index=False)
            test_data_frame.to_csv(TEST_DATA_PATH, index=False)
            validation_data_frame.to_csv(VALIDATION_DATA_PATH, index=False)
        except FileNotFoundError as e:
            raise RuntimeError(f"Save path not found: {e}")
        except Exception as e:
            raise RuntimeError(
                f"An unexpected error occurred when saving datasets to CSV files:\n{e}"
            )

    # Creating Datasets from the stored datasets
    logger.info(f"INITIALIZING DATASETS")
    train_dataset = CustomDataset(csv_file=TRAIN_DATA_PATH, label_column=target_column)
    test_dataset = CustomDataset(csv_file=TEST_DATA_PATH, label_column=target_column)
    val_dataset = CustomDataset(csv_file=VALIDATION_DATA_PATH, label_column=target_column)

    logger.info(
        f"Creating DataLoaders with 'batch_size'=({batch_size}), 'num_workers'=({num_workers}), 'pin_memory'=({pin_memory}). Training dataset 'drop_last'=({drop_last})"
    )
    train_dataloader = DataLoader(
        dataset=train_dataset,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        drop_last=drop_last,
        shuffle=True,
    )
    validation_dataloader = DataLoader(
        dataset=val_dataset,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        drop_last=drop_last,
        shuffle=False,
    )
    test_dataloader = DataLoader(
        dataset=test_dataset,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        drop_last=drop_last,
        shuffle=False,
    )

    logger.info(
        f"Training DataLoader has ({len(train_dataloader)}) batches, Test DataLoader has ({len(test_dataloader)}) batches, Validation DataLoader has ({len(validation_dataloader)}) batches"
    )

    logger.info("==================================================================")
    for name, dataloader in [
        ("Train", train_dataloader),
        ("Validation", validation_dataloader),
        ("Test", test_dataloader),
    ]:
        features, labels = next(iter(dataloader))  # Get one batch

        logger.info(f"{name} Dataloader Batch Information")
        logger.info(f"Features Shape: '{features.shape}' |  DataTypes: '{features.dtype}'")
        logger.info(f"Labels Shape: '{labels.shape}'   |  DataTypes: '{labels.dtype}' ")
        logger.info("==================================================================")

    return (
        train_dataset,
        test_dataset,
        val_dataset,
        train_dataloader,
        test_dataloader,
        validation_dataloader,
        feature_scaler,
        label_scaler,
    )

# Testing the Data Pipeline

## Testing with a given URL

- Edit the Python dictionary 'data' section

In [None]:
# USED WHEN TESTING THE RAW DATASET
def test_data_pipeline():
    # Function input setup
    data = {
        "dataset_url": "REPLACE WITH THE URL OF THE DATASET",
        "root_data_dir": "../Data",
        "data_file_path": DATA_CLEAN_FILE_NAME,
        "data_splits_dir": "DataSplits",
        "scaler_dir": "Scalers",
        "target_column": "REPLACE WITH TARGET/LABEL COLUMN NAME (ex. 'Price')",
        "extra_dropped_columns": [
            # REPLACE WITH ANY COLUMNS TO BE EXCLUDED FROM THE DATASET - COMMA SEPARATED
            "COLUMN_1",
            "COLUMN_2",
            "ETC.",
        ],
    }
    batch_size = 64
    num_workers = 0
    pin_memory = False
    drop_last = True

    logger = logging.getLogger(__name__)

    # Call the data pipeline function
    try:
        (
            train_dataset,
            test_dataset,
            val_dataset,
            train_dataloader,
            test_dataloader,
            validation_dataloader,
            feature_scaler,
            label_scaler,
        ) = data_pipeline(
            logger,
            **data,
            batch_size=batch_size,
            num_workers=num_workers,
            pin_memory=pin_memory,
            drop_last=drop_last,
        )
    except Exception as e:
        logger.info(f"Caught Exception: {e}", stack_info=True)

    # Basic assertions to verify the outputs
    assert isinstance(train_dataset, Dataset), "train_dataset is not an instance of Dataset"
    assert isinstance(test_dataset, Dataset), "test_dataset is not an instance of Dataset"
    assert isinstance(val_dataset, Dataset), "val_dataset is not an instance of Dataset"
    assert isinstance(
        train_dataloader, DataLoader
    ), "train_dataloader is not an instance of DataLoader"
    assert isinstance(
        test_dataloader, DataLoader
    ), "test_dataloader is not an instance of DataLoader"
    assert isinstance(
        validation_dataloader, DataLoader
    ), "validation_dataloader is not an instance of DataLoader"
    assert isinstance(
        feature_scaler, MinMaxScaler
    ), "feature_scaler is not an instance of MinMaxScaler"
    # assert isinstance(label_scaler, MinMaxScaler), "label_scaler is not an instance of MinMaxScaler"

    logger.info("All assertions passed. Data pipeline test successful.")

    return (
        train_dataset,
        test_dataset,
        val_dataset,
        train_dataloader,
        test_dataloader,
        validation_dataloader,
        feature_scaler,
        label_scaler,
    )

### Call the 'test_data_pipeline' function and capture the return variables

In [None]:
(
    train_dataset,
    test_dataset,
    val_dataset,
    train_dataloader,
    test_dataloader,
    validation_dataloader,
    feature_scaler,
    label_scaler,
) = test_data_pipeline()

### Verify the length of the dataloader(s)

In [None]:
len(validation_dataloader)

### See details about a batch of each dataloader 

In [None]:
logger.info("==================================================================")
for name, dataloader in [
    ("Train", train_dataloader),
    ("Validation", validation_dataloader),
    ("Test", test_dataloader),
]:
    features, labels = next(iter(dataloader))  # Get one batch

    logger.info(f"{name} Dataloader Batch Information")
    logger.info(f"Features Shape: '{features.shape}' |  DataTypes: '{features.dtype}'")
    logger.info(f"Labels Shape: '{labels.shape}'   |  DataTypes: '{labels.dtype}' ")
    logger.info(f"The labels: {labels}")  # Optional
    logger.info("==================================================================")