In [1]:
import os
import cv2
import numpy as np

def load_patch_data(polyp_folder, non_polyp_folder, polyp_folder_testing, non_polyp_folder_testing, image_size=(224, 224)):
    train_images = []
    train_labels = []  # 1 for polyp, 0 for non-polyp

    test_images = []
    test_labels = []
    
    
    # Training
    # Load polyp images
    for filename in os.listdir(polyp_folder):
        image = cv2.imread(os.path.join(polyp_folder, filename))
        image = cv2.resize(image, image_size)
        train_images.append(image)
        train_labels.append(1)
    
    # Load non-polyp images
    for filename in os.listdir(non_polyp_folder):
        image = cv2.imread(os.path.join(non_polyp_folder, filename))
        image = cv2.resize(image, image_size)
        train_images.append(image)
        train_labels.append(0)
    
    # Testing
    # for filename in os.listdir(polyp_folder_testing):
    #     image = cv2.imread(os.path.join(polyp_folder_testing, filename))
    #     image = cv2.resize(image, image_size)
    #     train_images.append(image)
    #     train_labels.append(0)
        
    # for filename in os.listdir(non_polyp_folder_testing):
    #     image = cv2.imread(os.path.join(non_polyp_folder_testing, filename))
    #     image = cv2.resize(image, image_size)
    #     train_images.append(image)
    #     train_labels.append(0)
    
    
    return np.array(train_images), np.array(train_labels)
# , np.array(test_images), np.array(test_labels)

# Patches
polyp_folder = 'src/output_patches_new_2/abnormal' # Training/abnormal_training_patches
non_polyp_folder = 'src/output_patches_new_2/normal' # Training/normal_training_patches
polyp_folder_testing = 'Testing/Patches/abnormal_testing_patches'
non_polyp_folder_testing = 'Testing/Patches/normal_testing_patches'



train_patches, train_patch_labels = load_patch_data(polyp_folder, non_polyp_folder, polyp_folder_testing, non_polyp_folder_testing)

In [3]:
import os
import cv2
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
import torch.nn as nn
from torch.optim import SGD, lr_scheduler
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split

# Check for MPS support
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

# Custom dataset class
class CustomDataset(Dataset):
    def __init__(self, patches, labels, transform=None):
        self.patches = patches
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.patches)

    def __getitem__(self, idx):
        image = self.patches[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

# Define transformations
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Split the data
train_patches, test_patches, train_patch_labels, test_patch_labels = train_test_split(
    train_patches, train_patch_labels, test_size=0.2, random_state=42)

# Datasets and Loaders
train_dataset = CustomDataset(train_patches, train_patch_labels, transform=transform)
test_dataset = CustomDataset(test_patches, test_patch_labels, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=10, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=10)

# Load a pretrained ResNet model
def create_resnet_model(num_classes=2, pretrained=True):
    model = models.resnet50(pretrained=pretrained)
    num_ftrs = model.fc.in_features
    model.fc = nn.Linear(num_ftrs, num_classes)
    return model

# Create and setup the model
model = create_resnet_model().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=1e-4)
scheduler = lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

# Training loop
model.train()
for epoch in range(50):
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
    scheduler.step()
    print(f'Epoch {epoch+1} completed')

# Validation
model.eval()
val_predictions = []
val_targets = []
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        val_predictions.extend(predicted.cpu().numpy())
        val_targets.extend(labels.cpu().numpy())

    accuracy = accuracy_score(val_targets, val_predictions)
    precision = precision_score(val_targets, val_predictions, average='weighted')
    recall = recall_score(val_targets, val_predictions, average='weighted')

    print(f'Validation - Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}')

# Save the trained model
torch.save(model, 'Patch_Model_All_Data')




Epoch 1 completed
Epoch 2 completed
Epoch 3 completed
Epoch 4 completed
Epoch 5 completed


KeyboardInterrupt: 