In [None]:
import numpy as np
from scipy.optimize import linear_sum_assignment
import random
import tensorflow as tf
from tensorflow.keras import layers, regularizers
import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_curve, roc_curve, auc
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import xgboost as xgb
from imblearn.over_sampling import SMOTE

# --- TSP Component ---
def solve_tsp(distance_matrix):
    row_ind, col_ind = linear_sum_assignment(distance_matrix)
    return col_ind, distance_matrix[row_ind, col_ind].sum()

# --- ACO Component ---
class AntColony:
    def __init__(self, distance_matrix, n_ants, n_best, n_iterations, decay, alpha=1, beta=2):
        self.distance_matrix = distance_matrix
        self.pheromone = np.ones(distance_matrix.shape) / len(distance_matrix)
        self.all_inds = range(len(distance_matrix))
        self.n_ants = n_ants
        self.n_best = n_best
        self.n_iterations = n_iterations
        self.decay = decay
        self.alpha = alpha
        self.beta = beta

    def run(self):
        shortest_path = None
        all_time_shortest_path = ("placeholder", np.inf)
        for i in range(self.n_iterations):
            all_paths = self.gen_all_paths()
            self.spread_pheromone(all_paths, self.n_best)
            shortest_path = min(all_paths, key=lambda x: x[1])
            if shortest_path[1] < all_time_shortest_path[1]:
                all_time_shortest_path = shortest_path
            self.pheromone *= self.decay
        return all_time_shortest_path

    def spread_pheromone(self, all_paths, n_best):
        sorted_paths = sorted(all_paths, key=lambda x: x[1])
        for path, dist in sorted_paths[:n_best]:
            for i in range(len(path) - 1):
                move = (path[i], path[i + 1])
                self.pheromone[move] += 1.0 / self.distance_matrix[move]

    def gen_path_dist(self, path):
        total_dist = 0
        for i in range(len(path) - 1):
            total_dist += self.distance_matrix[path[i], path[i+1]]
        total_dist += self.distance_matrix[path[-1], path[0]]
        return total_dist

    def gen_all_paths(self):
        all_paths = []
        for i in range(self.n_ants):
            path = self.gen_path(0)
            all_paths.append((path, self.gen_path_dist(path)))
        return all_paths

    def gen_path(self, start):
        path = [start]
        visited = set(path)
        prev = start
        for i in range(len(self.distance_matrix) - 1):
            move = self.pick_move(self.pheromone[prev], self.distance_matrix[prev], visited)
            path.append(move)
            visited.add(move)
            prev = move
        path.append(start)
        return path

    def pick_move(self, pheromone, dist, visited):
        pheromone = np.copy(pheromone)
        pheromone[list(visited)] = 0

        with np.errstate(divide='ignore', invalid='ignore'):
            heuristic = np.where(dist > 0, 1.0 / dist, 0)
            row = pheromone * self.alpha * heuristic * self.beta

        total = np.sum(row)
        if total == 0 or not np.isfinite(total):
            norm_row = np.ones(len(row)) / len(row)
        else:
            norm_row = row / total
        
        move = random.choices(self.all_inds, weights=norm_row, k=1)[0]
        return move

# --- Neural Network for Error Prediction ---
def build_model(input_shape):
    model = tf.keras.Sequential([
        layers.Dense(512, activation='relu', input_shape=(input_shape,),
                     kernel_regularizer=regularizers.l2(0.001)),  # L2 Regularization
        layers.BatchNormalization(),
        layers.Dropout(0.3),  # Reduced dropout to retain more information
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.001)),  # Added extra layer
        layers.BatchNormalization(),
        layers.Dropout(0.2),
        layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.2),
        layers.Dense(128, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
        layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),  # Adjusted learning rate
                  loss='binary_crossentropy', 
                  metrics=['accuracy'])
    return model

