# Execution notebook
This notebook serves as a walk-through of the code to execute training of the ViT keypoint tracker using models from the TIMM library on huggingface. This script largely replicates the preprocessing blocks from the Project_execution.ipynb script written to work with the jeonsworld ViT code.


In [1]:
import os


In [2]:
#Prepare root directory

#Mac
root = "/Users/annastuckert/Documents/GitHub/ViT_facemap/ViT-timm"

#Windows
#root = r"C:\Users\avs20\Documents\GitHub\ViT_facemap\ViT-timm"


In [4]:
import timm

timm.list_models()

# Create project folder

In [5]:
import os
import ipywidgets as widgets
from IPython.display import display

# Define the root directory for projects
root = os.getcwd()  # Set to current working directory or customize it
projects_dir = os.path.join(root, "projects")

# Create the 'projects' folder if it doesn't exist
if not os.path.exists(projects_dir):
    os.makedirs(projects_dir)

# Function to create a project folder inside 'projects'
def create_project_folder(project_name):
    # Ensure a valid project name is provided
    if project_name.strip() == "":
        print("Please enter a valid project name.")
        return
    
    # Create the project folder path
    project_folder = os.path.join(projects_dir, project_name)
    
    # Check if the folder already exists
    if not os.path.exists(project_folder):
        os.makedirs(project_folder)
        print(f"Project folder '{project_folder}' created successfully.")
    else:
        print(f"Project folder '{project_folder}' already exists.")
    
    # Return the path to the project folder
    return project_folder

# Create widgets for project name input and button
project_name_input = widgets.Text(
    description="Project Name:",
    placeholder="Enter your project name",
)

# Display the input box and button
display(project_name_input)


Text(value='', description='Project Name:', placeholder='Enter your project name')

In [6]:
project_dict = create_project_folder(project_name_input.value)

Project folder '/Users/annastuckert/Documents/GitHub/ViT_facemap/ViT-timm/projects/Facemap' already exists.


## Data Preprocessing

### Test-train split (incl. dropping NAs)

In [7]:
# Example usage:
base_dir = os.path.join(root, "data", "facemap_multi_video")
output_dir = os.path.join(project_dict, "data")

from utils.train_test_split import split_and_organize_data

# Call the function to process and combine data from multiple folders
split_and_organize_data(base_dir, output_dir)

KeyboardInterrupt: 

## Data augmentation

Arguments:

- rotation = how much rotation (degrees) should be applied to the image
- img_height = input image height (consider changing this to automatically be derived from meta data files if expecting it not to be uniform)
- img_size = size in pixels (ViT expect 224)

## Define transformations to be applied, and input parameters to the arguments

In [None]:
#from utils.Dataaugmentation import Rotate, ZeroPadHeight, Rescale, HorizontalFlip, GaussianBlur
from torchvision import transforms, utils
import importlib
from utils.Dataaugmentation import Rotate, ZeroPadHeight, Rescale, HorizontalFlip, GaussianBlur

#from utils import Dataaugmentation
#importlib.reload(Dataaugmentation)

# Set the parameters for image augmentation
rotation = 10  # Degrees to rotate image
img_width = 846  # Width of the input image to pad the height to match
final_im_size = 224  # Final image size (224x224 pixels)

# Define transformations using transforms.Compose
rotate_rescale = transforms.Compose([
    Rotate(rotation),
    ZeroPadHeight(img_width),
    Rescale(final_im_size)
])

flip_rescale = transforms.Compose([
    HorizontalFlip(),
    ZeroPadHeight(img_width),
    Rescale(final_im_size)
])

pad_rescale = transforms.Compose([
    ZeroPadHeight(img_width),  # Use img_width instead of hardcoded value
    Rescale(final_im_size),
])

rotate_flip_rescale = transforms.Compose([
    HorizontalFlip(),
    Rotate(rotation),
    ZeroPadHeight(img_width),
    Rescale(final_im_size)
])

blur = transforms.Compose([
    GaussianBlur(),
    ZeroPadHeight(img_width),
    Rescale(final_im_size)
])

rescale = transforms.Compose([
    Rescale(final_im_size)
])

In [None]:
# Define a dictionary to associate names with each transformation
transforms_dict = {
    #'rotate_rescale': rotate_rescale,
    #'flip_rescale': flip_rescale,
    #'pad_rescale': pad_rescale,
    #'rotate_flip_rescale': rotate_flip_rescale,
    #'blur': blur,
    'rescale': rescale
}


# Import the AugmentedFaceDataset class
from utils.Dataaugmentation import AugmentedFaceDataset

In [None]:
#create augmented train set

