In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split
df= pd.read_csv('dataset/large_data.csv')
X = df.drop('TYPE', axis=1) 
y = df['TYPE']

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

X_train = X_train.reset_index(drop=True)
y_train = y_train.reset_index(drop=True)
X_val = X_val.reset_index(drop=True)
y_val = y_val.reset_index(drop=True)

train_df = pd.concat([X_train, y_train], axis=1)
validation_df = pd.concat([X_val, y_val], axis=1)

In [2]:
train_df.TYPE.value_counts()

TYPE
FLU        20000
ALLERGY    13105
COVID       1638
COLD         819
Name: count, dtype: int64

In [3]:
train_df.TYPE.value_counts(normalize=True)

TYPE
FLU        0.562398
ALLERGY    0.368511
COVID      0.046060
COLD       0.023030
Name: proportion, dtype: float64

In [4]:
len(train_df)

35562

In [5]:
nan_counts_per_column_train = train_df.isnull().sum()
columns_with_nan = nan_counts_per_column_train[nan_counts_per_column_train > 0].index.tolist()

print("Columns with NaN values:", columns_with_nan)
print("-" * 30)


Columns with NaN values: []
------------------------------


In [6]:
nan_counts_per_column_validation = validation_df.isnull().sum()
columns_with_nan_validation = nan_counts_per_column_validation[nan_counts_per_column_validation > 0].index.tolist()

print("Columns with NaN values:", columns_with_nan_validation)
print("-" * 30)


Columns with NaN values: []
------------------------------


In [7]:
validation_df = validation_df.dropna(subset=columns_with_nan_validation)

In [8]:
RANDOM_STATE = 981408

# Parametros Active Learning 
INITIAL_SAMPLE_SIZE = 500 # Total samples for the initial random set
STEP_SIZE = 100 # Target total samples to add per step for most strategies
TOTAL_AL_SIMULATION_SAMPLES = 36000 # Target total samples for the simulation

SVD_N_COMPONENTS = 30 # Number of components for TruncatedSVD for diversity sampling
KMEANS_SUBSAMPLE_SIZE = 100 # Tamanho do MiniBatchKMeans

In [9]:
import numpy as np
from collections import deque
from sklearn.preprocessing import FunctionTransformer
from sklearn.cluster import MiniBatchKMeans
from sklearn.decomposition import TruncatedSVD
from tqdm import trange
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import matthews_corrcoef, accuracy_score, f1_score, classification_report
import plotly.express as px 
from copy import deepcopy
from sklearn.metrics.pairwise import euclidean_distances
import math 

In [10]:
# --- Define Initial Labeled Sample (Shared by all strategies) ---

print(f"Generating initial random labeled sample of size {INITIAL_SAMPLE_SIZE}...")

# Get all indices from the training data pool (train_df)
all_train_indices = np.arange(X_train.shape[0])

# Ensure initial sample size doesn't exceed total training data or simulation target
initial_sample_size_actual = min(INITIAL_SAMPLE_SIZE, X_train.shape[0], TOTAL_AL_SIMULATION_SAMPLES)

rng = np.random.RandomState(RANDOM_STATE) # Ensure reproducibility for sampling

# Take an initial random sample from the full training set for ALL strategies
if initial_sample_size_actual > 0:
    initial_sample_indices = rng.choice(all_train_indices, size=initial_sample_size_actual, replace=False).tolist()
else:
    initial_sample_indices = []

INITIAL_SAMPLE_SIZE_ACTUAL = len(initial_sample_indices)

print(f"\nInitial random labeled sample generated with {INITIAL_SAMPLE_SIZE_ACTUAL} samples (indices relative to train_df).")
if INITIAL_SAMPLE_SIZE_ACTUAL > 0:
    print("Initial sample class distribution (absolute):")
    initial_sample_series = pd.Series(y_train.iloc[initial_sample_indices])
    print(initial_sample_series.value_counts().sort_index())
    print("Initial sample class distribution (percentage):")
    print(initial_sample_series.value_counts(normalize=True).sort_index()*100)
else:
     print("Initial sample size is 0.")

# --- Calculate Total Target Samples for the Simulation --- 
# TOTAL_AL_SIMULATION_SAMPLES is already set to 10000
print(f"\nActive Learning Simulation will run until {TOTAL_AL_SIMULATION_SAMPLES} samples are labeled by each strategy, or the train pool ({X_train.shape[0]} samples) is exhausted.")

