In [1]:
import os
import torch
import gc
import numpy as np


os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'
torch.cuda.empty_cache()
gc.collect()


SEED = 1234
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.benchmark = True

In [3]:
Configuration = {'RootPath': r"C:\Users\Utente\Projects\Thesis",
                 'PositiveSamples':r"C:\Users\Utente\Projects\Thesis\doc_exp\Recaptured1",
                 'NegativeSamples':r"C:\Users\Utente\Projects\Thesis\doc_exp\Original1",
                 'DLC_dataset': r"C:\Users\Utente\Projects\Thesis\doc_exp\DLC",
                 'Methods': ['ResNet50','EfficientNet','MobileNet'],
                 'Batchsize':32,
                 'train_size': 0.80,
                 'val_size' : 0.10,
                 'test_size' : 0.10,
                 'n_workers':1,
                 'epochs':30,
                 'device': 'cuda' if torch.cuda.is_available() else 'cpu'}



In [4]:
from torch.utils.tensorboard import SummaryWriter


In [6]:
import torch
from torch.utils.data import Dataset
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from PIL import Image
from torchvision.transforms import ToTensor
import cv2
import torchvision
import pandas as pd
from sklearn.metrics import precision_recall_curve
from sklearn import metrics
import random

class DocumentRecaptureDataset(Dataset):
    def __init__(self,samples,labels,transforms = ToTensor(),oversample=False):
        self.samples = samples
        self.labels = labels
        self.oversample = oversample
        self.transform = transforms

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        sample = dict()
        image = cv2.imread(self.samples[idx])  # Open image using PIL

        if self.oversample:
            random_int = random.randint(0, 4)
            match random_int:
                case 0:
                    #horizontal flip 
                    image = cv2.flip(image, 1)
                case 1:
                    #vertical flip
                    image = cv2.flip(image, 0)
                case 2: 
                    image = cv2.rotate(image,cv2.ROTATE_90_CLOCKWISE)
                case 3:
                    image = cv2.rotate(image,cv2.ROTATE_90_COUNTERCLOCKWISE)
            

        label = torch.tensor(self.labels[idx], dtype=torch.int64)


        if self.transform:
            
            image = self.transform(image)

        sample['image'] = image
        sample['label'] = label  

        return sample
    

# UTILS:
 In this section I will define some utility functions that will help me later to perform operations on deep learning pipeline.

In [7]:

def load_paths(path,group,test_doc):

    """
    Load file paths for training and testing datasets based on the given directory structure.

    This function walks through the directory specified by `path` and identifies image files (i.e., files
    ending with .jpg or .png). Depending on the file's full path, it categorizes the images into training
    or testing samples.

    If `test_doc` is part of the file path, the file is added to the `test_samples` list. Otherwise, it is 
    added to `train_samples`. For the "negative" group and files that contain "KaleemCam" in their path, 
    the function limits the number of added files to 200.

    Parameters:
    ----------
    path : str
        The root directory to walk through and load image file paths.
    group : str
        A string identifier (such as 'negative') to determine the break condition when processing 
        specific files (e.g., "KaleemCam").
    test_doc : str
        A substring to identify test documents. If this substring is found in the file path, 
        the file is added to the test samples list.

    Returns:
    -------
    train_samples : list of str
        A list of file paths corresponding to the training samples.
    test_samples : list of str
        A list of file paths corresponding to the testing samples.
    """


    train_samples = []
    test_samples = []
    
    for root, _, files in os.walk(path):

        count = 0
        for file in files:
            if file.endswith((".jpg", ".png")):  
                full_path = os.path.join(root, file)  

                if test_doc in full_path:
                    test_samples.append(full_path)
                else:
                    train_samples.append(full_path)
              
              
                if count >= 200 and 'negative' in group and 'KaleemCam' in full_path:
                    break
                else:
                    count += 1
             
    return train_samples, test_samples


def log_distrinution(No_Positive_Samples,No_Negative_samples):

    labels = ['Recaptured_samples', 'Original_samples']

    plt.bar(labels,[No_Positive_Samples,No_Negative_samples], color=['blue', 'green'])
    plt.savefig('temp.png')

    temp = cv2.imread('temp.png')

    return temp