# Define the paths
source_folder = os.path.join(project_dict, "data")

# Define paths using os.path.join for consistency
train_csv_file = os.path.join(source_folder, "train", "train_data.csv")
train_folder = os.path.join(source_folder, "train")
train_output_dir = os.path.join(source_folder, "train", "augmented_data")


# Initialize the dataset with defined transformations
face_dataset = AugmentedFaceDataset(csv_file=train_csv_file, root_dir=train_folder, output_dir=train_output_dir)

# Apply the transformations and save
face_dataset.apply_transforms_and_save(transforms_dict)

In [None]:
#create augmented test set

# Define the paths
source_folder = os.path.join(project_dict, "data")

# Define paths using os.path.join for consistency
test_csv_file = os.path.join(source_folder, "test", "test_data.csv")
test_folder = os.path.join(source_folder, "test")
test_output_dir = os.path.join(source_folder, "test", "augmented_data")


# Initialize the dataset with defined transformations
face_dataset = AugmentedFaceDataset(csv_file=test_csv_file, root_dir=test_folder, output_dir=test_output_dir)

# Apply the transformations and save
face_dataset.apply_transforms_and_save(transforms_dict)

In [None]:
# set paths to train and test augmented data
# Train
train_output_dir = os.path.join(source_folder, "train", "augmented_data")
train_csv = os.path.join(train_output_dir, "augmented_labels.csv")

# Test
test_output_dir = os.path.join(source_folder, "test", "augmented_data")
test_csv = os.path.join(test_output_dir, "augmented_labels.csv")

print(train_output_dir)


# Set up train function

In [None]:
import os
import torch
import pandas as pd
import numpy as np

def train_model(model, train_loader, test_loader, criterion, optimizer, num_epochs, device, save_dir, test_csv_file, patience=5):
    model.to(device)  # Move model to the appropriate device (CPU or GPU)
    
    # Initialize lists to store the losses for plotting later
    training_losses = []
    validation_losses = []
    
    # Initialize variables for early stopping
    best_val_loss = float('inf')  # Best validation loss starts at infinity
    epochs_no_improve = 0         # Counter for how many epochs since the last improvement
    early_stop = False            # Flag to indicate early stopping
    
    # Create save directory if it doesn't exist
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    loss_curve_path = os.path.join(save_dir, "lossCurve.csv")
    
    # If the CSV file already exists, we read the data in case the training was interrupted
    if os.path.exists(loss_curve_path):
        loss_data = pd.read_csv(loss_curve_path)
        training_losses = loss_data['training_loss'].tolist()
        validation_losses = loss_data['validation_loss'].tolist()

    for epoch in range(num_epochs):
        if early_stop:
            print("Early stopping triggered. Ending training.")
            break

        model.train()  # Set the model to training mode
        running_loss = 0.0

        # Get the total number of batches
        total_batches = len(train_loader)

        for step, (images, labels) in enumerate(train_loader):
            labels = labels.view(20, -1)  # TODO: Adjust batch size dynamically if needed
            images, labels = images.to(device), labels.to(device)  # Move to device
            
            optimizer.zero_grad()  # Clear previous gradients
            outputs = model(images)  # Forward pass
            loss = criterion(outputs, labels)  # Calculate loss
            loss.backward()  # Backward pass
            optimizer.step()  # Update weights

            running_loss += loss.item() * images.size(0)  # Accumulate loss

            # Print step progress
            if step % 10 == 0 or step == total_batches - 1:  # Print every 10 steps and the last step
                print(f"Epoch [{epoch + 1}/{num_epochs}], Step [{step + 1}/{total_batches}], Loss: {loss.item():.4f}")

        # Calculate average training loss for the epoch
        epoch_loss = running_loss / len(train_loader.dataset)
        training_losses.append(epoch_loss)  # Log training loss
        print(f"Epoch [{epoch + 1}/{num_epochs}], Average Training Loss: {epoch_loss:.4f}")

        # Validation step
        if test_loader is not None:
            model.eval()  # Set the model to evaluation mode
            val_loss = 0.0
            all_outputs = []  # List to hold outputs
            image_names = []  # List to hold image names
            
            with torch.no_grad():
                for step, (images, labels) in enumerate(test_loader):
                    labels = labels.view(images.size(0), -1)  # Dynamically reshape based on batch size
                    images, labels = images.to(device), labels.to(device)
                    
                    # Get the predictions from the model
                    outputs = model(images)
                    all_outputs.append(outputs.cpu().detach().numpy())  # Collect predictions

                    # Assuming test_csv_file contains the image names corresponding to the test set
                    if step == 0:  # Assuming the same image names for every step; typically you would read it outside the loop
                        image_names.extend(pd.read_csv(test_csv_file)["image_name"].tolist())

                    loss = criterion(outputs, labels)
                    val_loss += loss.item() * images.size(0)  # Accumulate loss
            
            val_loss /= len(test_loader.dataset)
            validation_losses.append(val_loss)  # Log validation loss
            print(f"Validation Loss: {val_loss:.4f}")

            # Save outputs to CSV after processing all batches
            all_outputs = np.vstack(all_outputs)  # Stack all outputs into a single array
            d_preds = pd.DataFrame(all_outputs)  # Convert outputs to DataFrame
            d_preds["image_names"] = image_names  # Add image names to DataFrame

            # Save DataFrame to CSV
            predictions_csv_path = os.path.join(save_dir, "predictions.csv")
            d_preds.to_csv(predictions_csv_path, index=False)
            print(f"Validation outputs saved to {predictions_csv_path}")

            # Early stopping logic
            if val_loss < best_val_loss:
                best_val_loss = val_loss  # Update the best validation loss
                epochs_no_improve = 0     # Reset the counter when improvement occurs
                # Save the model whenever the validation loss improves
                best_model_path = os.path.join(save_dir, "best_model.pth")
                torch.save(model.state_dict(), best_model_path)
                print(f"Validation loss improved. Model saved at {best_model_path}")
            else:
                epochs_no_improve += 1
                print(f"No improvement in validation loss for {epochs_no_improve} epochs.")
            
            # Check if we should stop early
            if epochs_no_improve >= patience:
                print(f"Early stopping triggered after {patience} epochs without improvement.")
                early_stop = True

        # Save the loss curve after each epoch
        loss_data = pd.DataFrame({
            'epoch': range(1, len(training_losses) + 1),
            'training_loss': training_losses,
            'validation_loss': validation_losses
        })
        loss_data.to_csv(loss_curve_path, index=False)
        print(f"Loss curves updated and saved at {loss_curve_path}")

        # Save the model after every 10 epochs
        if (epoch + 1) % 10 == 0:
            save_path = os.path.join(save_dir, f"model_epoch_{epoch + 1}.pth")
            torch.save(model.state_dict(), save_path)
            print(f"Model saved at: {save_path}")

    print("Training complete.")