# --- XGBoost Model for Error Prediction ---
def train_xgboost(x_train, y_train):
    # Split the data into training and validation sets
    x_train_part, x_val_part, y_train_part, y_val_part = train_test_split(x_train, y_train, test_size=0.2, random_state=42)

    dtrain = xgb.DMatrix(x_train_part, label=y_train_part)
    dval = xgb.DMatrix(x_val_part, label=y_val_part)

    params = {
        'max_depth': 4,           # Adjust depth
        'eta': 0.05,              # Learning rate
        'objective': 'binary:logistic',
        'eval_metric': 'auc',
        'subsample': 0.8,         # Subsample to reduce overfitting
        'colsample_bytree': 0.8,  # Feature sampling
        'verbosity': 1
    }

    # Add validation data to the evals parameter
    evals = [(dtrain, 'train'), (dval, 'validation')]

    model = xgb.train(params, dtrain, num_boost_round=500, evals=evals, early_stopping_rounds=20)
    return model

# --- Noise Reduction ---
def apply_noise_reduction(signal, noise_level):
    return signal / (1 + noise_level)

# --- Function to Plot ROC Curve for Final XGBoost Model ---
def plot_roc_curve_final(y_true, y_pred):
    # ROC Curve
    fpr, tpr, _ = roc_curve(y_true, y_pred)
    roc_auc = auc(fpr, tpr)
    plt.figure(figsize=(10, 6))
    plt.plot(fpr, tpr, label=f'ROC curve (area = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve - Final XGBoost Model')
    plt.legend(loc='lower right')
    plt.grid(True)
    plt.show()

# --- Data Augmentation ---
def augment_data(x_train, y_train):
    noise_factor = 0.3
    noisy_data = x_train + noise_factor * np.random.normal(loc=0.0, scale=1.0, size=x_train.shape)
    noisy_data = np.clip(noisy_data, 0., 1.)
    return np.concatenate((x_train, noisy_data)), np.concatenate((y_train, y_train))

# --- Data Preprocessing and SMOTE ---
def preprocess_data(x_train, y_train):
    smote = SMOTE()
    x_train_smote, y_train_smote = smote.fit_resample(x_train, y_train)
    scaler = StandardScaler()
    x_train_scaled = scaler.fit_transform(x_train_smote)
    return x_train_scaled, y_train_smote

# --- MARTAHO Simulation ---
def martaho_simulation(distance_matrix, x_train, y_train, noise_level, interference_level, traffic_load):
    # Solve initial TSP
    tsp_route, _ = solve_tsp(distance_matrix)
    
    # Run ACO starting from TSP solution
    ant_colony = AntColony(distance_matrix, n_ants=int(5*traffic_load), n_best=2, n_iterations=100, decay=0.95)
    refined_route, _ = ant_colony.run()

    # Data Preprocessing and Augmentation
    scaler = StandardScaler()
    x_train_scaled = scaler.fit_transform(x_train)
    x_train_augmented, y_train_augmented = augment_data(x_train_scaled, y_train)

    # Train Neural Network for Error Prediction
    model = build_model(x_train.shape[1])
    
    # Learning rate scheduler
    def scheduler(epoch, lr):
        if epoch < 10:
            return lr
        else:
            return float(lr * tf.math.exp(-0.1).numpy())

    lr_callback = tf.keras.callbacks.LearningRateScheduler(scheduler)
    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

    history = model.fit(x_train_augmented, y_train_augmented, epochs=100, batch_size=64, 
                        verbose=1, validation_split=0.2, callbacks=[lr_callback, early_stopping])

    # Train XGBoost Model
    x_train_processed, y_train_processed = preprocess_data(x_train, y_train)
    xgb_model = train_xgboost(x_train_processed, y_train_processed)

    # Extract training loss and accuracy
    loss = history.history['loss']
    accuracy = history.history['accuracy']
    
    # Predict probabilities for ROC Curve for XGBoost
    dtrain = xgb.DMatrix(x_train_processed)
    y_pred_xgb = xgb_model.predict(dtrain)

    # Simulate signal transmission with noise reduction
    signal = np.random.rand(100)
    signal_with_interference = signal * (1 - interference_level)
    cleaned_signal = apply_noise_reduction(signal_with_interference, noise_level)

    # Output final route, processed signal, and training details
    return refined_route, cleaned_signal, loss, accuracy, y_pred_xgb, y_train_processed

# --- Simulation Setup ---
noise_levels = [0.1, 0.3, 0.5, 0.7, 0.9]
interference_levels = [0.1, 0.3, 0.5, 0.7, 0.9]
traffic_loads = [0.5, 1.0, 1.5, 2.0]

