# Training Service (development)

In [1]:
import torch
import torchxrayvision as xrv
import pandas as pd
from PIL import Image
from torchvision.io import read_image
from torchvision.transforms import Resize
from torch.utils.data import DataLoader 
import torch.optim as optim 
import os
from torch.cuda.amp import autocast, GradScaler
from tqdm import tqdm

In [3]:
import torch
print(f"PyTorch version: {torch.__version__}")
print(f"Torchvision version: {xrv.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA version: {torch.version.cuda}")
    print(f"GPU device: {torch.cuda.get_device_name(0)}")

PyTorch version: 2.6.0+cpu
Torchvision version: 1.2.0
CUDA available: False


## Custom Dataset for finetuning


In [None]:







def annotation_creation(image_path):
    annotations_list_normal = []
    annotations_list_pneumonia = []
    for filename in os.listdir(image_path):
        if filename.endswith(".jpg") or filename.endswith('.jpeg'):
            image_new_path = os.path.join(image_path,filename)
            try:
                img = Image.open(image_new_path)
                img.verify() #  verify that it is, in fact an image
                
                if "NORMAL" in image_path.upper():
                    annotations_list_normal.append({'filename': filename, 'label': 0})
                elif "PNEUMONIA" in image_path.upper():
                    annotations_list_pneumonia.append({"filename" : filename, "label" : 1})
            except Exception as e:
                print(f'Error loading imagage {filename}')
                
                
    return annotations_list_normal, annotations_list_pneumonia



            
            
            



In [None]:
normal_train = r"C:\Users\helto\OneDrive\Documentos\Mestrado Ciência de Dados\2º Semestre\Cloud Computing\Projeto\chest_xray\train\NORMAL"
pneumonia_train = r"C:\Users\helto\OneDrive\Documentos\Mestrado Ciência de Dados\2º Semestre\Cloud Computing\Projeto\chest_xray\train\PNEUMONIA"

annotations_train_normal,_ = annotation_creation(normal_train)
_,annotations_train_pneumonia = annotation_creation(pneumonia_train)

print(f'Nº normal images for training {len(annotations_train_normal)}')
print(f'Nº pneumonia images for training {len(annotations_train_pneumonia)}')
print(f'Total images for training {len(annotations_train_normal) + len(annotations_train_pneumonia)}')
print(f' Proportion of normal images for traing {len(annotations_train_normal)*100/ (len(annotations_train_normal) + len(annotations_train_pneumonia))}%')

annotaded_train_images = annotations_train_normal + annotations_train_pneumonia

print( annotaded_train_images)
print(len(annotaded_train_images))


In [None]:
normal_val = r'C:\Users\helto\OneDrive\Documentos\Mestrado Ciência de Dados\2º Semestre\Cloud Computing\Projeto\chest_xray\val\NORMAL'
pneumonia_val = r'C:\Users\helto\OneDrive\Documentos\Mestrado Ciência de Dados\2º Semestre\Cloud Computing\Projeto\chest_xray\val\PNEUMONIA'

annotations_val_normal,_ = annotation_creation(normal_val)
_,annotations_val_pneumonia = annotation_creation(pneumonia_val)


print(f'Nº normal images for validation {len(annotations_val_normal)}')
print(f'Nº pneumonia images for validation {len(annotations_val_pneumonia)}')
print(f'Total images for validation {len(annotations_val_normal) + len(annotations_val_pneumonia)}')
print(f' Proportion of normal images for validation {len(annotations_val_normal)*100/ (len(annotations_val_normal) + len(annotations_val_pneumonia))}%')

annotaded_val_images = annotations_val_normal + annotations_val_pneumonia

print( annotaded_val_images)
print(len(annotaded_val_images))

In [None]:
normal_test = r'C:\Users\helto\OneDrive\Documentos\Mestrado Ciência de Dados\2º Semestre\Cloud Computing\Projeto\chest_xray\test\NORMAL'
pneumonia_test = r'C:\Users\helto\OneDrive\Documentos\Mestrado Ciência de Dados\2º Semestre\Cloud Computing\Projeto\chest_xray\test\PNEUMONIA'

annotations_test_normal,_ = annotation_creation(normal_test)
_,annotations_test_pneumonia = annotation_creation(pneumonia_test)

print(f'Nº normal images for test {len(annotations_test_normal)}')
print(f'Nº pneumonia images for test {len(annotations_test_pneumonia)}')
print(f'Total images for test {len(annotations_test_normal) + len(annotations_test_pneumonia)}')
print(f' Proportion of normal images for test {len(annotations_test_normal)*100/ (len(annotations_test_normal) + len(annotations_test_pneumonia))}%')

