# ASL Neural Network Pipeline Notebook

This notebook contains all the steps necessary to train a neural network for the ASL Neural Network App project located at [this repository](https://github.com/TWilliamsA7/asl-neural-app/tree/main). Utility functions can also be found in the above repository under the src directory.

1. Setup: Configuration & Authentication
2. Environment: Initialization & Imports
3. Data: Acquisition & Preprocessing
4. Data: Loading & Splitting
5. Model: Architecture
6. Model: Training
7. Model: Evaluation

## Setup: Configuration & Authenticatioon

This section of the notebook is for setting up the necessary authentication and configuration of the Colab environment

In [None]:
# Import necessary modules for setup

from google.colab import userdata, auth, files
import os
import sys

### Create github connection via colab variables

In [None]:
# Define repository details
USERNAME = "TWilliamsA7"
REPO_NAME = "asl-neural-app.git"
BRANCH_NAME = "main"

# Get PAT (Personal Access Token) stored in Colab Secrets
PAT = userdata.get("GITHUB_PAT")
if not PAT:
    raise ValueError("GITHUB_PAT secret not found!")

# Construct Authetnicated URL for accessing repositry
AUTHENTICATED_URL = f"https://{PAT}@github.com/{USERNAME}/{REPO_NAME}"
REPO_FOLDER = REPO_NAME.replace(".git", "")

# Set global Git configuration
!git config --global user.email "twilliamsa776@gmail.com"
!git config --global user.name "{USERNAME}"

print("Setup github connection and authenticated url successfully!")

### Google Cloud Authentication

In [None]:
print("--- GCS Authentication ---")

auth.authenticate_user()

print("Google Cloud authentication complete.")

## Environment: Initialization and Imports

### Clone Github Repository

In [None]:
# Clean up any existing clone
if os.path.isdir(REPO_FOLDER):
    print(f"Removing old {REPO_FOLDER} folder...")
    !rm -rf {REPO_FOLDER}

# Clone the repository using the authenticated URL
print(f"Cloning repository: {REPO_NAME}...")
!git clone {AUTHENTICATED_URL}

# Change directory into the cloned repository
%cd {REPO_FOLDER}
print(f"Current working directory: {os.getcwd()}")

### Install Dependencies

In [None]:
print("Upgrading pip, setuptools, and wheel...")
!pip install --upgrade pip setuptools wheel -q

print("Using preinstalled numpy and tensorflow dependencies")

print("Installing remaining project dependencies from requirements.txt...")
!pip install -r requirements.txt -q

!pip install crcmod

print("Dependencies installed successfully.")

### Setup .Kaggle Directory

- Must upload kaggle.json file



In [None]:
# Check if the credentials file already exists in the expected location
if not os.path.exists(os.path.expanduser('~/.kaggle/kaggle.json')):
    print("Uploading kaggle.json file...")
    # This will open a dialog for you to select and upload your file
    uploaded = files.upload()

    # Check if the upload was successful
    if not uploaded:
        print("ERROR: kaggle.json was not uploaded.")
    else:
        # The uploaded file is now in the current working directory (/content/)
        # Proceed to move and secure it.

        # 2. Create the required directory
        !mkdir -p ~/.kaggle/

        # 3. Move the uploaded file into the correct directory
        # The key in the uploaded dictionary is the filename (kaggle.json)
        # User should upload a file: 'kaggle.json'
        !mv kaggle.json ~/.kaggle/kaggle.json

        # 4. Set the correct permissions (CRITICAL)
        # Permissions MUST be 600 for security.
        !chmod 600 ~/.kaggle/kaggle.json

        print("Kaggle authentication file set up successfully!")
else:
    print("Kaggle credentials already found at ~/.kaggle/kaggle.json.")

# --- Verification Step ---
# Run a simple Kaggle command to test authentication
try:
    print("\nAttempting to list datasets (Verification)...")
    # This command uses the username/key from the now-configured kaggle.json
    !kaggle datasets list -s asl_alphabet | head -n 3
    print("\nSUCCESS: Kaggle API authenticated and is functional.")
except Exception as e:
    print(f"\nERROR: Verification failed. Please check the content of your kaggle.json file. Details: {e}")

### Connect Src directory for access to utility functions

In [None]:
sys.path.append('src')
print("Setup Complete. Colab environment is ready.")

## Data: Acquisition & Preprocessing

### Include necessary imports

In [None]:
import numpy as np
import cv2
import gc
import shutil

# If earlier cells are not ran
import os
import sys

# Ensure src accessibility
sys.path.append('src')

# Import utility functions
from data_utils import extract_keypoints

### Setup directories and constants

In [None]:
KAGGLE_DATASET_ID = "grassknoted/asl-alphabet"
DESTINATION_PATH = "sample_data"
PROCESSED_OUTPUT_DIR = 'processed_data'
DATA_ROOT_FOLDER_NAME = 'asl_alphabet_train'

os.makedirs(DESTINATION_PATH, exist_ok=True)
os.makedirs(PROCESSED_OUTPUT_DIR, exist_ok=True)

### Download Data via Kaggle API

In [None]:
print(f"Downloading dataset: {KAGGLE_DATASET_ID}")
!kaggle datasets download -d {KAGGLE_DATASET_ID} -p {DESTINATION_PATH} --unzip

# Define the exact root path to the image subfolders (A, B, C, etc.)
DATA_ROOT = os.path.join(DESTINATION_PATH, DATA_ROOT_FOLDER_NAME, DATA_ROOT_FOLDER_NAME)
print(f"Image data root set to: {DATA_ROOT}")

### Feature Extraction and Array Storage

In [None]:
GCS_BUCKET_NAME = "gs://asl-keypoint-data-storage-2025"
GCS_DESTINATION_FOLDER = "processed_features_v1"

# 1. Get all unique class folder names and sort them alphabetically
class_names = sorted([d for d in os.listdir(DATA_ROOT) if os.path.isdir(os.path.join(DATA_ROOT, d))])

# 2. Create the dictionary
label_map = {name: i for i, name in enumerate(class_names)}

FEATURE_OUTPUT_DIR = os.path.join('processed_data', 'class_splits')
os.makedirs(FEATURE_OUTPUT_DIR, exist_ok=True) # Ensure the directory exists

def create_and_save_features():
    # List to hold file paths of NPY files for later concatenation
    all_class_files = []

    # Iterate through all class folders
    for class_name in class_names:
        class_path = os.path.join(DATA_ROOT, class_name)
        label_index = label_map[class_name]

        print(f"Processing Class: {class_name} (Label: {label_index})")

        # --- Memory-Saving Block ---
        class_keypoints = []
        class_images = []
        class_labels = []

        for image_name in os.listdir(class_path):
            image_path = os.path.join(class_path, image_name)

            # Use the imported modular function
            keypoints, resized_img = extract_keypoints(image_path)

            if keypoints is not None:
                class_keypoints.append(keypoints)
                class_images.append(resized_img)
                class_labels.append(label_index)

        # 3. Convert and Save (The memory-intensive part, done one class at a time)
        X_key_class = np.array(class_keypoints, dtype=np.float32)
        X_cnn_class = np.array(class_images, dtype=np.float32)
        y_class = np.array(class_labels, dtype=np.int32)

        # 4. Save to Disk
        # Use a temporary name for each class file
        key_file = os.path.join(FEATURE_OUTPUT_DIR, f'keypoints_{class_name}.npy')
        cnn_file = os.path.join(FEATURE_OUTPUT_DIR, f'cnn_{class_name}.npy')
        label_file = os.path.join(FEATURE_OUTPUT_DIR, f'labels_{class_name}.npy')

        np.save(key_file, X_key_class)
        np.save(cnn_file, X_cnn_class)
        np.save(label_file, y_class)
        all_class_files.append((key_file, cnn_file, label_file))

        print(f"Processed and saved {class_name}. Freeing memory...")

        # 5. Crucial: Delete objects and force garbage collection
        del X_key_class, X_cnn_class, y_class, class_keypoints, class_images, class_labels
        gc.collect()

# --- EXECUTION ---
create_and_save_features()

### Final Concatenation

In [None]:
print("Starting memory-optimized final concatenation...")

# 1. Identify all temporary class files that need to be merged
temp_files = sorted(os.listdir(FEATURE_OUTPUT_DIR))
keypoint_files = [os.path.join(FEATURE_OUTPUT_DIR, f) for f in temp_files if f.startswith('keypoints_')]
cnn_files = [os.path.join(FEATURE_OUTPUT_DIR, f) for f in temp_files if f.startswith('cnn_')]
label_files = [os.path.join(FEATURE_OUTPUT_DIR, f) for f in temp_files if f.startswith('labels_')]

# Check if files were found
if not keypoint_files:
    raise FileNotFoundError("No temporary keypoint files found. Check FEATURE_OUTPUT_DIR path.")
if not cnn_files:
    raise FileNotFoundError("No temporary cnn files found. Check FEATURE_OUTPUT_DIR path.")
if not label_files:
    raise FileNotFoundError("No temporary label files found. Check FEATURE_OUTPUT_DIR path.")

# 2. Memory-Optimized Concatenation (Loading one-by-one and overwriting)

def merge_files_efficiently(file_list, final_name):
    """Loads files sequentially and saves the final result."""

    output_path = os.path.join(FEATURE_OUTPUT_DIR, final_name)
    print(f"Merging {len(file_list)} files into {final_name}...")

    all_arrays = [np.load(f) for f in file_list]
    merged_array = np.concatenate(all_arrays)
    np.save(output_path, merged_array)

    # Crucial: Delete objects and force garbage collection after each merge
    del all_arrays, merged_array
    gc.collect()
    print(f"Successfully saved {final_name}.")
    return output_path

# Execute the merges
final_keypoints_path = merge_files_efficiently(keypoint_files, 'X_keypoints.npy')
final_labels_path = merge_files_efficiently(label_files, 'y_labels.npy')

print("\nAll final feature files created successfully on local disk.")

# 3. Upload to GCS
GCS_PATH = f"{GCS_BUCKET_NAME}/{GCS_DESTINATION_FOLDER}"
print(f"Uploading final processed features from {FEATURE_OUTPUT_DIR} to {GCS_PATH}...")

print(f"Uploading final feature files to {GCS_PATH}...")

# Upload X_keypoints.npy
!gsutil cp {FEATURE_OUTPUT_DIR}/X_keypoints.npy {GCS_PATH}/X_keypoints.npy

# Upload y_labels.npy
!gsutil cp {FEATURE_OUTPUT_DIR}/y_labels.npy {GCS_PATH}/y_labels.npy

print("\nUpload to GCS complete. Only final files were uploaded.")

print("\nUpload to GCS complete. Data processing pipeline finished! ðŸŽ‰")

In [None]:
print("--- Starting Memory-Mapped Merge for X_cnn_images (with Disk Cleanup) ---")

# 1. Identify all temporary files and verify paths
try:
    temp_files = sorted(os.listdir(FEATURE_OUTPUT_DIR))
    # We load these lists for reference, they are NOT deleted yet.
    label_files = [os.path.join(FEATURE_OUTPUT_DIR, f) for f in temp_files if f.startswith('labels_')]
    cnn_files = [os.path.join(FEATURE_OUTPUT_DIR, f) for f in temp_files if f.startswith('cnn_')]
except FileNotFoundError:
    print(f"Error: The directory {FEATURE_OUTPUT_DIR} was not found. Please check REPO_NAME.")
    exit()

if not cnn_files or not label_files:
    print("Error: No intermediate 'cnn_*.npy' or 'labels_*.npy' files found. Cannot proceed.")
    exit()

# 2. Calculate the required final shape (metadata only)
print(f"Found {len(cnn_files)} intermediate files.")

# Calculate the total number of samples (rows)
total_samples = sum(np.load(f).shape[0] for f in label_files)

# Get the shape of a single image (e.g., (224, 224, 3))
cnn_image_shape = np.load(cnn_files[0]).shape[1:]

print(f"Total Samples to Merge: {total_samples}")
print(f"Image Feature Shape: {cnn_image_shape}")

# 3. Create and Populate the Memory-Mapped Array
FINAL_CNN_PATH = os.path.join(FEATURE_OUTPUT_DIR, 'X_cnn_images.npy')
current_row = 0

print(f"Creating memory-mapped file at: {FINAL_CNN_PATH}")

# Create the destination memory-mapped array (mode='w+' means create/write)
X_cnn_final_map = np.memmap(
    FINAL_CNN_PATH,
    dtype=np.float32,
    mode='w+',
    shape=(total_samples, *cnn_image_shape)
)

# Iteratively write data into the memory-mapped file
for i, cnn_file in enumerate(cnn_files):
    # Load one small class array into RAM
    X_cnn_class = np.load(cnn_file)
    num_samples = X_cnn_class.shape[0]

    # Write the small array directly into the correct slice of the large file on disk
    X_cnn_final_map[current_row:current_row + num_samples] = X_cnn_class

    # Update the row counter
    current_row += num_samples

    print(f"  -> Wrote file {i+1}/{len(cnn_files)} ({num_samples} samples).")

    # Crucial: Delete objects and force garbage collection after each loop
    del X_cnn_class
    gc.collect()

    # Flush ensures data is written to disk immediately
    X_cnn_final_map.flush()

    # --- DISK CLEANUP STEP ---
    os.remove(cnn_file)
    print(f"  -> Deleted source file: {os.path.basename(cnn_file)}")

print("\nStep 1 of 2: X_cnn_images successfully merged and saved locally.")

# Final cleanup of the memmap object before GCS upload
del X_cnn_final_map
gc.collect()

# 4. Upload the final file to GCS
GCS_PATH = f"{GCS_BUCKET_NAME}/{GCS_DESTINATION_FOLDER}"
GCS_DESTINATION_FILE = os.path.basename(FINAL_CNN_PATH)

print(f"\nStep 2 of 2: Uploading {GCS_DESTINATION_FILE} to {GCS_PATH}...")
# Use gsutil cp to copy the local file to the GCS path
!gsutil cp {FINAL_CNN_PATH} {GCS_PATH}/{GCS_DESTINATION_FILE}

print("\nSUCCESS: X_cnn_images.npy uploaded to GCS.")

## Data: Loading and Splitting

In [None]:
import os
import numpy as np
from sklearn.model_selection import train_test_split

In [None]:
# Constants

%cd /content

GCS_BUCKET_NAME = "gs://asl-keypoint-data-storage-2025"
GCS_DESTINATION_FOLDER = "processed_features_v1/"
CNN_FILE_NAME = "X_cnn_images.npy"
KEY_FILE_NAME = "X_keypoints.npy"
LABELS_FILE_NAME = "y_labels.npy"
LOCAL_FEATURE_DIR = 'gcs_loaded_data'
GCS_PATH = f"{GCS_BUCKET_NAME}/{GCS_DESTINATION_FOLDER}"


In [None]:
def setup_gcs_data():
    """Authenticates GCS access and copies large files to the local Colab SSD."""
    print("Authenticating Google Cloud Storage...")
    try:
        # Authenticate the user for GCS access
        auth.authenticate_user()
    except Exception as e:
        print(f"Authentication failed: {e}")
        return False

    # Create the local directory
    os.makedirs(LOCAL_FEATURE_DIR, exist_ok=True)
    print(f"Local storage directory created at: {LOCAL_FEATURE_DIR}")

    # Use gsutil to copy the files to the local SSD
    print(f"Copying {CNN_FILE_NAME} (38 GB) from GCS to local SSD...")
    # It is crucial to use the local SSD for fast I/O during training.
    # The 'gsutil cp' command is optimized for this transfer.
    try:
        # Copy the large feature file
        !gsutil cp {GCS_PATH}{CNN_FILE_NAME} {LOCAL_FEATURE_DIR}/

        # Copy the much smaller labels file
        print(f"Copying {LABELS_FILE_NAME} from GCS to local SSD...")
        !gsutil cp {GCS_PATH}{LABELS_FILE_NAME} {LOCAL_FEATURE_DIR}/

        print(f"Copying {KEY_FILE_NAME} from GCS to local SSD...")
        !gsutil -m cp {GCS_PATH}{KEY_FILE_NAME} {LOCAL_FEATURE_DIR}/

        print("Data transfer complete.")
        return True
    except Exception as e:
        print(f"Data transfer failed: {e}")
        return False

setup_gcs_data()

In [None]:
print("Initializing data loading...")

# Path to files
cnn_path = os.path.join(LOCAL_FEATURE_DIR, CNN_FILE_NAME)
key_path = os.path.join(LOCAL_FEATURE_DIR, KEY_FILE_NAME)
label_path = os.path.join(LOCAL_FEATURE_DIR, LABELS_FILE_NAME)

# --- STEP 1: Load Labels to determine Total Samples (N) ---
# We try np.load first. If that fails (because it's a raw memmap file),
# we calculate N based on file size.

try:
    # Try standard load (works if you saved with np.save)
    y_labels = np.load(label_path)
    total_samples = y_labels.shape[0]
    print(f"Loaded Labels via np.load. Total samples: {total_samples}")
except ValueError:
    # Fallback: It's a raw binary file from np.memmap
    print("Labels file is raw binary. Loading via memmap...")
    # Assuming labels are int32 (4 bytes)
    file_size = os.path.getsize(label_path)
    total_samples = file_size // 4
    y_labels = np.memmap(label_path, dtype=np.int32, mode='r', shape=(total_samples,))
    print(f"Loaded Labels via memmap. Total samples: {total_samples}")

# --- STEP 2: Load Keypoints ---
try:
    X_keypoints = np.load(key_path)
except ValueError:
    print("Keypoints file is raw binary. Loading via memmap...")
    # Shape is (N, 42), float32
    X_keypoints = np.memmap(key_path, dtype=np.float32, mode='r', shape=(total_samples, 42))

print(f"X_keypoints shape:  {X_keypoints.shape}")

# --- STEP 3: Load CNN Images ---
# We MUST use np.memmap here because we created it with np.memmap
# Shape is (N, 224, 224, 3), float32

print("Mapping large CNN image file (Read-Only)...")
img_shape = (224, 224, 3)

X_cnn_mmap = np.memmap(
    cnn_path,
    dtype=np.float32,
    mode='r',
    shape=(total_samples, *img_shape)
)

print(f"X_cnn_mmap mapped. Shape: {X_cnn_mmap.shape}")
print("Data ready for splitting.")

In [None]:
import numpy as np
import collections

# Load labels
# We use mmap if it's the large binary file, or standard load if it's a saved .npy
try:
    y_labels = np.load(os.path.join(LOCAL_FEATURE_DIR, LABELS_FILE_NAME))
except:
    # Fallback for raw binary
    label_path = os.path.join(LOCAL_FEATURE_DIR, LABELS_FILE_NAME)
    file_size = os.path.getsize(label_path)
    total_samples = file_size // 4
    y_labels = np.memmap(label_path, dtype=np.int32, mode='r', shape=(total_samples,))

# Count samples per class
counter = collections.Counter(y_labels)
print("--- Sample Counts Per Class ---")
print(f"Total Samples: {len(y_labels)}")
print(f"Total Classes: {len(counter)}")
print("-" * 30)

# Print classes with dangerously low samples (< 10)
low_data_classes = []
for label, count in sorted(counter.items()):
    print(f"Class {label}: {count} samples")
    if count < 10:
        low_data_classes.append(label)

if low_data_classes:
    print(f"\nCRITICAL WARNING: Classes {low_data_classes} have insufficient data!")
    print("Stratified splitting will fail for these classes.")
else:
    print("\nData counts look robust. The error might be an edge case.")

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

# Generate a list of indices [0, 1, 2, ... N-1]
num_samples = y_labels.shape[0]
indices = np.arange(num_samples)

print("Attempting Robust Data Split...")

try:
    # 1. Split Indices into Train and Temp (Validation + Test)
    # We try stratification first
    train_idx, temp_idx, y_train_sparse, y_temp_sparse = train_test_split(
        indices, y_labels,
        test_size=0.2,
        random_state=42,
        stratify=y_labels
    )
    print("Primary split (Train/Temp) successful with stratification.")

    # 2. Split Temp Indices into Validation and Test
    try:
        val_idx, test_idx, y_val_sparse, y_test_sparse = train_test_split(
            temp_idx, y_temp_sparse,
            test_size=0.5, # 50% of 20% = 10% total
            random_state=42,
            stratify=y_temp_sparse
        )
        print("Secondary split (Val/Test) successful with stratification.")
    except ValueError as e:
        print(f"WARNING: Stratified split failed for Val/Test ({e}).")
        print("Falling back to random split (non-stratified) for Validation/Test sets.")
        val_idx, test_idx, y_val_sparse, y_test_sparse = train_test_split(
            temp_idx, y_temp_sparse,
            test_size=0.5,
            random_state=42,
            stratify=None # Disable stratification to prevent crash
        )

except ValueError as e:
    print(f"CRITICAL ERROR: Even primary split failed. Data is extremely sparse. {e}")
    # Fallback to completely random split if data is essentially empty for some classes
    train_idx, temp_idx, y_train_sparse, y_temp_sparse = train_test_split(
        indices, y_labels, test_size=0.2, random_state=42, stratify=None
    )
    val_idx, test_idx, y_val_sparse, y_test_sparse = train_test_split(
        temp_idx, y_temp_sparse, test_size=0.5, random_state=42, stratify=None
    )

# 3. One-Hot Encode Labels
num_classes = len(np.unique(y_labels))
y_labels_categorical = to_categorical(y_labels, num_classes=num_classes)

print("-" * 20)
print(f"Training Indices:   {len(train_idx)}")
print(f"Validation Indices: {len(val_idx)}")
print(f"Test Indices:       {len(test_idx)}")

In [None]:
import tensorflow as tf
import math
import numpy as np

class MultiInputGenerator(tf.keras.utils.Sequence):
    """
    The canonical Keras Sequence for loading batches of multi-input data.
    This structure is mandatory for stability when the tf.data wrapper causes issues.
    """
    def __init__(self, indices, x_cnn_mmap, x_keypoints, y_labels_cat, batch_size=64, shuffle=True):
        self.indices = indices
        self.x_cnn_mmap = x_cnn_mmap
        self.x_keypoints = x_keypoints
        self.y_labels_cat = y_labels_cat
        self.batch_size = batch_size
        self.shuffle = shuffle
        # on_epoch_end is called immediately by Keras at the start of training
        self.on_epoch_end()
        if len(self.indices) == 0:
            raise ValueError("Indices list is empty. Check your data splitting in Cell 4.")

    def __len__(self):
        # This method is what Keras uses to determine the number of steps
        # (if steps_per_epoch is not explicitly provided)
        return math.ceil(len(self.indices) / self.batch_size)

    def __getitem__(self, index):
        start_index = index * self.batch_size
        end_index = (index + 1) * self.batch_size
        batch_indices = self.indices[start_index:end_index]

        # Gather data (NumPy arrays/memory maps are accessed here)
        batch_images = self.x_cnn_mmap[batch_indices]
        batch_keypoints = self.x_keypoints[batch_indices]
        batch_labels = self.y_labels_cat[batch_indices]

        # Explicitly convert to tf.Tensor
        tensor_images = tf.convert_to_tensor(batch_images)
        tensor_keypoints = tf.convert_to_tensor(batch_keypoints)
        tensor_labels = tf.convert_to_tensor(batch_labels)

        # Return inputs as a dictionary, mapping to layer names
        inputs_dict = {
            "image_input": tensor_images,
            "keypoint_input": tensor_keypoints
        }

        # Return: (inputs_dictionary, labels_tensor)
        return inputs_dict, tensor_labels

    def on_epoch_end(self):
        """
        Called by Keras after every epoch. This ensures the indices are shuffled
        before the next epoch starts, which is mandatory for training stability.
        """
        if self.shuffle:
            np.random.shuffle(self.indices)

# --- 1. Create Keras Sequence Objects ---
BATCH_SIZE = 64
num_classes = y_labels_categorical.shape[1]

# Note: These are now the simple Sequence objects.
train_gen_seq = MultiInputGenerator(train_idx, X_cnn_mmap, X_keypoints, y_labels_categorical, BATCH_SIZE, shuffle=True)
val_gen_seq = MultiInputGenerator(val_idx, X_cnn_mmap, X_keypoints, y_labels_categorical, BATCH_SIZE, shuffle=False)
test_gen_seq = MultiInputGenerator(test_idx, X_cnn_mmap, X_keypoints, y_labels_categorical, BATCH_SIZE, shuffle=False)

# --- 2. Create Optimized tf.data.Dataset ONLY FOR VALIDATION/TEST (Optional) ---
# We keep this for validation because it works and offers minor efficiency gains.

# Define the expected output signature
output_signature = (
    {
        "image_input": tf.TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32),
        "keypoint_input": tf.TensorSpec(shape=(None, 42), dtype=tf.float32)
    },
    tf.TensorSpec(shape=(None, num_classes), dtype=tf.float32)
)

