In [11]:
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
import category_encoders as ce
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix, accuracy_score

# Load Data

In [12]:
file_name = 'car.data'
df = pd.read_csv(os.path.join(os.getcwd(), 'car+evaluation', file_name), header=None)

In [13]:
df.head()

Unnamed: 0,0,1,2,3,4,5,6
0,vhigh,vhigh,2,2,small,low,unacc
1,vhigh,vhigh,2,2,small,med,unacc
2,vhigh,vhigh,2,2,small,high,unacc
3,vhigh,vhigh,2,2,med,low,unacc
4,vhigh,vhigh,2,2,med,med,unacc


In [14]:
columns_names = ['buying', 'maint', 'doors', 'persons', 'lug_boot', 'safety', 'class']
df.columns = columns_names

In [15]:
df.head()

Unnamed: 0,buying,maint,doors,persons,lug_boot,safety,class
0,vhigh,vhigh,2,2,small,low,unacc
1,vhigh,vhigh,2,2,small,med,unacc
2,vhigh,vhigh,2,2,small,high,unacc
3,vhigh,vhigh,2,2,med,low,unacc
4,vhigh,vhigh,2,2,med,med,unacc


In [16]:
df.isna().sum()

buying      0
maint       0
doors       0
persons     0
lug_boot    0
safety      0
class       0
dtype: int64