# set up model specifications

In [None]:
import os
from timm import create_model
import torch.nn as nn
import torch.optim as optim
import torch
from data_utils_timm import get_loader

# Set your paths and parameters directly
current_dir = os.getcwd()
train_csv_file = os.path.join(current_dir,"projects/Facemap/data/train/augmented_data/augmented_labels.csv")
train_data_dir = os.path.join(current_dir, "projects/Facemap/data/train/augmented_data")
test_csv_file = os.path.join(current_dir,"projects/Facemap/data/test/augmented_data/augmented_labels.csv")
test_data_dir = os.path.join(current_dir, "projects/Facemap/data/test/augmented_data")
save_dir = os.path.join(current_dir, "projects/Facemap/data/output")
train_batch_size = 20
eval_batch_size = 20
num_epochs = 300  # Set your desired number of epochs

# Initialize the DataLoader
train_loader, test_loader = get_loader(train_csv_file, train_data_dir, test_csv_file, test_data_dir, train_batch_size, eval_batch_size)

# Create the model
#model = create_model('vit_base_r50_s16_224', pretrained=True)
model = create_model('vit_base_patch16_224', pretrained=True)
# Modify the last layer for regression
num_keypoints = 12
num_coordinates = num_keypoints * 2

# Check the current head of the model
print(model.head)

# Replace the head appropriately
if isinstance(model.head, nn.Identity):
    # Directly replace with a new linear layer
    model.head = nn.Linear(model.num_features, num_coordinates)  # Use num_features instead of in_features
else:
    # This handles the cases where the head is not Identity
    model.head = nn.Linear(model.head.in_features, num_coordinates)

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define the optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.MSELoss()

# Move the model to the appropriate device
model.to(device)



# Running Training



In [None]:
# Train the model
train_model(model, train_loader, test_loader, criterion, optimizer, num_epochs, device, save_dir=save_dir, test_csv_file=test_csv_file)

# Plot predictions

In [9]:
from timm import create_model
import torch.nn as nn
# Create the model
model = create_model('vit_base_patch16_224', pretrained=True)
import torch

# Modify the last layer for regression
#num_keypoints = 12
#num_coordinates = num_keypoints * 2
#model.head = nn.Linear(model.head.in_features, num_coordinates)

#hybrid model?
#model = create_model('vit_base_r50_s16_224', pretrained=True)