def sequence_to_dataset(sequence, signature):
    dataset = tf.data.Dataset.from_generator(
        lambda: sequence,
        output_signature=signature
    )
    dataset = dataset.map(
        lambda x, y: (x, y),
        num_parallel_calls=tf.data.AUTOTUNE
    ).prefetch(tf.data.AUTOTUNE)

    return dataset

# The training set is NOT converted to tf.data.Dataset
val_ds = sequence_to_dataset(val_gen_seq, output_signature)
test_ds = sequence_to_dataset(test_gen_seq, output_signature)

print("Training Data: Keras Sequence (train_gen_seq)")
print("Validation Data: Optimized tf.data.Dataset (val_ds)")
print("This configuration forces reliable epoch-end state management.")

In [None]:
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout, Input, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

def create_fusion_model(num_classes, learning_rate=1e-4):
    # --- Branch 1: CNN for Images (MobileNetV2) ---
    # Input shape: 224x224x3 RGB
    input_image = Input(shape=(224, 224, 3), name="image_input")

    # Load MobileNetV2, exclude top classification layer
    base_mobilenet = MobileNetV2(weights='imagenet', include_top=False, input_tensor=input_image)

    # Freeze the base model initially (optional, but recommended for stability)
    base_mobilenet.trainable = False

    x1 = base_mobilenet.output
    x1 = GlobalAveragePooling2D()(x1)
    x1 = Dense(128, activation='relu')(x1)
    x1 = Dropout(0.3)(x1)

    # --- Branch 2: MLP for Keypoints ---
    # Input shape: 42 (21 points * 2 coordinates)
    input_keypoints = Input(shape=(42,), name="keypoint_input")

    x2 = Dense(64, activation='relu')(input_keypoints)
    x2 = Dropout(0.3)(x2)
    x2 = Dense(32, activation='relu')(x2)

    # --- Fusion ---
    combined = Concatenate()([x1, x2])

    # Final Classification Head
    z = Dense(64, activation='relu')(combined)
    z = Dropout(0.2)(z)
    output = Dense(num_classes, activation='softmax', name="class_output")(z)

    # Create Model
    model = Model(inputs=[input_image, input_keypoints], outputs=output)

    model.compile(optimizer=Adam(learning_rate=learning_rate),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    return model

# Initialize Model
model = create_fusion_model(num_classes=num_classes)
model.summary()

## Load previous weights

In [None]:
import tensorflow as tf
import numpy as np

# --- 1. Load the Best Weights ---
# Ensure your model architecture is defined (Cell 6) and compiled (Cell 7) BEFORE running this.
try:
    # Use the name of the file saved by your ModelCheckpoint/BackupAndRestore
    model.load_weights('processed_features_v1_latest.weights.h5')
    print("Successfully loaded model weights.")
except Exception as e:
    print(f"ERROR: Could not load weights file. Please check the path: {e}")
    # Halt execution if weights cannot be loaded, as the rest of the process is meaningless.
    raise

# --- 2. Fix the Optimizer State Mismatch ---

print("Initializing Adam optimizer state to match loaded weights...")

# CRITICAL STEP: Run a dummy training step.
# This forces the Keras Adam optimizer to allocate its 20 internal state variables
# (momentum and velocity vectors for every weight) after the weights have been loaded.

@tf.function
def initialize_optimizer_state():
    """Runs a single, no-op gradient calculation to allocate Adam's state variables."""
    # We grab one batch's shape from the generator objects defined in Cell 5.
    # Note: We use the *shape* and *dtype* defined in the generator output, not actual data.

    # Use the shapes you defined for the inputs in the generator:
    image_shape = (1, 224, 224, 3)
    keypoint_shape = (1, 42)
    label_shape = (1, num_classes) # num_classes should be defined from y_labels_categorical.shape[1]

    # Create a batch of Zeros with size 1
    dummy_input = {
        "image_input": tf.zeros(image_shape, dtype=tf.float32),
        "keypoint_input": tf.zeros(keypoint_shape, dtype=tf.float32)
    }
    dummy_label = tf.zeros(label_shape, dtype=tf.float32)

    with tf.GradientTape() as tape:
        # Forward pass
        predictions = model(dummy_input, training=True)
        # Calculate loss (no need for a real value, just need to trigger the computation graph)
        # Using the recommended method `model.compute_loss` to replace the deprecated `model.compiled_loss`
        loss = model.compute_loss(y=dummy_label, y_pred=predictions)

    # Compute and apply gradients (this is the step that allocates the M/V variables)
    gradients = tape.gradient(loss, model.trainable_variables)
    model.optimizer.apply_gradients(zip(gradients, model.trainable_variables))

# Execute the initialization
initialize_optimizer_state()
print("Optimizer state successfully initialized. The 2 vs. 22 variable mismatch is resolved.")

# --- 3. Resume Training ---
# You can now proceed to the model.fit() call in your next block (Cell 7).
# This time, Adam will start training with momentum from scratch, but it will not
# throw the variable mismatch warning and should stabilize quickly.

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau, BackupAndRestore

# Checkpoint to save model
backup_restore = BackupAndRestore(
    backup_dir=GCS_PATH, # MUST be a GCS path or local folder
    save_freq='epoch'
)


# Define Callbacks
checkpoint = ModelCheckpoint(
    'best_fusion_model.keras',
    monitor='val_accuracy',
    save_best_only=True,
    verbose=1
)

early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True,
    verbose=1
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=3,
    min_lr=1e-6,
    verbose=1
)

