In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import json
import os
import datetime
import tensorflow as tf
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, accuracy_score, roc_curve, auc
from sklearn.model_selection import KFold

In [None]:
# Constants
SEQUENCE_LENGTH = 25000
CLASS_NAMES = ["Bike Throttle", "Jackhammer", "Jumping", "Walking"]
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
RESULTS_DIR = f"/content/drive/MyDrive/das_testing_results_{timestamp}"

In [None]:
# Create results directory if it doesn't exist
os.makedirs(RESULTS_DIR, exist_ok=True)

In [None]:
def load_data_and_model():
    """Load and prepare data and model"""
    # Paths to your data files - update these paths
    x_train_path = '/content/drive/MyDrive/X_train.npy'
    y_train_path = '/content/drive/MyDrive/y_train.npy'
    x_test_path = '/content/drive/MyDrive/X_test.npy'
    y_test_path = '/content/drive/MyDrive/y_test.npy'
    model_path = '/content/drive/MyDrive/das_model_output_20250304_032635/model_3/model_3.keras'

    # Load and downsample data
    X_train, y_train, X_test, y_test, new_seq_length = load_data(
        x_train_path, y_train_path, x_test_path, y_test_path, downsample_factor=5
    )

    # Print original shapes for verification
    print("Loading data and model...")
    print(f"Original X_train shape: (64000, 25000)")
    print(f"Original y_train shape: {y_train.shape}")
    print(f"Original X_test shape: (16000, 25000)")
    print(f"Original y_test shape: {y_test.shape}")

    # Load model with custom object scope to handle any custom layers
    custom_objects = {}  # Add any custom layers/metrics here if needed

    with tf.keras.utils.custom_object_scope(custom_objects):
        try:
            model = tf.keras.models.load_model(model_path)

            # Initialize the model with a small batch to fix shape issues
            _ = model(X_test[:2], training=False)
        except Exception as e:
            print(f"Standard model loading failed: {str(e)}")
            print("Trying alternative approach...")

            # Create model architecture with explicit input shape
            # Replace this with your actual model architecture
            inputs = tf.keras.Input(shape=(new_seq_length, 1))
            x = tf.keras.layers.Conv1D(32, 3, activation='relu')(inputs)
            x = tf.keras.layers.MaxPooling1D(2)(x)
            x = tf.keras.layers.Flatten()(x)
            outputs = tf.keras.layers.Dense(len(CLASS_NAMES), activation='softmax')(x)
            model = tf.keras.Model(inputs=inputs, outputs=outputs)

            # Load weights
            model.load_weights(model_path, by_name=True)

            # Initialize the model with a small batch
            _ = model(X_test[:2], training=False)

    return X_train, y_train, X_test, y_test, model

In [None]:
def load_data(x_train_path, y_train_path, x_test_path, y_test_path, downsample_factor=5):
    """Load data with memory mapping and downsampling"""
    print("Loading training data...")
    X_train = np.load(x_train_path, mmap_mode='r')
    y_train = np.load(y_train_path)

    print("Loading test data...")
    X_test = np.load(x_test_path, mmap_mode='r')
    y_test = np.load(y_test_path)

    # Calculate new sequence length
    new_seq_length = SEQUENCE_LENGTH // downsample_factor

    # Process in batches for memory efficiency
    print("Downsampling data...")
    X_train_shape = (X_train.shape[0], new_seq_length, 1)
    X_test_shape = (X_test.shape[0], new_seq_length, 1)

    X_train_downsampled = np.zeros(X_train_shape, dtype=np.float32)
    X_test_downsampled = np.zeros(X_test_shape, dtype=np.float32)

    batch_size = 100  # Process in small batches to save memory

    # Downsample training data in batches
    for i in range(0, X_train.shape[0], batch_size):
        end_idx = min(i + batch_size, X_train.shape[0])
        batch = X_train[i:end_idx].reshape(-1, SEQUENCE_LENGTH, 1)
        for j in range(end_idx - i):
            X_train_downsampled[i+j, :, 0] = batch[j, ::downsample_factor, 0][:new_seq_length]

    # Downsample test data in batches
    for i in range(0, X_test.shape[0], batch_size):
        end_idx = min(i + batch_size, X_test.shape[0])
        batch = X_test[i:end_idx].reshape(-1, SEQUENCE_LENGTH, 1)
        for j in range(end_idx - i):
            X_test_downsampled[i+j, :, 0] = batch[j, ::downsample_factor, 0][:new_seq_length]

    print(f"Downsampled X_train shape: {X_train_downsampled.shape}")
    print(f"y_train shape: {y_train.shape}")
    print(f"Downsampled X_test shape: {X_test_downsampled.shape}")
    print(f"y_test shape: {y_test.shape}")

    return X_train_downsampled, y_train, X_test_downsampled, y_test, new_seq_length