largest_step_size = STEP_SIZE
# Start counting steps from 1 after the initial (step 0)
if TOTAL_AL_SIMULATION_SAMPLES > INITIAL_SAMPLE_SIZE_ACTUAL and largest_step_size > 0:
    max_steps = (TOTAL_AL_SIMULATION_SAMPLES - INITIAL_SAMPLE_SIZE_ACTUAL + largest_step_size - 1) // largest_step_size
else:
    max_steps = 0

print(f"Starting Active Learning Simulation for up to {max_steps} steps.")


Generating initial random labeled sample of size 500...

Initial random labeled sample generated with 500 samples (indices relative to train_df).
Initial sample class distribution (absolute):
TYPE
ALLERGY    183
COLD        11
COVID       22
FLU        284
Name: count, dtype: int64
Initial sample class distribution (percentage):
TYPE
ALLERGY    36.6
COLD        2.2
COVID       4.4
FLU        56.8
Name: proportion, dtype: float64

Active Learning Simulation will run until 36000 samples are labeled by each strategy, or the train pool (35562 samples) is exhausted.
Starting Active Learning Simulation for up to 355 steps.


In [11]:
def least_confidence_sampling(X_unlabeled, model, n_instances: int, random_state=None) -> list[int]:
    """
    Seleciona as instâncias menos confiantes a partir das características não rotuladas.
    Assume que X_unlabeled é uma fatia da matriz completa de características, correspondente aos dados não rotulados.
    Retorna índices relativos ao array de entrada X_unlabeled.
    """
    if X_unlabeled.shape[0] == 0 or n_instances <= 0:
        return []

    probs = model.predict_proba(X_unlabeled) ## Analisa a confiança do modelo nas instâncias que ainda não estão sendo utilizadas para o treino do modelo

    # Calcula a incerteza como 1 - max probabilidade prevista
    uncertainty_scores = 1 - np.max(probs, axis=1)

    # Pegar as instâncias mais incertas, como 1 - max do predict proba quanto mais incerto o modelo maior esse valor
    sorted_indices_in_unlabeled = np.argsort(uncertainty_scores)[::-1] # Ordenar de forma descendente

    return sorted_indices_in_unlabeled[:n_instances].tolist() ## Retorna a quantidade de instâncias mais incertas


def margin_of_confidence_sampling(X_unlabeled, model, n_instances: int, random_state=None) -> list[int]:
    """
    Seleciona instâncias com a menor margem entre as duas maiores probabilidades previstas.
    Assume que X_unlabeled é uma fatia da matriz completa de características, correspondente aos dados não rotulados.
    Retorna índices relativos ao array de entrada X_unlabeled.
    """
    if X_unlabeled.shape[0] == 0 or n_instances <= 0:
        return []

    probs = model.predict_proba(X_unlabeled)## Analisa a confiança do modelo nas instâncias que ainda não estão sendo utilizadas para o treino do modelo

    # Sort probabilities for each instance
    sorted_probs = np.sort(probs, axis=1)[:, ::-1] # Sort descending along axis 1

    # Calculate margin: difference between the top two probabilities
    # Handle cases with fewer than 2 classes
    if sorted_probs.shape[1] < 2:
         margins = np.full(X_unlabeled.shape[0], np.inf)
    else:
        margins = sorted_probs[:, 0] - sorted_probs[:, 1]

    # Get indices of the top n_instances with the *smallest* margin
    sorted_indices_in_unlabeled = np.argsort(margins) # Sort ascending for smallest margin

    # Return indices relative to the *input* X_unlabeled array
    return sorted_indices_in_unlabeled[:n_instances].tolist()

def ratio_of_confidence_sampling(X_unlabeled, model, n_instances: int, random_state=None) -> list[int]:
    """
    Selects instances with the highest ratio between the second highest and highest probabilities.
    Assumes X_unlabeled is a slice of the full feature matrix corresponding to unlabeled data.
    Returns indices relative to the *input* X_unlabeled array.
    """
    if X_unlabeled.shape[0] == 0 or n_instances <= 0:
        return []

    probs = model.predict_proba(X_unlabeled)

    # Sort probabilities for each instance
    sorted_probs = np.sort(probs, axis=1)[:, ::-1] # Sort descending along axis 1

    # Calculate ratio: second highest / highest
    # Handle cases where the highest probability is 0 or where there's only one class
    ratios = np.zeros(X_unlabeled.shape[0]) # Default ratio is 0 if calculation fails/invalid

    # Find instances where the highest probability is > 0 and there's at least a second class
    valid_ratio_mask = (sorted_probs[:, 0] > 0) & (sorted_probs.shape[1] > 1)

    if np.any(valid_ratio_mask):
        # Avoid division by zero if sorted_probs[:, 0] is exactly 0 (already handled by mask)
        ratios[valid_ratio_mask] = sorted_probs[valid_ratio_mask, 1] / sorted_probs[valid_ratio_mask, 0]
    # For instances where valid_ratio_mask is False, the ratio remains 0,
    # which is a low uncertainty score for this metric (perfect certainty if ratio is 0).

    # Get indices of the top n_instances with the *highest* ratio
    sorted_indices_in_unlabeled = np.argsort(ratios)[::-1] # Sort descending

    # Return indices relative to the *input* X_unlabeled array
    return sorted_indices_in_unlabeled[:n_instances].tolist()