def log_samples(train_loader):
    """
    Log and visualize a sample of positive and negative images from the dataset.

    This function iterates through the dataset provided by `train_loader` and selects
    five positive and five negative images. It categorizes the images based on their 
    labels: '0' for negative and any other label for positive. Once the function has 
    collected 5 samples for each category, it breaks the loop to optimize processing time.

    The collected images are then combined into grids for easy visualization using 
    `torchvision.utils.make_grid()`.

    Parameters:
    ----------
    train_loader : DataLoader
        A PyTorch DataLoader containing the dataset from which images and labels are sampled.
        Each element in the dataset is expected to be a dictionary with 'image' and 'label' keys.

    Returns:
    -------
    positive_images : Tensor
        A grid of 5 positive images, ready for visualization.
    negative_images : Tensor
        A grid of 5 negative images, ready for visualization.

    Notes:
    ------
    - The dataset is expected to be structured such that each item is a dictionary containing
      'image' and 'label' keys.
    - Label '0' represents negative samples, while all other labels are treated as positive samples.
    - If fewer than 5 positive or negative images are available, the function will return
      whatever images it has collected by the time the loop ends.
    """
    
    positive_images = []
    negative_images = []
    
    # Define the required number of samples for each category
    max_samples = 5

    # Loop through the dataset, assuming each item has 'image' and 'label' keys
    for i in range(len(train_loader.dataset)):
        sample = train_loader.dataset[i]
        label = sample['label']
        image = sample['image']

        # Add to negative or positive samples
        if label == 0 and len(negative_images) < max_samples:
            negative_images.append(image)
        elif label != 0 and len(positive_images) < max_samples:
            positive_images.append(image)

        # Break if we have enough images for both categories
        if len(negative_images) == max_samples and len(positive_images) == max_samples:
            break

    # Ensure we have enough images to create grids
    if len(negative_images) < max_samples:
        print(f"Warning: Only {len(negative_images)} negative samples found.")
    if len(positive_images) < max_samples:
        print(f"Warning: Only {len(positive_images)} positive samples found.")
    
    # Create image grids using torchvision
    positive_grid = torchvision.utils.make_grid(positive_images)
    negative_grid = torchvision.utils.make_grid(negative_images)

    return positive_grid, negative_grid



def calculate_metrics(y_true, functional_margin, thresholds, tag=None, model_type=None, writer=None):
    """
    Calculate evaluation metrics and optionally log results to a CSV or TensorBoard writer.

    This function computes binary classification metrics such as accuracy, precision, recall, and F1 score,
    based on the predicted values from a thresholded functional margin. The function can also log the results
    to a CSV file or write them to a TensorBoard writer.

    Parameters:
    ----------
    y_true : list or array
        Ground truth (true binary labels).
    functional_margin : list or array
        Predicted margins (continuous values), which will be thresholded to generate binary predictions.
    thresholds : float
        The threshold value to convert the continuous functional margins into binary predictions (0 or 1).
    tag : str, optional
        A tag for naming result files and scalar logs (e.g., 'experiment1').
    model_type : str, optional
        A string representing the model type, used for naming result files.
    writer : torch.utils.tensorboard.SummaryWriter, optional
        A TensorBoard writer for logging metrics.

    Returns:
    -------
    accuracy : float
        Accuracy of the predictions.
    precision : float
        Precision of the predictions.
    recall : float
        Recall of the predictions.
    f1 : float
        F1 score (macro-averaged) of the predictions.

    Notes:
    ------
    - If `tag` or `model_type` are provided, results will be saved in a CSV file.
    - If `writer` is provided, metrics will be logged to TensorBoard with scalar values.
    """

    # Convert functional_margin to binary predictions using vectorized operations
    y_pred = np.where(np.array(functional_margin) < thresholds, 0, 1)

    # Calculate classification metrics
    accuracy = metrics.accuracy_score(y_true, y_pred)
    precision = metrics.precision_score(y_true, y_pred, zero_division=0)
    recall = metrics.recall_score(y_true, y_pred, zero_division=0)
    f1 = metrics.f1_score(y_true, y_pred, average='macro', zero_division=0)

    # Save results to CSV if tag and model_type are provided
    if tag or model_type:
        csv_filename = f"results_{tag}_{model_type}.csv" if tag else f"results_{model_type}.csv"
        pd.DataFrame({'gt': y_true, 'pred': y_pred}).to_csv(csv_filename, index=False)
    
    # Log metrics to TensorBoard if a writer is provided
    if writer:
        writer.add_scalar(f'{tag}/Test Accuracy' if tag else 'Test Accuracy', accuracy, 0)
        writer.add_scalar(f'{tag}/Test Precision' if tag else 'Test Precision', precision, 0)
        writer.add_scalar(f'{tag}/Test Recall' if tag else 'Test Recall', recall, 0)
        writer.add_scalar(f'{tag}/Test F1 score' if tag else 'Test F1 score', f1, 0)

    return accuracy, precision, recall, f1

