In [1]:

import pandas as pd
import numpy as np
import math
import os
import csv
import json
import re
from sklearn import preprocessing
from Pivot_Based import Relation
from prince import MCA



### **Cell 2: Encoding and Data Transformation Functions**

This cell centralizes all the helper functions used for data transformation, such as encoding categorical variables and performing correspondence analysis.



In [None]:
def calculate_correspondence_analysis(data, n_components=3):
    """
    Perform Multiple Correspondence Analysis (MCA) on the given categorical data.

    Args:
        data (pd.DataFrame): A DataFrame containing categorical data.
        n_components (int): The number of dimensions to reduce to.

    Returns:
        pd.DataFrame: The transformed data with reduced dimensions.
    """
    mca = MCA(
        n_components=n_components,
        n_iter=3,
        copy=True,
        check_input=True,
        engine='sklearn',
        random_state=42
    )
    return mca.fit_transform(data)
    

def apply_idf_encoding(df, attribute):
    """
    Calculates and applies the Inverse Document Frequency (IDF) to a specific attribute.

    Args:
        df (pd.DataFrame): The input DataFrame.
        attribute (str): The name of the column to apply IDF encoding to.

    Returns:
        pd.DataFrame: The DataFrame with a new column containing the IDF values.
    """
    N = len(df)
    if N == 0:
        return df

    # Calculate frequency of each value
    value_counts = df[attribute].value_counts()

    # Calculate ln(N / frequency) for each unique value
    idf_values = {value: math.log(N / freq) for value, freq in value_counts.items()}

    # Create a new column with the calculated IDF values
    df[attribute + '_IDF'] = df[attribute].map(idf_values)

    return df

def apply_one_hot_encoding(df, attribute):
    """
    Applies One-Hot Encoding to a specified column.

    Args:
        df (pd.DataFrame): The input DataFrame.
        attribute (str): The name of the column to encode.

    Returns:
        pd.DataFrame: The DataFrame with the specified column one-hot encoded.
    """
    df_encoded = pd.get_dummies(df, columns=[attribute], dtype=int)
    return df_encoded

def apply_nocat(df: pd.DataFrame) -> pd.DataFrame:
    """
    Preprocesses a DataFrame by:
    1. Removing categorical columns that are not binary.
    2. Encoding binary categorical columns to 0 and 1.
    3. Normalizing numerical columns (with more than 2 unique values) to a [0, 1] scale.
    4. Columns named 'outlier' or 'class' are ignored and left unchanged.
    
    Args:
        df (pd.DataFrame): The input DataFrame.
        
    Returns:
        pd.DataFrame: The processed DataFrame.
    """
    # Create a copy to avoid modifying the original DataFrame
    processed_df = df.copy()
    
    cols_to_drop = []
    numeric_cols_to_normalize = []
    binary_cat_cols_to_encode = []
    
    for col in processed_df.columns:
        # Skip processing for 'outlier' and 'class' columns (case-insensitive)
        if col.lower() in ['outlier', 'class']:
            continue

        # Check if the column is of a numeric type
        if pd.api.types.is_numeric_dtype(processed_df[col]):
            # If it's numeric and has more than 2 unique values, it should be normalized
            if processed_df[col].nunique() > 2:
                numeric_cols_to_normalize.append(col)
        else:
            # Handle non-numeric (categorical) columns
            if processed_df[col].nunique() == 2:
                binary_cat_cols_to_encode.append(col)
            elif processed_df[col].nunique() > 2:
                cols_to_drop.append(col)
                
    # Drop the identified non-binary categorical columns
    processed_df.drop(columns=cols_to_drop, inplace=True)
    print(f"Dropped columns: {cols_to_drop}")

    # Encode binary categorical columns to 0/1
    if binary_cat_cols_to_encode:
        for col in binary_cat_cols_to_encode:
            processed_df[col] = processed_df[col].astype('category').cat.codes
        print(f"Encoded binary columns: {binary_cat_cols_to_encode}")
    
    # Normalize the identified numeric columns
    if numeric_cols_to_normalize:
        scaler = preprocessing.MinMaxScaler()
        processed_df[numeric_cols_to_normalize] = scaler.fit_transform(processed_df[numeric_cols_to_normalize])
        print(f"Normalized columns: {numeric_cols_to_normalize}")
        
    return processed_df



### **Cell 3: Data Loading and Preprocessing Functions**

This cell contains functions for loading data, detecting delimiters, applying data types, normalizing features, and standardizing labels.



In [None]:
def get_delimiter(file_path: str) -> str:
    """
    Detects the delimiter of a CSV file.
    """
    with open(file_path, 'r') as csvfile:
        delimiter = csv.Sniffer().sniff(csvfile.read(1024)).delimiter
    return delimiter

def convert_int_to_str(df: pd.DataFrame) -> pd.DataFrame:
    """
    Converts all integer columns to string type to treat them as categorical.
    """
    for col in df.select_dtypes(include='int').columns:
        df[col] = df[col].astype(str)
    return df

def split_numerical_categorical(df: pd.DataFrame) -> (pd.DataFrame, pd.DataFrame):
    """
    Splits the DataFrame into numerical and categorical columns.
    """
    num_cols = df.select_dtypes(include=np.number)
    cat_cols = df.select_dtypes(include=['object', 'category'])
    return num_cols, cat_cols

