In [1]:
import os
import numpy as np
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import TimeDistributed, Conv2D, MaxPooling2D, Flatten, LSTM, Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import ModelCheckpoint, LambdaCallback, LearningRateScheduler
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import Sequence
from tensorflow.keras.regularizers import l2
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
import time
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.callbacks import EarlyStopping

In [3]:
# **Step 1: Define data generator**
class DataGenerator(Sequence):
    def __init__(self, X_path, y_path, indices, batch_size):
        self.X = np.memmap(X_path, dtype='float32', mode='r', shape=(5082, 10, 224, 224, 3))
        self.y = np.memmap(y_path, dtype='int32', mode='r', shape=(5082,))
        self.indices = indices
        self.batch_size = batch_size

    def __len__(self):
        return len(self.indices) // self.batch_size

    def __getitem__(self, idx):
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]
        X_batch = self.X[batch_indices]
        y_batch = np.eye(2)[self.y[batch_indices]]  # One-hot encode
        return X_batch, y_batch

In [None]:
# **Step 1: Define data generator**
class DataGenerator(Sequence):
    def __init__(self, X_path, y_path, indices, batch_size):
        self.X = np.memmap(X_path, dtype='float32', mode='r', shape=(5082, 10, 224, 224, 3))
        self.y = np.memmap(y_path, dtype='int32', mode='r', shape=(5082,))
        self.indices = indices
        self.batch_size = batch_size

    def __len__(self):
        return len(self.indices) // self.batch_size

    def __getitem__(self, idx):
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]
        X_batch = self.X[batch_indices]
        y_batch = np.eye(2)[self.y[batch_indices]]  # One-hot encode
        return X_batch, y_batch

# **Step 2: Build the CNN-LSTM model**
def build_cnn_lstm_model(seq_length, height, width, channels, num_classes):
    model = Sequential([
        TimeDistributed(Conv2D(32, (3, 3), activation='relu', padding='same',
                               kernel_regularizer=l2(0.01)),  # Regularization added
                        input_shape=(seq_length, height, width, channels)),
        TimeDistributed(BatchNormalization()),
        TimeDistributed(MaxPooling2D(pool_size=(2, 2))),

        TimeDistributed(Conv2D(64, (3, 3), activation='relu', padding='same',
                               kernel_regularizer=l2(0.01))),  # Regularization added
        TimeDistributed(BatchNormalization()),
        TimeDistributed(MaxPooling2D(pool_size=(2, 2))),

        TimeDistributed(Conv2D(128, (3, 3), activation='relu', padding='same',
                               kernel_regularizer=l2(0.01))),  # Regularization added
        TimeDistributed(BatchNormalization()),
        TimeDistributed(MaxPooling2D(pool_size=(2, 2))),

        TimeDistributed(Flatten()),

        LSTM(128, return_sequences=True, kernel_regularizer=l2(0.01)),  # Regularization added
        Dropout(0.5),
        LSTM(64, kernel_regularizer=l2(0.01)),  # Regularization added
        Dropout(0.5),

        Dense(32, activation='relu', kernel_regularizer=l2(0.01)),  # Regularization added
        Dropout(0.5),
        Dense(num_classes, activation='softmax', kernel_regularizer=l2(0.01))  # Regularization added
    ])
    
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)  # Lower initial learning rate
    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# Instantiate the model
cnn_lstm_model = build_cnn_lstm_model(seq_length=10, height=224, width=224, channels=3, num_classes=2)
cnn_lstm_model.summary()

# **Step 3: Split dataset into train and validation**
indices = np.arange(5082)
train_indices, val_indices = train_test_split(indices, test_size=0.2, random_state=42)

train_gen = DataGenerator(r'E:\PosePerfect\Dataset Creation\X_final.dat', r'E:\PosePerfect\Dataset Creation\y_final.dat', train_indices, batch_size=8)
val_gen = DataGenerator(r'E:\PosePerfect\Dataset Creation\X_final.dat', r'E:\PosePerfect\Dataset Creation\y_final.dat', val_indices, batch_size=8)

# **Step 4: Set up checkpoint callback**
checkpoint_dir = './Checkpoints'
os.makedirs(checkpoint_dir, exist_ok=True)

checkpoint_path = os.path.join(checkpoint_dir, 'model_epoch_{epoch:02d}_val_loss_{val_loss:.2f}.keras')
checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_path,
    monitor='val_loss',  # Metric to monitor
    save_best_only=False,  # Save at every epoch regardless of performance
    save_weights_only=False,  # Save the entire model (not just weights)
    verbose=1
)