def find_threshold(y_true, functional_margin):
    """
    Find the best threshold that maximizes the F1 score based on precision-recall curve.

    This function computes the precision-recall curve for a binary classification task,
    and calculates the F1 scores for each threshold. It returns the threshold that 
    corresponds to the highest F1 score.

    Parameters:
    ----------
    y_true : list or array
        Ground truth binary labels (0 or 1).
    functional_margin : list or array
        Predicted scores or functional margins (continuous values).

    Returns:
    -------
    best_threshold : float
        The threshold value that results in the highest F1 score.
    best_f1_score : float
        The highest F1 score achieved with the best threshold.

    Notes:
    ------
    - F1 score is the harmonic mean of precision and recall, and is used to balance 
      the trade-off between the two metrics.
    - The function assumes that `y_true` contains only binary labels (0 or 1).
    """
    
    # Calculate precision-recall curve
    precision, recall, thresholds = precision_recall_curve(y_true, functional_margin)

    # Avoid division by zero by handling edge cases
    f1_scores = np.divide(2 * recall * precision, recall + precision, out=np.zeros_like(precision), where=(recall + precision) != 0)

    # Find the index of the best F1 score
    best_index = np.argmax(f1_scores)
    best_threshold = thresholds[best_index] if len(thresholds) > 0 else None
    best_f1_score = f1_scores[best_index]

    # Return the best threshold and F1 score
    return best_threshold, best_f1_score
    

def dlc_dataset(path):
    positive_train_samples = []
    negative_train_samples = []
    positive_test_samples = []
    negative_test_samples = []
    
    for root, _, files in os.walk(path):
        count = 0
        for file in sorted(files):
            if file.endswith((".jpg", ".png")):  # Use tuple for multiple extensions
                full_path = os.path.join(root, file)  # Use a different variable name here

                if 'or' in full_path:
                    count += 1
                    if 'esp_id' in full_path:
                        negative_test_samples.append(full_path)
                    else:
                        negative_train_samples.append(full_path)
                        
                else:
                    
                    if 'esp_id' in full_path:
                        positive_test_samples.append(full_path)
                    else:
                        positive_train_samples.append(full_path)

            if count >= 10:
                break
            
            
    print(len(positive_test_samples),len(negative_test_samples),len(positive_train_samples),len(negative_train_samples))
    
    train_samples = positive_train_samples + negative_train_samples
    train_labels = [1] * len(positive_train_samples) + [0] * len(negative_train_samples)

    test_samples = positive_test_samples + negative_test_samples
    test_labels = [1] * len(positive_test_samples) + [0] * len(negative_test_samples)
    return train_samples , train_labels, test_samples, test_labels
    

# Model Definintion:

In this section we define a very comprehensive function that return us model and corresponding transoformations. It will recieve several parametes of choice such as pretraining signifying if the pretrained model is required and many more that self explanatory. 

In [8]:
import torch
import torchvision.models as models
import torch.nn as nn
import torchvision.transforms as T
from torchvision.transforms import v2

