In [None]:
### A. CONFORMAL PREDICTION TESTING WITH OPTIMIZED DL MODELS##

In [60]:
import os
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from scikeras.wrappers import KerasClassifier  
from mapie.classification import MapieClassifier
from sklearn.utils.validation import check_is_fitted


In [None]:
alpha = 0.05 # Conformal Confidence Prediction threshold
score_thresh = 0.9 # Minimum confidence threshold for valid predictions
model_names = [ "OneLow", "TwoTypes", "ThreeTypes", "FourTypes", "OneHigh", "TwoHigh", "TwoLow"]
calib_data_path = "./Data_Splits"
mdl_path = "./MDL"

In [62]:
calib_data = {}
calib_labels = {}

for model_name in model_names:
    x_calib_file = os.path.join(calib_data_path, f"x_calib_{model_name}.npy")
    y_calib_file = os.path.join(calib_data_path, f"y_calib_{model_name}.npy")

    if os.path.exists(x_calib_file) and os.path.exists(y_calib_file):
        calib_data[model_name] = np.load(x_calib_file, allow_pickle=True)
        calib_labels[model_name] = np.load(y_calib_file, allow_pickle=True)

In [63]:
test_data = {}
test_labels = {}

for model_name in model_names:
    x_test_file = os.path.join(calib_data_path, f"x_test_{model_name}.npy")
    y_test_file = os.path.join(calib_data_path, f"y_test_{model_name}.npy")

    if os.path.exists(x_test_file) and os.path.exists(y_test_file):
        test_data[model_name] = np.load(x_test_file, allow_pickle=True)
        test_labels[model_name] = np.load(y_test_file, allow_pickle=True)

In [None]:
import os
import numpy as np
import tensorflow as tf
from mapie.classification import MapieClassifier
from scikeras.wrappers import KerasClassifier  #  SciKeras Wrapper for TensorFlow Model

# Ensure eager execution is enabled
tf.compat.v1.experimental.output_all_intermediates(True)
tf.config.run_functions_eagerly(True)

models = {}

for model_name in model_names:
    model_path = os.path.join(mdl_path, f"{model_name}_best.h5")

    if os.path.exists(model_path):
        #  Ensure calibration data is in NumPy format
        X_calib = np.array(calib_data[model_name])
        y_calib = np.array(calib_labels[model_name])

        #  Define a function that loads and compiles the model
        def create_model():
            model = tf.keras.models.load_model(model_path)  # Load model from file
            model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),  
                          loss='sparse_categorical_crossentropy', 
                          metrics=['accuracy'])
            return model

        # Wrap model inside KerasClassifier
        keras_clf = KerasClassifier(model=create_model, epochs=1, batch_size=32, verbose=1)

        # Fit model before MAPIE (SciKeras expects a trainable model)
        keras_clf.fit(X_calib, y_calib)

        # Use MAPIE for uncertainty estimation
        mapie_clf = MapieClassifier(estimator=keras_clf, method="score", cv="prefit")
        mapie_clf.fit(X_calib, y_calib)

        #  Store trained model
        models[model_name] = mapie_clf

print(" Models successfully fitted.")


In [None]:
psets = {}

for model_name in model_names:
    if model_name in models and model_name in test_data:
        print(f"Processing model: {model_name}")
        mapie_clf = models[model_name]
        try:
            check_is_fitted(mapie_clf)
            _, y_psets = mapie_clf.predict(test_data[model_name], alpha=[alpha])  # Generate prediction sets
            psets[model_name] = y_psets
            print(f"Shape of prediction set for {model_name}: {y_psets.shape}")
        except Exception as e:
            print(f"Skipping {model_name} due to prediction error: {e}")

In [None]:
correct_tags = 0
total_steps = 0
expected_set_size = 0
num_cards = len(set(np.concatenate(list(test_labels.values()))))

start_index = model_names.index("OneLow")
ordered_model_names = model_names[start_index:] + model_names[:start_index]

In [None]:
for model_name in ordered_model_names:
    if model_name not in test_labels or model_name not in psets:
        continue

    y_test = test_labels[model_name]
    y_psets = psets[model_name]

    for test_num in range(len(y_test)):
        tag_scores = np.zeros(num_cards)
        correct_tag = int(y_test[test_num])

        tag_guess = None
        steps = 0

        for y_psets in psets.values():
            tags = [j for j in range(num_cards) if y_psets[test_num][j] == True]
            expected_set_size += len(tags)
            steps += 1

            for tag in tags:
                tag_scores[tag] += tags.count(tag) / len(tags)

            if max(tag_scores) > score_thresh:
                is_unique_max = np.count_nonzero(tag_scores == max(tag_scores)) == 1
                if is_unique_max:
                    tag_guess = np.argmax(tag_scores)
                    break
            else:
                # If no prediction meets the threshold, choose the class with the highest probability
                tag_guess = np.argmax(tag_scores)

        total_steps += steps
        if tag_guess == correct_tag:
            correct_tags += 1

total_tests = sum(len(y) for y in test_labels.values())
accuracy = correct_tags / total_tests if total_tests > 0 else 0
avg_steps = total_steps / total_tests if total_tests > 0 else 0
avg_set_size = expected_set_size / total_tests if total_tests > 0 else 0

print(f"Adaptive Identification Accuracy (A): {accuracy:.2%}")
print(f"Average Steps Taken (E[step]): {avg_steps:.2f}")
print(f"Average Prediction Set Size (E[|S|]): {avg_set_size:.2f}")



In [None]:
#####  B. Conformal   Prediction ML