BATCH_SIZE = 64 # Confirm this matches Cell 5
TRAIN_SAMPLES = len(train_idx) # train_idx comes from Cell 4
VAL_SAMPLES = len(val_idx) # Get validation samples

# The number of steps is the ceiling of the total samples divided by the batch size.
STEPS_PER_EPOCH = math.ceil(TRAIN_SAMPLES / BATCH_SIZE)
VALIDATION_STEPS = math.ceil(VAL_SAMPLES / BATCH_SIZE)

# Train the model
history = model.fit(
    train_gen_seq,
    validation_data=val_ds,
    steps_per_epoch=STEPS_PER_EPOCH,
    validation_steps=VALIDATION_STEPS,
    epochs=25, # Adjust as needed
    callbacks=[backup_restore, checkpoint, early_stopping, reduce_lr]
)

In [None]:
import tensorflow as tf
import math
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix
import pandas as pd # Used for better visualization of the confusion matrix

# --- 1. Define Constants (Must match Cell 5/7) ---
BATCH_SIZE = 64
TEST_SAMPLES = len(test_idx) # Assumes test_idx is available from Cell 4

TEST_STEPS = math.ceil(TEST_SAMPLES / BATCH_SIZE)

# --- 2. Load the Best Model ---
MODEL_PATH = 'best_fusion_model.keras'