# Modify the last layer for regression
num_keypoints = 12
num_coordinates = num_keypoints * 2

# Check the current head of the model
print(model.head)

# Replace the head appropriately
if isinstance(model.head, nn.Identity):
    # Directly replace with a new linear layer
    model.head = nn.Linear(model.num_features, num_coordinates)  # Use num_features instead of in_features
else:
    # This handles the cases where the head is not Identity
    model.head = nn.Linear(model.head.in_features, num_coordinates)


# Load the trained model
model.load_state_dict(torch.load('model_epoch_100.pth'))

print(model)

# Check if GPU is available and move model to GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)  # Move model to the appropriate device
model.eval()  # Set to evaluation mode

from torchvision import transforms

# Define the transform (assuming this is required elsewhere)
transform = transforms.Compose([
    #transforms.Resize((224, 224)),  # Uncomment if resizing is needed
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

from pathlib import Path
from PIL import Image

# Define the data path in an OS-independent way
data_path = Path("projects") / "Facemap" / "data" / "test" / "augmented_data"

# Define the image name

#imageName = 'cam1_G7c1_1_img0703_pad_rescale_augmented.jpg'
#imageName = 'cam1_G7c1_1_img0703_rotate_rescale_augmented.jpg'
imageName = 'cam1_G7c1_1_img0703_flip_rescale_augmented.jpg'


# Open the image (using the / operator to concatenate the path and file name)
im = Image.open(data_path / imageName)

x = transform(im)

#print(x)[0]


import pandas as pd

labels = pd.read_csv(data_path /'augmented_labels.csv')
labelsKepoints = labels.loc[labels['image_name'] == imageName].values.flatten().tolist()[1:]

pred_path = Path("projects") / "Facemap" / "data" / "output"
preds = pd.read_csv(pred_path /'predictions.csv')
predsKeypoints = preds.loc[labels['image_name'] == imageName].values.flatten().tolist()[0:-1]

print(predsKeypoints)

import torch

# Assuming 'transform' and 'model' are defined somewhere in your code
# Check if a GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load and transform the image
x = transform(im).to(device)  # Move the transformed image to the appropriate device

# Move the model to the same device
model.to(device)

# Set model to evaluation mode
model.eval()

# Get model output
with torch.no_grad():  # Disable gradient calculation for inference
    out = model(x.unsqueeze(0))  # Ensure input is batched
    #attention_weights = model.get_attention_maps(x.unsqueeze(0))

#print(out.shape)
# Convert the model output to a format suitable for plotting
keypoints = out[0].detach().cpu().numpy()  # Detach and move to CPU if using a GPU

# Access the first row of keypoints since shape is (1, 24)
#keypoints = keypoints[0]

print(keypoints)

import matplotlib.pyplot as plt


plt.imshow(im)
# Loop through the keypoints and plot them
for i in range(0, len(keypoints), 2):
    x_coord = keypoints[i]
    y_coord = keypoints[i + 1]
    plt.scatter(x_coord, y_coord, s=10, c='blue', marker='x')  # Plot each keypoint
for i in range(0, 23, 2):
    plt.plot(labelsKepoints[i], labelsKepoints[i+1], 'ro')
    plt.plot(predsKeypoints[i], predsKeypoints[i+1], 'yo', markerfacecolor='none', markersize=10) #allows us to plot x+y coordinate of each key point (i+1) and loops over the 24 keypoints, skipping every second step since we plot both x+y 

plt.title("Image with Key Points")
plt.axis('off')  # Turn off axis labels
plt.show()
 

Linear(in_features=768, out_features=1000, bias=True)


FileNotFoundError: [Errno 2] No such file or directory: 'model_epoch_100.pth'

# plot loss curve

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path

# Path to the loss curve data
losscurve_path = Path("projects") / "Facemap" / "data" / "output"

# Load the loss data from CSV
d_lossCurve = pd.read_csv(losscurve_path / 'lossCurve.csv')

# Define colors for different metrics
colors = {'training_loss': 'blue', 'validation_loss': 'orange'}  
labels2 = {'validation_loss': 'Test Loss', 'training_loss': 'Training Loss'}  # Rename metrics

# Plot each metric
for metric, color in colors.items():
    steps = d_lossCurve['epoch']  # Extract the epoch column
    loss = d_lossCurve[metric]     # Extract the corresponding loss values

    # Plot the loss curve
    plt.plot(steps, loss, label=labels2[metric], color=color)

# Labels and title
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss Curve')
plt.legend()

# Set the y-axis to logarithmic scale for better visualization
plt.yscale('log')

# Show the plot
plt.show()