def build_model_and_transforms(pretrained=True, fine_tune=True, num_classes=1, initialize=True,model_type='Resnet34'):
    if pretrained:
        match model_type:
            case 'Resnet18':
                print("[INFO]: Loading pre-trained weights for : {}".format(model_type))
                model = models.resnet18(weights='DEFAULT',progress=True)
                model.fc = nn.Linear(model.fc.in_features, num_classes)
            case 'Resnet34':
                print("[INFO]: Loading pre-trained weights for : {}".format(model_type))
                model = models.resnet34(weights='DEFAULT',progress=True)
                model.fc = nn.Linear(model.fc.in_features, num_classes)
            case 'Resnet50':
                print("[INFO]: Loading pre-trained weights for : {}".format(model_type))
                model = models.resnet50(weights='DEFAULT',progress=True)
                model.fc = nn.Linear(model.fc.in_features, num_classes)
            case 'EfficientnetM':
                model = models.efficientnet_v2_m(weights='DEFAULT',progress = True)
                model.classifier[1] = nn.Linear(in_features=model.classifier[1].in_features, out_features=num_classes)
            case 'EfficientnetS':
                model = models.efficientnet_v2_s(weights='DEFAULT',progress=True)      
                model.classifier[1] = nn.Linear(in_features=model.classifier[1].in_features, out_features=num_classes)
            case 'Efficientnetb0':
                model = models.efficientnet_b0(weights='DEFAULT',progress=True)
                for param in model.parameters():
                    param.requires_grad = False
                model.classifier[1] = nn.Linear(in_features=model.classifier[1].in_features, out_features=num_classes)
            
                
    else:
        match model_type:
            case 'Resnet18':
                print("[INFO]: Loading pre-trained weights for : {}".format(model_type))
                model = models.resnet18(pretrained=False,progress=True)
                model.fc = nn.Linear(model.fc.in_features, num_classes)
            case 'Resnet34':
                print("[INFO]: Loading pre-trained weights for : {}".format(model_type))
                model = models.resnet34(pretrained=False,progress=True)
                for param in model.parameters():
                    param.requires_grad = False
                model.fc = nn.Linear(model.fc.in_features, num_classes)
            case 'Resnet50':
                print("[INFO]: Loading pre-trained weights for : {}".format(model_type))
                model = models.resnet50(pretrained=False,progress=True)
                model.fc = nn.Linear(model.fc.in_features, num_classes)
            case 'EfficientnetM':
                model = models.efficientnet_v2_m(pretrained=False,progress = True)
                model.classifier[1] = nn.Linear(in_features=model.classifier[1].in_features, out_features=num_classes)
            case 'Efficientnetb0':
                model = models.efficientnet_b0(pretrained=False,progress=True)
                model.classifier[1] = nn.Linear(in_features=model.classifier[1].in_features, out_features=num_classes)
            case 'EfficientnetS':
                model = models.efficientnet_v2_s(pretrained=False,progress=True)
                model.classifier[1] = nn.Linear(in_features=model.classifier[1].in_features, out_features=num_classes)
        

    if fine_tune:
        print("[INFO]: Fine-tuning all layers...")
        for param in list(model.children())[-1:]:
            for p in param.parameters():
                p.requires_grad = True

    else:
        print("[INFO]: Freezing hidden layers...")
        for param in model.parameters():  
            param.requires_grad = False

    if initialize and pretrained == False:
        print("[INFO]: Initializing parameters...")
        for param in model.parameters():
            if len(param.shape) > 1:  # Exclude biases
                nn.init.kaiming_uniform_(param, mode='fan_in', nonlinearity='relu')


    if 'Resnet' in model_type:
        train_transforms =T.Compose([
            T.ToPILImage(),
            T.CenterCrop((224,224)),
            T.Pad(padding=12, fill=0, padding_mode='constant'),
            T.ToTensor(),
            ])

        test_transforms = T.Compose([
            T.ToPILImage(),
            T.CenterCrop((224,224)),
            T.Pad(padding=12, fill=0, padding_mode='constant'),
            T.ToTensor(),
            ])
    elif 'Efficient' in model_type or 'Inception' in model_type:
        train_transforms =T.Compose([
            T.ToPILImage(),
            T.CenterCrop((224,224)),
            
            T.ToTensor()
            ])

        test_transforms = T.Compose([
            T.ToPILImage(),
            T.CenterCrop((224,224)),
            T.ToTensor(),
            ])


    return model.to(Configuration['device']),model_type,train_transforms,test_transforms


In [9]:
def get_model(checkpoint_path):

    """this function I had defined in order to load already trained model that later needs to be finetuned on different resolution dataset."""
    
    model = models.efficientnet_v2_s(weights='DEFAULT', progress=True)
    model.classifier[1] = nn.Linear(in_features=model.classifier[1].in_features, out_features=1)
    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['state_dict'])
    for param in list(model.parameters())[:-8]:
        param.requires_grad = False

    return model

In [10]:
import torch.utils
from torch.utils.data import DataLoader
import os
import torch.utils.data