In [None]:
def check_dataset_integrity(X_train, y_train, X_test, y_test):
    """Check dataset integrity for various issues"""
    print("\n========== DATASET INTEGRITY CHECK ==========")

    # Check for NaN and infinity values
    nan_train_X = np.isnan(X_train).sum()
    nan_train_y = np.isnan(y_train).sum()
    nan_test_X = np.isnan(X_test).sum()
    nan_test_y = np.isnan(y_test).sum()

    inf_train_X = np.isinf(X_train).sum()
    inf_test_X = np.isinf(X_test).sum()

    print(f"NaN values in X_train: {nan_train_X}")
    print(f"NaN values in y_train: {nan_train_y}")
    print(f"NaN values in X_test: {nan_test_X}")
    print(f"NaN values in y_test: {nan_test_y}")
    print(f"Infinity values in X_train: {inf_train_X}")
    print(f"Infinity values in X_test: {inf_test_X}")

    # Check for duplicates (sample a subset for efficiency)
    sample_size = min(10000, X_train.shape[0])
    sample_indices = np.random.choice(X_train.shape[0], sample_size, replace=False)
    X_sample = X_train[sample_indices].reshape(sample_size, -1)

    # Check for duplicate rows
    unique_rows = np.unique(X_sample, axis=0)
    if len(unique_rows) < sample_size:
        duplicates = sample_size - len(unique_rows)
        print(f"Found {duplicates} duplicate rows in the sampled subset of training data")
    else:
        print("No duplicate rows found in the sampled subset of training data")

    # Check data range
    x_train_min = X_train.min()
    x_train_max = X_train.max()
    x_test_min = X_test.min()
    x_test_max = X_test.max()

    print(f"X_train min: {x_train_min}, max: {x_train_max}")
    print(f"X_test min: {x_test_min}, max: {x_test_max}")

    # Save results
    integrity_report = {
        "nan_values": {
            "X_train": int(nan_train_X),
            "y_train": int(nan_train_y),
            "X_test": int(nan_test_X),
            "y_test": int(nan_test_y)
        },
        "infinity_values": {
            "X_train": int(inf_train_X),
            "X_test": int(inf_test_X)
        },
        "duplicates_check": {
            "sample_size": sample_size,
            "unique_rows": len(unique_rows),
            "has_duplicates": len(unique_rows) < sample_size
        },
        "data_range": {
            "X_train_min": float(x_train_min),
            "X_train_max": float(x_train_max),
            "X_test_min": float(x_test_min),
            "X_test_max": float(x_test_max)
        }
    }

    return integrity_report