In [17]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1728 entries, 0 to 1727
Data columns (total 7 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   buying    1728 non-null   object
 1   maint     1728 non-null   object
 2   doors     1728 non-null   object
 3   persons   1728 non-null   object
 4   lug_boot  1728 non-null   object
 5   safety    1728 non-null   object
 6   class     1728 non-null   object
dtypes: object(7)
memory usage: 94.6+ KB


In [18]:
df['class'].value_counts()

class
unacc    1210
acc       384
good       69
vgood      65
Name: count, dtype: int64

In [19]:
X = df.drop(columns='class', axis=1)
y = df['class']

In [20]:
def select_lowest_probabieties(probalility, df, n_samples, labels):
    """
    Split df for 2 df, where df_top contains the rows with the lowest probabilities, and df_rest contains the rest of the rows.
    """
    df_with_proba = df.copy()
    df_with_proba['probability'] = None
    df_with_proba['labels'] = None

    df_with_proba['probability'] = probalility
    df_with_proba['labels'] = labels
    df_with_proba = df_with_proba.sort_values(by='probability', ascending=True)
    df_top = df_with_proba[:n_samples]
    X_top = df_top.drop(columns=['probability', 'labels'])
    y_top = df_top['labels']
    df_rest = df_with_proba[n_samples:]
    X_rest = df_rest.drop(columns=['probability', 'labels'])
    y_rest = df_rest['labels']

    return X_top, y_top, X_rest, y_rest

def save_metrics_to_file(filename, cycle, precision, recall, f1, cm, accuracy, X_labelled, y_test):
    # Create a list to hold the results for this cycle
    results = []
    
    # Append results for each class in the metrics
    for i, class_label in enumerate(np.unique(y_test)):
        results.append({
            'Cycle': cycle,
            'Class': class_label,
            'Precision': precision[i],
            'Recall': recall[i],
            'F1-score': f1[i],
            'Accuracy': accuracy,
            'Confusion Matrix': cm.tolist(),
            'Labelled Samples': X_labelled.shape[0]
        })
    
    # Convert to DataFrame and save to CSV
    results_df = pd.DataFrame(results)
    
    # Append to the CSV file (if it exists) or create a new one
    results_df.to_csv(filename, mode='a', header=not pd.io.common.file_exists(filename), index=False)
    print(f"Metrics saved for cycle {cycle} to {filename}")
    
# Function to calculate metrics
def calculate_metrics(y_test, y_pred):
    precision, recall, f1, _ = precision_recall_fscore_support(y_test, y_pred, average=None)
    cm = confusion_matrix(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    
    return precision, recall, f1, cm, accuracy

def select_samples(probalility, df, n_samples, labels, metric="least_confidence"):
    """
    Select samples for active learning based on different uncertainty metrics.
    
    Parameters:
    - probalility: Array of probabilities for each sample (shape: [n_samples, n_classes]).
    - df: The DataFrame containing the feature data.
    - n_samples: The number of samples to select.
    - labels: The true labels for the unlabelled samples.
    - metric: The uncertainty metric to use. Options are 'least_confidence', 'entropy', or 'margin_sampling'.
    
    Returns:
    - X_top: The feature data for the selected samples.
    - y_top: The labels for the selected samples.
    - X_rest: The feature data for the remaining samples.
    - y_rest: The labels for the remaining samples.
    """
    
    # Create a DataFrame to store the probabilities and labels
    df_with_proba = df.copy()
    df_with_proba['labels'] = labels

    # Compute the uncertainty scores based on the chosen metric
    if metric == "least_confidence":
        # Least Confidence: Select the samples with the lowest top predicted probabilities
        prob_top = np.max(probalility, axis=1)  # Get the top probability for each sample
        df_with_proba['uncertainty'] = prob_top

    elif metric == "entropy":
        # Entropy: Calculate the entropy for each sample based on its class probabilities
        entropy_values = -np.sum(probalility * np.log(probalility + 1e-10), axis=1)  # Adding small epsilon to avoid log(0)
        df_with_proba['uncertainty'] = entropy_values

    elif metric == "margin_sampling":
        # Margin Sampling: Select the samples where the top two predicted probabilities are closest
        top_2_probs = np.partition(probalility, -2, axis=1)[:, -2:]  # Get the top 2 predicted probabilities
        margin = top_2_probs[:, 1] - top_2_probs[:, 0]  # Calculate the difference between the top two
        df_with_proba['uncertainty'] = margin
        
    else:
        raise ValueError(f"Unknown metric: {metric}. Available options are 'least_confidence', 'entropy', 'margin_sampling'.")
    
    # Sort the samples by the uncertainty (ascending order to get the most uncertain samples)
    df_with_proba = df_with_proba.sort_values(by='uncertainty', ascending=True)
    
    # Select the top `n_samples` samples with the highest uncertainty
    df_top = df_with_proba[:n_samples]
    X_top = df_top.drop(columns=['uncertainty', 'labels'])
    y_top = df_top['labels']
    
    # Select the remaining samples
    df_rest = df_with_proba[n_samples:]
    X_rest = df_rest.drop(columns=['uncertainty', 'labels'])
    y_rest = df_rest['labels']
    
    return X_top, y_top, X_rest, y_rest


In [None]:
# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y)
# Split data to DL and DU
X_labelled, X_unlabelled, y_labeled, y_unlabelled = train_test_split(X_train, y_train, test_size=0.9, stratify=y_train)

# Prepare data for model
encoder = ce.OrdinalEncoder(cols=['buying', 'maint', 'doors', 'persons', 'lug_boot', 'safety'])
X_labelled = encoder.fit_transform(X_labelled)
X_test = encoder.transform(X_test)
X_unlabelled = encoder.transform(X_unlabelled)


# Whole budget
B = 1000
# Budget per cycle
b = 50
# Number of cycle
c = 0

while B>0:
    # Define the model
    dt_clf = DecisionTreeClassifier(criterion='gini', max_depth=100, random_state=0)
    dt_clf.fit(X_labelled, y_labeled)
    probalilities = dt_clf.predict_proba(X_unlabelled)
    #X_lowest_prob, y_lowest_proba, X_rest, y_rest = select_lowest_probabieties(probalility=probalilities, df=X_unlabelled, n_samples=b, labels=y_unlabelled)
    X_lowest_prob, y_lowest_proba, X_rest, y_rest = select_samples(probalility=probalilities, df=X_unlabelled, n_samples=b, labels=y_unlabelled, metric='margin_sampling')

    X_labelled = pd.concat([X_labelled, X_lowest_prob])
    y_labeled = pd.concat([y_labeled, y_lowest_proba])
    X_unlabelled = X_rest
    y_unlabelled = y_rest
    # Calculate accuracy for this cycle
    y_pred = dt_clf.predict(X_test)

    metrics_filename = 'classic_AL.csv'

    # Calculate metrics
    precision, recall, f1, cm, accuracy = calculate_metrics(y_test, y_pred)
    
    # Save the metrics to the file
    save_metrics_to_file(metrics_filename, c, precision, recall, f1, cm, accuracy, X_labelled, y_test)





    # Compute precision, recall, and f1-score for each class
    precision, recall, f1, _ = precision_recall_fscore_support(y_test, y_pred, average=None)
    
    # Print Precision, Recall, and F1 for each class
    print("Class-wise Precision, Recall, F1-score:")
    for i, class_label in enumerate(np.unique(y_test)):
        print(f"Class {class_label}: Precision={precision[i]:.4f}, Recall={recall[i]:.4f}, F1-score={f1[i]:.4f}")
    
    # Compute confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    print("\nConfusion Matrix:")
    print(cm)

    # Optionally, you can also print overall accuracy
    accuracy = accuracy_score(y_test, y_pred)
    print(f"\nOverall Accuracy: {accuracy:.4f}")
    print(X_labelled.shape[0])
    c +=1
    B -=b

Metrics saved for cycle 0 to classic_AL.csv
Class-wise Precision, Recall, F1-score:
Class acc: Precision=0.6522, Recall=0.5844, F1-score=0.6164
Class good: Precision=0.1250, Recall=0.0714, F1-score=0.0909
Class unacc: Precision=0.8984, Recall=0.9504, F1-score=0.9237
Class vgood: Precision=0.2308, Recall=0.2308, F1-score=0.2308

Confusion Matrix:
[[ 45   2  22   8]
 [  8   1   3   2]
 [ 12   0 230   0]
 [  4   5   1   3]]

Overall Accuracy: 0.8064
188
Metrics saved for cycle 1 to classic_AL.csv
Class-wise Precision, Recall, F1-score:
Class acc: Precision=0.6557, Recall=0.5195, F1-score=0.5797
Class good: Precision=0.1818, Recall=0.1429, F1-score=0.1600
Class unacc: Precision=0.9291, Recall=0.9752, F1-score=0.9516
Class vgood: Precision=0.3000, Recall=0.4615, F1-score=0.3636

Confusion Matrix:
[[ 40   7  17  13]
 [ 10   2   1   1]
 [  6   0 236   0]
 [  5   2   0   6]]

Overall Accuracy: 0.8208
238
Metrics saved for cycle 2 to classic_AL.csv
Class-wise Precision, Recall, F1-score:
Class 

# New flow

In [97]:
def calculate_dynamic_class_weights_based_on_model(model, X_labelled, y_labeled):
    """
    Calculate dynamic class weights based on the performance of the model on the current labelled data.
    """
    y_pred = model.predict(X_labelled)
    
    # Find misclassifications (or measure uncertainty)
    misclassifications = (y_pred != y_labeled)
    
    # Calculate the frequency of misclassifications per class
    class_misclassifications = {class_label: np.sum(misclassifications[y_labeled == class_label]) 
                                for class_label in np.unique(y_labeled)}
    
    # Compute class weights as the inverse of misclassifications (more misclassified = higher weight)
    total_misclassifications = sum(class_misclassifications.values())
    class_weights = {class_label: (total_misclassifications / (class_misclassifications[class_label] + 1)) 
                     for class_label in class_misclassifications}
    
    return class_weights

def select_samples(probalility, df, n_samples, labels, metric, class_weights):
    """
    Select samples for active learning based on different uncertainty metrics and dynamically calculated class weights.

    Parameters:
    - probalility: Array of probabilities for each sample (shape: [n_samples, n_classes]).
    - df: The DataFrame containing the feature data.
    - n_samples: The number of samples to select.
    - labels: The true labels for the unlabelled samples.
    - metric: The uncertainty metric to use. Options are 'least_confidence', 'entropy', or 'margin_sampling'.
    - calculate_class_weights_fn: A function to dynamically calculate class weights.

    Returns:
    - X_top: The feature data for the selected samples.
    - y_top: The labels for the selected samples.
    - X_rest: The feature data for the remaining samples.
    - y_rest: The labels for the remaining samples.
    """
    
    # Create a DataFrame to store the probabilities and labels
    df_with_proba = df.copy()
    df_with_proba['labels'] = labels
    
    # Compute the uncertainty scores based on the chosen metric
    if metric == "least_confidence":
        # Least Confidence: Select the samples with the lowest top predicted probabilities
        prob_top = np.max(probalility, axis=1)  # Get the top probability for each sample
        df_with_proba['uncertainty'] = prob_top

    elif metric == "entropy":
        # Entropy: Calculate the entropy for each sample based on its class probabilities
        entropy_values = -np.sum(probalility * np.log(probalility + 1e-10), axis=1)  # Adding small epsilon to avoid log(0)
        df_with_proba['uncertainty'] = entropy_values

    elif metric == "margin_sampling":
        # Margin Sampling: Select the samples where the top two predicted probabilities are closest
        top_2_probs = np.partition(probalility, -2, axis=1)[:, -2:]  # Get the top 2 predicted probabilities
        margin = top_2_probs[:, 1] - top_2_probs[:, 0]  # Calculate the difference between the top two
        df_with_proba['uncertainty'] = margin
    else:
        raise ValueError(f"Unknown metric: {metric}. Available options are 'least_confidence', 'entropy', 'margin_sampling'.")

    # Apply class weighting to the uncertainty values if class weights are provided
    if class_weights:
        df_with_proba['uncertainty'] *= df_with_proba['labels'].map(class_weights)
    
    # Sort the samples by the uncertainty (ascending order to get the most uncertain samples)
    df_with_proba = df_with_proba.sort_values(by='uncertainty', ascending=True)
    
    # Select the top `n_samples` samples with the highest uncertainty
    df_top = df_with_proba[:n_samples]
    X_top = df_top.drop(columns=['uncertainty', 'labels'])
    y_top = df_top['labels']
    
    # Select the remaining samples
    df_rest = df_with_proba[n_samples:]
    X_rest = df_rest.drop(columns=['uncertainty', 'labels'])
    y_rest = df_rest['labels']
    
    return X_top, y_top, X_rest, y_rest


In [100]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y)

X_labelled, X_unlabelled, y_labeled, y_unlabelled = train_test_split(X_train, y_train, test_size=0.9, stratify=y_train)

# encode variables with ordinal encoding

encoder = ce.OrdinalEncoder(cols=['buying', 'maint', 'doors', 'persons', 'lug_boot', 'safety'])
X_labelled = encoder.fit_transform(X_labelled)
X_test = encoder.transform(X_test)
X_unlabelled = encoder.transform(X_unlabelled)


# Whole budget
B =1000
# Budget per cycle
b = 50
# Number of cycle
c = 0

while B>0:
    dt_clf = DecisionTreeClassifier(criterion='gini', max_depth=100, random_state=0)
    dt_clf.fit(X_labelled, y_labeled)
    probalilities = dt_clf.predict_proba(X_unlabelled)
    class_weights = calculate_dynamic_class_weights_based_on_model(dt_clf, X_labelled, y_labeled)
    #X_lowest_prob, y_lowest_proba, X_rest, y_rest = select_lowest_probabieties(probalility=probalilities, df=X_unlabelled, n_samples=b, labels=y_unlabelled)
    X_lowest_prob, y_lowest_proba, X_rest, y_rest = select_samples(probalility=probalilities, df=X_unlabelled, n_samples=b, labels=y_unlabelled, metric='margin_sampling', class_weights=class_weights)

    X_labelled = pd.concat([X_labelled, X_lowest_prob])
    y_labeled = pd.concat([y_labeled, y_lowest_proba])
    X_unlabelled = X_rest
    y_unlabelled = y_rest
    # Calculate accuracy for this cycle
    y_pred = dt_clf.predict(X_test)
    metrics_filename = 'custom_AL.csv'

    # Calculate metrics
    precision, recall, f1, cm, accuracy = calculate_metrics(y_test, y_pred)
    
    # Save the metrics to the file
    save_metrics_to_file(metrics_filename, c, precision, recall, f1, cm, accuracy, X_labelled)
    # Compute precision, recall, and f1-score for each class
    precision, recall, f1, _ = precision_recall_fscore_support(y_test, y_pred, average=None)
    
    # Print Precision, Recall, and F1 for each class
    print("Class-wise Precision, Recall, F1-score:")
    for i, class_label in enumerate(np.unique(y_test)):
        print(f"Class {class_label}: Precision={precision[i]:.4f}, Recall={recall[i]:.4f}, F1-score={f1[i]:.4f}")
    
    # Compute confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    print("\nConfusion Matrix:")
    print(cm)

    # Optionally, you can also print overall accuracy
    accuracy = accuracy_score(y_test, y_pred)
    print(f"\nOverall Accuracy: {accuracy:.4f}")
    print(accuracy)
    print(X_labelled.shape[0])
    c +=1
    B -=b

Metrics saved for cycle 0 to custom_AL.csv
Class-wise Precision, Recall, F1-score:
Class acc: Precision=0.5571, Recall=0.5065, F1-score=0.5306
Class good: Precision=0.2857, Recall=0.1429, F1-score=0.1905
Class unacc: Precision=0.8769, Recall=0.9421, F1-score=0.9084
Class vgood: Precision=0.4444, Recall=0.3077, F1-score=0.3636

Confusion Matrix:
[[ 39   4  30   4]
 [ 11   2   1   0]
 [ 13   0 228   1]
 [  7   1   1   4]]

Overall Accuracy: 0.7890
0.7890173410404624
188
Metrics saved for cycle 1 to custom_AL.csv
Class-wise Precision, Recall, F1-score:
Class acc: Precision=0.6154, Recall=0.7273, F1-score=0.6667
Class good: Precision=0.2000, Recall=0.1429, F1-score=0.1667
Class unacc: Precision=0.9409, Recall=0.9215, F1-score=0.9311
Class vgood: Precision=0.3750, Recall=0.2308, F1-score=0.2857

Confusion Matrix:
[[ 56   4  14   3]
 [ 10   2   0   2]
 [ 18   1 223   0]
 [  7   3   0   3]]

Overall Accuracy: 0.8208
0.8208092485549133
238
Metrics saved for cycle 2 to custom_AL.csv
Class-wise 