def get_data(batch_size, positive_samples_path, negative_samples_path, train_transforms, test_transforms,test_doc,writer=None):
 
    train_samples = []
    test_samples = []
    train_labels = []
    test_labels = []

    train_positive, test_positive = load_paths(positive_samples_path,'positive',test_doc)
    train_samples.extend(train_positive)
    train_labels.extend([1] * len(train_positive))  # Fix label count issue here
    test_samples.extend(test_positive)
    test_labels.extend([1] * len(test_positive))

    train_samples_transformed = []
    test_samples_transformed = []
    train_labels_transformed = []
    test_labels_transformed = []

    '''
    #this was used in case I need to upsample the minority class which in our case was recaptured samples

    train_positive_transformed, test_positive_transformed = load_paths(positive_samples_path,'positive',test_doc)
    
    train_samples_transformed.extend(train_positive_transformed)
    train_labels_transformed.extend([1]* len(train_positive_transformed))
    test_samples_transformed.extend(test_positive_transformed)
    test_labels_transformed.extend([1] * len(test_positive_transformed))
    '''

    train_negative, test_negative = load_paths(negative_samples_path,'negative',test_doc)
    train_samples.extend(train_negative)
    train_labels.extend([0] * len(train_negative))  # And here
    test_samples.extend(test_negative)
    test_labels.extend([0] * len(test_negative))


    if writer:
        writer.add_text('No# distrubtion in training Images',str({'positive': len(train_positive),'negative':len(train_negative)}),0)
        writer.add_text('No# distrubtion in testing Images',str({'positive': len(test_positive),'negative':len(test_negative)}),0)

   
    # Creating datasets
    train_transformations = DocumentRecaptureDataset(train_samples_transformed,labels = train_labels_transformed,oversample=True,transforms=train_transforms)
    train_data = DocumentRecaptureDataset(train_samples, labels=train_labels, transforms=train_transforms,oversample=False)
    train_data = torch.utils.data.ConcatDataset([train_transformations,train_data])
    val_data = DocumentRecaptureDataset(test_samples, labels=test_labels, transforms=test_transforms,oversample=False)
    val_transformations = DocumentRecaptureDataset(test_samples_transformed,labels= test_labels_transformed,oversample=True,transforms=test_transforms)
    val_data = torch.utils.data.ConcatDataset([val_transformations,val_data])

    test_data = val_data

    # DataLoaders
    train_loader = DataLoader(train_data, batch_size=batch_size,shuffle=True)
    val_loader = DataLoader(val_data, batch_size=batch_size)
    test_loader = DataLoader(test_data, batch_size=batch_size)

    return train_loader,val_loader,test_loader


In [11]:
import torch.utils
from torch.utils.data import DataLoader
import os
import torch.utils.data


def get_data_dlc(batch_size, positive_samples_path, negative_samples_path, train_transforms, test_transforms,dlc_path,writer=None):
 

    train_samples_dlc ,train_labels_dlc,test_samples_dlc,test_labels_dlc = dlc_dataset(dlc_path)

    train_samples = []
    test_samples = []
    train_labels = []
    test_labels = []

    train_positive, test_positive = load_paths(positive_samples_path,'positive')
    train_samples.extend(train_positive)
    train_labels.extend([1] * len(train_positive))  # Fix label count issue here
    test_samples.extend(test_positive)
    test_labels.extend([1] * len(test_positive))
 
 
    train_negative, test_negative = load_paths(negative_samples_path,'negative')
    train_samples.extend(train_negative)
    train_labels.extend([0] * len(train_negative))  # And here
    test_samples.extend(test_negative)
    test_labels.extend([0] * len(test_negative))

    
    if writer:
        writer.add_text('No# distrubtion in training Images',str({'positive': len(train_positive),'negative':len(train_negative)}),0)
        writer.add_text('No# distrubtion in testing Images',str({'positive': len(test_positive),'negative':len(test_negative)}),0)

   
    # Creating datasets
    train_data = DocumentRecaptureDataset(train_samples_dlc, labels=train_labels_dlc, transforms=train_transforms,oversample=False)
    val_data = DocumentRecaptureDataset(test_samples_dlc, labels=test_labels_dlc, transforms=test_transforms,oversample=False)
    test_data = DocumentRecaptureDataset(test_samples,test_labels,transforms=test_transforms,oversample=False)


    # DataLoaders
    train_loader = DataLoader(train_data, batch_size=batch_size,shuffle=True)
    val_loader = DataLoader(val_data, batch_size=batch_size)
    test_loader = DataLoader(test_data, batch_size=batch_size)

    return train_loader,val_loader,test_loader


