In [None]:
import os
import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import cross_val_score
from scipy.stats import shapiro, kstest, norm, probplot, chi2_contingency
import argparse
import configparser
import subprocess
from sklearn.model_selection import train_test_split
pd.set_option('display.max_rows', None)  # Display all rows

In [None]:
# Function to open config file for review
def open_config_file(config_file):
    if not os.path.exists(config_file):
        print(f"Configuration file '{config_file}' not found. Please make sure it exists.")
        return False
    
    try:
        print(f"Opening configuration file '{config_file}' for review...")
        subprocess.Popen(['open' if os.name == 'posix' else 'start', config_file], shell=True)
        input("Press Enter when you're ready to proceed with data cleaning...")
        return True
    except Exception as e:
        print(f"Failed to open the configuration file: {e}")
        return False

In [None]:
# Function to load paths and suffix from config file
def load_paths_and_suffix(config_file):
    config = configparser.ConfigParser()
    config.read(config_file)
    paths = {
        "input_folder": config["Paths"].get("input_folder_clean", "../OTH_DATA/training_data"),
        "output_folder": config["Paths"].get("output_folder_clean", "../OTH_DATA/cleaned_data"),
        "cleaned_file_suffix": config["Paths"].get("cleaned_file_suffix", "_v1")
    }
    return paths

In [None]:
# Clean data function
def clean_data(data, drop_columns=None, add_target=False, target_column_name="target"):
    # Drop unnecessary columns
    # If there is extra trailing delimiter, pandas will create an extra column 'Unnamed: 18'
    if 'Unnamed: 18' in data.columns:
        data = data.drop(columns=['Unnamed: 18'])

    # Drop user-specified columns
    if drop_columns:
        data = data.drop(columns=drop_columns, errors='ignore')
        print(f"Dropped columns: {drop_columns}")

    # Handle missing values, fill numerical columns with median
    for column in data.select_dtypes(include=['float64', 'int64']).columns:
        if data[column].isnull().sum() > 0:
            data[column].fillna(data[column].median(), inplace=True)
            
    # Calculate BMI if Height and Weight columns are present
    if 'Height' in data.columns and 'Weight' in data.columns:
        data['BMI'] = data['Weight'] / ((data['Height']) ** 2)
        data['BMI'] = data['BMI'].round(2)

    # Encoding categorical variables with numbers
    categorical_columns = data.select_dtypes(include=['object']).columns
    for column in categorical_columns:
        data[column] = data[column].astype('category').cat.codes

    # Add an empty target column if requested for dataset with no target column
    if add_target and target_column_name not in data.columns:
        data[target_column_name] = None
        print(f"Added an empty target column: '{target_column_name}'")

    print("Data cleaning complete.")
    return data

In [None]:
#function to display mapped encoded numbers to previous said values
def generate_encoding_summary(original_data, encoded_data):
    """
    Generates a summary of the encoding performed on categorical variables.

    Parameters:
        original_data (pd.DataFrame): The original DataFrame before encoding.
        encoded_data (pd.DataFrame): The DataFrame after encoding.

    Returns:
        dict: A dictionary containing the mapping for each categorical column.
    """
    summary = {}
    categorical_columns = original_data.select_dtypes(include=['object']).columns

    for column in categorical_columns:
        # Check if the column exists in both original and encoded data
        if column in original_data.columns and column in encoded_data.columns:
            # Create a mapping of categories to codes
            original_col = original_data[column].astype('category')
            mapping = dict(enumerate(original_col.cat.categories))
            summary[column] = mapping

    return summary

In [None]:
def print_encoding_summary(encoding_summary):
    """
    Prints the encoding summary in a readable format.

    Parameters:
        encoding_summary (dict): The summary dictionary with mappings.
    """
    for column, mapping in encoding_summary.items():
        print(f"\nColumn: {column}")
        for code, category in mapping.items():
            print(f"  {code} -> {category}")

In [None]:
# Define paths and configuration
config_file = "config.txt"

# Step 1: Open the configuration file for review
if not open_config_file(config_file):
    print("Exiting due to missing or inaccessible config file.")
