In [None]:
import os
import random
import math
import json
import pickle
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from sklearn.model_selection import train_test_split

from tensorflow import keras
from keras import layers, models, callbacks
#from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, TensorBoard

from keras import mixed_precision
mixed_precision.set_global_policy('mixed_float16')

# 1. Set Seeds
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

# 2. Define Directories & Params
DATA_DIR = '/home/giannisstavrakis/Documents/Pure processed full'  
SEQUENCE_LENGTH = 30
IMAGE_HEIGHT = 224
IMAGE_WIDTH = 224
IMAGE_CHANNELS = 3
BATCH_SIZE = 32
SHUFFLE_BUFFER_SIZE = 1000

# 3. Split the dataset
all_folders = sorted([
    folder for folder in os.listdir(DATA_DIR)
    if os.path.isdir(os.path.join(DATA_DIR, folder))
])
print(f"Total folders found: {len(all_folders)}")

# Train/Val/Test split
train_folders, temp_folders = train_test_split(
    all_folders, test_size=0.3, random_state=SEED, shuffle=True
)
val_folders, test_folders = train_test_split(
    temp_folders, test_size=0.5, random_state=SEED, shuffle=True
)

print(f"Training folders: {len(train_folders)}")
print(f"Validation folders: {len(val_folders)}")
print(f"Test folders: {len(test_folders)}")

# 4. Utility function to build (seq_image_paths, label) pairs
def get_image_label_pairs(folder_list, sequence_length, data_dir):
    """
    Returns a list of (list_of_image_paths, label).
    Each `list_of_image_paths` has exactly SEQUENCE_LENGTH file paths,
    and `label` is the pulse rate of the last frame in the sequence.
    """
    pairs = []
    for folder_name in folder_list:
        folder_path = os.path.join(data_dir, folder_name)
        if not os.path.isdir(folder_path):
            continue

        json_path = os.path.join(folder_path, 'aligned_pulse_rate.json')
        if not os.path.exists(json_path):
            continue

        # Load JSON data
        with open(json_path, 'r') as f:
            try:
                pulse_data = json.load(f)
            except json.JSONDecodeError:
                continue

        # Filter and sort
        valid_data = [d for d in pulse_data if 'ImageFile' in d and 'PulseRate' in d]
        valid_data = sorted(valid_data, key=lambda x: x['ImageFile'])

        # Build all images & labels
        images = []
        labels = []
        for entry in valid_data:
            img_file = entry['ImageFile']
            label = entry['PulseRate']
            full_img_path = os.path.join(folder_path, img_file)
            if os.path.exists(full_img_path):
                images.append(full_img_path)
                labels.append(label)

        # Create sliding windows
        num_sequences = len(images) - sequence_length + 1
        if num_sequences <= 0:
            continue

        for i in range(num_sequences):
            seq_image_paths = images[i : i + sequence_length]
            seq_label = labels[i + sequence_length - 1]
            pairs.append((seq_image_paths, seq_label))

    return pairs

# 5. Generator function to yield pairs (needed for from_generator)
def pairs_generator(pairs_list):
    for seq_image_paths, label in pairs_list:
        yield (seq_image_paths, label)

# 6. parse_sequence function using tf.io & tf.image
def parse_sequence(seq_image_paths, seq_label):
    """
    seq_image_paths: string tensor of shape (sequence_length,)
    seq_label: scalar float
    Returns: (images, label)
        - images: float32 tensor [sequence_length, H, W, C]
        - label: float32 scalar
    """
    def _load_image(path):
        # Read file & decode
        img = tf.io.read_file(path)
        img = tf.image.decode_jpeg(img, channels=IMAGE_CHANNELS)
        # Resize
        #img = tf.image.resize(img, [IMAGE_HEIGHT, IMAGE_WIDTH])
        # Normalize
        img = tf.cast(img, tf.float32) #process the input for ResNet, from unit8 to float32
        img = keras.applications.resnet.preprocess_input(img)
        return img

    # Apply _load_image to each path in seq_image_paths
    images = tf.map_fn(_load_image, seq_image_paths, fn_output_signature=tf.float32)
    label = tf.cast(seq_label, tf.float32)
    return images, label

