In [None]:
#CycleGAN Implementation
#Running with Nvidia Cuda GPU each 200 epoch training takes ~45min
import torch
import torch.nn as nn
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import os
import random
import matplotlib.pyplot as plt

#check if can use GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

#randomizing input image transforms
class SpiralDataset(Dataset):
    def __init__(self, folder, img_size=(256, 256)):
        self.paths = [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith('.jpg')]
        resize_size = random.randint(286, 300)
        self.transform = transforms.Compose([
            transforms.Resize((resize_size, resize_size)),
            transforms.RandomCrop(img_size),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5]*3, std=[0.5]*3)
        ])

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        img = Image.open(self.paths[idx]).convert("RGB")
        return self.transform(img)

class ResnetBlock(nn.Module):
    def __init__(self, dim):
        super(ResnetBlock, self).__init__()
        self.block = nn.Sequential(
            nn.ReflectionPad2d(1),
            nn.Conv2d(dim, dim, kernel_size=3, padding=0),
            nn.InstanceNorm2d(dim),
            nn.ReLU(True),

            nn.ReflectionPad2d(1),
            nn.Conv2d(dim, dim, kernel_size=3, padding=0),
            nn.InstanceNorm2d(dim)
        )

    def forward(self, x):
        return x + self.block(x)  # Skip connection

#Generator block for generation of new images
class ResnetGenerator(nn.Module):
    def __init__(self, input_nc=3, output_nc=3, ngf=64, n_blocks=6):
        super(ResnetGenerator, self).__init__()
        model = []

        # Initial conv
        model += [
            nn.ReflectionPad2d(3),
            nn.Conv2d(input_nc, ngf, kernel_size=7, padding=0),
            nn.InstanceNorm2d(ngf),
            nn.ReLU(True)
        ]

        # Downsampling
        in_features = ngf
        out_features = ngf * 2
        for _ in range(2):  # Two downsamples
            model += [
                nn.Conv2d(in_features, out_features, kernel_size=3, stride=2, padding=1),
                nn.InstanceNorm2d(out_features),
                nn.ReLU(True)
            ]
            in_features = out_features
            out_features = in_features * 2

        # Residual blocks
        for _ in range(n_blocks):
            model += [ResnetBlock(in_features)]

        # Upsampling
        out_features = in_features // 2
        for _ in range(2):
            model += [
                nn.ConvTranspose2d(in_features, out_features, kernel_size=3, stride=2, padding=1, output_padding=1),
                nn.InstanceNorm2d(out_features),
                nn.ReLU(True)
            ]
            in_features = out_features
            out_features = in_features // 2

        # Output layer
        model += [
            nn.ReflectionPad2d(3),
            nn.Conv2d(ngf, output_nc, kernel_size=7),
            nn.Tanh()
        ]

        self.model = nn.Sequential(*model)

    def forward(self, x):
        return self.model(x)


#discriminator block for determining domain of images
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3, 64, 4, 2, 1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, 128, 4, 2, 1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(128, 256, 4, 2, 1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(256, 1, 4, 1, 1)
        )

    def forward(self, x):
        return self.model(x)

bce_loss = nn.BCEWithLogitsLoss()
l1_loss = nn.L1Loss()