def entropy_sampling(X_unlabeled, model, n_instances: int, random_state=None) -> list[int]:
    """
    Selects instances with the highest entropy of predicted probabilities.
    Assumes X_unlabeled is a slice of the full feature matrix corresponding to unlabeled data.
    Returns indices relative to the *input* X_unlabeled array.
    """
    if X_unlabeled.shape[0] == 0 or n_instances <= 0:
        return []

    probs = model.predict_proba(X_unlabeled)

    # Ensure log is calculated only for positive probabilities to avoid log(0)
    # Add a small epsilon for numerical stability if probabilities are very close to 0
    probs = probs + 1e-12

    # Calculate entropy for each instance: -sum(p * log(p))
    entropy_scores = -np.sum(probs * np.log(probs), axis=1)

    # Get indices of the top n_instances with the *highest* entropy
    sorted_indices_in_unlabeled = np.argsort(entropy_scores)[::-1] # Sort descending

    # Return indices relative to the *input* X_unlabeled array
    return sorted_indices_in_unlabeled[:n_instances].tolist()

def optimized_cluster_sampling(X_reduced_unlabeled, n_instances, n_clusters, subsample_size=10000, random_state=None):
    if random_state is None:
        rng = np.random
    else:
        rng = np.random.RandomState(random_state)
    
    # Convert input to numpy array for consistent indexing
    if isinstance(X_reduced_unlabeled, pd.DataFrame):
        X_values = X_reduced_unlabeled.values
    else:
        X_values = np.asarray(X_reduced_unlabeled, dtype=np.float64)
        
    num_unlabeled = X_values.shape[0]

    if num_unlabeled <= n_instances:
        return list(range(num_unlabeled))

    if n_clusters > num_unlabeled:
        n_clusters = num_unlabeled

    if subsample_size is None or subsample_size >= num_unlabeled:
        X_subsample = X_values
        subsample_indices = np.arange(num_unlabeled)
    else:
        subsample_size = min(subsample_size, num_unlabeled)
        subsample_indices = rng.choice(num_unlabeled, size=subsample_size, replace=False)
        X_subsample = X_values[subsample_indices]

    kmeans = MiniBatchKMeans(n_clusters=n_clusters, random_state=random_state)
    kmeans.fit(X_subsample)
    
    cluster_centers = kmeans.cluster_centers_

    # Compute distances to each cluster center from the full unlabeled pool
    from scipy.spatial.distance import cdist
    distances = cdist(X_values, cluster_centers)

    # For each cluster, find the closest point in the full unlabeled set
    closest_indices = np.argmin(distances, axis=0)
    
    # Ensure uniqueness and limit to desired number of instances
    selected_indices = list(dict.fromkeys(closest_indices))[:n_instances]

    # If we still need more points, sample the remainder randomly
    if len(selected_indices) < n_instances:
        remaining_pool = np.setdiff1d(np.arange(num_unlabeled), selected_indices)
        additional = rng.choice(remaining_pool, size=n_instances - len(selected_indices), replace=False)
        selected_indices.extend(additional.tolist())

    return selected_indices