In [None]:
def analyze_class_distribution(y_train, y_test):
    """Analyze class distribution and imbalance"""
    print("\n========== CLASS DISTRIBUTION ANALYSIS ==========")

    # Convert one-hot encoded labels to class indices
    y_train_classes = np.argmax(y_train, axis=1)
    y_test_classes = np.argmax(y_test, axis=1)

    # Count occurrences of each class
    train_counts = np.bincount(y_train_classes)
    test_counts = np.bincount(y_test_classes)

    # Calculate total samples
    train_total = len(y_train_classes)
    test_total = len(y_test_classes)

    # Calculate percentages
    train_percentages = (train_counts / train_total) * 100
    test_percentages = (test_counts / test_total) * 100

    # Print class distribution
    print("Training set class distribution:")
    for i, class_name in enumerate(CLASS_NAMES):
        print(f"{class_name}: {train_counts[i]} samples ({train_percentages[i]:.2f}%)")

    print("\nTest set class distribution:")
    for i, class_name in enumerate(CLASS_NAMES):
        print(f"{class_name}: {test_counts[i]} samples ({test_percentages[i]:.2f}%)")

    # Calculate imbalance (difference between most and least represented classes)
    train_imbalance = np.max(train_percentages) - np.min(train_percentages)
    test_imbalance = np.max(test_percentages) - np.min(test_percentages)

    print(f"\nTraining set imbalance: {train_imbalance:.2f}%")
    print(f"Test set imbalance: {test_imbalance:.2f}%")

    # Plot class distribution
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

    ax1.bar(CLASS_NAMES, train_counts)
    ax1.set_title('Training Set Class Distribution')
    ax1.set_ylabel('Count')
    ax1.tick_params(axis='x', rotation=45)

    ax2.bar(CLASS_NAMES, test_counts)
    ax2.set_title('Test Set Class Distribution')
    ax2.set_ylabel('Count')
    ax2.tick_params(axis='x', rotation=45)

    plt.tight_layout()
    plt.savefig(f"{RESULTS_DIR}/class_distribution.png")
    plt.close()

    # Save results
    distribution_report = {
        "training_set": {
            "counts": train_counts.tolist(),
            "percentages": train_percentages.tolist(),
            "imbalance": float(train_imbalance)
        },
        "test_set": {
            "counts": test_counts.tolist(),
            "percentages": test_percentages.tolist(),
            "imbalance": float(test_imbalance)
        },
        "class_names": CLASS_NAMES
    }

    return distribution_report

In [None]:
def simulate_unknown_class(X_test, model):
    """Simulate unknown class detection with confidence thresholds"""
    print("\n========== UNKNOWN CLASS SIMULATION ==========")

    # Generate random signals to simulate unknown class
    np.random.seed(42)
    n_samples = 200
    signal_length = X_test.shape[1]
    channels = X_test.shape[2]

    # Create random signals with same statistics as original data
    mean = np.mean(X_test)
    std = np.std(X_test)
    unknown_signals = np.random.normal(mean, std, (n_samples, signal_length, channels))

    # Predict on unknown signals
    y_pred_prob = model.predict(unknown_signals)

    # Use max probability as confidence score
    confidence_scores = np.max(y_pred_prob, axis=1)

    # Apply confidence threshold (0.7 is example threshold)
    threshold = 0.7
    low_confidence = confidence_scores < threshold
    num_unknown_detected = np.sum(low_confidence)

    print(f"Generated {n_samples} unknown signals")
    print(f"Signals classified as 'unknown' (confidence < {threshold}): {num_unknown_detected} ({num_unknown_detected/n_samples*100:.2f}%)")

    # Plot confidence distribution
    plt.figure(figsize=(10, 6))
    plt.hist(confidence_scores, bins=20)
    plt.axvline(x=threshold, color='r', linestyle='--', label=f'Threshold ({threshold})')
    plt.xlabel('Confidence Score')
    plt.ylabel('Count')
    plt.title('Confidence Distribution for Unknown Signals')
    plt.legend()
    plt.savefig(f"{RESULTS_DIR}/unknown_class_confidence.png")
    plt.close()

    # Save results
    unknown_report = {
        "n_samples": n_samples,
        "threshold": threshold,
        "unknown_detected": int(num_unknown_detected),
        "detection_rate": float(num_unknown_detected/n_samples),
        "confidence_scores": {
            "min": float(np.min(confidence_scores)),
            "max": float(np.max(confidence_scores)),
            "mean": float(np.mean(confidence_scores)),
            "std": float(np.std(confidence_scores)),
            "quantiles": [float(np.quantile(confidence_scores, q)) for q in [0.25, 0.5, 0.75]]
        }
    }

    return unknown_report