#training step of CycleGAN model
def train_step(real_A, real_B, G_AB, G_BA, D_A, D_B, opt_G, opt_D_A, opt_D_B, lambda_cycle=10, lambda_identity=0.5):
    # Move data to device
    real_A, real_B = real_A.to(device, non_blocking=True), real_B.to(device, non_blocking=True)

    # === Generators ===
    opt_G.zero_grad()

    fake_B = G_AB(real_A)
    fake_A = G_BA(real_B)
    rec_A = G_BA(fake_B)
    rec_B = G_AB(fake_A)

    same_A = G_BA(real_A)
    same_B = G_AB(real_B)

    # Adversarial loss
    loss_G_AB = bce_loss(D_B(fake_B), torch.ones_like(D_B(fake_B)))
    loss_G_BA = bce_loss(D_A(fake_A), torch.ones_like(D_A(fake_A)))

    # Cycle loss
    loss_cycle_A = l1_loss(rec_A, real_A)
    loss_cycle_B = l1_loss(rec_B, real_B)

    # Identity loss
    loss_identity_A = l1_loss(same_A, real_A)
    loss_identity_B = l1_loss(same_B, real_B)

    # Total generator loss
    total_G = loss_G_AB + loss_G_BA + lambda_cycle * (loss_cycle_A + loss_cycle_B) + lambda_identity * (loss_identity_A + loss_identity_B)
    total_G.backward()
    opt_G.step()

    # === Discriminator A ===
    opt_D_A.zero_grad()
    loss_D_A = bce_loss(D_A(real_A), torch.ones_like(D_A(real_A))) + bce_loss(D_A(fake_A.detach()), torch.zeros_like(D_A(fake_A)))
    loss_D_A.backward()
    opt_D_A.step()

    # === Discriminator B ===
    opt_D_B.zero_grad()
    loss_D_B = bce_loss(D_B(real_B), torch.ones_like(D_B(real_B))) + bce_loss(D_B(fake_B.detach()), torch.zeros_like(D_B(fake_B)))
    loss_D_B.backward()
    opt_D_B.step()

    return total_G.item(), loss_D_A.item(), loss_D_B.item()

#training of CycleGAN
def train(domain_A,domain_B):
    dataset_A = DataLoader(SpiralDataset(domain_A), batch_size=1, shuffle=True, pin_memory=True)
    dataset_B = DataLoader(SpiralDataset(domain_B), batch_size=1, shuffle=True, pin_memory=True)

    G_AB = ResnetGenerator().to(device)
    G_BA = ResnetGenerator().to(device)
    D_A = Discriminator().to(device)
    D_B = Discriminator().to(device)

    opt_G = torch.optim.Adam(list(G_AB.parameters()) + list(G_BA.parameters()), lr=0.0002, betas=(0.5, 0.999))
    opt_D_A = torch.optim.Adam(D_A.parameters(), lr=0.0002, betas=(0.5, 0.999))
    opt_D_B = torch.optim.Adam(D_B.parameters(), lr=0.0002, betas=(0.5, 0.999))

    for epoch in range(200):

        for real_A, real_B in zip(dataset_A, dataset_B):
            g_loss, dA_loss, dB_loss = train_step(real_A, real_B, G_AB, G_BA, D_A, D_B, opt_G, opt_D_A, opt_D_B)
        print(f"Epoch {epoch} - Gen Loss: {g_loss:.4f} | Disc A: {dA_loss:.4f} | Disc B: {dB_loss:.4f}")
        if epoch % 10 == 0:
            G_AB.eval()
            with torch.no_grad():
                sample_input = real_A.to(device)
                fake_B = G_AB(sample_input)
                # Denormalize real A (input)
                real_A_img = sample_input.squeeze(0).cpu() * 0.5 + 0.5
                real_A_img = real_A_img.permute(1, 2, 0).clamp(0, 1).numpy()
                fake_B = fake_B.squeeze(0).cpu() * 0.5 + 0.5  # Denormalize
                fake_B = fake_B.permute(1, 2, 0).clamp(0, 1).numpy()

            plt.figure(figsize=(10, 5))

            plt.subplot(1, 2, 1)
            plt.imshow(real_A_img)
            plt.title(f"Epoch {epoch}: Real A (Input)")
            plt.axis("off")

            plt.subplot(1, 2, 2)
            plt.imshow(fake_B)
            plt.title(f"Epoch {epoch}: Fake B (Output)")
            plt.axis("off")

            plt.tight_layout()
            plt.show()
            G_AB.train()
    return G_AB, G_BA

#cleaned control 0305 deleted patient 0009 0153 0202 0219 0261
#cleaned meander control 0305 deleted patient 0023 0024 0153 0202

G_Spiral_Control_Patient, G_Spiral_Control_Patient_Back = train("FDM Resources/SpiralControl - Cleaned","FDM Resources/SpiralPatients - Cleaned")
#G_Spiral_Patient_Control, G_Spiral_Patient_Control_Back = train("FDM Resources/SpiralPatients - Cleaned","FDM Resources/SpiralControl - Cleaned")
#G_Meander_Control_Patient, G_Meander_Control_Patient_Back = train("FDM Resources/MeanderControl","FDM Resources/MeanderPatients")
#G_Meander_Patient_Control, G_Meander_Patient_Control_Back = train("FDM Resources/SpiralPatients","FDM Resources/MeanderControl")