# **Step 5: Set up real-time accuracy and loss callback**
def on_batch_end(batch, logs):
    # Print loss and accuracy after each batch during the epoch
    print(f"Epoch: {logs['epoch']+1}, Batch: {batch + 1}/{logs['steps']}, Loss: {logs['loss']:.4f}, Accuracy: {logs['accuracy']:.4f}")

realtime_metrics_callback = LambdaCallback(on_batch_end=on_batch_end)


# **Step 6: Learning Rate Scheduler**
def scheduler(epoch, lr):
    if epoch < 2:  # Keep the initial learning rate for the first 2 epochs
        return lr
    return lr * 0.9  # Decay the learning rate by 10% after every epoch

lr_callback = LearningRateScheduler(scheduler)

# Step 7: Train the model 
cnn_lstm_model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=10,
    steps_per_epoch=len(train_gen),
    validation_steps=len(val_gen),
    callbacks=[checkpoint_callback, realtime_metrics_callback],
    verbose=1# Add checkpointing and real-time metrics callback
)


In [None]:
train_gen = DataGenerator(r'E:\PosePerfect\Dataset Creation\X_final.dat', r'E:\PosePerfect\Dataset Creation\y_final.dat', train_indices, batch_size=10)
val_gen = DataGenerator(r'E:\PosePerfect\Dataset Creation\X_final.dat', r'E:\PosePerfect\Dataset Creation\y_final.dat', val_indices, batch_size=10)

In [3]:
class StepTimerCallback(Callback):
    def on_epoch_begin(self, epoch, logs=None):
        print(f"\n--- Starting Epoch {epoch + 1} ---")
        self.epoch_start_time = time.time()

    def on_epoch_end(self, epoch, logs=None):
        epoch_time = time.time() - self.epoch_start_time
        print(f"--- Epoch {epoch + 1} completed in {epoch_time:.2f} seconds ---\n")

    def on_train_batch_begin(self, batch, logs=None):
        self.step_start_time = time.time()
        print(f"Step {batch + 1}/{self.params['steps']} - ", end="")

    def on_train_batch_end(self, batch, logs=None):
        step_time = time.time() - self.step_start_time
        print(f"Loss: {logs['loss']:.4f}, Accuracy: {logs['accuracy']:.4f}, Time: {step_time:.2f} seconds")

In [None]:
# **Step 1: Define data generator**
class DataGenerator(Sequence):
    def __init__(self, X_path, y_path, indices, batch_size):
        self.X = np.memmap(X_path, dtype='float32', mode='r', shape=(2041, 10, 224, 224, 3))
        self.y = np.memmap(y_path, dtype='int32', mode='r', shape=(2041,))
        self.indices = indices
        self.batch_size = batch_size

    def __len__(self):
        return len(self.indices) // self.batch_size

    def __getitem__(self, idx):
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]
        X_batch = self.X[batch_indices]
        y_batch = np.eye(2)[self.y[batch_indices]]  # One-hot encode
        return X_batch, y_batch

# **Step 2: Build the CNN-LSTM model**
def build_cnn_lstm_model(seq_length, height, width, channels, num_classes):
    model = Sequential([
        TimeDistributed(Conv2D(32, (3, 3), activation='relu', padding='same',
                               kernel_regularizer=l2(0.01)),  # Regularization added
                        input_shape=(seq_length, height, width, channels)),
        TimeDistributed(BatchNormalization()),
        TimeDistributed(MaxPooling2D(pool_size=(2, 2))),

        TimeDistributed(Conv2D(64, (3, 3), activation='relu', padding='same',
                               kernel_regularizer=l2(0.01))),  # Regularization added
        TimeDistributed(BatchNormalization()),
        TimeDistributed(MaxPooling2D(pool_size=(2, 2))),

        TimeDistributed(Conv2D(128, (3, 3), activation='relu', padding='same',
                               kernel_regularizer=l2(0.01))),  # Regularization added
        TimeDistributed(BatchNormalization()),
        TimeDistributed(MaxPooling2D(pool_size=(2, 2))),

        TimeDistributed(Flatten()),

        LSTM(128, return_sequences=True, kernel_regularizer=l2(0.01)),  # Regularization added
        Dropout(0.5),
        LSTM(64, kernel_regularizer=l2(0.01)),  # Regularization added
        Dropout(0.5),

        Dense(32, activation='relu', kernel_regularizer=l2(0.01)),  # Regularization added
        Dropout(0.5),
        Dense(num_classes, activation='softmax', kernel_regularizer=l2(0.01))  # Regularization added
    ])
    
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)  # Lower initial learning rate
    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# Instantiate the model
cnn_lstm_model = build_cnn_lstm_model(seq_length=10, height=224, width=224, channels=3, num_classes=2)
cnn_lstm_model.summary()