In [None]:
import os
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from mapie.classification import MapieClassifier
from joblib import dump, load
import matplotlib.pyplot as plt
import seaborn as sns


In [None]:
data_one_high = np.load("OneHigh_.npy", allow_pickle=True)
data_one_low = np.load("OneLow_.npy", allow_pickle=True)
data_two_high = np.load("TwoHigh_.npy", allow_pickle=True)
data_two_low = np.load("TwoLow_.npy", allow_pickle=True)

In [None]:
# Normalize datasets
normalization_point = 0.5
def normalize_data(data):
    for i in range(52):
        data_abs = np.abs(data[:, :, i])
        min_point = np.min(data_abs)
        diff = min_point - normalization_point
        data[:, :, i] = np.abs(data[:, :, i]) - diff
    return data

In [None]:
# Normalize each dataset individually
data_one_high = normalize_data(data_one_high)
data_one_low = normalize_data(data_one_low)
data_two_high = normalize_data(data_two_high)
data_two_low = normalize_data(data_two_low)

In [None]:
# Concatenate datasets for training
dataset = np.concatenate((data_one_high, data_one_low, data_two_high, data_two_low), axis=1)
X_combined = np.empty((dataset.shape[0] * dataset.shape[2], dataset.shape[1]), dtype=np.complex_)
labels = np.empty((dataset.shape[0] * dataset.shape[2]))

for card in range(dataset.shape[2]):
    responses = dataset[:, :, card]
    start_idx = card * dataset.shape[0]
    end_idx = start_idx + dataset.shape[0]
    X_combined[start_idx:end_idx, :] = responses
    labels[start_idx:end_idx] = card

# Convert to absolute values for training
dataset = np.abs(X_combined)


x_train, x_temp, y_train, y_temp = train_test_split(dataset, labels, test_size=0.3, random_state=42)


x_test, x_calib, y_test, y_calib = train_test_split(x_temp, y_temp, test_size=0.2, random_state=42)

# Save train, test, and calibration sets
os.makedirs('./Testing', exist_ok=True)
np.save('./Testing/x_train', x_train)
np.save('./Testing/y_train', y_train)
np.save('./Testing/x_test', x_test)
np.save('./Testing/y_test', y_test)
np.save('./Testing/x_calib', x_calib)
np.save('./Testing/y_calib', y_calib)

# Define lengths for sub-carrier response types
one_high_len = 910
one_low_len = 3624
two_high_len = 900
two_low_len = 3599

# Model training function with resume functionality
def train_and_save_model(features, labels, model_name):
    model_path = f'./Models/{model_name}.joblib'
    os.makedirs('./Models', exist_ok=True)
    
    # Check if model already exists
    if os.path.exists(model_path):
        print(f"{model_name} model already exists. Loading saved model.")
        mapie_clf = load(model_path)
    else:
        print(f"Training {model_name}...")
        base_clf = RandomForestClassifier()
        mapie_clf = MapieClassifier(estimator=base_clf, method="score")
        mapie_clf.fit(features, labels)
        dump(mapie_clf, model_path)
        print(f"{model_name} model saved at {model_path}")

In [None]:
# Conformal Prediction using calibration set
num_cards = 52
score_thresh = 0.8
alpha = 0.15
expected_set_size = 0
total_steps = 0

# Define model test data combinations
model_names = ["OneLow", "TwoTypes", "ThreeTypes", "FourTypes", "OneHigh", "TwoHigh", "TwoLow"]
test_combinations = [
    x_test[:, :one_low_len + one_high_len],  
    x_test[:, :two_high_len + one_high_len + one_low_len],  
    x_test,  
    x_test[:, :one_high_len],  
    x_test[:, one_high_len:one_high_len + one_low_len],  
    x_test[:, one_high_len + one_low_len:one_high_len + one_low_len + two_high_len],  
    x_test[:, one_high_len + one_low_len + two_high_len:]  
]

# Run predictions for each model
psets = []
for i, model_name in enumerate(model_names):
    model_path = f'./Models/{model_name}.joblib'
    if os.path.exists(model_path):
        mapie_clf = load(model_path)
        _, y_psets = mapie_clf.predict(test_combinations[i], alpha=alpha)
        psets.append(y_psets)

# Evaluate Conformal Prediction accuracy
correct_tags = 0
for test_num in range(len(y_test)):
    tag_scores = np.zeros(num_cards)
    correct_tag = int(y_test[test_num])

    tag_guess = None
    steps = 0
    
    for y_psets in psets:
        tags = [j for j in range(num_cards) if y_psets[test_num][j] == True]
        expected_set_size += len(tags)
        steps += 1
        
        for tag in tags:
            tag_scores[tag] += tags.count(tag) / len(tags)
        if max(tag_scores) > score_thresh:
                is_unique_max = np.count_nonzero(tag_scores == max(tag_scores)) == 1
                if is_unique_max:
                    tag_guess = np.argmax(tag_scores)
                    break

                else:
                # If no prediction meets the threshold, choose the class with the highest probability
                 tag_guess = np.argmax(tag_scores)

        total_steps += steps
        if tag_guess == correct_tag:
            correct_tags += 1

# Calculate metrics
accuracy = (correct_tags / len(y_test)) * 100
expected_set_size /= len(y_test)
expected_steps = total_steps / len(y_test)

# Print results
print(f"Adaptive Identification Accuracy (A): {accuracy:.2%}")
print(f"Average Steps Taken (E[step]): {avg_steps:.2f}")
print(f"Average Prediction Set Size (E[|S|]): {avg_set_size:.2f}")