In [None]:
#Using generators created in CycleGAN block to create images
from torchvision import transforms
from PIL import Image
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define the same preprocessing transform used during training
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

# Input and output folders
input_folder = "FDM Resources/SpiralControl"
output_folder = "generated_patient"
os.makedirs(output_folder, exist_ok=True)

# Loop over all image files in the input folder
for filename in os.listdir(input_folder):
    if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
        img_path = os.path.join(input_folder, filename)
        image = Image.open(img_path).convert("RGB")

        input_tensor = transform(image).unsqueeze(0).to(device)

        with torch.no_grad():
            fake = G_Spiral_Control_Patient(input_tensor)

        output_img = fake.squeeze(0).cpu()
        output_img = (output_img * 0.5) + 0.5  # Denormalize

        # Save output
        output_pil = transforms.ToPILImage()(output_img)
        output_file = os.path.join(output_folder, f"generated_patient_{filename}")
        output_pil.save(output_file)

        print(f"Saved: {output_file}")


In [None]:
#Handcrafted Features
import os
import cv2
import numpy as np
import pandas as pd

# Set your folders here
spiral_control_dir = '/content/drive/MyDrive/Financial Data Mining Project/FDM Resources/SpiralControl'
# 'spiralcontrol'
spiral_patients_dir = '/content/drive/MyDrive/Financial Data Mining Project/FDM Resources/SpiralPatients'
#'spiralpd'

sprial_gen_control = "generated_control"
spiral_gen_patient = "generated_patient"
meander_gen_control = "generated_meander_control"
meander_gen_patient = "generated_meander_patient"

# Functions
def calculate_radial_deviation(image):
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    blur = cv2.GaussianBlur(gray, (5, 5), 0)
    _, thresh = cv2.threshold(blur, 50, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if contours:
        cnt = max(contours, key=cv2.contourArea)
        M = cv2.moments(cnt)
        if M['m00'] == 0:
            return 0
        cx = int(M['m10'] / M['m00'])
        cy = int(M['m01'] / M['m00'])
        deviations = [np.sqrt((pt[0][0] - cx)**2 + (pt[0][1] - cy)**2) for pt in cnt]
        return np.std(deviations)
    return 0

def calculate_stroke_length(image):
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    edges = cv2.Canny(gray, 50, 150)
    contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
    length = sum(cv2.arcLength(cnt, False) for cnt in contours)
    return length

def calculate_curvature(image):
  def curvature(x, y): #calculating curvature as mag of r' - r'' / mag r' ^3
    dx = np.gradient(x)
    dy = np.gradient(y)
    ddx = np.gradient(dx)
    ddy = np.gradient(dy)
    curv = (dx * ddy - dy * ddx) / ((dx**2 + dy**2)+1e-10)**(3/2) #+ 1e-10 because getting NaN for really small values
    return curv
  gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
  blur = cv2.GaussianBlur(gray,(5,5),0)
  _,thresh = cv2.threshold(blur, 0, 255, cv2.THRESH_BINARY_INV + cv2. THRESH_OTSU)
  contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
  cnt = max(contours, key=cv2.contourArea)
  contour = cnt.squeeze() #extracts coords of contour points and remove uneccesary dimensions
  x = contour[:, 0] #extracts x coords
  y = contour[:, 1] #extract y coords
  curvature = curvature(x,y) # calculate curvature
  curv_variance = np.var(curvature) #find variance
  return curv_variance

def stroke_density(image):
  gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
  blur = cv2.GaussianBlur(gray,(5,5),0)
  _,thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2. THRESH_OTSU)
  #Stroke Density
  #caluclate amount of white pixels in image after binary
  white_pixels = np.sum(thresh == 255)
  #calculate how many pixels
  total_pixels = thresh.size
  #density of the image can be viewed as how many white pixels are left after thresholding
  stroke_density = white_pixels / total_pixels
  return stroke_density