# **Step 3: Split dataset into train and validation**
indices = np.arange(2041)
train_indices, val_indices = train_test_split(indices, test_size=0.2, random_state=42)

train_gen = DataGenerator(r'E:\PosePerfect\Dataset Creation\X_final_org.dat', r'E:\PosePerfect\Dataset Creation\y_final_org.dat', train_indices, batch_size=10)
val_gen = DataGenerator(r'E:\PosePerfect\Dataset Creation\X_final_org.dat', r'E:\PosePerfect\Dataset Creation\y_final_org.dat', val_indices, batch_size=10)

# **Step 4: Set up checkpoint callback**
checkpoint_dir = './Checkpoints'
os.makedirs(checkpoint_dir, exist_ok=True)

checkpoint_path = os.path.join(checkpoint_dir, 'model_epoch_{epoch:02d}_val_loss_{val_loss:.2f}.keras')
checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_path,
    monitor='val_loss',  # Metric to monitor
    save_best_only=False,  # Save at every epoch regardless of performance
    save_weights_only=False,  # Save the entire model (not just weights)
    verbose=1
)

# **Step 6: Learning Rate Scheduler**
def scheduler(epoch, lr):
    if epoch < 2:  # Keep the initial learning rate for the first 2 epochs
        return lr
    return lr * 0.9  # Decay the learning rate by 10% after every epoch

lr_callback = LearningRateScheduler(scheduler)

step_timer_callback = StepTimerCallback()

# Step 7: Train the model 
cnn_lstm_model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=10,
    steps_per_epoch=len(train_gen),
    validation_steps=len(val_gen),
    callbacks=[checkpoint_callback, lr_callback, step_timer_callback],
    verbose=1  # Ensure verbose is enabled
)


In [None]:

# **Step 1: Data Generator**
class DataGenerator(Sequence):
    def __init__(self, X_path, y_path, indices, batch_size):
        self.X = np.memmap(X_path, dtype='float32', mode='r', shape=(2041, 10, 224, 224, 3))
        self.y = np.memmap(y_path, dtype='int32', mode='r', shape=(2041,))
        self.indices = indices
        self.batch_size = batch_size

    def __len__(self):
        return len(self.indices) // self.batch_size

    def __getitem__(self, idx):
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]
        X_batch = self.X[batch_indices]
        y_batch = np.eye(2)[self.y[batch_indices]]  # One-hot encode
        return X_batch, y_batch
    
class StepTimerCallback(Callback):
    def on_epoch_begin(self, epoch, logs=None):
        print(f"\n--- Starting Epoch {epoch + 1} ---")
        self.epoch_start_time = time.time()

    def on_epoch_end(self, epoch, logs=None):
        epoch_time = time.time() - self.epoch_start_time
        print(f"--- Epoch {epoch + 1} completed in {epoch_time:.2f} seconds ---\n")

    def on_train_batch_begin(self, batch, logs=None):
        self.step_start_time = time.time()
        print(f"Step {batch + 1}/{self.params['steps']} - ", end="")

    def on_train_batch_end(self, batch, logs=None):
        step_time = time.time() - self.step_start_time
        print(f"Loss: {logs['loss']:.4f}, Accuracy: {logs['accuracy']:.4f}, Time: {step_time:.2f} seconds")
        
class BatchEarlyStopping(Callback):
    def __init__(self, monitor='loss', threshold=0.1, patience=5):
        """
        Early stopping within the same epoch based on a monitored metric.
        Args:
            monitor: Metric to monitor ('loss', 'accuracy', etc.).
            threshold: Threshold value for stopping (e.g., loss < 0.1).
            patience: Number of batches to wait for improvement before stopping.
        """
        super().__init__()
        self.monitor = monitor
        self.threshold = threshold
        self.patience = patience
        self.wait = 0

    def on_train_batch_end(self, batch, logs=None):
        current_value = logs.get(self.monitor)
        if current_value is not None:
            # Check if the monitored metric meets the threshold
            if current_value < self.threshold:
                self.wait += 1
                if self.wait >= self.patience:
                    print(f"\nEarly stopping triggered at batch {batch + 1}: {self.monitor} = {current_value:.4f}")
                    self.model.stop_training = True
            else:
                self.wait = 0  # Reset patience if condition is not met

# Instantiate the batch-level early stopping callback
batch_early_stopping_callback = BatchEarlyStopping(
    monitor='loss',      # Metric to monitor
    threshold=0.1,       # Stop if loss goes below this value
    patience=2           # Number of consecutive batches meeting the condition
)