def core_set_selection(X_reduced_unlabeled, X_reduced_labeled, n_instances: int, random_state=None) -> list[int]:
    """
    Selects instances from unlabeled data that are farthest from the current labeled set (using reduced features).
    Assumes X_reduced_unlabeled and X_reduced_labeled are slices of the *reduced* feature matrix.
    Returns indices relative to the *input* X_reduced_unlabeled array.
    
    NOTE: This implementation can be memory-intensive for very large X_reduced_unlabeled.
    """
    rng = np.random.RandomState(random_state if random_state is not None else RANDOM_STATE)
    num_unlabeled = X_reduced_unlabeled.shape[0]

    if num_unlabeled == 0 or n_instances <= 0:
        return []

    # If there are no labeled samples yet, just pick randomly from unlabeled
    if X_reduced_labeled.shape[0] == 0:
         # print("Warning: No labeled samples for Core-Set. Falling back to random sampling.")
         return rng.choice(num_unlabeled, size=min(n_instances, num_unlabeled), replace=False).tolist()

    # Compute distances between unlabeled instances and labeled instances (using reduced features)
    # THIS IS THE MEMORY-INTENSIVE STEP
    try:
        distances = euclidean_distances(X_reduced_unlabeled, X_reduced_labeled)
    except MemoryError:
        print("MemoryError during Core-Set distance calculation. Returning empty list.")
        # Fallback: if distance calculation fails, return nothing or fall back to random on a subsample
        # For now, returning empty list to skip adding samples this step for Core-Set
        return []

    # Find the minimum distance to any labeled instance for each unlabeled instance
    min_distances = distances.min(axis=1)

    # Get indices of the top n_instances with the *highest* minimum distance
    sorted_indices_in_unlabeled = np.argsort(min_distances)[::-1] # Sort descending

    # Return indices relative to the *input* X_reduced_unlabeled array
    return sorted_indices_in_unlabeled[:n_instances].tolist()


def train_and_evaluate_model(
    X_train_full, y_train_full, X_val, y_val,
    current_labeled_indices: list[int],
) -> tuple[float, float, float]:
    """
    Trains a Logistic Regression model on the current labeled data subset 
    from the full training pool and evaluates it on the validation data.
    Handles sparse X_train.

    Args:
        X_train_full (sparse matrix): The full training features pool.
        y_train_full (pd.Series): The full training labels pool.
        X_val (sparse matrix): Validation features.
        y_val (pd.Series): Validation labels.
        current_labeled_indices (list[int]): The indices of instances currently selected for training 
                                            (relative to the full training pool).

    Returns:
        tuple[float, float, float]: The accuracy, weighted F1 score, and Matthews Correlation Coefficient on the validation data.
    """
    if not current_labeled_indices:
         # print("Warning: No labeled samples to train on.")
         return 0.0, 0.0, 0.0 # Return zero metrics if no data

    # Select the subset of data using the current labeled indices
    X_train_subset = X_train_full.loc[current_labeled_indices]
    y_train_subset = y_train_full.loc[current_labeled_indices]

    # Use balanced class weight for skewed datasets
    clf = LogisticRegression(random_state=RANDOM_STATE, n_jobs=-1, max_iter=1000, class_weight='balanced')

    # Check if the subset has more than one unique class before fitting
    if y_train_subset.nunique() < 2:
        # print(f"Warning: Only {y_train_subset.nunique()} unique class(es) in the training subset. Cannot train classifier. Returning zero metrics.")
        return 0.0, 0.0, 0.0

    clf.fit(X_train_subset, y_train_subset)

    y_pred = clf.predict(X_val)

    # Calculate metrics
    accuracy = accuracy_score(y_val, y_pred)
    # Handle potential errors if MCC cannot be calculated (e.g., only one class in prediction/true)
    try:
        mcc = matthews_corrcoef(y_val, y_pred)
        # Check if mcc is NaN, which can happen in edge cases even if no exception
        if math.isnan(mcc):
             mcc = 0.0
    except Exception as e:
        # print(f"Warning: Could not compute MCC: {e}. Returning 0.")
        mcc = 0.0
        
    # Handle potential errors if F1 cannot be calculated
    try:
        f1_w = f1_score(y_val, y_pred, average='weighted')
        if math.isnan(f1_w):
             f1_w = 0.0
    except Exception as e:
        # print(f"Warning: Could not compute F1: {e}. Returning 0.")
        f1_w = 0.0

    return accuracy, f1_w, mcc

In [12]:
# --- Active Learning Simulation Setup and Loop ---

# Initialize separate lists for selected indices for each strategy
# All strategies start with the same initial_sample_indices (100 random samples)
strategy_indices = {
    'Random': deepcopy(initial_sample_indices),
    'Least Confidence': deepcopy(initial_sample_indices),
    'Margin of Confidence': deepcopy(initial_sample_indices),
    'Ratio of Confidence': deepcopy(initial_sample_indices),
    'Entropy': deepcopy(initial_sample_indices),
    'Cluster-Based': deepcopy(initial_sample_indices),
}

# Initialize results list
results_list = []