In [None]:
def analyze_confusion_matrix(X_test, y_test, model):
    """Analyze confusion matrix for the model predictions"""
    print("\n========== CONFUSION MATRIX ANALYSIS ==========")

    # Get predictions using model.predict()
    try:
        # Predict on test data
        y_pred_prob = model.predict(X_test)
        y_pred = np.argmax(y_pred_prob, axis=1)
        y_true = np.argmax(y_test, axis=1)

    except:
        print("Error using model.predict(). Trying alternative approach...")
        # Try using model.__call__ with TensorFlow tensors
        y_pred_prob = []
        batch_size = 64  # Smaller batch size to avoid memory issues

        for i in range(0, len(X_test), batch_size):
            batch = X_test[i:i+batch_size]
            batch_tensor = tf.convert_to_tensor(batch)
            pred = model(batch_tensor, training=False)
            y_pred_prob.append(pred.numpy())

        y_pred_prob = np.vstack(y_pred_prob)
        y_pred = np.argmax(y_pred_prob, axis=1)
        y_true = np.argmax(y_test, axis=1)

    # Calculate confusion matrix
    cm = confusion_matrix(y_true, y_pred)

    # Print confusion matrix
    print("Confusion Matrix:")
    print(cm)

    # Calculate metrics
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average=None)
    accuracy = accuracy_score(y_true, y_pred)

    print(f"\nOverall Accuracy: {accuracy:.4f}")

    for i, class_name in enumerate(CLASS_NAMES):
        print(f"\nClass: {class_name}")
        print(f"Precision: {precision[i]:.4f}")
        print(f"Recall: {recall[i]:.4f}")
        print(f"F1 Score: {f1[i]:.4f}")

    # Plot confusion matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=CLASS_NAMES, yticklabels=CLASS_NAMES)
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.tight_layout()
    plt.savefig(f"{RESULTS_DIR}/confusion_matrix.png")
    plt.close()

    # Save results
    cm_results = {
        "confusion_matrix": cm.tolist(),
        "accuracy": float(accuracy),
        "precision": precision.tolist(),
        "recall": recall.tolist(),
        "f1_score": f1.tolist(),
        "class_names": CLASS_NAMES
    }

    with open(f"{RESULTS_DIR}/confusion_matrix_results.json", 'w') as f:
        json.dump(cm_results, f, indent=4)

    return cm_results

In [None]:
def evaluate_metrics(X_test, y_test, model):
    """Evaluate various metrics including ROC curves"""
    print("\n========== METRICS EVALUATION ==========")

    # Get predictions
    y_pred_prob = model.predict(X_test)
    y_pred = np.argmax(y_pred_prob, axis=1)
    y_true = np.argmax(y_test, axis=1)

    # Calculate ROC curves and AUC for each class
    fpr = {}
    tpr = {}
    roc_auc = {}

    for i, class_name in enumerate(CLASS_NAMES):
        fpr[i], tpr[i], _ = roc_curve(y_test[:, i], y_pred_prob[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])
        print(f"ROC AUC for {class_name}: {roc_auc[i]:.4f}")

    # Calculate macro-averaged ROC curve and ROC area
    all_fpr = np.unique(np.concatenate([fpr[i] for i in range(len(CLASS_NAMES))]))
    mean_tpr = np.zeros_like(all_fpr)

    for i in range(len(CLASS_NAMES)):
        mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])

    mean_tpr /= len(CLASS_NAMES)
    macro_roc_auc = auc(all_fpr, mean_tpr)
    print(f"Macro-averaged ROC AUC: {macro_roc_auc:.4f}")

    # Plot ROC curves
    plt.figure(figsize=(10, 8))

    plt.plot(all_fpr, mean_tpr,
             label=f'Macro-average ROC (AUC = {macro_roc_auc:.4f})',
             color='navy', linestyle=':', linewidth=4)

    for i, class_name in enumerate(CLASS_NAMES):
        plt.plot(fpr[i], tpr[i],
                 label=f'ROC for {class_name} (AUC = {roc_auc[i]:.4f})')

    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curves')
    plt.legend(loc="lower right")
    plt.savefig(f"{RESULTS_DIR}/roc_curves.png")
    plt.close()

    # Save results
    metrics_results = {
        "roc_auc_per_class": {class_name: float(roc_auc[i]) for i, class_name in enumerate(CLASS_NAMES)},
        "macro_roc_auc": float(macro_roc_auc)
    }

    return metrics_results