# Training
Below we define our training logic. Depending on the requirements it trains and save the chosen model

In [12]:

import time
from tqdm import tqdm


def find_correct_pred(gt,pred):
    count = 0
    for i,j in zip(gt,pred):
        if i == j:
            count += 1

    return count

def test(net, test_loader, device='cpu'):
    net.eval()

    original = []
    outputs_pr = []

    with torch.no_grad():
        for index, batch in enumerate(tqdm(test_loader)):
            images = batch['image'].to(device)
            outputs = net(images)
            outputs_pr.extend(outputs.cpu().numpy()) 

            original.extend(list(batch['label'].type(torch.int32).numpy()))

    return {'gt': original,'raw_outputs':outputs_pr}


def train(net, train_dataloader, valid_dataloader, criterion, optimizer, scheduler=None, epochs=20, device='cpu', checkpoint_epochs=5,writer = None,model_type=None):
    start = time.time()
    print(f'Training for {epochs} epochs on {device}')

 
    for epoch in range(1,epochs+1):
        print(f"Epoch {epoch}/{epochs}")

        net.train()  
        train_loss = torch.tensor(0.)  
        train_accuracy = torch.tensor(0.)
        for index,batch in enumerate(tqdm(train_dataloader)):

            images = batch['image'].to(device)
            labels = batch['label']#.to(device)
            labels = labels.unsqueeze(1).float()

            preds = net(images).cpu()

            loss = criterion(preds, labels)

            optimizer.zero_grad()

            loss.backward()

            optimizer.step()

            with torch.no_grad():
                predicted_labels = list((preds > 0.5).type(torch.int32).numpy())
                predicted =[item[0] for item in predicted_labels]
                gt = list(batch['label'].type(torch.int32).numpy())

                train_loss += loss * train_dataloader.batch_size
                train_accuracy += find_correct_pred(gt,predicted)
                

        
        if valid_dataloader is not None:
            net.eval()  
            valid_loss = torch.tensor(0.)
            valid_accuracy = torch.tensor(0.)
            with torch.no_grad():
                for index, batch in enumerate(tqdm(valid_dataloader)):
                    

                    images = batch['image'].to(device)
                    labels = batch['label']
                    labels = labels.unsqueeze(1).float()

                    preds = net(images).cpu()

                    predicted_labels = list((preds > 0.5).type(torch.int32).numpy())

                    predicted =[item[0] for item in predicted_labels]
                    
                    gt = list(batch['label'].type(torch.int32).numpy())
                    valid_loss += loss * valid_dataloader.batch_size
                    valid_accuracy +=  find_correct_pred(gt,predicted)

                  

        if scheduler is not None:
            scheduler.step()

        # Print out what's happening
        print(
          f"Epoch: {epoch} | "
          f"train_loss: {train_loss/len(train_dataloader.dataset):.4f} | "
          f"train_accuracy: {100*train_accuracy/len(train_dataloader.dataset):.4f} | "
          f"valid_loss: {valid_loss/len(valid_dataloader.dataset):.4f} | "
          f"valid_accuracy: {100*valid_accuracy/len(valid_dataloader.dataset):.4f}"
        )


        if writer:
            ### New: Experiment tracking ###
            # Add loss results to SummaryWriter
            writer.add_scalar(tag="Train_loss", scalar_value = train_loss/len(train_dataloader.dataset),global_step = epoch)
            writer.add_scalar(tag="Valid_loss", scalar_value = valid_loss/len(valid_dataloader.dataset),global_step = epoch)
            writer.add_scalar(tag="Train_accuracy", scalar_value = 100*train_accuracy/len(train_dataloader.dataset),global_step = epoch)
            writer.add_scalar(tag="Test_accuracy", scalar_value = 100*valid_accuracy/len(valid_dataloader.dataset),global_step = epoch)
            
    
        if epoch%10==0:
            torch.save({
                'epoch': epoch,
                'state_dict': net.state_dict(),
                'optimizer': optimizer.state_dict(),
            }, f'./kfold/{model_type}_{epoch}.pth.tar')

    end = time.time()
    print(f'Total training time: {end-start:.1f} seconds')
    return net