# Evaluate initial models (provides the first data point at step 0)
print(f"Evaluating initial models with {INITIAL_SAMPLE_SIZE_ACTUAL} samples...")
if INITIAL_SAMPLE_SIZE_ACTUAL > 0:
    initial_accuracy, initial_f1, initial_mcc = train_and_evaluate_model(
        X_train, y_train, X_val, y_val, initial_sample_indices
    )

    # Add initial results for all strategies (they all start the same)
    for strategy_name in strategy_indices.keys():
         results_list.append({
             'Strategy': strategy_name,
             'Step': 0, # Use step 0 for the initial state
             'Number of Labeled Samples': len(strategy_indices[strategy_name]),
             'MCC': initial_mcc,
             'Accuracy': initial_accuracy,
             'F1 Score (weighted)': initial_f1
         })
    print(f"Initial MCC: {initial_mcc:.4f}")
else:
     print("Warning: Initial sample size is 0. Starting AL from 0 samples.")

# Get all indices from the training data pool (train_df)
all_train_indices = np.arange(X_train.shape[0])

# Start counting steps from 1 after the initial (step 0)
if TOTAL_AL_SIMULATION_SAMPLES > INITIAL_SAMPLE_SIZE_ACTUAL and largest_step_size > 0:
    # Calculate steps needed *after* the initial sample
    samples_needed_after_init = TOTAL_AL_SIMULATION_SAMPLES - INITIAL_SAMPLE_SIZE_ACTUAL
    max_steps = (samples_needed_after_init + largest_step_size - 1) // largest_step_size
else:
    max_steps = 0

print(f"Starting Active Learning Simulation for up to {max_steps} steps.")
print(f"Target total samples per strategy: up to {TOTAL_AL_SIMULATION_SAMPLES}")

