In [None]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torchvision import datasets, transforms
import numpy as np
from torch.utils.data import Dataset, DataLoader, random_split

from torch.utils.data import Dataset
from torchvision import transforms
from PIL import Image
import os

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') #choose whether to use gpu or cpu

In [None]:
# DATA STANDARDIZATION

# import libraries
import os
import shutil
from collections import defaultdict 

# Directories
# change to the [AgNO3][NaBH4] kinetic filter directory
targetDir = r'C:\Users\Public\PartIIB project 2023_2024\Image collection without reaction\00AgNO3_mole_fraction\Outputs_Grayscale_Labelled_Images_Sizes\size_folder3'
input_folder = r'C:\Users\Public\PartIIB project 2023_2024\Image collection without reaction\00AgNO3_mole_fraction\Outputs_Grayscale_Labelled_Images_Sizes\size_folder'
output_folder_train = r'C:\Users\Public\PartIIB project 2023_2024\Image collection without reaction\00AgNO3_mole_fraction\dataSelectorHelical5\train'
output_folder_test_val = r'C:\Users\Public\PartIIB project 2023_2024\Image collection without reaction\00AgNO3_mole_fraction\dataSelectorHelical5\testVal'
output_folder_test_val_disc = r'C:\Users\Public\PartIIB project 2023_2024\Image collection without reaction\00AgNO3_mole_fraction\dataSelectorHelical5\testValDisc'

# Self-define parameters

# number of image in input
input_num = 1

# number of size bins
num_bins = 68

# disrgardFactor x x_avg (average number of contours per bin) = the upper limit such that any bin with lower than this number is discarded
disregardFactor = 0.5

# propotion of data that will be used for traning given disrgardFactor x x_avg < x < x_avg x upperFactor / train_proportion
trainProportion = 0.8

# x_avg x upperFactor / train_proportion = below this number take 80% to train and above this number take x_avg x upperFactor to train
upperFactor = 2

# the size range of each bin
bin_width = 0.5


def obtainSizeRange(targetDir, input_num, num_bins):
    lstDir = os.listdir(targetDir)    
    numLst = [i/2 for i in range(num_bins)]
    sizeDict = {}
    for j in numLst:
      sizeDict[j] = 0
    for i in lstDir:
        t_number = int(i.split("t-")[1].split("_")[0])
        lowerT = 200 - (200/input_num) + 1
        if lowerT <= t_number <= 200:
            size = float(i[-17:-5])
            sizeRange = int(size*2)/2
            sizeDict[sizeRange] += 1

    print(sizeDict)
    print(dict(sorted(sizeDict.items())))
    print(dict(sorted(sizeDict.items(), key=lambda item: item[1])))
    return dict(sorted(sizeDict.items()))

sizeDict = obtainSizeRange(targetDir = targetDir, input_num= input_num, num_bins= num_bins)

def obtainDesiredDict(sizeDict, disregardFactor, trainProportion, upperFactor):
    # get avg
    numLst = [i/2 for i in range(len(sizeDict)+1)]

    counter = 0
    for j in numLst:
        try:
            counter += sizeDict[j]
        except KeyError:
            pass    
    x_avg = counter/(len(numLst))
    
    
    
    desiredDict = {}
    x_lower = x_avg*disregardFactor
    x_upper = (x_avg*upperFactor)/trainProportion
    numLst = [i/2 for i in range(len(sizeDict)+1)]
    
    for j in numLst:
        try:
            # x = number of contours per bin
            x = sizeDict[j]
            
            if x < x_lower:
                desiredDict[j] = 0
            
            if x_lower < x < x_upper:
                desiredDict[j] = int(trainProportion*x)
                
            if x > x_upper:
                desiredDict[j] = int(x_avg*upperFactor)
                
        except KeyError:
            pass
    return desiredDict

desiredDict = obtainDesiredDict(sizeDict, disregardFactor= disregardFactor, trainProportion= trainProportion, upperFactor= upperFactor)

def data_standardisation(input_folder, output_folder_train, output_folder_test_val, output_folder_test_val_disc, desiredDict, input_num, bin_width):
   
   last_time_section = ((input_num-1)/(input_num))*200
   os.makedirs(output_folder_train, exist_ok=True)
   os.makedirs(output_folder_test_val, exist_ok=True)
   os.makedirs(output_folder_test_val_disc, exist_ok=True)
   
   
   # create the counter dictionary
   numLst = [i/2 for i in range(len(desiredDict))]
   counterDict = {}
   for j in numLst:
      counterDict[j] = 0
      
   
   for filename in os.listdir(input_folder):
      size = float(filename[-17:-5])
      t_number = int(filename.split("t-")[1].split("_")[0])
      if t_number > last_time_section:
         for j in numLst:
            if j < size < j+bin_width:
               if counterDict[j] < desiredDict[j]:
                  input_filepath = os.path.join(input_folder, filename)
                  output_filepath = os.path.join(output_folder_train, filename)
                  shutil.copy(input_filepath, output_filepath)
                  counterDict[j] += 1
                  for j in range(input_num-1):
                        fig_number = filename.split("_")[1]
                        # print(fig_number)
                        t_number_new = str(int(t_number-(j+1)*(200/input_num)))
                        # print(t_number_new)
                        # print(f"Fig_{fig_number}__t-{t_number_new}")
                        filename_new = [item for item in os.listdir(input_folder) if item.startswith(f"Fig_{fig_number}__t-{t_number_new}")][0]
                        # print(filename_new)
                        input_filepath = os.path.join(input_folder, filename_new)
                        output_filepath = os.path.join(output_folder_train, filename_new)
                        shutil.copy(input_filepath, output_filepath)
   for filename2 in os.listdir(input_folder):
      if filename2 not in os.listdir(output_folder_train):
         for j in numLst:
            if j < size < j+bin_width:
               if obtainDesiredDict[j] == 0:
                  input_filepath = os.path.join(input_folder, filename2)
                  output_filepath = os.path.join(output_folder_test_val_disc, filename2)
                  shutil.copy(input_filepath, output_filepath)
               else:
                  input_filepath = os.path.join(input_folder, filename2)
                  output_filepath = os.path.join(output_folder_test_val, filename2)
                  shutil.copy(input_filepath, output_filepath)
      
               