try:
    # Load the best model saved by the ModelCheckpoint callback in Cell 7.
    # This ensures we are testing the highest-performing version.
    final_model = tf.keras.models.load_model(MODEL_PATH)
    print(f"Successfully loaded the best model for evaluation from: {MODEL_PATH}")
except Exception as e:
    print(f"Error loading model from {MODEL_PATH}: {e}")
    print("WARNING: Using the last state of the currently loaded model instead.")
    # Assuming 'model' variable is still holding the trained model object from Cell 7
    final_model = model

print("-" * 50)
print(f"Evaluation Details: {TEST_SAMPLES} samples in {TEST_STEPS} steps.")
print("-" * 50)

# --- 3. Evaluate the Model (Standard Metrics) ---
# Use the optimized tf.data.Dataset (test_ds) for evaluation.
print("Starting final model evaluation on the unseen test dataset...")

results = final_model.evaluate(
    test_ds,
    steps=TEST_STEPS,
    verbose=1,
    return_dict=True # Return results as a dictionary for clear output
)

# --- 4. Display Standard Results ---
print("\n--- Final Test Evaluation Results ---")
for name, value in results.items():
    if name == 'accuracy':
        print(f"TEST ACCURACY: {value:.4f} (This is the final performance metric)")
    else:
        print(f"Test {name.capitalize()}: {value:.4f}")