# --- Active Learning Simulation Loop ---
for step in trange(1, max_steps + 1, desc="AL Steps"):
    # print(f"\n--- Step {step}/{max_steps} ---")

    # Keep track if any strategy added samples in this step
    samples_added_in_step = False

    # Iterate over all strategies
    for strategy_name, current_indices in strategy_indices.items():

        current_n_samples = len(current_indices)

        # Check if this strategy has already reached its target or exhausted the pool
        if current_n_samples >= TOTAL_AL_SIMULATION_SAMPLES or current_n_samples >= X_train.shape[0]:
            # print(f"{strategy_name}: Target sample count ({TOTAL_AL_SIMULATION_SAMPLES}) or pool size ({X_train.shape[0]}) already reached. Skipping step.")
            # Append the last result if no new samples were added in this step yet
            last_result = next((r for r in reversed(results_list) if r['Strategy'] == strategy_name), None)
            if last_result and last_result.get('Step', 0) < step: # Use .get with default 0 for safety
                 results_list.append({
                     'Strategy': strategy_name,
                     'Step': step,
                     'Number of Labeled Samples': last_result['Number of Labeled Samples'],
                     'MCC': last_result['MCC'],
                     'Accuracy': last_result['Accuracy'],
                     'F1 Score (weighted)': last_result['F1 Score (weighted)']
                 })
            continue # Move to the next strategy

        # Find indices NOT yet selected by THIS strategy (the unlabeled pool for this strategy)
        unlabeled_mask = ~np.isin(all_train_indices, current_indices)
        unlabeled_indices = all_train_indices[unlabeled_mask]

        # --- Select New Indices based on Strategy ---
        newly_selected_indices_original = [] # Indices relative to the original train_df pool

        # Determine how many samples to add in this step for this strategy (usually STEP_SIZE)
        # Ensure we don't add more than needed to reach the target or exhaust the pool
        num_to_add = min(STEP_SIZE, TOTAL_AL_SIMULATION_SAMPLES - current_n_samples, X_train.shape[0] - current_n_samples)
        
        if len(unlabeled_indices) == 0 or num_to_add <= 0:
            # No unlabeled data or 0 samples requested, skip selection for this strategy this step
            # The plateauing logic outside this block will handle appending the last result if necessary.
            # print(f"{strategy_name}: No unlabeled data or samples requested (num_to_add={num_to_add}, unlabeled={len(unlabeled_indices)}). Skipping selection.")
            newly_selected_indices_relative = [] # Empty list means no new samples added
        else:
                # Get subsets for sampling functions (indices relative to the unlabeled_indices list)
                # The sampling functions will return indices relative to X_unlabeled_subset / X_reduced_unlabeled_subset
                X_unlabeled_subset = X_train.loc[unlabeled_indices] 
                X_reduced_unlabeled_subset = X_train.loc[unlabeled_indices]
                
                if strategy_name == 'Random':
                    newly_selected_indices_relative = rng.choice(X_unlabeled_subset.shape[0], size=min(num_to_add, X_unlabeled_subset.shape[0]), replace=False).tolist()

                elif strategy_name in ['Least Confidence', 'Margin of Confidence', 'Ratio of Confidence', 'Entropy']:
                    # Train model on *current* labeled data to query uncertainty on unlabeled data
                    if len(current_indices) == 0:
                        # print(f"Warning: {strategy_name} needs a trained model but no labeled data available. Falling back to random sampling for this step.")
                        newly_selected_indices_relative = rng.choice(X_unlabeled_subset.shape[0], size=min(num_to_add, X_unlabeled_subset.shape[0]), replace=False).tolist()
                    else:
                        model_for_query = LogisticRegression(random_state=RANDOM_STATE, n_jobs=-1, max_iter=1000, class_weight='balanced')
                        try:
                            model_for_query.fit(X_train.iloc[current_indices], y_train.iloc[current_indices])
                        except ValueError as e:
                            # print(f"Warning: Could not train model for {strategy_name} at step {step} due to: {e}. Falling back to random sampling for this batch.")
                            newly_selected_indices_relative = rng.choice(X_unlabeled_subset.shape[0], size=min(num_to_add, X_unlabeled_subset.shape[0]), replace=False).tolist()
                        else:
                            if strategy_name == 'Least Confidence':
                                newly_selected_indices_relative = least_confidence_sampling(X_unlabeled_subset, model_for_query, num_to_add)
                            elif strategy_name == 'Margin of Confidence':
                                newly_selected_indices_relative = margin_of_confidence_sampling(X_unlabeled_subset, model_for_query, num_to_add)
                            elif strategy_name == 'Ratio of Confidence':
                                newly_selected_indices_relative = ratio_of_confidence_sampling(X_unlabeled_subset, model_for_query, num_to_add)
                            elif strategy_name == 'Entropy':
                                newly_selected_indices_relative = entropy_sampling(X_unlabeled_subset, model_for_query, num_to_add)

                elif strategy_name == 'Cluster-Based':
                    # Number of clusters for sampling should ideally scale with the batch size
                    n_clusters_for_sampling = min(num_to_add, X_reduced_unlabeled_subset.shape[0])
                    if n_clusters_for_sampling > 0:
                        newly_selected_indices_relative = optimized_cluster_sampling(
                            X_reduced_unlabeled_subset, num_to_add, n_clusters_for_sampling,
                            subsample_size=KMEANS_SUBSAMPLE_SIZE, random_state=RANDOM_STATE
                        )
                    elif num_to_add > 0: # If we needed samples but couldn't cluster meaningfully
                        # print(f"Warning: {strategy_name} cannot cluster {X_reduced_unlabeled_subset.shape[0]} samples into {n_clusters_for_sampling} clusters. Falling back to random sampling for this batch.")
                        newly_selected_indices_relative = rng.choice(X_unlabeled_subset.shape[0], size=min(num_to_add, X_unlabeled_subset.shape[0]), replace=False).tolist()
                    # else: num_to_add was 0, no selection needed
            

        # Map relative indices back to original training set indices (from the all_train_indices pool)
        # Only do this mapping if indices were actually selected
        if newly_selected_indices_relative:
                newly_selected_indices_original = unlabeled_indices[newly_selected_indices_relative].tolist()


        # --- Update Labeled Indices and Evaluate ---

        # Check if any indices were actually selected by the strategy logic
        if newly_selected_indices_original:
            # Add new indices to the strategy's selected pool
            strategy_indices[strategy_name].extend(newly_selected_indices_original)

            # Remove duplicates (shouldn't be any if sampled from unlabeled) and sort (optional)
            # Using set() removes duplicates efficiently
            strategy_indices[strategy_name] = sorted(list(set(strategy_indices[strategy_name])))

            # Set flag indicating samples were added by at least one strategy in this step
            samples_added_in_step = True

            # Train and evaluate the model for this strategy with the updated labeled set
            # print(f"{strategy_name}: Training model with {len(strategy_indices[strategy_name])} samples...")
            accuracy, f1, mcc = train_and_evaluate_model(
                X_train, y_train, X_val, y_val, strategy_indices[strategy_name]
            )

            # Record results
            results_list.append({
                'Strategy': strategy_name,
                'Step': step,
                'Number of Labeled Samples': len(strategy_indices[strategy_name]),
                'MCC': mcc,
                'Accuracy': accuracy,
                'F1 Score (weighted)': f1
            })
            # print(f"{strategy_name} MCC: {mcc:.4f}")
        else:# If no samples were added in this step for this specific strategy,
             # append the last result if it's from a previous step to show plateau/stagnation
             last_result = next((r for r in reversed(results_list) if r['Strategy'] == strategy_name), None)
             if last_result and last_result.get('Step', 0) < step: 
                 results_list.append({
                     'Strategy': strategy_name,
                     'Step': step,
                     'Number of Labeled Samples': last_result['Number of Labeled Samples'],
                     'MCC': last_result['MCC'],
                     'Accuracy': last_result['Accuracy'],
                     'F1 Score (weighted)': last_result['F1 Score (weighted)']
                 })

    # Stop loop early if no samples were added by *any* strategy in this step
    # This happens if all strategies have reached their target samples or the pool is exhausted
    if not samples_added_in_step:
         print("No samples added by any strategy in this step. Stopping simulation.")
         break

