In [None]:
import os
import torch
from torchvision import models
from torch import nn
from torchvision.io import read_image
from torch.utils.data import Dataset,random_split,DataLoader
from torchvision.transforms import ToPILImage
from torchvision import transforms
import matplotlib.pyplot as plt
from torchvision import models
from torch import optim
from torch.nn import CrossEntropyLoss
from sklearn.metrics import precision_score, recall_score, f1_score

# Set device (GPU if available, otherwise CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
BATCHSIZE = 16
EPOCHS = 10
RESNETSIZE = 2048
GOOGLENETSIZE = 1024
DENSENETSIZE = 1024
VGG19SIZE = 512

In [None]:
print(f'Device being used is {device}')
print(f'Batch size is {BATCHSIZE}')

In [None]:
class CustomImageDataset(Dataset):
    def __init__(self, img_dir, transform=None, target_transform=None):
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform
        self.classes = os.listdir(img_dir)
        print(self.classes)
        self.images = []
        self.class_counts = {class_: 0 for class_ in self.classes}  # Initialize class counts

        for class_ in self.classes:
            for dirpath, dirnames, filenames in os.walk(os.path.join(img_dir, class_)):
                for filename in filenames:
                    self.images.append((os.path.join(dirpath, filename), class_))
                    self.class_counts[class_] += 1  # Increment class count

        print("Class counts:", self.class_counts)  # Print class counts

    def __len__(self):
        return len(self.images)
    
    
    
    def __getitem__(self, idx):
        img_path, label = self.images[idx]
        img = read_image(img_path).float()  # Convert the images to float
        img = img.repeat(3, 1, 1)  # Convert the images to 3 channels
        if self.transform:
            img = self.transform(img)
        if self.target_transform:
            label = self.target_transform(label)
            print(label)
        # Convert label to tensor
        label = torch.tensor(self.classes.index(label))  # Convert class name to class index
        return img, label


    

    def __getitemImg__(self, idx):
        img_path, label = self.images[idx]
        img = read_image(img_path).float()  # Convert the images to float
        img = ToPILImage()(img)  # Convert the tensor to a PIL Image
        if self.transform:
            img = self.transform(img)
        if self.target_transform:
            label = self.target_transform(label)
        return img, label

    
    def show_first_images(self, num_images=5):
        fig = plt.figure(figsize=(10, num_images * len(self.classes)))
        for i, class_ in enumerate(self.classes):
            class_images = [img for img, label in self.images if label == class_]
            for j in range(num_images):
                img = read_image(class_images[j]).float()
                img = ToPILImage()(img)
                if self.transform:
                    img = self.transform(img)
                ax = fig.add_subplot(len(self.classes), num_images, i * num_images + j + 1)
                ax.imshow(img, cmap='gray')
                ax.set_title(class_)
                ax.axis('off')
        plt.show()


In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize the images to 224 x 224
    transforms.Normalize(mean=[0.5], std=[0.5])  # Normalize the images
])

In [None]:
dataset = CustomImageDataset(img_dir='./DataSetkaggle')

In [None]:
dataset.__len__()

In [None]:
dataset.show_first_images()

In [None]:
dataset = CustomImageDataset(img_dir='./DataSetkaggle', transform=transform)

In [None]:
# Split the dataset into training and validation sets
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
print("Training Size : ",train_size," Validation Size : ",val_size)
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

In [None]:
train_dataloader = DataLoader(train_dataset, batch_size=BATCHSIZE, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=BATCHSIZE)

In [None]:
def get_models(name):
    if name.lower() == 'resnet50':
        return models.resnet50(weights=True)
    elif name.lower() == 'vgg19':
        return models.vgg19_bn(weights=True)
    elif name.lower() == 'densenet121':
        return models.densenet121(weights=True)
    elif name.lower() == 'googlenet':
        return models.googlenet(weights=True)
    elif name.lower() == 'mobilenet':
        return models.mobilenet_v3_large(weights=True)
    else:
        raise ValueError(f'Model {name} not found')


In [None]:
# Concatenate the feature extraction layers
class Concatenate(nn.Module):
    def __init__(self, model1, model2):
        super(Concatenate, self).__init__()
        self.model1 = model1
        self.model2 = model2

    def forward(self, x):
        x1 = self.model1(x)
        #print(f"Shape of ResNet50 output: {x1.shape}")
        x2 = self.model2(x)
        #print(f"Shape of VGG19 output: {x2.shape}")
        x1 = x1.view(x1.size(0), -1)
        x2 = x2.view(x2.size(0), -1)
        return torch.cat((x1, x2), dim=1)
    
# Combine into a new model
class fusedModel(nn.Module):
    def __init__(self, concatenated, output):
        super(fusedModel, self).__init__()
        self.concatenated = concatenated
        self.output = output

    def forward(self, x):
        x = self.concatenated(x)
        #print(f"Shape of concatenated output: {x.shape}")
        x = self.output(x)
        return x
    