print("-------------------------------------")


# --- 5. DETAILED CLASSIFICATION ANALYSIS ---

# 5a. Get Raw Predictions
print("\n--- Generating Raw Predictions ---")
test_predictions = final_model.predict(
    test_ds,
    steps=TEST_STEPS,
    verbose=1
)

# 5b. Get True Labels
# Extract the true labels from the original one-hot array using the test indices.
y_test_one_hot = y_labels_categorical[test_idx]

# 5c. Convert to Class Indices (Integers)
# Convert one-hot encoded true labels to single class indices (0, 1, 2, ...)
y_true_classes = np.argmax(y_test_one_hot, axis=1)

# Convert predicted probabilities/logits to single class indices (0, 1, 2, ...)
y_pred_classes = np.argmax(test_predictions, axis=1)

# Ensure the lengths match
if len(y_true_classes) != len(y_pred_classes):
    # This happens if the generator or dataset padded the last batch.
    # We must truncate the predictions to match the true label count.
    min_len = min(len(y_true_classes), len(y_pred_classes))
    y_pred_classes = y_pred_classes[:min_len]
    y_true_classes = y_true_classes[:min_len]
    print(f"Warning: Prediction length truncated to {min_len} to match true label count.")

# 5d. Generate and Print Classification Report
print("\n\n--- CLASSIFICATION REPORT ---")
# The target_names should ideally be the list of your ASL signs (e.g., ['A', 'B', 'C', ...])
# Replace `[str(i) for i in range(y_test_one_hot.shape[1])]` with your actual class names if known.
class_names = [str(i) for i in range(y_test_one_hot.shape[1])]
print(classification_report(y_true_classes, y_pred_classes, target_names=class_names, zero_division=0))

# 5e. Generate and Print Confusion Matrix
print("\n\n--- CONFUSION MATRIX ---")
cm = confusion_matrix(y_true_classes, y_pred_classes)
print("Rows = True Class, Columns = Predicted Class")

# Display the confusion matrix using pandas for better formatting
cm_df = pd.DataFrame(cm, index=class_names, columns=class_names)
print(cm_df)

print("\nDetailed analysis complete.")