print("Active Learning simulation loop finished.")

# Add final data points for plotting consistency (ensure lines extend to the end)
if results_list:
    max_step_recorded = max(r.get('Step', 0) for r in results_list)
    # If the loop broke early, the final step should still be max_steps
    final_plot_step = max(max_step_recorded, max_steps)
else:
    final_plot_step = max_steps # If no results, use max_steps


print("Appending final data points for plot consistency...")



Evaluating initial models with 500 samples...
Initial MCC: 0.8354
Starting Active Learning Simulation for up to 355 steps.
Target total samples per strategy: up to 36000


AL Steps:  99%|█████████▉| 351/355 [18:53<00:12,  3.23s/it]

No samples added by any strategy in this step. Stopping simulation.
Active Learning simulation loop finished.
Appending final data points for plot consistency...





In [13]:
# Re-process dataframe after adding final points
df_results = pd.DataFrame(results_list)
for strategy_name in strategy_indices.keys():
    last_result = next((r for r in reversed(results_list) if r['Strategy'] == strategy_name), None);

    # Append a point at the final_plot_step only if the last point for this strategy is before it
    if last_result is not None and last_result.get('Step', -1) < final_plot_step:
         # Find the actual last result for this strategy (important if intermediate plateau points were added)
         # Filter df_results for the current strategy, sort by steps, and get the last one
         strategy_df = df_results[(df_results['Strategy'] == strategy_name)].sort_values(by='Step')
         if not strategy_df.empty:
             last_valid_point = strategy_df.iloc[-1].to_dict()
             # Check if we are not duplicating the very last point if it's already at final_plot_step
             if last_valid_point.get('Step', -1) < final_plot_step:
                 results_list.append({
                      'Strategy': strategy_name,
                      'Step': final_plot_step, # Assign the final simulation step
                      'Number of Labeled Samples': last_valid_point['Number of Labeled Samples'], # Samples remain the same
                      'MCC': last_valid_point['MCC'], # Metrics remain the same
                      'Accuracy': last_valid_point['Accuracy'],
                      'F1 Score (weighted)': last_valid_point['F1 Score (weighted)']
                  })
    # Handle case where a strategy had no samples initially but should appear in plot legend at step 0
    elif last_result is None and INITIAL_SAMPLE_SIZE_ACTUAL == 0 and 0 not in [r.get('Step',-1) for r in results_list if r.get('Strategy') == strategy_name]:
         results_list.append({
              'Strategy': strategy_name,
              'Step': 0,
              'Number of Labeled Samples': 0,
              'MCC': 0.0,
              'Accuracy': 0.0,
              'F1 Score (weighted)': 0.0
         })

# Re-process dataframe after adding final points
df_results = pd.DataFrame(results_list)
# Remove potential duplicate entries at the same step/sample count caused by plateauing logic or final append
df_results = df_results.drop_duplicates(subset=['Strategy', 'Step', 'Number of Labeled Samples']).reset_index(drop=True)
# Sort again to ensure correct plotting
df_results = df_results.sort_values(by=['Strategy', 'Number of Labeled Samples']).reset_index(drop=True)

print("Final data points appended. Results DataFrame updated.")

# Recalculate theoretical curve based on potentially new max samples after adding final points
min_samples_achieved = df_results["Number of Labeled Samples"].min() if not df_results.empty else 0
max_samples_achieved = df_results["Number of Labeled Samples"].max() if not df_results.empty else TOTAL_AL_SIMULATION_SAMPLES

# Ensure log calculation is possible (start from 1 if min_samples_achieved is 0 or 1)
log_start = max(1, min_samples_achieved)

# Create x values for the theoretical curve plot
if max_samples_achieved > log_start:
    x_vals_theoretical = np.linspace(log_start, max_samples_achieved, 100) # Generate 100 points