# Example data for neural network training (randomly generated for demonstration purposes)
x_train = np.random.rand(1000, 10)
y_train = np.random.randint(0, 2, 1000)

# Distance matrix for TSP (example data)
distance_matrix = np.array([
    [0, 2, 9, 10],
    [1, 0, 6, 4],
    [15, 7, 0, 8],
    [6, 3, 12, 0]
])

# Run simulations and collect results
results = []

for noise_level in noise_levels:
    for interference_level in interference_levels:
        for traffic_load in traffic_loads:
            print(f"Testing with Noise Level: {noise_level}, Interference Level: {interference_level}, Traffic Load: {traffic_load}")
            refined_route, cleaned_signal, loss, accuracy, y_pred_xgb, y_train_processed = martaho_simulation(distance_matrix, x_train, y_train, noise_level, interference_level, traffic_load)
            results.append({
                'noise_level': noise_level,
                'interference_level': interference_level,
                'traffic_load': traffic_load,
                'refined_route': refined_route,
                'cleaned_signal_mean': np.mean(cleaned_signal),
                'loss': loss,
                'accuracy': accuracy,
                'y_pred_xgb': y_pred_xgb,
                'y_train_processed': y_train_processed
            })

# Plot ROC Curve for the final XGBoost Model from the last run
print("Final XGBoost Model ROC Curve:")
plot_roc_curve_final(results[-1]['y_train_processed'], results[-1]['y_pred_xgb'])


In [None]:
import numpy as np
import random
import tensorflow as tf
from tensorflow.keras import layers, regularizers
import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_curve, roc_curve, auc
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import xgboost as xgb
from imblearn.over_sampling import SMOTE
from scipy.optimize import linear_sum_assignment

# --- TSP Component ---
def solve_tsp(distance_matrix):
    row_ind, col_ind = linear_sum_assignment(distance_matrix)
    return col_ind, distance_matrix[row_ind, col_ind].sum()

# --- ACO Component ---
class AntColony:
    def __init__(self, distance_matrix, n_ants, n_best, n_iterations, decay, alpha=1, beta=2):
        self.distance_matrix = distance_matrix
        self.pheromone = np.ones(distance_matrix.shape) / len(distance_matrix)
        self.all_inds = range(len(distance_matrix))
        self.n_ants = n_ants
        self.n_best = n_best
        self.n_iterations = n_iterations
        self.decay = decay
        self.alpha = alpha
        self.beta = beta

    def run(self):
        shortest_path = None
        all_time_shortest_path = ("placeholder", np.inf)
        for i in range(self.n_iterations):
            all_paths = self.gen_all_paths()
            self.spread_pheromone(all_paths, self.n_best)
            shortest_path = min(all_paths, key=lambda x: x[1])
            if shortest_path[1] < all_time_shortest_path[1]:
                all_time_shortest_path = shortest_path
            self.pheromone *= self.decay
        return all_time_shortest_path

    def spread_pheromone(self, all_paths, n_best):
        sorted_paths = sorted(all_paths, key=lambda x: x[1])
        for path, dist in sorted_paths[:n_best]:
            for i in range(len(path) - 1):
                move = (path[i], path[i + 1])
                self.pheromone[move] += 1.0 / self.distance_matrix[move]

    def gen_path_dist(self, path):
        total_dist = 0
        for i in range(len(path) - 1):
            total_dist += self.distance_matrix[path[i], path[i+1]]
        total_dist += self.distance_matrix[path[-1], path[0]]
        return total_dist

    def gen_all_paths(self):
        all_paths = []
        for i in range(self.n_ants):
            path = self.gen_path(0)
            all_paths.append((path, self.gen_path_dist(path)))
        return all_paths

    def gen_path(self, start):
        path = [start]
        visited = set(path)
        prev = start
        for i in range(len(self.distance_matrix) - 1):
            move = self.pick_move(self.pheromone[prev], self.distance_matrix[prev], visited)
            path.append(move)
            visited.add(move)
            prev = move
        path.append(start)
        return path

    def pick_move(self, pheromone, dist, visited):
        pheromone = np.copy(pheromone)
        pheromone[list(visited)] = 0

        with np.errstate(divide='ignore', invalid='ignore'):
            heuristic = np.where(dist > 0, 1.0 / dist, 0)
            row = pheromone * self.alpha * heuristic * self.beta

        total = np.sum(row)
        if total == 0 or not np.isfinite(total):
            norm_row = np.ones(len(row)) / len(row)
        else:
            norm_row = row / total

        move = random.choices(self.all_inds, weights=norm_row, k=1)[0]
        return move