# Analyze images
results = []

def process_directory(folder, label):
    for filename in os.listdir(folder):
        if filename.lower().endswith(('.jpg', '.png', '.jpeg')):
            path = os.path.join(folder, filename)
            img = cv2.imread(path)
            if img is not None:
                rd = calculate_radial_deviation(img)
                sl = calculate_stroke_length(img)
                c = calculate_curvature(img)
                sd = stroke_density(img)
                results.append({
                    'Filename': filename,
                    'Group': label,
                    'Radial Deviation': rd,
                    'Stroke Length': sl,
                    'Curvature': c,
                    'Stroke Density': sd
                })

control_spiral = "FDM Resources/Images - wCycleGAN/Control"
patient_spiral = "FDM Resources/Images - wCycleGAN/Patient"

# Run analysis
# process_directory(spiral_control_dir, 'Control')
# process_directory(spiral_patients_dir, 'Patient')
process_directory(control_spiral, 'Control')
process_directory(patient_spiral, 'Patient')
# process_directory(meander_gen_control, 'Control')
# process_directory(meander_gen_patient, 'Patient')



# Export or view results
df = pd.DataFrame(results)
print(df)

# Save to Excel or CSV
df.to_csv("spiral_gen_analysis_results.csv", index=False)
#boxplot
import seaborn as sns
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 5))
sns.boxplot(data=df, x='Group', y='Radial Deviation')
plt.title("Radial Deviation by Group")
plt.show()

plt.figure(figsize=(10, 5))
sns.boxplot(data=df, x='Group', y='Stroke Length')
plt.title("Stroke Length by Group")
plt.show()

plt.figure(figsize=(10, 5))
sns.boxplot(data=df, x='Group', y='Curvature')
plt.title("Curvature by Group")
plt.show()

plt.figure(figsize=(10, 5))
sns.boxplot(data=df, x='Group', y='Stroke Density')
plt.title("Stroke Density by Group")
plt.show()


#pca visualization
from sklearn.decomposition import PCA

features = df[['Radial Deviation', 'Stroke Length', "Curvature", "Stroke Density"]]
pca = PCA(n_components=4)
pca_result = pca.fit_transform(features)

df['PCA-RD'] = pca_result[:, 0] #rd
df['PCA-SL'] = pca_result[:, 1] #sl
df['PCA-C'] = pca_result[:, 2] #c
df['PCA-SD'] = pca_result[:, 3] #sd

plt.figure(figsize=(8,6))
sns.scatterplot(data=df, x='PCA-RD', y='PCA-SL', hue='Group', style='Group', s=100)
plt.title('PCA Visualization of Handwriting Features (RD & SL)')
plt.grid(True)
plt.show()

plt.figure(figsize=(8,6))
sns.scatterplot(data=df, x='PCA-C', y='PCA-SL', hue='Group', style='Group', s=100)
plt.title('PCA Visualization of Handwriting Features (C & SL)')
plt.grid(True)
plt.show()

plt.figure(figsize=(8,6))
sns.scatterplot(data=df, x='PCA-SD', y='PCA-C', hue='Group', style='Group', s=100)
plt.title('PCA Visualization of Handwriting Features (SD & C)')
plt.grid(True)
plt.show()

from scipy.stats import ttest_ind

control_rd = df[df['Group'] == 'Control']['Radial Deviation']
patient_rd = df[df['Group'] == 'Patient']['Radial Deviation']
rd_tstat, rd_pval = ttest_ind(control_rd, patient_rd)

control_sl = df[df['Group'] == 'Control']['Stroke Length']
patient_sl = df[df['Group'] == 'Patient']['Stroke Length']
sl_tstat, sl_pval = ttest_ind(control_sl, patient_sl)

control_c = df[df['Group'] == 'Control']['Curvature']
patient_c = df[df['Group'] == 'Patient']['Curvature']
c_tstat, c_pval = ttest_ind(control_c, patient_c)

control_sd = df[df['Group'] == 'Control']['Stroke Density']
patient_sd = df[df['Group'] == 'Patient']['Stroke Density']
sd_tstat, sd_pval = ttest_ind(control_sd, patient_sd)

