## DEBUG

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import logging
import matplotlib.pyplot as plt

from pkg import c, m, f 
import torch
import logging
from pkg import c, m, f 

# classes 
paths = c.PathManager()
data = c.PeakImageDataset(paths)
prep = c.DataPreparation(paths, data, batch_size=1)
water_h5 = data.load_h5(paths.water_background_h5)
ip = c.ImageProcessor(water_h5)
p = c.PeakThresholdProcessor(threshold_value=10)

# Check if CUDA is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(torch.cuda.is_available())


Number of peak images: 20
Number of water images: 20
Number of label images: 20
<class 'numpy.ndarray'>
False


In [4]:
# if not already generated 
# ip.process_directory(paths, p.threshold_value) 
print(dir(m))


['BasicCNN1', 'BasicCNN2', 'DenseNet121_Weights', 'DenseNetBraggPeakClassifier', 'F', 'ResNet50BraggPeakClassifier', 'ResNet50_Weights', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'models', 'nn', 'np', 'optim', 'os', 'torch']


In [4]:

# model = m.ResNet50BraggPeakClassifier()
model = m.ResNet50BraggPeakClassifier().to(device)
# loss function/ combines a Sigmoid layer and the BCELoss in one single class
criterion = nn.BCEWithLogitsLoss()
# optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
print("Criterion: ", criterion)
print("Optimizer: \n", optimizer)
print("Learning rate: ", optimizer.param_groups[0]['lr'])
# data loaders 
train_loader, test_loader = prep.prep_data()

Criterion:  BCEWithLogitsLoss()
Optimizer: 
 Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    differentiable: False
    eps: 1e-08
    foreach: None
    fused: None
    lr: 0.001
    maximize: False
    weight_decay: 0
)
Learning rate:  0.001
Data prepared.
Train size: 16
Test size: 4
Batch size: 1
Number of batches in train_loader: 16 




In [5]:
"""Training the model"""
num_epochs = 10
# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
train_losses = []
test_losses = []

# logging.info('Staring training...')
print('Staring training...')
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader: 
        peak_images, water_images = inputs
        # Move data to GPU
        peak_images = peak_images.to(device)
        labels = labels.to(device)
        # zero parameter gradients
        optimizer.zero_grad()
        # forward pass
        outputs = model(peak_images)
        loss = criterion(outputs, labels)
        # backward pass/optimize
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    # logging.info(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader)}')
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader)}')

2024-03-15 22:07:32,603 - INFO - Staring training...


KeyboardInterrupt: 

In [None]:
def train_test_model(model, train_loader, test_loader, num_epochs, device, criterion, optimizer):
    print(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    train_losses = test_losses = []

    print('Staring training...')
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct_predictions = 0
        total_predictions = 0
        
        for inputs, labels in train_loader: 
            peak_images, water_images = inputs[0].to(device), inputs[1].to(device)
            labels = labels.to(device)
            
            # zero parameter gradients
            optimizer.zero_grad()
            # forward pass
            outputs = model(peak_images)
            loss = criterion(outputs, labels)
            # backward pass/optimize
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            # calculate accuracy
            predicted = (torch.sigmoid(outputs) > 0.5).float()
            correct_predictions += (predicted == labels).float().sum()
            total_predictions += labels.size(0)
        
        epoch_loss = running_loss / len(train_loader)
        epoch_acc = correct_predictions / total_predictions
        train_losses.append(epoch_loss)
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss}, Accuracy: {epoch_acc}')
        
        print('Starting testing...')
        
    model.eval()  
    test_loss = correct_peaks = total = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            peak_images, water_images = inputs[0].to(device), inputs[1].to(device)
            # forward pass prediction
            peak_pred = model(peak_images) 
            # compute loss
            loss = criterion(peak_pred, labels)
            test_loss += loss.item()
            # convert predictions to binary to compare with labels 
            peak_prediction = torch.sigmoid(peak_pred) > 0.5 
            # calculate accuracy of peak detection
            correct_peaks += (peak_prediction == labels).sum().item()
            total += np.prod(labels.shape) 

        avg_test_loss = test_loss / len(test_loader)
        peak_detection_accuracy = correct_peaks / total
        test_losses.append(avg_test_loss)
        # log results 
        print(f"Test Loss: {avg_test_loss:.4f}, Bragg Peak Detection Accuracy: {peak_detection_accuracy:.4f}")
        print(f'Test loss: {test_loss}')
        
    print('Testing completed.')
    
    plt.plot(train_losses)
    plt.plot(test_losses, color='r')
    plt.xlabel('Epoch')
    plt.ylabel('Training Loss')
    plt.title('Training Losses')
    plt.show()