# --- Neural Network for Error Prediction ---
def build_model(input_shape):
    model = tf.keras.Sequential([
        layers.Dense(512, activation='relu', input_shape=(input_shape,),
                     kernel_regularizer=regularizers.l2(0.001)),  # L2 Regularization
        layers.BatchNormalization(),
        layers.Dropout(0.3),  # Reduced dropout to retain more information
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.001)),  # Added extra layer
        layers.BatchNormalization(),
        layers.Dropout(0.2),
        layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
        layers.BatchNormalization(),
        layers.Dropout(0.2),
        layers.Dense(128, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
        layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),  # Adjusted learning rate
                  loss='binary_crossentropy', 
                  metrics=['accuracy'])
    return model

# --- XGBoost Model for Error Prediction ---
def train_xgboost(x_train, y_train):
    # Split the data into training and validation sets
    x_train_part, x_val_part, y_train_part, y_val_part = train_test_split(x_train, y_train, test_size=0.2, random_state=42)

    dtrain = xgb.DMatrix(x_train_part, label=y_train_part)
    dval = xgb.DMatrix(x_val_part, label=y_val_part)

    params = {
        'max_depth': 4,           # Adjust depth
        'eta': 0.05,              # Learning rate
        'objective': 'binary:logistic',
        'eval_metric': 'auc',
        'subsample': 0.8,         # Subsample to reduce overfitting
        'colsample_bytree': 0.8,  # Feature sampling
        'verbosity': 1
    }

    # Add validation data to the evals parameter
    evals = [(dtrain, 'train'), (dval, 'validation')]

    model = xgb.train(params, dtrain, num_boost_round=500, evals=evals, early_stopping_rounds=20)
    return model