In [None]:
def perform_cross_validation(X_train, y_train, model):
    """Perform k-fold cross validation"""
    print("\n========== CROSS VALIDATION ==========")

    # Use 5-fold cross validation
    k_folds = 5
    kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)

    # We'll use a smaller subset for cross-validation to save time
    sample_size = min(10000, X_train.shape[0])
    indices = np.random.choice(X_train.shape[0], sample_size, replace=False)
    X_sample = X_train[indices]
    y_sample = y_train[indices]

    # Store metrics for each fold
    accuracy_scores = []
    precision_scores = []
    recall_scores = []
    f1_scores = []

    print(f"Performing {k_folds}-fold cross-validation on {sample_size} samples...")

    for fold, (train_idx, val_idx) in enumerate(kf.split(X_sample)):
        print(f"Fold {fold+1}/{k_folds}")

        # Split data
        X_fold_train, X_fold_val = X_sample[train_idx], X_sample[val_idx]
        y_fold_train, y_fold_val = y_sample[train_idx], y_sample[val_idx]

        # Skip the model retraining part for time efficiency
        # Just evaluate on the validation fold using the original model
        y_fold_pred = np.argmax(model.predict(X_fold_val), axis=1)
        y_fold_true = np.argmax(y_fold_val, axis=1)

        # Calculate metrics
        accuracy = accuracy_score(y_fold_true, y_fold_pred)
        precision, recall, f1, _ = precision_recall_fscore_support(
            y_fold_true, y_fold_pred, average='macro'
        )

        # Store results
        accuracy_scores.append(accuracy)
        precision_scores.append(precision)
        recall_scores.append(recall)
        f1_scores.append(f1)

        print(f"  Accuracy: {accuracy:.4f}")

    # Calculate statistics
    print("\nCross-validation results:")
    print(f"Mean Accuracy: {np.mean(accuracy_scores):.4f} ± {np.std(accuracy_scores):.4f}")
    print(f"Mean Precision: {np.mean(precision_scores):.4f} ± {np.std(precision_scores):.4f}")
    print(f"Mean Recall: {np.mean(recall_scores):.4f} ± {np.std(recall_scores):.4f}")
    print(f"Mean F1 Score: {np.mean(f1_scores):.4f} ± {np.std(f1_scores):.4f}")

    # Save results
    cv_results = {
        "k_folds": k_folds,
        "sample_size": sample_size,
        "accuracy": {
            "values": [float(x) for x in accuracy_scores],
            "mean": float(np.mean(accuracy_scores)),
            "std": float(np.std(accuracy_scores))
        },
        "precision": {
            "values": [float(x) for x in precision_scores],
            "mean": float(np.mean(precision_scores)),
            "std": float(np.std(precision_scores))
        },
        "recall": {
            "values": [float(x) for x in recall_scores],
            "mean": float(np.mean(recall_scores)),
            "std": float(np.std(recall_scores))
        },
        "f1_score": {
            "values": [float(x) for x in f1_scores],
            "mean": float(np.mean(f1_scores)),
            "std": float(np.std(f1_scores))
        }
    }

    return cv_results