annotaded_test_images = annotations_test_normal + annotations_test_pneumonia

print( annotaded_test_images)
print(len(annotaded_test_images))



In [None]:
labels_train = [int(entry['label']) for entry in annotaded_train_images]
labels_val = [int(entry['label']) for entry in annotaded_val_images]
labels_test = [int(entry['label']) for entry in annotaded_test_images]

In [None]:



# def get_images(annotaded_images,image_list,normal_path,pneumonia_path):
#     resize_transform = Resize((224,224))
#     for entry in annotaded_images:
#         if entry['label'] == 0:
#             imaage_path = os.path.join(normal_path, entry['filename'])
#             image = read_image(imaage_path)
#             image = image.float()
#             if image.shape[0] == 3:  # RGB image
#                 image = image.mean(dim=0, keepdim=True)  # Convert to grayscale by averaging channels
#             image = resize_transform(image)
#             image_list.append(image)
#         elif entry['label'] == 1:
#             imaage_path = os.path.join(pneumonia_path, entry['filename'])
#             image = read_image(imaage_path)
#             image = image.float()
#             if image.shape[0] == 3:  # RGB image
#                 image = image.mean(dim=0, keepdim=True)  # Convert to grayscale by averaging channels
#             image = resize_transform(image)
#             image_list.append(image)
        
#     image_tensor = torch.stack(image_list)
#     return image_tensor
   
   
   
""" Com esta função estava a ter problemaas para carregar as imagens 
para treino e aplicar dinamicamente transformações para melhorar a variedade de imagens para reduzir overfitting

Uma solução mellhor seria fazer load dos paths de cada imagem de acordo com o label e usar uma classe herdada do pytorch Dataset
para fazer loading dinamico.

É o stardard normal no workflow com pytorch
"""

In [None]:
def get_image_path(annotaded_images, normal_path,pneumonia_path):
    
    images_paths = []
    for annotaded_images in annotaded_images:
        if annotaded_images['label'] == 0:
            path_normal = os.path.join(normal_path,annotaded_images['filename'])
            images_paths.append(path_normal)
        elif annotaded_images['label'] == 1:
            path_pneumonia = os.path.join(pneumonia_path,annotaded_images['filename'])
            images_paths.append(path_pneumonia)
            
    return images_paths
    


In [None]:
from torchvision import transforms

train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])])

In [None]:
class PneumoniaDataset(torch.utils.data.Dataset):
    def __init__(self,labels,images_path,transform = None):
        
        self.images_path = images_path
        self.labels = labels 
        self.transform = transform
        
    def __len__(self):
        return len(self.images_path)
    
    def __getitem__(self, index):
        # print(f"Loading image {index}: {self.images_path[index]}")  # Debug
        image = Image.open(self.images_path[index]).convert('L') # Using PIL to convert the images to grayscale
        label = self.labels[index]
        # print(f"Image loaded, applying transform")
        if self.transform:
            image = self.transform(image)
            
        # print(f"Transform applied, returning image and label")  # Debug    
        return image,label
           
       
        



In [None]:
train_path = get_image_path(annotaded_train_images,normal_train,pneumonia_train)
val_path = get_image_path(annotaded_val_images, normal_val, pneumonia_val)
test_path = get_image_path(annotaded_test_images, normal_test,pneumonia_test)



debugging


In [None]:
dataset = PneumoniaDataset(labels_train, train_path, transform=train_transform)
for i in range(5):
    image, label = dataset[i]
    print(f"Fetched image {i}, label: {label}, image shape: {image.size()}")

In [None]:
from torch.utils.data import DataLoader

# Create the DataLoader with num_workers=0
train_loader = DataLoader(dataset, batch_size=16, shuffle=True, num_workers=0, pin_memory=True)

# Test fetching a single batch
for batch in train_loader:
    images, label = batch
    print(f"Batch loaded: images shape {images.size()}, labels shape {label.size()}")
    break  # Stop after the first batch

In [None]:
from torch.utils.data import DataLoader
import time

# Create the DataLoader
train_loader = DataLoader(dataset, batch_size=16, shuffle=True, num_workers=0, pin_memory=True)

# Time how long it takes to fetch the first batch
start_time = time.time()
for batch in train_loader:
    images, labels = batch
    print(f"Batch loaded: images shape {images.shape}, labels shape {labels.shape}")
    break
end_time = time.time()
print(f"Time to load first batch: {end_time - start_time:.2f} seconds")

## Training Production



In [None]:
model = xrv.models.DenseNet(weights="densenet121-res224-rsna", op_threshs= None)
model.eval()