# 7. Build the final tf.data Datasets
def make_dataset(pairs_list, sequence_length, batch_size, shuffle_buffer_size, seed=42):
    """
    Build a tf.data.Dataset using from_generator -> parse_sequence -> batch
    """
    output_signature = (
        tf.TensorSpec(shape=(sequence_length,), dtype=tf.string),  # image paths
        tf.TensorSpec(shape=(), dtype=tf.float32),                 # label
    )

    ds = tf.data.Dataset.from_generator(
        lambda: pairs_generator(pairs_list),
        output_signature=output_signature
    )

    ds = ds.shuffle(buffer_size=shuffle_buffer_size, seed=seed)
    ds = ds.map(parse_sequence, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(batch_size)
    ds = ds.repeat()
    ds = ds.prefetch(tf.data.AUTOTUNE)
    return ds

# 8. Create train/val/test pairs + datasets
train_pairs = get_image_label_pairs(train_folders, SEQUENCE_LENGTH, DATA_DIR)
val_pairs = get_image_label_pairs(val_folders, SEQUENCE_LENGTH, DATA_DIR)
test_pairs = get_image_label_pairs(test_folders, SEQUENCE_LENGTH, DATA_DIR)

print(f"Number of train sequences: {len(train_pairs)}")
print(f"Number of val sequences:   {len(val_pairs)}")
print(f"Number of test sequences:  {len(test_pairs)}")

train_dataset = make_dataset(train_pairs, SEQUENCE_LENGTH, BATCH_SIZE, SHUFFLE_BUFFER_SIZE, seed=SEED)
val_dataset   = make_dataset(val_pairs, SEQUENCE_LENGTH, BATCH_SIZE, SHUFFLE_BUFFER_SIZE, seed=SEED)
test_dataset  = make_dataset(test_pairs, SEQUENCE_LENGTH, BATCH_SIZE, SHUFFLE_BUFFER_SIZE, seed=SEED)

steps_per_epoch = math.ceil(len(train_pairs) / BATCH_SIZE)
validation_steps = math.ceil(len(val_pairs) / BATCH_SIZE)
train_dataset = train_dataset.repeat(steps_per_epoch) 

# Optional: verify shapes
print("Verifying Training Dataset:")
for batch_x, batch_y in train_dataset.take(1):
    print("  Train Batch X:", batch_x.shape)  # (batch_size, SEQUENCE_LENGTH, H, W, C)
    print("  Train Batch Y:", batch_y.shape)  # (batch_size,)

print("Verifying Validation Dataset:")
for batch_x, batch_y in val_dataset.take(1):
    print("  Val Batch X:", batch_x.shape)  # (batch_size, SEQUENCE_LENGTH, H, W, C)
    print("  Val Batch Y:", batch_y.shape)  # (batch_size,)

print("Verifying Testing Dataset:")
for batch_x, batch_y in test_dataset.take(1):
    print("  Test Batch X:", batch_x.shape)  # (batch_size, SEQUENCE_LENGTH, H, W, C)
    print("  Test Batch Y:", batch_y.shape)  # (batch_size,)

In [None]:
import tensorflow as tf
import keras
from keras import layers, models


def spatial_encoder_block(input_shape, custom_weights_path='/home/giannisstavrakis/Downloads/vggface2_Keras/vggface2_Keras/model/resnet50_softmax_dim512/weights.h5'):
    # Load ResNet50 without top layers; weights not loaded by default.
    base_model = tf.keras.applications.ResNet50(include_top=False, input_shape=input_shape, weights=None)

    # If custom weights are provided, load them with by_name=True to resolve layer mismatches.
    if custom_weights_path is not None:
        base_model.load_weights(custom_weights_path, by_name=True)

    # Optionally, freeze the base model to preserve pretrained features.
    base_model.trainable = False

    # Extract an intermediate feature map that preserves spatial dimensions.
    # For a 224x224 input, "conv4_block6_out" typically gives a 14x14 feature map.
    feature_layer_name = "conv4_block6_out"
    feature_output = base_model.get_layer(feature_layer_name).output

    encoder_model = models.Model(inputs=base_model.input, outputs=feature_output, name="spatial_encoder")
    return encoder_model

def se_block(inputs, ratio=16):
  
    channel = inputs.shape[-1]
    se = layers.GlobalAveragePooling2D()(inputs)
    se = layers.Reshape((1, 1, channel))(se)
    se = layers.Dense(channel // ratio, activation='relu', kernel_initializer='he_normal')(se)
    se = layers.Dense(channel, activation='sigmoid', kernel_initializer='he_normal')(se)
    x = layers.multiply([inputs, se])
    return x

def build_hr_estimation_model(SEQUENCE_LENGTH, frame_shape):
   
    video_input = layers.Input(shape=(SEQUENCE_LENGTH, *frame_shape), name="video_input")

    # Apply the spatial encoder to each frame via TimeDistributed
    spatial_encoder = spatial_encoder_block(frame_shape)
    encoded_frames = layers.TimeDistributed(spatial_encoder, name="time_distributed_encoder")(video_input)

    # Stacked ConvLSTM layers for spatiotemporal feature extraction
    x = layers.ConvLSTM2D(filters=64, kernel_size=(3, 3), padding='same',
                          return_sequences=True, activation='tanh', name="convLSTM1")(encoded_frames)
    x = layers.BatchNormalization()(x)
    x = layers.ConvLSTM2D(filters=64, kernel_size=(3, 3), padding='same',
                          return_sequences=False, activation='tanh', name="convLSTM2")(x)
    x = layers.BatchNormalization()(x)

    # Apply an attention mechanism (SE block)
    x = se_block(x)

    # Global pooling and fully connected layers for regression
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dense(64, activation='relu')(x)
    hr_output = layers.Dense(1, activation='linear', name="hr_output")(x)

    model = models.Model(inputs=video_input, outputs=hr_output, name="HR_Estimation_Model")
    return model

# Example usage:
if __name__ == "__main__":
    # Define hyperparameters
    SEQUENCE_LENGTH = SEQUENCE_LENGTH   # Number of frames in each input sequence
    frame_shape = (224, 224, 3)  # e.g., 64x64 RGB images; adjust based on your dataset

    # Build and compile the model
    model = build_hr_estimation_model(SEQUENCE_LENGTH, frame_shape)
    base_optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
    optimizer = tf.keras.mixed_precision.LossScaleOptimizer(base_optimizer, dynamic=True)

    model.compile(optimizer=optimizer,
                  loss='mean_squared_error',
                  metrics=['mae'])

    # Print model summary to verify the architecture
    model.summary()


In [7]:
import numpy as np
import tensorflow as tf

class OneCycleScheduler(tf.keras.callbacks.Callback):
    def __init__(self, max_lr, total_steps, start_lr=None, end_lr=None, verbose=0):
        super(OneCycleScheduler, self).__init__()
        self.max_lr = max_lr
        self.total_steps = total_steps
        self.start_lr = start_lr if start_lr is not None else max_lr / 10.0
        self.end_lr = end_lr if end_lr is not None else self.start_lr / 100.0
        self.verbose = verbose
        self.iterations = 0

    def set_optimizer_lr(self, value):
        optimizer = self.model.optimizer
        # For LossScaleOptimizer, update the inner optimizer's learning rate.
        if hasattr(optimizer, '_optimizer'):
            target_lr = optimizer._optimizer.learning_rate
        else:
            target_lr = optimizer.learning_rate
        # Directly assign the new learning rate.
        target_lr.assign(value)

    def on_train_begin(self, logs=None):
        self.set_optimizer_lr(self.start_lr)
        if self.verbose > 0:
            print(f"Starting learning rate: {self.start_lr:.6f}")

    def on_batch_end(self, batch, logs=None):
        self.iterations += 1
        lr = self.compute_lr()
        self.set_optimizer_lr(lr)
        if self.verbose > 0:
            print(f"\nIteration {self.iterations}: Learning rate is {lr:.6f}")

    def compute_lr(self):
        progress = self.iterations / self.total_steps
        if progress < 0.5:
            # Linear warm-up from start_lr to max_lr.
            lr = self.start_lr + (self.max_lr - self.start_lr) * (progress / 0.5)
        else:
            # Cosine decay from max_lr to end_lr.
            cosine_progress = (progress - 0.5) / 0.5
            lr = self.end_lr + (self.max_lr - self.end_lr) * 0.5 * (1 + np.cos(np.pi * cosine_progress))
        return lr



# Variables
epochs = 40
total_steps = steps_per_epoch * epochs

# Define your learning rate parameters
start_lr = 0.0001   # starting learning rate
max_lr = 0.001     # maximum learning rate during warm-up
end_lr = 0.00001    # final learning rate at the end of training

one_cycle = OneCycleScheduler(max_lr=max_lr, total_steps=total_steps, start_lr=start_lr, end_lr=end_lr, verbose=1)




In [8]:
# ------------------------------------------------------------------------------
# 8. Define Callbacks
# ------------------------------------------------------------------------------
checkpoint_dir = 'model_checkpoints'
os.makedirs(checkpoint_dir, exist_ok=True)

early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True, mode='min')

model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath=os.path.join(checkpoint_dir, '03_04/best_cnn_lstm_model.weights.h5'),
    monitor='val_loss',
    save_best_only=True,
    verbose=1,
    save_weights_only=True,
    mode='min'
)

reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=3,
    verbose=1,
    min_lr=1e-7,
    mode='min'
)

tensorboard = tf.keras.callbacks.TensorBoard(log_dir='logs', histogram_freq=1, write_graph=True, write_images=True)


In [None]:


# ------------------------------------------------------------------------------
# 10. Train the Model
# ------------------------------------------------------------------------------
history = model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=epochs,
    steps_per_epoch=steps_per_epoch,
    validation_steps=validation_steps,
    callbacks=[early_stopping, model_checkpoint,tensorboard,one_cycle]
)

In [None]:
# ------------------------------------------------------------------------------
# 11. Plot the Results
# ------------------------------------------------------------------------------
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Val Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss (MSE)')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['mae'], label='Train MAE')
plt.plot(history.history['val_mae'], label='Val MAE')
plt.title('Model MAE')
plt.xlabel('Epoch')
plt.ylabel('MAE')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
import os
import shutil
from datetime import datetime

base_path = "weights"

# Generate a folder name using the current date and time
folder_name = datetime.now().strftime("%d%m%Y_%H%M")
# Optionally, add a prefix or suffix
folder_name = f"folder_{folder_name}"

# Create the full path by joining the base path with the folder name
full_path = os.path.join(base_path, folder_name)

# Create the new folder; exist_ok=True avoids errors if the folder already exists
os.makedirs(full_path, exist_ok=True)