def apply_custom_dtypes(df: pd.DataFrame, dataset_name: str, config: dict) -> pd.DataFrame:
    """
    Applies data types to DataFrame columns based on a configuration dictionary.
    """
    # Remove versioning from dataset name (e.g., _v01) to match config key
    dataset_key = re.sub(r'_v[0-9]+', '', dataset_name.split('.')[0])
    
    if dataset_key in config:
        dtypes = config[dataset_key]
        columns = df.columns
        if '...' in dtypes:
            # Apply a single dtype to all columns
            dtype = dtypes[0]
            return df.astype(dict.fromkeys(columns, dtype))
        else:
            # Apply specific dtypes to corresponding columns
            return df.astype(dict(zip(columns, dtypes)))
            
    print(f'Notice: Dtype configuration not found for {dataset_key}. Using inferred types.')
    return df

def normalize_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Normalizes all columns except the last one (target) to a [0, 1] scale.
    """
    features = df.drop(columns=['outlier'])
    target = df['outlier']
    
    min_max_scaler = preprocessing.MinMaxScaler()
    scaled_features = min_max_scaler.fit_transform(features)
    
    df_scaled = pd.DataFrame(scaled_features, columns=features.columns, index=df.index)
    df_scaled[target.name] = target
    return df_scaled
    
def standardize_outlier_label(df: pd.DataFrame) -> pd.DataFrame:
    """
    Standardizes the last column to 'outlier' and its labels to 'yes' (minority) and 'no' (majority).
    """
    outlier_col_name = df.columns[-1]
    df = df.rename(columns={outlier_col_name: 'outlier'})
    
    value_counts = df['outlier'].value_counts()
    
    if len(value_counts) == 2:
        majority_val = value_counts.idxmax()
        minority_val = value_counts.idxmin()
        
        df['outlier'] = df['outlier'].replace({
            majority_val: 'no',
            minority_val: 'yes'
        })
    return df



### **Cell 4: Main Processing Pipeline**

This is the main execution block. It iterates through the specified datasets, applies the preprocessing and encoding steps, and saves the transformed files into structured directories.



In [None]:

# --- Configuration ---
# List of dataset directories to process.
# Add the relative paths to your dataset folders here.
DATASET_PATHS = r'..\..\datasets\experiments'
CONFIG_FILE = r'..\config_dataset.json'

# --- Load Configuration ---
try:
    with open(CONFIG_FILE, 'r') as f:
        dataset_dtypes_config = json.load(f)
except FileNotFoundError:
    print(f"Error: Configuration file not found at {CONFIG_FILE}")
    dataset_dtypes_config = {}

# --- Main Loop ---
for context_path in os.listdir(DATASET_PATHS):
    dataset_dir = os.path.join(DATASET_PATHS, context_path, 'processed')
    if not os.path.isdir(dataset_dir):
        continue
    
    print(f"\n--- Processing directory: {context_path} ---")
        
    # Create output directories
    output_base = os.path.join(dataset_dir, 'number')
    os.makedirs(os.path.join(output_base, 'one_hot'), exist_ok=True)
    os.makedirs(os.path.join(output_base, 'idf'), exist_ok=True)
    os.makedirs(os.path.join(output_base, 'ca'), exist_ok=True)
    os.makedirs(os.path.join(output_base, 'pivot'), exist_ok=True)
    os.makedirs(os.path.join(output_base, 'nocat'), exist_ok=True)

    # Process each file in the directory
    for filename in os.listdir(dataset_dir):
        
        if os.path.isdir(os.path.join(dataset_dir, filename)):
            continue
    
        file_path = os.path.join(dataset_dir, filename)
        
        if os.path.isdir(file_path):
            continue

        print(f"Processing file: {filename}")
        
        # Load data
        df = pd.read_csv(file_path, engine='python')

        # --- Preprocessing Steps ---
        df = apply_custom_dtypes(df, filename, dataset_dtypes_config)
        df = convert_int_to_str(df)
        df = standardize_outlier_label(df)

        # --- Encoding and Saving ---
        
        # 1. Pivot-Based
        relation = Relation.read_csv(file_path, len(df.columns)-1, True)
        relation.normalize()
        relation.save(os.path.join(output_base, 'pivot'))

        # 2. One-Hot Encoding
        df_one_hot = df.copy()
        for col in df_one_hot.columns:
            if df_one_hot[col].dtype == 'object' and col != 'outlier':
                df_one_hot = apply_one_hot_encoding(df_one_hot, col)
        df_one_hot = normalize_features(df_one_hot)
        df_one_hot.to_csv(os.path.join(output_base, 'one_hot', filename), index=False)

        # 3. IDF Encoding
        df_idf = df.copy()
        for col in df_idf.columns:
            if df_idf[col].dtype == 'object' and col != 'outlier':
                df_idf = apply_idf_encoding(df_idf, col)
                df_idf.drop(columns=[col], inplace=True)
        df_idf = normalize_features(df_idf)
        df_idf.to_csv(os.path.join(output_base, 'idf', filename), index=False)

        # 4. Correspondence Analysis (CA)
        features_df = df.drop(columns=['outlier'])
        num_features, cat_features = split_numerical_categorical(features_df)
        if not cat_features.empty:
            ca_features = calculate_correspondence_analysis(cat_features, n_components=len(cat_features.columns))
            ca_features.columns = [f'Cat_{i+1}' for i in range(ca_features.shape[1])]
            df_ca = pd.concat([num_features.reset_index(drop=True), ca_features.reset_index(drop=True)], axis=1)
        else:
            df_ca = num_features
            
        df_ca['outlier'] = df['outlier']
        df_ca = normalize_features(df_ca)
        df_ca.to_csv(os.path.join(output_base, 'ca', filename), index=False)
        
        # 5. Non Categorical Feature (NOCAT)
        df_nocat = df.copy()
        df_nocat = apply_nocat(df_nocat)
        df_nocat.to_csv(os.path.join(output_base, 'nocat', filename), index=False)

print("\n--- All processing complete. ---")