print(f"Radial Deviation t-test p-value: {rd_pval:.4f}")
print(f"Stroke Length t-test p-value: {sl_pval:.4f}")
print(f"Curvature t-test p-value: {c_pval:.4f}")
print(f"Stroke Density t-test p-value: {sd_pval:.4f}")

In [None]:
#Handcrafted features modeling
#model building 3 diff models for comparison, Logistic Regression, Decision Tree, K-nearest-neigbors
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

# X = handcrafted features, y = labels
X = df[['Radial Deviation', 'Stroke Length', 'Curvature', 'Stroke Density']].values
y = df['Group'].map({'Control': 0, 'Patient': 1}).values

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

models = {
    "Logistic Regression": LogisticRegression(max_iter=1000),
    "Decision Tree": DecisionTreeClassifier(),
    "KNN": KNeighborsClassifier(n_neighbors=5)
}

for name, model in models.items():
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    print(f"\n=== {name} ===")
    print("Accuracy:", accuracy_score(y_test, y_pred))
    print("Classification Report:\n", classification_report(y_test, y_pred))
    print("Confusion Matrix:\n", confusion_matrix(y_test, y_pred))

#confusion matrix for final comparison
ConfusionMatrixDisplay.from_estimator(model, X_test, y_test, display_labels=["Control", "Patient"], cmap='Blues')
plt.title(f"Confusion Matrix - {name}")
plt.show()

In [None]:
#Vision Transformer (ViT)
#Using Pre-Trained Model from Imagenet-21k, Transfer Learning
import torch
import torchvision
from torchvision import datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, precision_score, recall_score, f1_score, accuracy_score
from PIL import Image
import shutil
import os
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
from transformers import ViTForImageClassification, ViTFeatureExtractor
import numpy as np

# Define whether to use GPU or CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

batch_size = 16
num_epochs = 10
learning_rate = 1e-4
patch_size = 4

root_dir = 'C:/Users/Thomas/coding/FDMProj/FDM Resources/Images - wCycleGAN'

# Loading a pre-trained ViT model and feature extractor
model_name = "google/vit-base-patch16-224-in21k"  # ViT model name from VIT paper
model = ViTForImageClassification.from_pretrained(model_name, num_labels=len(os.listdir(root_dir)))
feature_extractor = ViTFeatureExtractor.from_pretrained(model_name)

# Move the model to GPU if available
model.to(device)

# Transform to match ViT input size and normalization
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # ViT expects 224x224 input images
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Pre-trained ViT normalization
])

#setting up the image folders for data loading
class CustomImageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir  # Path to the root directory containing categories
        self.transform = transform
        self.image_paths = []  # List to store paths of all images
        self.labels = []  # List to store corresponding labels

        # Get all the subdirectories (i.e., categories) in the root directory
        self.class_names = os.listdir(root_dir)  # This gives you folder names (labels)

        # Create a mapping from class name to class index (label)
        self.class_to_idx = {class_name: idx for idx, class_name in enumerate(self.class_names)}

        # Loop through each category folder
        for class_name in self.class_names:
            category_folder = os.path.join(root_dir, class_name)

            # Ensure it's a folder (not a file)
            if os.path.isdir(category_folder):
                # Loop through each image in the folder
                for img_name in os.listdir(category_folder):
                    img_path = os.path.join(category_folder, img_name)

                    # Only consider image files
                    if img_path.endswith(('.jpg')):
                        self.image_paths.append(img_path)
                        self.labels.append(self.class_to_idx[class_name])  # Assign label based on folder name

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # Get the image and label based on the index
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(img_path)

        # Apply any transformations (like ToTensor, normalization, etc.)
        if self.transform:
            image = self.transform(image)

        return image, label

dataset = CustomImageDataset(root_dir=root_dir, transform=transform)

train_paths, test_paths = train_test_split(dataset.image_paths, test_size=0.2, random_state=15)
train_paths, val_paths = train_test_split(train_paths, test_size=0.25, random_state=15)  # 0.25 * 0.8 = 0.2 validation