source_path = "model_checkpoints/03_04/best_cnn_lstm_model.weights.h5"

shutil.copy(source_path, full_path)


In [14]:
model.save("03_04/model.keras")

In [None]:
import os
import shutil
from datetime import datetime

base_path = "weights"

# Generate a folder name using the current date and time
folder_name = datetime.now().strftime("%d%m%Y_%H%M")
# Optionally, add a prefix or suffix
folder_name = f"folder_{folder_name}"

# Create the full path by joining the base path with the folder name
full_path = os.path.join(base_path, folder_name)

# Create the new folder; exist_ok=True avoids errors if the folder already exists
os.makedirs(full_path, exist_ok=True)

source_path = "03_04/model.keras"

shutil.copy(source_path, full_path)

In [None]:
model.save_weights('/content/drive/MyDrive/5thYear/weights/folder_14022025_1936/model_weights.weights.h5')


In [None]:
import numpy as np

# 1. (Optional) Load the best weights from your checkpoint
best_weights_path = "model_checkpoints/03_04/best_cnn_lstm_model.weights.h5"
model.load_weights(best_weights_path)

# 2. Evaluate on the test dataset
    #This uses the same metrics you specified in model.compile(...)
test_loss, test_mae = model.evaluate(test_dataset)
print(f"Test Loss (e.g. MSE): {test_loss:.4f}")
print(f"Test MAE: {test_mae:.4f}")

# 3. (Optional) Get actual predictions and compare to ground truth
#    - We'll collect predictions for every batch
predictions = model.predict(test_dataset)  # shape (total_samples, 1) for Dense(1)

#    - Also collect labels from the test dataset
all_labels = []
for _, y in test_dataset:
    all_labels.append(y.numpy())

all_labels = np.concatenate(all_labels, axis=0)   # shape (total_samples,)
predictions = predictions.squeeze(axis=-1)        # shape (total_samples,)

# 4. Compute a manual MSE or any other metric you like
manual_mse = np.mean((predictions - all_labels)**2)
print(f"Manual MSE on Test Set: {manual_mse:.4f}")

# 5. (Optional) Inspect a few predictions vs. ground truths
for i in range(5):
    print(f"Sample {i}: Prediction={predictions[i]:.3f}, Ground Truth={all_labels[i]:.3f}")