In [None]:
model1 = get_models('mobilenet')
model2 = get_models('googlenet')
model1Name = model1.__class__.__name__
model2Name = model2.__class__.__name__
print(model1Name,model2Name)
print(model1.parameters,"\n\n\n",model2.parameters)


In [None]:
# Remove output layers :
#model1 = nn.Sequential(*list(model1.children())[:-1])
#model2 = nn.Sequential(*list(model2.children())[:-1])
print(model1.parameters,"\n\n\n",model2.parameters)

In [None]:
sizeMapping = {'resnet50': RESNETSIZE, 'googlenet': GOOGLENETSIZE, 'densenet121': DENSENETSIZE, 'vgg19': VGG19SIZE}
print(sizeMapping)

In [None]:
concatenated = Concatenate(model1,model2) 

In [None]:
# Add a new output layer
output = nn.Sequential(
    nn.Linear(2000, 100),  # Adjusted the input size here
    nn.ReLU(),
    nn.Linear(100, 3),
    nn.Softmax(dim=1)
)

In [None]:
import matplotlib.pyplot as plt
import os
from sklearn.metrics import confusion_matrix
import seaborn as sns

def train_model(model, dataloader, optimizer, criterion, num_epochs=25, patience=5,model_name_Input = None):
    best_loss = float('inf')
    no_improvement_count = 0
    model_name = model_name_Input  # Get the name of the model
    os.makedirs(model_name, exist_ok=True)  # Create a directory named after the model

    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)

        running_loss = 0.0
        running_corrects = 0
        running_preds = []
        running_labels = []
        losses = []
        accuracies = []

        # Iterate over data
        for i, (inputs, labels) in enumerate(dataloader):
            inputs = inputs.to(device)
            labels = labels.to(device)
            

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            #outputs = model(inputs)
            try:
                outputs = model(inputs)
            except RuntimeError as e:
                print(f"Error: {e}")
                print(f"Input shape to the linear layer: {inputs.shape}")
                raise e

            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            # Print iteration results
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
            running_preds.extend(preds.cpu().numpy())
            running_labels.extend(labels.data.cpu().numpy())
            losses.append(loss.item())
            accuracies.append((torch.sum(preds == labels.data).double() / inputs.size(0)).item())
            if i % 10 == 0:  # Print every 10 batches
                print(f'Batch {i} Loss: {loss.item():.4f} Acc: {torch.sum(preds == labels.data).double() / inputs.size(0):.4f}')

        # Plot loss and accuracy vs batch
        plt.figure(figsize=(12, 4))
        plt.subplot(121)
        plt.plot(losses)
        plt.title('Loss vs Batch')
        plt.subplot(122)
        plt.plot(accuracies)
        plt.title('Accuracy vs Batch')
        plt.savefig(f'{model_name}/epoch_{epoch+1}_loss_accuracy.png')
        plt.close()

        # Print epoch results
        epoch_loss = running_loss / len(dataloader.dataset)
        epoch_acc = running_corrects.double() / len(dataloader.dataset)
        epoch_precision = precision_score(running_labels, running_preds, average='macro')
        epoch_recall = recall_score(running_labels, running_preds, average='macro')
        epoch_f1 = f1_score(running_labels, running_preds, average='macro')
        print(f'Epoch Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
        print(f'Epoch Precision: {epoch_precision:.4f} Recall: {epoch_recall:.4f} F1-score: {epoch_f1:.4f}\n')

        # Plot confusion matrix
        cm = confusion_matrix(running_labels, running_preds)
        plt.figure(figsize=(10, 10))
        sns.heatmap(cm, annot=True, fmt='d')
        plt.title('Confusion Matrix')
        plt.savefig(f'{model_name}/epoch_{epoch+1}_confusion_matrix.png')
        plt.close()

        # Check for early stopping
        if epoch_loss < best_loss:
            best_loss = epoch_loss
            no_improvement_count = 0
        else:
            no_improvement_count += 1
            if no_improvement_count >= patience:
                print(f'Early stopping after {epoch+1} epochs without improvement.')
                return model

    return model

In [None]:
def create_fused_model(concatenated,output):
    fusedmodel = fusedModel(concatenated, output)
    return fusedmodel

In [None]:
#fusedModel = fusedModel(concatenated, output)
fusedModel = create_fused_model(concatenated=concatenated,output=output)


In [None]:
fusedModel = fusedModel.to(device)
criterion = CrossEntropyLoss()
optimizer_fusedModel = optim.SGD(fusedModel.parameters(), lr=0.0001, momentum=0.9)

In [None]:
fusedModel = train_model(fusedModel, train_dataloader, optimizer_fusedModel, criterion, num_epochs=EPOCHS,model_name_Input=model1Name+"_"+model2Name)

In [None]:
#torch.save(fusedModel.state_dict(),f'/home/krishnatejaswis/Files/VSCode/LungScan/fusedModels/{model1Name+"_"+model2Name+"_Test"}.pth')

import dill

with open(f'/home/krishnatejaswis/Files/VSCode/LungScan/fusedModels/{model1Name+"_"+model2Name}.pth', 'wb') as f:
    dill.dump(fusedModel, f)