In [None]:
def test_noise_resilience(X_test, y_test, model):
    """Test model resilience to different noise levels"""
    print("\n========== NOISE RESILIENCE TESTING ==========")

    # Define noise levels to test
    noise_levels = [0.01, 0.05, 0.1, 0.2, 0.5]

    # Store results for each noise level
    accuracy_results = []

    # Get baseline accuracy
    y_pred = np.argmax(model.predict(X_test), axis=1)
    y_true = np.argmax(y_test, axis=1)
    baseline_accuracy = accuracy_score(y_true, y_pred)
    print(f"Baseline accuracy (no noise): {baseline_accuracy:.4f}")

    # Test each noise level
    for noise_level in noise_levels:
        print(f"Testing noise level: {noise_level}")

        # Create noisy data
        noise = np.random.normal(0, noise_level, X_test.shape)
        X_test_noisy = X_test + noise

        # Get predictions
        y_pred_noisy = np.argmax(model.predict(X_test_noisy), axis=1)

        # Calculate accuracy
        noisy_accuracy = accuracy_score(y_true, y_pred_noisy)
        print(f"  Accuracy with noise: {noisy_accuracy:.4f}")
        print(f"  Accuracy drop: {baseline_accuracy - noisy_accuracy:.4f}")

        # Store results
        accuracy_results.append({
            "noise_level": noise_level,
            "accuracy": float(noisy_accuracy),
            "accuracy_drop": float(baseline_accuracy - noisy_accuracy)
        })

    # Plot results
    plt.figure(figsize=(10, 6))
    noise_levels_str = [str(nl) for nl in noise_levels]
    accuracies = [res["accuracy"] for res in accuracy_results]

    plt.bar(noise_levels_str, accuracies)
    plt.axhline(y=baseline_accuracy, color='r', linestyle='--', label=f'Baseline ({baseline_accuracy:.4f})')
    plt.xlabel('Noise Level (σ)')
    plt.ylabel('Accuracy')
    plt.title('Model Accuracy Under Different Noise Levels')
    plt.legend()
    plt.savefig(f"{RESULTS_DIR}/noise_resilience.png")
    plt.close()

    # Save results
    noise_results = {
        "baseline_accuracy": float(baseline_accuracy),
        "noise_tests": accuracy_results
    }

    return noise_results

In [None]:
def perform_adversarial_testing(X_test, y_test, model):
    """Perform simple adversarial testing using Fast Gradient Sign Method"""
    print("\n========== ADVERSARIAL TESTING ==========")

    # Define epsilon values to test
    epsilons = [0.01, 0.05, 0.1, 0.2]

    # Get baseline accuracy
    y_pred = np.argmax(model.predict(X_test), axis=1)
    y_true = np.argmax(y_test, axis=1)
    baseline_accuracy = accuracy_score(y_true, y_pred)
    print(f"Baseline accuracy: {baseline_accuracy:.4f}")

    # Store results for each epsilon
    accuracy_results = []

    # Create TensorFlow inputs for FGSM
    X_tensor = tf.convert_to_tensor(X_test[:100])  # Use subset for speed
    y_tensor = tf.convert_to_tensor(y_test[:100])

    # Test each epsilon
    for epsilon in epsilons:
        print(f"Testing epsilon: {epsilon}")

        try:
            # Simple FGSM implementation
            with tf.GradientTape() as tape:
                tape.watch(X_tensor)
                prediction = model(X_tensor)
                loss = tf.keras.losses.categorical_crossentropy(y_tensor, prediction)

            # Get the gradients
            gradient = tape.gradient(loss, X_tensor)

            # Create adversarial examples
            signed_grad = tf.sign(gradient)
            adversarial_data = X_tensor + epsilon * signed_grad
            adversarial_data = tf.clip_by_value(adversarial_data, -1, 1)

            # Get predictions on adversarial examples
            adv_predictions = model.predict(adversarial_data)
            adv_y_pred = np.argmax(adv_predictions, axis=1)
            adv_y_true = np.argmax(y_test[:100], axis=1)

            # Calculate accuracy
            adv_accuracy = accuracy_score(adv_y_true, adv_y_pred)
            print(f"  Adversarial accuracy: {adv_accuracy:.4f}")
            print(f"  Accuracy drop: {baseline_accuracy - adv_accuracy:.4f}")

            # Store results
            accuracy_results.append({
                "epsilon": epsilon,
                "accuracy": float(adv_accuracy),
                "accuracy_drop": float(baseline_accuracy - adv_accuracy)
            })

        except Exception as e:
            print(f"  Error performing FGSM with epsilon={epsilon}: {str(e)}")
            accuracy_results.append({
                "epsilon": epsilon,
                "accuracy": None,
                "accuracy_drop": None,
                "error": str(e)
            })

    # Plot results (if we have any successful tests)
    successful_tests = [r for r in accuracy_results if r["accuracy"] is not None]
    if successful_tests:
        plt.figure(figsize=(10, 6))
        epsilons_str = [str(r["epsilon"]) for r in successful_tests]
        accuracies = [r["accuracy"] for r in successful_tests]

        plt.bar(epsilons_str, accuracies)
        plt.axhline(y=baseline_accuracy, color='r', linestyle='--', label=f'Baseline ({baseline_accuracy:.4f})')
        plt.xlabel('Epsilon')
        plt.ylabel('Accuracy')
        plt.title('Model Accuracy Under Adversarial Attacks (FGSM)')
        plt.legend()
        plt.savefig(f"{RESULTS_DIR}/adversarial_testing.png")
        plt.close()

    # Save results
    adversarial_results = {
        "baseline_accuracy": float(baseline_accuracy),
        "adversarial_tests": accuracy_results
    }

    return adversarial_results