# --- Plot ROC and Precision-Recall Curves ---
def plot_precision_recall_roc(y_true, y_pred):
    # Precision-Recall Curve
    precision, recall, _ = precision_recall_curve(y_true, y_pred)
    plt.figure(figsize=(10, 6))
    plt.plot(recall, precision, label='Precision-Recall curve')
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve')
    plt.grid(True)
    plt.show()

    # ROC Curve
    fpr, tpr, _ = roc_curve(y_true, y_pred)
    roc_auc = auc(fpr, tpr)
    plt.figure(figsize=(10, 6))
    plt.plot(fpr, tpr, label=f'ROC curve (area = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend(loc='lower right')
    plt.grid(True)
    plt.show()
    return roc_auc

# --- Noise Reduction ---
def apply_noise_reduction(signal, noise_level):
    return signal / (1 + noise_level)

# --- Data Augmentation ---
def augment_data(x_train, y_train):
    noise_factor = 0.3
    noisy_data = x_train + noise_factor * np.random.normal(loc=0.0, scale=1.0, size=x_train.shape)
    noisy_data = np.clip(noisy_data, 0., 1.)
    return np.concatenate((x_train, noisy_data)), np.concatenate((y_train, y_train))

# --- Data Preprocessing and SMOTE ---
def preprocess_data(x_train, y_train):
    smote = SMOTE()
    x_train_smote, y_train_smote = smote.fit_resample(x_train, y_train)
    scaler = StandardScaler()
    x_train_scaled = scaler.fit_transform(x_train_smote)
    return x_train_scaled, y_train_smote

# --- MARTAHO Simulation ---
def martaho_simulation(distance_matrix, x_train, y_train, noise_level, interference_level, traffic_load):
    # Solve initial TSP
    tsp_route, _ = solve_tsp(distance_matrix)
    
    # Run ACO starting from TSP solution
    ant_colony = AntColony(distance_matrix, n_ants=int(5*traffic_load), n_best=2, n_iterations=100, decay=0.95)
    refined_route, _ = ant_colony.run()

    # Data Preprocessing and Augmentation
    scaler = StandardScaler()
    x_train_scaled = scaler.fit_transform(x_train)
    x_train_augmented, y_train_augmented = augment_data(x_train_scaled, y_train)

    # Train Neural Network for Error Prediction
    model = build_model(x_train.shape[1])
    
    # Learning rate scheduler
    lr_callback = tf.keras.callbacks.LearningRateScheduler(lambda epoch, lr: lr if epoch < 10 else float(lr * tf.math.exp(-0.1).numpy()))
    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

    history = model.fit(x_train_augmented, y_train_augmented, epochs=100, batch_size=64, 
                        verbose=1, validation_split=0.2, callbacks=[lr_callback, early_stopping])

    # Train XGBoost Model
    x_train_processed, y_train_processed = preprocess_data(x_train, y_train)
    xgb_model = train_xgboost(x_train_processed, y_train_processed)

    # Extract training loss and accuracy
    loss = history.history['loss']
    accuracy = history.history['accuracy']
    
    # Predict probabilities for ROC Curve for XGBoost
    dtrain = xgb.DMatrix(x_train_processed)
    y_pred_xgb = xgb_model.predict(dtrain)

    # Simulate signal transmission with noise reduction
    signal = np.random.rand(100)
    signal_with_interference = signal * (1 - interference_level)
    cleaned_signal = apply_noise_reduction(signal_with_interference, noise_level)

    # Output final route, processed signal, and training details
    return refined_route, cleaned_signal, loss, accuracy, y_pred_xgb, y_train_processed, history

# --- Simulation Setup ---
noise_levels = [0.1, 0.3, 0.5, 0.7, 0.9]
interference_levels = [0.1, 0.3, 0.5, 0.7, 0.9]
traffic_loads = [0.5, 1.0, 1.5, 2.0]

# Example data for neural network training (randomly generated for demonstration purposes)
x_train = np.random.rand(1000, 10)
y_train = np.random.randint(0, 2, 1000)

# Distance matrix for TSP (example data)
distance_matrix = np.array([
    [0, 2, 9, 10],
    [1, 0, 6, 4],
    [15, 7, 0, 8],
    [6, 3, 12, 0]
])

# Run simulations and collect results
results = []

for noise_level in noise_levels:
    for interference_level in interference_levels:
        for traffic_load in traffic_loads:
            print(f"Testing with Noise Level: {noise_level}, Interference Level: {interference_level}, Traffic Load: {traffic_load}")
            refined_route, cleaned_signal, loss, accuracy, y_pred_xgb, y_train_processed, history = martaho_simulation(distance_matrix, x_train, y_train, noise_level, interference_level, traffic_load)
            results.append({
                'noise_level': noise_level,
                'interference_level': interference_level,
                'traffic_load': traffic_load,
                'refined_route': refined_route,
                'cleaned_signal_mean': np.mean(cleaned_signal),
                'loss': loss,
                'accuracy': accuracy,
                'y_pred_xgb': y_pred_xgb,
                'y_train_processed': y_train_processed,
                'history': history
            })

# Visualize results for MARTAHO simulation
plt.figure(figsize=(10, 6))
plt.plot(noise_levels, [result['cleaned_signal_mean'] for result in results[:len(noise_levels)]], 'o-', color='blue')
plt.xlabel('Noise Level')
plt.ylabel('Cleaned Signal Mean')
plt.title('Cleaned Signal Mean vs Noise Level')
plt.grid(True)
plt.show()

# Visualize loss and accuracy for last simulation
history = results[-1]['history']
plt.figure(figsize=(10, 6))
plt.plot(history.history['loss'], label="Training Loss")
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss over Epochs')
plt.legend()
plt.grid(True)
plt.show()

plt.figure(figsize=(10, 6))
plt.plot(history.history['accuracy'], label="Training Accuracy")
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training Accuracy over Epochs')
plt.legend()
plt.grid(True)
plt.show()

# Plot ROC Curve for the final XGBoost Model from the last run
print("Final XGBoost Model ROC Curve:")
roc_auc = plot_precision_recall_roc(results[-1]['y_train_processed'], results[-1]['y_pred_xgb'])
print(f'Final ROC AUC: {roc_auc}')