# Function to copy files into the appropriate directories
def copy_images_to_dir(image_paths, target_dir):
    for path in image_paths:
        class_name = path.split('\\')[-2]
        target_class_dir = os.path.join(target_dir, class_name)
        if not os.path.exists(target_class_dir):
            os.makedirs(target_class_dir)
            #print(f"Created directory: {target_class_dir}")
        #print(f"Copying image: {path} to {target_class_dir}")
        shutil.copy(path, target_class_dir)

train_dir = 'C:/Users/Thomas/coding/FDMProj/FDM Resources/train'
val_dir = 'C:/Users/Thomas/coding/FDMProj/FDM Resources/val'
test_dir = 'C:/Users/Thomas/coding/FDMProj/FDM Resources/test'

#FOR SEPARATION OF TRAINING,TEST,VALIDATION SETS INTO FILES

copy_images_to_dir(train_paths, train_dir)
copy_images_to_dir(val_paths, val_dir)
copy_images_to_dir(test_paths, test_dir)

train_dataset = datasets.ImageFolder(root=train_dir, transform=transform)
val_dataset = datasets.ImageFolder(root=val_dir, transform=transform)
test_dataset = datasets.ImageFolder(root=test_dir, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dataset, shuffle=True, batch_size=batch_size)
valid_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size)
test_loader  = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)

#viewing labels
# print(train_dataset.class_to_idx)

# for images, labels in train_loader:
#   print(images.shape)
#   print(labels)

optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
criterion = torch.nn.CrossEntropyLoss()
num_epochs = 50



# Model evaluation function
def evaluate_model(model, dataloader):
    model.eval()  # Set model to evaluation mode
    correct_predictions = 0
    total_predictions = 0
    true_labels = []
    predicted_values = []
    with torch.no_grad():
        for images, labels in dataloader:
            # Move images and labels to GPU if available
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.logits, 1)
            total_predictions += labels.size(0)
            correct_predictions += (predicted == labels).sum().item()
            true_labels.extend(predicted.cpu().numpy())
            predicted_values.extend(labels.cpu().numpy())
    accuracy = correct_predictions / total_predictions
    return accuracy, true_labels, predicted_values

# Early stopping parameters
patience = 5  # Number of epochs with no improvement after which training will stop
best_val_accuracy = 0.0
epochs_without_improvement = 0

training_loss_logger = []
validation_acc_logger = []
training_acc_logger = []

valid_acc = 0
train_acc = 0
early_epoch = 0

# Training loop
for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    for images, labels in train_loader:
        # Move images and labels to GPU if available
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()  # Clear gradients
        outputs = model(images)  # Forward pass
        loss = criterion(outputs.logits, labels)  # Calculate loss
        loss.backward()  # Backpropagation
        training_loss_logger.append(loss.item())
        optimizer.step()  # Optimizer step

    # Evaluate after each epoch
    val_accuracy, _, _ = evaluate_model(model, valid_loader)
    training_accuracy,_,_ = evaluate_model(model, train_loader)

    validation_acc_logger.append(val_accuracy)
    training_acc_logger.append(training_accuracy)

    # Early stopping based on validation accuracy
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        epochs_without_improvement = 0
    else:
        epochs_without_improvement += 1

    if epochs_without_improvement >= patience:
        early_epoch = epoch
        print(f"Early stopping on epoch {epoch + 1}")
        break
    print(f"Epoch {epoch + 1}/{num_epochs}, Validation Accuracy: {val_accuracy * 100:.2f}%, Training Accuracy: {training_accuracy * 100:.2f}%")

#plotting training loss
plt.figure(figsize = (10,5))
train_x = np.linspace(0, early_epoch, len(training_loss_logger))
plt.plot(train_x, training_loss_logger)
_ = plt.title("ViT Training Loss")

#plotting training and validation accuracy
plt.figure(figsize = (10,5))
train_x = np.linspace(0, early_epoch, len(training_acc_logger))
plt.plot(train_x, training_acc_logger, c = "y")
valid_x = np.linspace(0, early_epoch, len(validation_acc_logger))
plt.plot(valid_x, validation_acc_logger, c = "k")

plt.title("ViT")
_ = plt.legend(["Training accuracy", "Validation accuracy"])