In [None]:
def test_time_series_shift(X_test, y_test, model):
    """Test model performance with shifts in time series data"""
    print("\n========== TIME SERIES SHIFT TESTING ==========")

    # Define shift amounts to test (as percentage of sequence length)
    shift_percentages = [0.05, 0.1, 0.2, 0.5]

    # Get sequence length
    sequence_length = X_test.shape[1]

    # Get baseline accuracy
    y_pred = np.argmax(model.predict(X_test), axis=1)
    y_true = np.argmax(y_test, axis=1)
    baseline_accuracy = accuracy_score(y_true, y_pred)
    print(f"Baseline accuracy (no shift): {baseline_accuracy:.4f}")

    # Store results for each shift
    shift_results = []

    # Test each shift percentage
    for shift_pct in shift_percentages:
        shift_amount = int(sequence_length * shift_pct)
        print(f"Testing shift of {shift_amount} points ({shift_pct*100:.1f}% of sequence)")

        # Create left-shifted data (move signal earlier)
        X_test_left_shifted = np.zeros_like(X_test)
        X_test_left_shifted[:, :-shift_amount, :] = X_test[:, shift_amount:, :]

        # Create right-shifted data (move signal later)
        X_test_right_shifted = np.zeros_like(X_test)
        X_test_right_shifted[:, shift_amount:, :] = X_test[:, :-shift_amount, :]

        # Get predictions for left-shifted data
        left_y_pred = np.argmax(model.predict(X_test_left_shifted), axis=1)
        left_accuracy = accuracy_score(y_true, left_y_pred)
        print(f"  Left-shift accuracy: {left_accuracy:.4f}")
        print(f"  Left-shift accuracy drop: {baseline_accuracy - left_accuracy:.4f}")

        # Get predictions for right-shifted data
        right_y_pred = np.argmax(model.predict(X_test_right_shifted), axis=1)
        right_accuracy = accuracy_score(y_true, right_y_pred)
        print(f"  Right-shift accuracy: {right_accuracy:.4f}")
        print(f"  Right-shift accuracy drop: {baseline_accuracy - right_accuracy:.4f}")

        # Store results
        shift_results.append({
            "shift_percentage": shift_pct,
            "shift_points": shift_amount,
            "left_shift": {
                "accuracy": float(left_accuracy),
                "accuracy_drop": float(baseline_accuracy - left_accuracy)
            },
            "right_shift": {
                "accuracy": float(right_accuracy),
                "accuracy_drop": float(baseline_accuracy - right_accuracy)
            }
        })

    # Plot results
    plt.figure(figsize=(12, 6))

    # Set up axis and labels
    shift_labels = [f"{r['shift_percentage']*100:.1f}%" for r in shift_results]
    left_accuracies = [r["left_shift"]["accuracy"] for r in shift_results]
    right_accuracies = [r["right_shift"]["accuracy"] for r in shift_results]

    x = np.arange(len(shift_labels))
    width = 0.35

    # Create grouped bar chart
    plt.bar(x - width/2, left_accuracies, width, label='Left Shift')
    plt.bar(x + width/2, right_accuracies, width, label='Right Shift')
    plt.axhline(y=baseline_accuracy, color='r', linestyle='--', label=f'Baseline ({baseline_accuracy:.4f})')

    plt.xlabel('Shift Amount (% of sequence)')
    plt.ylabel('Accuracy')
    plt.title('Model Accuracy Under Time Series Shifts')
    plt.xticks(x, shift_labels)
    plt.legend()
    plt.tight_layout()
    plt.savefig(f"{RESULTS_DIR}/time_series_shift.png")
    plt.close()

    # Save results
    time_shift_results = {
        "baseline_accuracy": float(baseline_accuracy),
        "shift_tests": shift_results
    }

    return time_shift_results