else:
    # Step 2: Load paths and suffix
    paths = load_paths_and_suffix(config_file)
    input_folder = paths["input_folder"]
    output_folder = paths["output_folder"]
    cleaned_file_suffix = paths["cleaned_file_suffix"]

    # Ensure the output folder exists
    os.makedirs(output_folder, exist_ok=True)

    # Step 3: List available files in the input folder
    files = [f for f in os.listdir(input_folder) if f.endswith(".csv")]
    if not files:
        print("No files found in the input folder.")
    else:
        print("\nAvailable files:")
        for i, file in enumerate(files, 1):
            print(f"{i}. {file}")

        # Step 4: Prompt user to select a file to clean
        try:
            choice = int(input("Enter the number of the file you want to clean: "))
            if 1 <= choice <= len(files):
                selected_file = files[choice - 1]
            else:
                print("Invalid selection.")
        except ValueError:
            print("Please enter a valid number.")

        input_path = os.path.join(input_folder, selected_file)
        data = pd.read_csv(input_path)

        # Step 5: Ask for columns to drop
        user_input = input("Enter the columns you want to drop, separated by commas, or press Enter to skip: ")
        drop_columns = ["Patient ID"]
        if user_input:
            drop_columns += [col.strip() for col in user_input.split(",")]

        # Step 6: Ask if an empty target column should be added
        add_target = input("Do you want to add an empty target column to this dataset? (yes/no): ").strip().lower() == "yes"
        target_column_name = "target"
        if add_target:
            target_column_name = input("Enter the name of the target column: ")

        # Step 7: Clean the data
        cleaned_data = clean_data(data, drop_columns=drop_columns, add_target=add_target, target_column_name=target_column_name)

        # Step 8: Save the cleaned data
        output_filename = f"cleaned_{selected_file.split('.')[0]}{cleaned_file_suffix}.csv"
        output_path = os.path.join(output_folder, output_filename)
        cleaned_data.to_csv(output_path, index=False)
        print(f"Saved cleaned data to {output_path}")

        # Step 9: Split data into train/test/validation sets (optional)
        split_data = input("Do you want to split the data into training and testing sets? (yes/no): ").strip().lower() == "yes"
        if split_data:
            try:
                train_percentage = float(input("Enter the percentage for training data (e.g., 80 for 80%): ")) / 100
                validation_percentage = float(input("Enter the percentage for validation data (e.g., 10 for 10%): ")) / 100
                test_percentage = float(input("Enter the percentage for test data (e.g., 10 for 10%): ")) / 100

                # Ensure percentages add up to 1 (100%)
                if not abs(train_percentage + validation_percentage + test_percentage - 1) < 1e-5:
                    raise ValueError("Percentages must add up to 100%!")
            except ValueError as e:
                print(f"Invalid input: {e}")
                print("Using default split: 70% train, 15% validation, 15% test.")
                train_percentage, validation_percentage, test_percentage = 0.7, 0.15, 0.15

            if validation_percentage == 0:
                # Split into training and testing only
                train_data, test_data = train_test_split(cleaned_data, test_size=test_percentage, random_state=42)
                validation_data = None
            else:
                # Split into training, validation, and testing
                train_data, temp_data = train_test_split(cleaned_data, test_size=(validation_percentage + test_percentage), random_state=42)
                validation_data, test_data = train_test_split(temp_data, test_size=(test_percentage / (validation_percentage + test_percentage)), random_state=42)

            # Save the split datasets
            train_output_path = os.path.join(output_folder, f"train_{selected_file.split('.')[0]}{cleaned_file_suffix}.csv")
            test_output_path = os.path.join(output_folder, f"test_{selected_file.split('.')[0]}{cleaned_file_suffix}.csv")
            train_data.to_csv(train_output_path, index=False)
            test_data.to_csv(test_output_path, index=False)
            print(f"Saved training data to {train_output_path}")
            print(f"Saved testing data to {test_output_path}")

            if validation_data is not None:
                validation_output_path = os.path.join(output_folder, f"validation_{selected_file.split('.')[0]}{cleaned_file_suffix}.csv")
                validation_data.to_csv(validation_output_path, index=False)
                print(f"Saved validation data to {validation_output_path}")

        # Step 10: Generate and print encoding summary
        summary = generate_encoding_summary(data, cleaned_data)
        print_encoding_summary(summary)


In [None]:
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Clean specific data file in the training_data folder.")
    parser.add_argument('--config_file', type=str, default="config.txt", help="Path to the configuration file.")
    parser.add_argument('--file', type=str, default=None, help="Specific file to clean.")
    args = parser.parse_args()
    main(args.config_file, args.file)