data_standardisation(input_folder=input_folder, output_folder_train=output_folder_train, output_folder_test_val=output_folder_test_val, desiredDict=desiredDict, input_num=input_num, bin_width= bin_width) 




In [None]:
data_dir = r'C:\Users\Public\PartIIB project 2023_2024\Image collection without reaction\00AgNO3_mole_fraction\Outputs_Grayscale_Labelled_Images_Sizes\size_folder'
# for this we need to use [AgNO3][NaBH4] kinetic filter contours to be able to compare direcly

In [None]:
class CustomImageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.images = sorted([f for f in os.listdir(root_dir) if os.path.isfile(os.path.join(root_dir, f))]) #this excludes folder as well
        self.labels = [self.extract_label(img) for img in self.images]

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.images[idx])
        image = Image.open(img_name)

        if self.transform:
            image = self.transform(image)

        label = self.labels[idx]
        return image, label

    def extract_label(self, img_name):
        # Assuming that the label is part of the filename before the first underscore
        label = float(img_name[-17:-5]) #this is the right code
        # label = img_name
        return label

In [None]:
data_transform = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.45), (0.25))]) 

custom_dataset = CustomImageDataset(root_dir=data_dir, transform=data_transform)

# # Accessing the data
# for img, label in custom_dataset:
#     print(f"Image shape: {img.shape}, Label: {label}")

print(len(custom_dataset))
# train_set, val_set, test_set = random_split(custom_dataset, [int(len(custom_dataset)*0.75), int(len(custom_dataset)*0.15), int(len(custom_dataset)*0.100056)]) #splits data into training, validation and test sets
train_set, test_set = random_split(custom_dataset, [int(len(custom_dataset)*0.75), int(len(custom_dataset)*0.25003)])
print(len(train_set))
# print(len(val_set))
print(len(test_set))

In [None]:
#hyper parameters
# num_epochs = 30
num_epochs = 60
batch_size = 1
learning_rate = 0.0005
# learning_rate = 0.001

train = DataLoader(train_set, batch_size=batch_size, shuffle=True)
# val = DataLoader(val_set, batch_size=batch_size, shuffle=False)
test = DataLoader(test_set, batch_size=batch_size, shuffle=False)

In [None]:
class ConvNet(nn.Module): # note need to find out image size
    def __init__(self):
        super(ConvNet, self).__init__()
        self.conv1 = nn.Conv2d(1,8,10, padding='same') #in_channels, out_channels, kernel_size
        self.normalise1 = nn.BatchNorm2d(8)
        # self.pool = nn.MaxPool2d(5,5) #kernel_size, stride (shift x pixel to the right)
        # self.pool1 = nn.AvgPool2d(10, stride=10) 
        self.pool1 = nn.MaxPool2d(10, stride=10)
        self.conv2 = nn.Conv2d(8, 16, 10, padding='same')
        self.normalise2 = nn.BatchNorm2d(16)
        # self.pool2 = nn.AvgPool2d(2, stride=2)
        self.pool2 = nn.MaxPool2d(2, stride=2)
        self.conv3 = nn.Conv2d(16, 32, 10, padding='same')
        self.normalise3 = nn.BatchNorm2d(32) 
        self.conv4 = nn.Conv2d(32, 32, 10, padding='same')
        # self.fc1 = nn.Linear(16*3*3, 120) # 3x3 is the size of the image after 2 conv layers, 16 is the number of channels, 120 is the number of nodes in the hidden layer
        # self.fc2 = nn.Linear(120,84)
        # self.fc3 = nn.Linear(60, 1)
        self.fc = nn.Linear(32*5*5, 1)
        self.dropout = nn.Dropout(0.2)


    def forward(self, x):
        x = self.pool1(F.relu(self.normalise1(self.conv1(x)))) 
        x = self.pool2(F.relu(self.normalise2(self.conv2(x)))) 
        x = F.relu(self.normalise3(self.conv3(x)))
        x = F.relu(self.normalise3(self.conv4(x)))
        x = F.relu(self.normalise3(self.conv4(x)))
        x = x.view(-1, 32*5*5)  #flatten
        x = self.dropout(x)
        x = self.fc(x)
        
        return x

In [None]:
model = ConvNet().to(device)

# loss and optimizer
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

#training loop
n_total_steps = len(train)
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train):
        images = images.to(device)
        labels = labels.to(device)

        #forward
        outputs = model(images)
        labels = labels.float()
        loss = criterion(outputs, labels)

        #backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i+1) % 10 ==0:
            print(f'epoch {epoch+1}/{num_epochs}, step {i+1}/{n_total_steps}, loss = {loss.item():.4f}')

print("Finished Training")

#test 
#need to change testing to be based on continuous value
with torch.no_grad(): # no need to calculate gradient
    squared_difference = 0
    for images, labels in test:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        
        #value, index
        _, predictions = torch.max(outputs, 1)
        squared_difference += (predictions - labels) ** 2
    
    rmse = torch.sqrt(squared_difference / len(test))
    print(f'RMSE = {rmse}')

#Data set is split into training and test sets (85% and 15%)