else:
    # If range is 0 or 1 point, create a simple x range for plotting
    # Ensure there's at least one point if min_samples_achieved exists
    if min_samples_achieved > 0:
        x_vals_theoretical = np.array([min_samples_achieved, min_samples_achieved + 1])
    else:
        x_vals_theoretical = np.array([1.0, 2.0]) # Default if no data at all, use floats for consistency

# Calculate theoretical curve values based on the overall range of MCC observed
max_mcc_observed = df_results["MCC"].max() if not df_results.empty else 0.0
min_mcc_observed = df_results["MCC"].min() if not df_results.empty else 0.0

# Avoid division by zero if max_samples_achieved is the same as log_start
if max_samples_achieved > log_start:
    # Scale log curve between min and max observed MCC
    # Ensure the log argument is positive
    log_arg = np.maximum(x_vals_theoretical, log_start)
    theoretical_curve_values = (
        np.log(log_arg / log_start) / np.log(max_samples_achieved / log_start) * 
        (max_mcc_observed - min_mcc_observed) + 
        min_mcc_observed
    )
else: 
    # If only one effective sample point range, theoretical curve is just a flat line at the max observed MCC
    theoretical_curve_values = np.full_like(x_vals_theoretical, max_mcc_observed)

df_theoretical = pd.DataFrame({
    "Number of Labeled Samples": x_vals_theoretical,
    "MCC": theoretical_curve_values,
    "Strategy": "Theoretical",
    "Step": None, # No meaningful step for theoretical curve
    "Accuracy": None,
    "F1 Score (weighted)": None
})

# Concatenate the results and the theoretical curve for plotting
plot_df = pd.concat([df_results, df_theoretical], ignore_index=True)

print("Combined DataFrame for plotting created.")

Final data points appended. Results DataFrame updated.
Combined DataFrame for plotting created.


  plot_df = pd.concat([df_results, df_theoretical], ignore_index=True)


In [15]:
plot_df.to_csv('plot_tabular.csv', sep = ';', index=False)

In [2]:
strategy_colors = {
    'Random': 'orange',
    'Least Confidence': 'green',
    'Margin of Confidence': 'purple',
    'Ratio of Confidence': 'cyan',
    'Entropy': 'red',
    'Cluster-Based': 'blueviolet',
    'Theoretical': 'gray'
}


strategy_dashes = {
    'Random': 'solid',
    'Least Confidence': 'solid',
    'Margin of Confidence': 'solid',
    'Ratio of Confidence': 'solid',
    'Entropy': 'dot',
    'Cluster-Based': 'solid',
    'Theoretical': 'dash'
}

sorted_strategies = sorted(plot_df['Strategy'].unique())

fig_mcc = px.line(
    plot_df,
    x="Number of Labeled Samples",
    y="MCC",
    color="Strategy",
    title="Active Learning Performance: MCC vs. Number of Labeled Samples",
    line_dash="Strategy",
    markers=True,
    color_discrete_map=strategy_colors,
    line_dash_map=strategy_dashes,
    category_orders={"Strategy": sorted_strategies},
    hover_data={
        "Strategy": True,
        "Number of Labeled Samples": ':.0f',
        "MCC": ':.4f',
        "Accuracy": ':.4f',
        "F1 Score (weighted)": ':.4f',
        # "Step": True
        }
)

fig_mcc.update_layout(
    xaxis_title="Number of Labeled Samples",
    yaxis_title="MCC Score",
    legend_title="Strategy",
    title_x=0.5,
    hovermode="x unified",
)

fig_mcc.show()

plot_df_strategies = plot_df[plot_df['Strategy'] != 'Theoretical']

fig_f1 = px.line(
    plot_df_strategies,
    x="Number of Labeled Samples",
    y="F1 Score (weighted)",
    color="Strategy",
    title="Active Learning Performance: F1 Score (Weighted) vs. Number of Labeled Samples",
    line_dash="Strategy",
    markers=True,
    color_discrete_map=strategy_colors,
    line_dash_map=strategy_dashes,
    category_orders={"Strategy": sorted_strategies},
    hover_data={
        "Strategy": True,
        "Number of Labeled Samples": ':.0f',
        "MCC": ':.4f',
        "Accuracy": ':.4f',
        "F1 Score (weighted)": ':.4f',
        "Step": True 
        }
)

fig_f1.update_layout(
    xaxis_title="Number of Labeled Samples",
    yaxis_title="F1 Score (Weighted)",
    legend_title="Strategy",
    title_x=0.5,
    hovermode="x unified", 
)

fig_f1.show()