# Test the model on the test set
test_accuracy, test_labels, test_predicted = evaluate_model(model, test_loader)
print(test_labels)
print(test_predicted)
print(f"Test Accuracy: {test_accuracy * 100:.2f}%")

#find model scores
accuracy = accuracy_score(test_labels, test_predicted)
print(f"Test Accuracy: {accuracy}%")
precision = precision_score(test_labels, test_predicted, average='weighted')
print(f"Test Precision: {precision}%")
recall = recall_score(test_labels, test_predicted, average='weighted')
print(f"Test Recall: {recall}%")
f1 = f1_score(test_labels, test_predicted, average='weighted')
print(f"Test F1 Score: {f1}%")

#plot confusion matrix
cm = confusion_matrix(test_labels, test_predicted)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.show()

In [None]:
#Re-Training Existing ViT Model with HandPD data
import torch
import torchvision
from torchvision import datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, precision_score, recall_score, f1_score, accuracy_score
from PIL import Image
import shutil
import os
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
from transformers import ViTForImageClassification, ViTFeatureExtractor
import numpy as np

# Define whether to use GPU or CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

batch_size = 16
num_epochs = 10
learning_rate = 1e-4
patch_size = 4

root_dir = 'C:/Users/Thomas/coding/FDMProj/FDM Resources/Images - wCycleGAN'

# Loading a pre-trained ViT model and feature extractor
model_name = "google/vit-base-patch16-224-in21k"  # ViT model name
model = ViTForImageClassification.from_pretrained(model_name, num_labels=len(os.listdir(root_dir)))
#, hidden_dropout_prob=0.5
feature_extractor = ViTFeatureExtractor.from_pretrained(model_name)

# Move the model to GPU if available
model.to(device)

# Transform to match ViT input size and normalization
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # ViT expects 224x224 input images
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Pre-trained ViT normalization
])

class CustomImageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir  # Path to the root directory containing categories
        self.transform = transform
        self.image_paths = []  # List to store paths of all images
        self.labels = []  # List to store corresponding labels

        # Get all the subdirectories (i.e., categories) in the root directory
        self.class_names = os.listdir(root_dir)  # This gives you folder names (labels)

        # Create a mapping from class name to class index (label)
        self.class_to_idx = {class_name: idx for idx, class_name in enumerate(self.class_names)}

        # Loop through each category folder
        for class_name in self.class_names:
            category_folder = os.path.join(root_dir, class_name)

            # Ensure it's a folder (not a file)
            if os.path.isdir(category_folder):
                # Loop through each image in the folder
                for img_name in os.listdir(category_folder):
                    img_path = os.path.join(category_folder, img_name)

                    # Only consider image files
                    if img_path.endswith(('.jpg')):
                        self.image_paths.append(img_path)
                        self.labels.append(self.class_to_idx[class_name])  # Assign label based on folder name

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # Get the image and label based on the index
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(img_path)

        # Apply any transformations (like ToTensor, normalization, etc.)
        if self.transform:
            image = self.transform(image)

        return image, label

dataset = CustomImageDataset(root_dir=root_dir, transform=transform)

train_paths, test_paths = train_test_split(dataset.image_paths, test_size=0.2, random_state=15)
train_paths, val_paths = train_test_split(train_paths, test_size=0.25, random_state=15)  # 0.25 * 0.8 = 0.2 validation

# Function to copy files into the appropriate directories
def copy_images_to_dir(image_paths, target_dir):
    for path in image_paths:
        class_name = path.split('\\')[-2]
        target_class_dir = os.path.join(target_dir, class_name)
        if not os.path.exists(target_class_dir):
            os.makedirs(target_class_dir)
            #print(f"Created directory: {target_class_dir}")
        #print(f"Copying image: {path} to {target_class_dir}")
        shutil.copy(path, target_class_dir)

train_dir = 'C:/Users/Thomas/coding/FDMProj/FDM Resources/train'
val_dir = 'C:/Users/Thomas/coding/FDMProj/FDM Resources/val'
test_dir = 'C:/Users/Thomas/coding/FDMProj/FDM Resources/test'