In [None]:
# ================= MAIN FUNCTION =================

def main():
    """Main function to run all tests"""
    print("================= DAS MODEL TESTING SUITE =================")
    print(f"Results will be saved to: {RESULTS_DIR}")

    # Load data and model
    X_train, y_train, X_test, y_test, model = load_data_and_model()

    # Run data validation tests
    integrity_report = check_dataset_integrity(X_train, y_train, X_test, y_test)
    distribution_report = analyze_class_distribution(y_train, y_test)
    unknown_report = simulate_unknown_class(X_test, model)

    # Run model performance tests
    cm_results = analyze_confusion_matrix(X_test, y_test, model)
    metrics_results = evaluate_metrics(X_test, y_test, model)
    cv_results = perform_cross_validation(X_train, y_train, model)

    # Run robustness and edge case tests
    noise_results = test_noise_resilience(X_test, y_test, model)
    adversarial_results = perform_adversarial_testing(X_test, y_test, model)
    shift_results = test_time_series_shift(X_test, y_test, model)

    # Compile final report
    final_report = {
        "test_timestamp": timestamp,
        "data_validation": {
            "integrity": integrity_report,
            "distribution": distribution_report,
            "unknown_class": unknown_report
        },
        "model_performance": {
            "confusion_matrix": cm_results,
            "metrics": metrics_results,
            "cross_validation": cv_results
        },
        "robustness": {
            "noise_resilience": noise_results,
            "adversarial_testing": adversarial_results,
            "time_series_shift": shift_results
        }
    }

    # Save final report as JSON
    with open(f"{RESULTS_DIR}/final_report.json", 'w') as f:
        json.dump(final_report, f, indent=4)

    print(f"\n================= TESTING COMPLETE =================")
    print(f"All results saved to: {RESULTS_DIR}")
    print(f"Final report: {RESULTS_DIR}/final_report.json")

In [None]:
if __name__ == "__main__":
    # Add missing import
    import json
    main()

Results will be saved to: /content/drive/MyDrive/das_testing_results_20250304_082741
Loading training data...
Loading test data...
Downsampling data...
Downsampled X_train shape: (64000, 5000, 1)
y_train shape: (64000, 4)
Downsampled X_test shape: (16000, 5000, 1)
y_test shape: (16000, 4)
Loading data and model...
Original X_train shape: (64000, 25000)
Original y_train shape: (64000, 4)
Original X_test shape: (16000, 25000)
Original y_test shape: (16000, 4)


  saveable.load_own_variables(weights_store.get(inner_path))



NaN values in X_train: 0
NaN values in y_train: 0
NaN values in X_test: 0
NaN values in y_test: 0
Infinity values in X_train: 0
Infinity values in X_test: 0
No duplicate rows found in the sampled subset of training data
X_train min: -18.643932342529297, max: 18.516048431396484
X_test min: -18.582998275756836, max: 18.560361862182617

Training set class distribution:
Bike Throttle: 16000 samples (25.00%)
Jackhammer: 16000 samples (25.00%)
Jumping: 16000 samples (25.00%)
Walking: 16000 samples (25.00%)

Test set class distribution:
Bike Throttle: 4000 samples (25.00%)
Jackhammer: 4000 samples (25.00%)
Jumping: 4000 samples (25.00%)
Walking: 4000 samples (25.00%)

Training set imbalance: 0.00%
Test set imbalance: 0.00%

[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 224ms/step
Generated 200 unknown signals
Signals classified as 'unknown' (confidence < 0.7): 200 (100.00%)

[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 97ms/step
Confusion Matrix:
[[3625 