In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
import cv2
import numpy as np
import os

# Define data paths
normal_path = r"C:\Users\umasr\Downloads\Normal"
pneumonia_path = r"C:\Users\umasr\Downloads\pneumonia"

# Define transforms to be applied to the images
transforms = transforms.Compose([
    transforms.ToTensor()
])

# Define a custom dataset class
class XRayDataset(Dataset):
    def __init__(self, normal_path, pneumonia_path, transforms=None):
        self.normal_path = normal_path
        self.pneumonia_path = pneumonia_path
        self.transforms = transforms
        
        self.normal_images = os.listdir(normal_path)
        self.pneumonia_images = os.listdir(pneumonia_path)
        
    def __len__(self):
        return len(self.normal_images) + len(self.pneumonia_images)
    
    def __getitem__(self, idx):
        if idx < len(self.normal_images):
            img_path = os.path.join(self.normal_path, self.normal_images[idx])
            label = 0
        else:
            img_path = os.path.join(self.pneumonia_path, self.pneumonia_images[idx - len(self.normal_images)])
            label = 1
            
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        img = cv2.resize(img, (256, 256))
        img = np.expand_dims(img, axis=2)
        
        if self.transforms:
            img = self.transforms(img)
            
        return img, label

# Create data loaders for the training and validation sets
train_dataset = XRayDataset(normal_path, pneumonia_path, transforms=transforms)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

val_dataset = XRayDataset(normal_path, pneumonia_path, transforms=transforms)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=True)

class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )
        
    def forward(self, x):
        return self.conv(x)
    
class Up(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(Up, self).__init__()
        self.up = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)
        self.conv = DoubleConv(in_channels, out_channels)
        
    def forward(self, x1, x2):
        x1 = self.up(x1)
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)
    
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        self.conv1 = DoubleConv(1, 64)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = DoubleConv(64, 128)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv3 = DoubleConv(128, 256)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv4 = DoubleConv(256, 512)
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv5 = DoubleConv(512, 1024)
        self.up6 = Up(1024, 512)
        self.conv6 = DoubleConv(1024, 512)
        self.up7 = Up(512, 256)
        self.conv7 = DoubleConv(512, 256)
        self.up8 = Up(256, 128)
        self.conv8 = DoubleConv(256, 128)
        self.up9 = Up(128, 64)
        self.conv9 = DoubleConv(128, 64)
        self.conv10 = nn.Conv2d(64, 1, kernel_size=1)
        
    def forward(self, x):
        x1 = self.conv1(x)
        x2 = self.pool1(x1)
        x2 = self.conv2(x2)
        x3 = self.pool2(x2)
        x3 = self.conv3(x3)
        x4 = self.pool3(x3)
        x4 = self.conv4(x4)
        x5 = self.pool4(x4)
        x5 = self.conv5(x5)
        x6 = self.up6(x5, x4)
        x6 = self.conv6(x6)
        x7 = self.up7(x6, x3)
        x7 = self.conv7(x7)
        x8 = self.up8(x7, x2)
        x8 = self.conv8(x8)
        x9 = self.up9(x8, x1)
        x9 = self.conv9(x9)
        x10 = self.conv10(x9)
        return x10

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = UNet()
model.to(device)

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters())

def train(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    for i, data in enumerate(dataloader):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    epoch_loss = running_loss / len(dataloader)
    return epoch_loss

def validate(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
        for i, data in enumerate(dataloader):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_loss += loss.item()
    epoch_loss = running_loss / len(dataloader)
    return epoch_loss

train_losses = []
valid_losses = []

for epoch in range(num_epochs):
    train_loss = train(model, train_loader, criterion, optimizer, device)
    valid_loss = validate(model, valid_loader, criterion, device)
    train_losses.append(train_loss)
    valid_losses.append(valid_loss)
    print(f"Epoch {epoch+1}: Train Loss = {train_loss:.4f}, Valid Loss = {valid_loss:.4f}")

torch.save(model.state_dict(), "unet_xray_segmentation.pth")