# Loss
During my thesis I had difficulty finding the right library for using the focalloss so I had copied the following code from git hub.

In [13]:
import torch
import torch.nn.functional as F


class FOCALLOSS(nn.Module):
    def __init__(self, alpha= 0.25, gamma= 2,reduction = None) -> None:
        super(FOCALLOSS,self).__init__()
        
        self.alpha = alpha
        self.gamma = gamma
        self.reduction=reduction

    def forward(self,inputs,targets):
        """
        Original implementation from https://github.com/facebookresearch/fvcore/blob/master/fvcore/nn/focal_loss.py .
        Loss used in RetinaNet for dense detection: https://arxiv.org/abs/1708.02002.
        Args:
            inputs: A float tensor of arbitrary shape.
                    The predictions for each example.
            targets: A float tensor with the same shape as inputs. Stores the binary
                    classification label for each element in inputs
                    (0 for the negative class and 1 for the positive class).
            alpha: (optional) Weighting factor in range (0,1) to balance
                    positive vs negative examples or -1 for ignore. Default = 0.25
            gamma: Exponent of the modulating factor (1 - p_t) to
                balance easy vs hard examples.
            reduction: 'none' | 'mean' | 'sum'
                    'none': No reduction will be applied to the output.
                    'mean': The output will be averaged.
                    'sum': The output will be summed.
        Returns:
            Loss tensor with the reduction option applied.
        """
        p = torch.sigmoid(inputs)
        ce_loss = F.binary_cross_entropy_with_logits(
            inputs, targets, reduction="none"
        )
        p_t = p * targets + (1 - p) * (1 - targets)
        loss = ce_loss * ((1 - p_t) ** self.gamma)

        if self.alpha >= 0:
            alpha_t = self.alpha * targets + (1 - self.alpha) * (1 - targets)
            loss = alpha_t * loss

        if self.reduction == "mean":
            loss = loss.mean()
        elif self. reduction == "sum":
            loss = loss.sum()

        return loss

In [None]:
model,model_type,train_transforms,test_transforms = build_model_and_transforms(pretrained=True, fine_tune=True, num_classes=1,model_type='EfficientnetS',initialize=False)

#

In [None]:
from tqdm import tqdm
import pandas as pd


for test_doc in ['French']:
    writer = SummaryWriter(f'kfold_runs/efficient_focalloss_downsampled_{test_doc}')
    
    train_loader, val_loader, test_loader= get_data(Configuration['Batchsize'],Configuration['PositiveSamples'],Configuration['NegativeSamples'],train_transforms,test_transforms,test_doc,writer=writer)
    device = Configuration['device']

    
    print(len(train_loader.dataset))
    print(len(test_loader.dataset))
    lr, weight_decay = 5e-2, 5e-5
    model = model.to(device)

   
    criterion = FOCALLOSS(alpha=0.25,gamma=2,reduction='sum') #
    #criterion = BCEWithLogitsLoss(pos_weight=torch.tensor(2.),reduction='sum')

    
    params_1x = [param for name, param in model.named_parameters()]


    
    optimizer = torch.optim.Adam([{'params':params_1x}])


    #writer.add_image('Positive images', positive_images, 0)
    #writer.add_image('Negative images', negative_images, 0)

    net = train(model, train_loader, val_loader, criterion,optimizer , scheduler=None, epochs=10, device=device,writer=writer,model_type=f'efficient_focalloss_downsampled_{test_doc}')
    results = test(net, test_loader, device)

    Accuracy ,Precision,Recall,F1_score = calculate_metrics(results['gt'],results['raw_outputs'],0.5,tag='our',model_type=model_type,writer=writer)

    print(f'Accuracy: {Accuracy}\nPrecision: {Precision}\nRecall{Recall}\nF1 Score: {F1_score}')



    #below code is for testing on choice of document from dlc dataset.

    positive_files = []
    negative_files = []

    for folder in os.listdir(r"C:\Users\Utente\Projects\Thesis\doc_exp\DLC\clips\clips\images\esp_id"):
        for file in os.listdir(os.path.join(r"C:\Users\Utente\Projects\Thesis\doc_exp\DLC\clips\clips\images\esp_id",folder)):
            if 'or' in folder:
                negative_files.append(os.path.join(r"C:\Users\Utente\Projects\Thesis\doc_exp\DLC\clips\clips\images\esp_id",folder,file))
            else:
                positive_files.append(os.path.join(r"C:\Users\Utente\Projects\Thesis\doc_exp\DLC\clips\clips\images\esp_id",folder,file))


    all_files = positive_files + negative_files

    labels = [1] * len(positive_files) + [0] * len(negative_files)

    dlc_dataset = DocumentRecaptureDataset(all_files,labels,transforms=test_transforms)

    dlc_dataloader = DataLoader(dlc_dataset, batch_size=32, shuffle=False)

    results = test(net,dlc_dataloader,device=device)

    Accuracy ,Precision,Recall,F1_score = calculate_metrics(results['gt'],results['raw_outputs'],threshold=0.5,tag='DLC',model_type=model_type,writer=writer)

    print(f'Accuracy: {Accuracy}\nPrecision: {Precision}\nRecall{Recall}\nF1 Score: {F1_score}')

    