num_features = model.classifier.in_features
model.classifier = torch.nn.Sequential(
                                        torch.nn.Dropout(p=0.33), # To improve overfitting issues i was having before
                                        torch.nn.Linear(num_features,2)) # Cause in our case doing binary classification

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f"Device: {device}")

model.op_threshs = None
model = model.to(device)


for param in model.features.parameters(): # This freezes earlier layers, so they don´t update. these alrready learned high level features of x-ray images
    param.requires_grad = False
    

criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr= 1e-3, weight_decay=1e-4)



    
    

debugging training<>

In [None]:
model.to(device)  # Ensure the model is on the correct device
model.eval()  # Use eval mode to disable dropout, etc., for debugging

for images, labels in tqdm(train_loader):
    images, labels = images.to(device), labels.to(device)
    print(f"Batch moved to device: {device}")
    outputs = model(images)  # Forward pass
    print(f"Forward pass completed: outputs shape {outputs.shape}")
    break

In [None]:
import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU device: {torch.cuda.get_device_name(0)}")
else:
    print("CUDA is not available. Check your PyTorch installation and GPU drivers.")

In [None]:
!nvidia-smi

In [None]:

def train():
    dataset_train = PneumoniaDataset(labels_train,train_path,train_transform) # tirar o transform para notar o erro
    dataset_val = PneumoniaDataset(labels_val, val_path)
    dataset_test = PneumoniaDataset(labels_test, test_path)


    train_loader = DataLoader(dataset_train, batch_size = 16, shuffle = True, num_workers=4, pin_memory=True)     
    val_loader = DataLoader(dataset_val, batch_size = 16, shuffle= False, num_workers=4, pin_memory=True)
    test_loader = DataLoader(dataset_test, batch_size = 16, shuffle= False, num_workers=4, pin_memory=True)






    num_epochs = 5
    train_acc = []
    val_acc = []
    scaler = GradScaler()
    margin = 1e-5
    best_val_acc = 0
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct_train = 0
        total_train = 0
        for  images, labels in tqdm(train_loader):
            images, labels = images.to(device), labels.to(device)
            
            optimizer.zero_grad()
            with autocast():
                outputs = model(images)
                print(f"Forward pass completed: outputs shape {outputs.size}")
                
                
                loss = criterion(outputs, labels)
                print(f"Loss computed: {loss.item()}")
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            print("Batch processed")
            scaler.update()
            break
            
            _,predictions_train = torch.max(outputs,1)
            total_train += labels.size(0)
            correct_train += (predictions_train == labels).sum().item()
                
            
            
            running_loss += loss.item() * images.size(0)  # Multiplying loss.item() (whcich is the average loss per image in the batch) by images.size(0) (wich is the number of images in the batch means we get the total loss of that batch)
        epoch_loss = running_loss / len(dataset_train)
        epoch_acc = correct_train *100 / total_train
        train_acc.append(epoch_acc)
        print(f'Epoch {epoch+1} / {num_epochs}, \n Loss: {epoch_loss}, \n Accuracy: {epoch_acc}')
        
        
        
        model.eval()
        correct_val = 0
        total_val = 0  
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, predictions_val = torch.max(outputs,1)
                total_val += labels.size(0)
                correct_val += (predictions_val == labels).sum().item()
                
        epoch_acc_val = correct_val *100 / total_val
        val_acc.append(epoch_acc_val)
        print(f'Validation accuracy: {epoch_acc_val}')
        
        
        # Early stopping as last measure to avoid overfitting
        if epoch_acc_val > best_val_acc + margin:  
            best_val_acc = epoch_acc_val
            no_improvement_epochs = 0
        else:
            no_improvement_epochs += 1
            if no_improvement_epochs >= patience:
                print('Finished training due to early stopping')
                break

In [None]:
if __name__ == "__main__":
    train()

In [None]:
## Viewing the validation and training acccuracies

import matplotlib.pyplot as plt 
import numpy as np

x = np.arange(1,num_epochs+1)

plt.figure(figsize=(10, 6))  # Single figure, adjustable size
plt.plot(x, train_acc, label='Training Accuracy', color='red')
plt.plot(x, val_acc, label='Validation Accuracy', color='green')
plt.title('Training and Validation Accuracies')
plt.xlabel('Epoch')
plt.ylabel('Accuracy(%)')
plt.legend()  # Show the labels
plt.grid(False)  # Optional: adds a grid for readability
plt.show()

In [None]:
torch.save(model.state_dict(), r'C:\Users\helto\OneDrive\Documentos\Mestrado Ciência de Dados\2º Semestre\Cloud Computing\Projeto\Training Service\models_saved\model01.pth')