#FOR SEPARATION OF TRAINING,TEST,VALIDATION SETS INTO FILES

copy_images_to_dir(train_paths, train_dir)
copy_images_to_dir(val_paths, val_dir)
copy_images_to_dir(test_paths, test_dir)

train_dataset = datasets.ImageFolder(root=train_dir, transform=transform)
val_dataset = datasets.ImageFolder(root=val_dir, transform=transform)
test_dataset = datasets.ImageFolder(root=test_dir, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dataset, shuffle=True, batch_size=batch_size)
valid_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size)
test_loader  = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)

#viewing labels
# print(train_dataset.class_to_idx)

# for images, labels in train_loader:
#   print(images.shape)
#   print(labels)

optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
criterion = torch.nn.CrossEntropyLoss()
num_epochs = 50



# Model evaluation function
def evaluate_model(model, dataloader):
    model.eval()  # Set model to evaluation mode
    correct_predictions = 0
    total_predictions = 0
    true_labels = []
    predicted_values = []
    with torch.no_grad():
        for images, labels in dataloader:
            # Move images and labels to GPU if available
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.logits, 1)
            total_predictions += labels.size(0)
            correct_predictions += (predicted == labels).sum().item()
            true_labels.extend(predicted.cpu().numpy())
            predicted_values.extend(labels.cpu().numpy())
    accuracy = correct_predictions / total_predictions
    return accuracy, true_labels, predicted_values

# Early stopping parameters
patience = 5  # Number of epochs with no improvement after which training will stop
best_val_accuracy = 0.0
epochs_without_improvement = 0

training_loss_logger = []
validation_acc_logger = []
training_acc_logger = []

valid_acc = 0
train_acc = 0
early_epoch = 0

# Training loop
for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    for images, labels in train_loader:
        # Move images and labels to GPU if available
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()  # Clear gradients
        outputs = model(images)  # Forward pass
        loss = criterion(outputs.logits, labels)  # Calculate loss
        loss.backward()  # Backpropagation
        training_loss_logger.append(loss.item())
        optimizer.step()  # Optimizer step

    # Evaluate after each epoch
    val_accuracy, _, _ = evaluate_model(model, valid_loader)
    training_accuracy,_,_ = evaluate_model(model, train_loader)

    validation_acc_logger.append(val_accuracy)
    training_acc_logger.append(training_accuracy)

    # Early stopping based on validation accuracy
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        epochs_without_improvement = 0
    else:
        epochs_without_improvement += 1

    if epochs_without_improvement >= patience:
        early_epoch = epoch
        print(f"Early stopping on epoch {epoch + 1}")
        break
    print(f"Epoch {epoch + 1}/{num_epochs}, Validation Accuracy: {val_accuracy * 100:.2f}%, Training Accuracy: {training_accuracy * 100:.2f}%")

plt.figure(figsize = (10,5))
train_x = np.linspace(0, early_epoch, len(training_loss_logger))
plt.plot(train_x, training_loss_logger)
_ = plt.title("ViT Training Loss")

plt.figure(figsize = (10,5))
train_x = np.linspace(0, early_epoch, len(training_acc_logger))
plt.plot(train_x, training_acc_logger, c = "y")
valid_x = np.linspace(0, early_epoch, len(validation_acc_logger))
plt.plot(valid_x, validation_acc_logger, c = "k")

plt.title("ViT")
_ = plt.legend(["Training accuracy", "Validation accuracy"])

# Test the model on the test set
test_accuracy, test_labels, test_predicted = evaluate_model(model, test_loader)
print(test_labels)
print(test_predicted)
print(f"Test Accuracy: {test_accuracy * 100:.2f}%")

accuracy = accuracy_score(test_labels, test_predicted)
print(f"Test Accuracy: {accuracy}%")
precision = precision_score(test_labels, test_predicted, average='weighted')
print(f"Test Precision: {precision}%")
recall = recall_score(test_labels, test_predicted, average='weighted')
print(f"Test Recall: {recall}%")
f1 = f1_score(test_labels, test_predicted, average='weighted')
print(f"Test F1 Score: {f1}%")

cm = confusion_matrix(test_labels, test_predicted)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.show()