In [17]:

device = Configuration['device']

net = get_model(r'C:\Users\Utente\Projects\Thesis\Evaluations\Deep Learning Based\kfold\efficient_focalloss_downsampled_ahan_crop150_30.pth.tar')
net.to(device)
positive_files = []
negative_files = []

for L in ['esp_id','iva_passport','alb_id','est_id','aze_passport']:
    for folder in sorted(os.listdir(r"C:\Users\Utente\Projects\Thesis\doc_exp\DLC\clips\clips\images"+f'\{L}')):
        
        for file in sorted(os.listdir(os.path.join(r"C:\Users\Utente\Projects\Thesis\doc_exp\DLC\clips\clips\images",L,folder))):
            if 'or' in folder:
                negative_files.append(os.path.join(r"C:\Users\Utente\Projects\Thesis\doc_exp\DLC\clips\clips\images",L,folder,file))
            else:
                positive_files.append(os.path.join(r"C:\Users\Utente\Projects\Thesis\doc_exp\DLC\clips\clips\images",L,folder,file))


all_files = positive_files + negative_files

print(len(positive_files))
print(len(negative_files))
labels = [1] * len(positive_files) + [0] * len(negative_files)

dlc_dataset = DocumentRecaptureDataset(all_files,labels,transforms=test_transforms)

dlc_dataloader = DataLoader(dlc_dataset, batch_size=32, shuffle=False)

results = test(net,dlc_dataloader,device=device)

Accuracy ,Precision,Recall,F1_score = calculate_metrics(results['gt'],results['raw_outputs'],0.5,tag='DLC',model_type=model_type,writer=None)

print(f'Accuracy: {Accuracy}\nPrecision: {Precision}\nRecall{Recall}\nF1 Score: {F1_score}')


7913
8153


100%|██████████| 503/503 [15:19<00:00,  1.83s/it]


Accuracy: 0.7135565791111664
Precision: 0.7754117451339212
Recall0.5890307089599394
F1 Score: 0.7083727398647642


In [20]:
import glob 

positive_files = []
negative_files = []


jpg_files = glob.glob(os.path.join(r"C:\Users\Utente\Projects\Thesis\doc_exp\personal_dataset", '**', '*.jpg'), recursive=True)
    
    
# Iterate through the list of jpg files
for jpg_file in jpg_files:
    # Check if the substring is in the full path
    if 'Or' in jpg_file:
        negative_files.append(jpg_file)
    
    else:
        positive_files.append(jpg_file)



all_files = positive_files + negative_files


labels = [1] * len(positive_files) + [0] * len(negative_files)

dlc_dataset = DocumentRecaptureDataset(all_files,labels,transforms=test_transforms)

dlc_dataloader = DataLoader(dlc_dataset, batch_size=32, shuffle=False)

results = test(net,dlc_dataloader,device=device)

Accuracy ,Precision,Recall,F1_score = calculate_metrics(results['gt'],results['raw_outputs'],0.5,tag='DLC_webcam',model_type=model_type,writer=None)

print(f'Accuracy: {Accuracy}\nPrecision: {Precision}\nRecall{Recall}\nF1 Score: {F1_score}')


100%|██████████| 54/54 [00:38<00:00,  1.41it/s]

Accuracy: 0.3346938775510204
Precision: 0.5470588235294118
Recall0.08038029386343994
F1 Score: 0.29880376868585123