# **Step 2: Learning Rate Scheduler**
def scheduler(epoch, lr):
    if epoch < 2:  # Keep the initial learning rate for the first 2 epochs
        return lr
    return lr * 0.9  # Decay the learning rate by 10% after every epoch

lr_callback = LearningRateScheduler(scheduler)

# **Step 3: Real-time Accuracy and Loss Callback**
def on_batch_end(batch, logs):
    print(f"Epoch: {logs.get('epoch', 0) + 1}, Batch: {batch + 1}, Loss: {logs['loss']:.4f}, Accuracy: {logs['accuracy']:.4f}")

realtime_metrics_callback = LambdaCallback(on_batch_end=on_batch_end)

# **Step 4: Split Dataset**
indices = np.arange(2041)
train_indices, val_indices = train_test_split(indices, test_size=0.2, random_state=42)

train_gen = DataGenerator(r'E:\PosePerfect\Dataset Creation\X_final_org.dat', r'E:\PosePerfect\Dataset Creation\y_final_org.dat', train_indices, batch_size=10)
val_gen = DataGenerator(r'E:\PosePerfect\Dataset Creation\X_final_org.dat', r'E:\PosePerfect\Dataset Creation\y_final_org.dat', val_indices, batch_size=10)

# **Step 5: Load Checkpoint and Resume Training**
# Path to the saved model
checkpoint_path = r'E:\PosePerfect\Model\Checkpoints\model_epoch_01_val_loss_6.53.keras'

# Load the model from the checkpoint
cnn_lstm_model = load_model(checkpoint_path)

# Verify model structure
cnn_lstm_model.summary()

# Set up a new checkpoint callback to save subsequent epochs
new_checkpoint_dir = './Checkpoints'
os.makedirs(new_checkpoint_dir, exist_ok=True)

new_checkpoint_path = os.path.join(new_checkpoint_dir, 'model_epoch_{epoch:02d}_val_loss_{val_loss:.2f}.keras')
new_checkpoint_callback = ModelCheckpoint(
    filepath=new_checkpoint_path,
    monitor='val_loss',
    save_best_only=False,
    save_weights_only=False,
    verbose=1
)

step_timer_callback = StepTimerCallback()


# **Step 6: Resume Training**
cnn_lstm_model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=10,  # Total number of epochs (not just resuming epochs)
    initial_epoch=1,  # Resume from epoch 1
    steps_per_epoch=len(train_gen),
    validation_steps=len(val_gen),
    callbacks=[new_checkpoint_callback, step_timer_callback, lr_callback, batch_early_stopping_callback],
    verbose=1
)


In [2]:
from tensorflow.keras.models import load_model

# Path to your saved model from epoch 1
checkpoint_path = r'E:\PosePerfect\Model\Checkpoints\model_epoch_01_val_loss_6.53.keras'

# Load the model
cnn_lstm_model = load_model(checkpoint_path)

# Verify the loaded model structure and weights
cnn_lstm_model.summary()


In [5]:
# **Step 1: Define data generator**
class DataGenerator(Sequence):
    def __init__(self, X_path, y_path, indices, batch_size):
        self.X = np.memmap(X_path, dtype='float32', mode='r', shape=(2041, 10, 224, 224, 3))
        self.y = np.memmap(y_path, dtype='int32', mode='r', shape=(2041,))
        self.indices = indices
        self.batch_size = batch_size

    def __len__(self):
        return len(self.indices) // self.batch_size

    def __getitem__(self, idx):
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]
        X_batch = self.X[batch_indices]
        y_batch = np.eye(2)[self.y[batch_indices]]  # One-hot encode
        return X_batch, y_batch
    
indices = np.arange(2041)
train_indices, val_indices = train_test_split(indices, test_size=0.2, random_state=42)

train_gen = DataGenerator(r'E:\PosePerfect\Dataset Creation\X_final_org.dat', r'E:\PosePerfect\Dataset Creation\y_final_org.dat', train_indices, batch_size=10)
val_gen = DataGenerator(r'E:\PosePerfect\Dataset Creation\X_final_org.dat', r'E:\PosePerfect\Dataset Creation\y_final_org.dat', val_indices, batch_size=10)

In [6]:
val_loss, val_accuracy = cnn_lstm_model.evaluate(val_gen)
print(f"Validation Loss: {val_loss}, Validation Accuracy: {val_accuracy}")

In [10]:
example_input = X_data = np.memmap(r'E:\PosePerfect\Model\Process Example\X_exam.dat', dtype='float32', mode='r', shape=(40, 10, 224, 224, 3))

predictions = cnn_lstm_model.predict(example_input)
predicted_class = np.argmax(predictions, axis=1)

print(f"Predicted class: {predicted_class}")