In [12]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import numpy as np

# Set data directories
train_data_dir = 'Dataset/augmented_training'
val_data_dir = 'Dataset/validation'

# ImageDataGenerator for training and validation sets
train_datagen = ImageDataGenerator(rescale=1./255)
val_datagen = ImageDataGenerator(rescale=1./255)

# Create the data generators
train_generator = train_datagen.flow_from_directory(
    train_data_dir,
    target_size=(224, 224),
    batch_size=32,
    class_mode=None,  # No labels for the encoder
    shuffle=True
)

validation_generator = val_datagen.flow_from_directory(
    val_data_dir,
    target_size=(224, 224),
    batch_size=32,
    class_mode=None,  # No labels for the encoder
    shuffle=False
)

# Define the encoder model
input_img = layers.Input(shape=(224, 224, 3))

# Encoder (Feature extraction)
x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(input_img)
x = layers.MaxPooling2D((2, 2), padding='same')(x)
x = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x)
x = layers.MaxPooling2D((2, 2), padding='same')(x)
x = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(x)
x = layers.MaxPooling2D((2, 2), padding='same')(x)
x = layers.Conv2D(512, (3, 3), activation='relu', padding='same')(x)
x = layers.MaxPooling2D((2, 2), padding='same')(x)

# Latent Space
latent_space = layers.Flatten()(x)
latent_space = layers.Dense(512, activation='relu')(latent_space)  # Latent space size of 512

# Create the encoder model
encoder = models.Model(input_img, latent_space)

# Compile the encoder model
encoder.compile(optimizer='adam', loss='mean_squared_error')

# Show the model architecture
encoder.summary()

# Custom data generator for training
def custom_data_generator(generator):
    while True:
        batch_x = next(generator)  # Get the next batch of images
        batch_y = np.zeros((batch_x.shape[0], 512))  # Dummy labels with shape (batch_size, 512)
        yield batch_x, batch_y  # Yield the images and dummy labels

# Train the encoder using the custom generator
history_encoder = encoder.fit(
    custom_data_generator(train_generator),
    steps_per_epoch=train_generator.samples // train_generator.batch_size,
    epochs=50,
    validation_data=custom_data_generator(validation_generator),
    validation_steps=validation_generator.samples // validation_generator.batch_size,
    verbose=1
)

# Print losses for the encoder
print("Encoder Training Losses:", history_encoder.history['loss'])
print("Encoder Validation Losses:", history_encoder.history['val_loss'])


Found 96163 images belonging to 10 classes.
Found 16132 images belonging to 10 classes.


Epoch 1/50
[1m3005/3005[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3163s[0m 1s/step - loss: 1.5594e-06 - val_loss: 0.0000e+00
Epoch 2/50
[1m3005/3005[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3153s[0m 1s/step - loss: 0.0000e+00 - val_loss: 0.0000e+00
Epoch 3/50
[1m3005/3005[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3144s[0m 1s/step - loss: 0.0000e+00 - val_loss: 0.0000e+00
Epoch 4/50
[1m1899/3005[0m [32m━━━━━━━━━━━━[0m[37m━━━━━━━━[0m [1m18:26[0m 1s/step - loss: 0.0000e+00

KeyboardInterrupt: 

In [14]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader

# Hyperparameters
batch_size = 32
num_epochs = 50
latent_space_size = 1024  # You can adjust this if needed

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Data transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Dataset paths
train_dir = 'Dataset/augmented_training'  # Replace with your training directory path
val_dir = 'Dataset/validation'      # Replace with your validation directory path

# Datasets
train_dataset = datasets.ImageFolder(root=train_dir, transform=transform)
val_dataset = datasets.ImageFolder(root=val_dir, transform=transform)

# Data loaders
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(dataset=val_dataset, batch_size=batch_size, shuffle=False)

# ResNet Encoder
class ResNetEncoder(nn.Module):
    def __init__(self):
        super(ResNetEncoder, self).__init__()
        self.resnet = models.resnet50(pretrained=True)
        # Remove the fully connected layer
        self.resnet = nn.Sequential(*(list(self.resnet.children())[:-1]))
        # Use the number of features from resnet.fc
        num_features = models.resnet50().fc.in_features
        self.fc = nn.Linear(num_features, latent_space_size)

    def forward(self, x):
        with torch.no_grad():
            x = self.resnet(x)
        x = x.view(x.size(0), -1)  # Flatten the output
        x = self.fc(x)              # Map to latent space
        return x

# Instantiate model, loss function, and optimizer
encoder = ResNetEncoder().to(device)
criterion = nn.MSELoss()  # Dummy loss since we're not using true labels
optimizer = optim.Adam(encoder.parameters(), lr=0.001)

# Training loop
for epoch in range(num_epochs):
    encoder.train()
    total_loss = 0

    for images, _ in train_loader:
        images = images.to(device)

        optimizer.zero_grad()
        latent_space = encoder(images)  # Get latent space
        loss = criterion(latent_space, torch.zeros_like(latent_space))  # Dummy target
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(train_loader)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.6f}')

    # Validation loop
    encoder.eval()
    with torch.no_grad():
        val_loss = 0
        for images, _ in val_loader:
            images = images.to(device)
            latent_space = encoder(images)  # Get latent space
            loss = criterion(latent_space, torch.zeros_like(latent_space))  # Dummy target
            val_loss += loss.item()

        avg_val_loss = val_loss / len(val_loader)
        print(f'Validation Loss: {avg_val_loss:.6f}')


Epoch [1/50], Loss: 0.002775
Validation Loss: 0.001146
Epoch [2/50], Loss: 0.000725
Validation Loss: 0.000860
Epoch [3/50], Loss: 0.000644
Validation Loss: 0.000420
Epoch [4/50], Loss: 0.000634
Validation Loss: 0.001048
Epoch [5/50], Loss: 0.000633
Validation Loss: 0.000644
Epoch [6/50], Loss: 0.000631
Validation Loss: 0.001204
Epoch [7/50], Loss: 0.000630
Validation Loss: 0.000562
Epoch [8/50], Loss: 0.000631
Validation Loss: 0.001536
Epoch [9/50], Loss: 0.000632
Validation Loss: 0.000313
Epoch [10/50], Loss: 0.000630
Validation Loss: 0.001348
Epoch [11/50], Loss: 0.000631
Validation Loss: 0.000690
Epoch [12/50], Loss: 0.000635
Validation Loss: 0.001239
Epoch [13/50], Loss: 0.000629
Validation Loss: 0.000424
Epoch [14/50], Loss: 0.000629
Validation Loss: 0.000545
Epoch [15/50], Loss: 0.000632
Validation Loss: 0.001293
Epoch [16/50], Loss: 0.000633
Validation Loss: 0.000869
Epoch [17/50], Loss: 0.000631
Validation Loss: 0.000486
Epoch [18/50], Loss: 0.000632
Validation Loss: 0.000457
E

In [17]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

# Encoder Model with 8 Convolutional Layers and latent space of size 1024
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.conv_layers = nn.Sequential(
            # First Convolution Block
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # 224 -> 112

            # Second Convolution Block
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # 112 -> 56

            # Third Convolution Block
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # 56 -> 28

            # Fourth Convolution Block
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # 28 -> 14

            # Fifth Convolution Block
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # 14 -> 7

            # Sixth Convolution Block
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            # No pooling to avoid shrinking the dimensions too much

            # Seventh Convolution Block
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),

            # Eighth Convolution Block
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)  # 7 -> 3 (final pooling)
        )

        # Fully connected layer to generate latent space of size 1024
        self.fc = nn.Linear(512 * 3 * 3, 1024)  # Adapt to final size after convs

    def forward(self, x):
        x = self.conv_layers(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

# Data Loading
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Ensure consistent input size
    transforms.ToTensor(),
])

train_data = datasets.ImageFolder(root='Dataset/augmented_training', transform=transform)
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)

val_data = datasets.ImageFolder(root='Dataset/validation', transform=transform)
val_loader = DataLoader(val_data, batch_size=32, shuffle=False)

# Training and Validation
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

encoder = Encoder().to(device)
optimizer = optim.Adam(encoder.parameters(), lr=0.001)
criterion = nn.MSELoss()

# Training Loop
def train_encoder(model, train_loader, val_loader, epochs):
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, _ in train_loader:
            inputs = inputs.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, torch.zeros(outputs.size()).to(device))
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        epoch_loss = running_loss / len(train_loader)
        print(f"Epoch [{epoch+1}/{epochs}], Training Loss: {epoch_loss:.4f}")

        # Validation
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for inputs, _ in val_loader:
                inputs = inputs.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, torch.zeros(outputs.size()).to(device))
                val_loss += loss.item()
        val_loss /= len(val_loader)
        print(f"Epoch [{epoch+1}/{epochs}], Validation Loss: {val_loss:.4f}")

# Start training
train_encoder(encoder, train_loader, val_loader, epochs=50)


Epoch [1/50], Training Loss: 0.0023
Epoch [1/50], Validation Loss: 0.0027
Epoch [2/50], Training Loss: 0.0000
Epoch [2/50], Validation Loss: 0.0000
Epoch [3/50], Training Loss: 0.0000
Epoch [3/50], Validation Loss: 0.0000
Epoch [4/50], Training Loss: 0.0000
Epoch [4/50], Validation Loss: 0.0000
Epoch [5/50], Training Loss: 0.0000
Epoch [5/50], Validation Loss: 0.0000
Epoch [6/50], Training Loss: 0.0000
Epoch [6/50], Validation Loss: 0.0000
Epoch [7/50], Training Loss: 0.0000
Epoch [7/50], Validation Loss: 0.0000
Epoch [8/50], Training Loss: 0.0000
Epoch [8/50], Validation Loss: 0.0000
Epoch [9/50], Training Loss: 0.0000
Epoch [9/50], Validation Loss: 0.0000
Epoch [10/50], Training Loss: 0.0000
Epoch [10/50], Validation Loss: 0.0000
Epoch [11/50], Training Loss: 0.0000
Epoch [11/50], Validation Loss: 0.0000
Epoch [12/50], Training Loss: 0.0000
Epoch [12/50], Validation Loss: 0.0000
Epoch [13/50], Training Loss: 0.0000
Epoch [13/50], Validation Loss: 0.0000
Epoch [14/50], Training Loss: 

KeyboardInterrupt: 

In [20]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

class Encoder(nn.Module):
    def __init__(self, input_channels=3, latent_dim=10):
        super(Encoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(input_channels, 32, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(256 * 14 * 14, 512),
            nn.ReLU(),
            nn.Linear(512, latent_dim)
        )
        
    def forward(self, x):
        return self.encoder(x)

latent_dim = 10
learning_rate = 0.001
num_epochs = 50

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

train_data = datasets.ImageFolder(root='Dataset/augmented_training', transform=transform)
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
val_data = datasets.ImageFolder(root='Dataset/validation', transform=transform)
val_loader = DataLoader(val_data, batch_size=32, shuffle=False)
print("Dataset successfully loaded")

encoder = Encoder(input_channels=3, latent_dim=latent_dim)

criterion = nn.MSELoss()
optimizer = optim.Adam(encoder.parameters(), lr=learning_rate)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
encoder.to(device)

for epoch in range(num_epochs):
    encoder.train()
    train_loss = 0
    for batch in train_loader:
        inputs, _ = batch
        inputs = inputs.to(device)

        optimizer.zero_grad()
        latent = encoder(inputs)
        loss = criterion(latent, torch.zeros_like(latent))
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    avg_train_loss = train_loss / len(train_loader)

    encoder.eval()
    val_loss = 0
    with torch.no_grad():
        for batch in val_loader:
            inputs, _ = batch
            inputs = inputs.to(device)

            latent = encoder(inputs)
            loss = criterion(latent, torch.zeros_like(latent))

            val_loss += loss.item()

    avg_val_loss = val_loss / len(val_loader)

    print(f"Epoch [{epoch+1}/{num_epochs}], "
          f"Training Loss: {avg_train_loss:.4f}, "
          f"Validation Loss: {avg_val_loss:.4f}")

print("Training completed!")

torch.save(encoder.state_dict(), 'encoder_model.pth')
print("Model saved as 'encoder_model.pth'")

Dataset successfully loaded
Epoch [1/50], Training Loss: 0.0005, Validation Loss: 0.0000
Epoch [2/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [3/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [4/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [5/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [6/50], Training Loss: 0.0000, Validation Loss: 0.0000


KeyboardInterrupt: 

In [22]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

class Encoder(nn.Module):
    def __init__(self, input_channels=3, latent_dim=10):
        super(Encoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(input_channels, 32, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.2),  # LeakyReLU instead of ReLU
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.2),
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.2),
            nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.2),
            nn.Flatten(),
            nn.Linear(256 * 14 * 14, 512),
            nn.LeakyReLU(0.2),
            nn.Linear(512, latent_dim)
        )
        
    def forward(self, x):
        return self.encoder(x)

# Hyperparameters
latent_dim = 10
learning_rate = 0.001
num_epochs = 50

# Data transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Datasets and DataLoaders
train_data = datasets.ImageFolder(root='Dataset/augmented_training', transform=transform)
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
val_data = datasets.ImageFolder(root='Dataset/validation', transform=transform)
val_loader = DataLoader(val_data, batch_size=32, shuffle=False)

print("Dataset successfully loaded")

# Model, Loss function, and Optimizer
encoder = Encoder(input_channels=3, latent_dim=latent_dim)
criterion = nn.MSELoss()
optimizer = optim.Adam(encoder.parameters(), lr=learning_rate)

# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
encoder.to(device)

# Training Loop
for epoch in range(num_epochs):
    encoder.train()
    train_loss = 0
    for batch in train_loader:
        inputs, _ = batch
        inputs = inputs.to(device)

        optimizer.zero_grad()
        latent = encoder(inputs)
        loss = criterion(latent, torch.zeros_like(latent))  # MSE with zero target
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    avg_train_loss = train_loss / len(train_loader)

    # Validation Phase
    encoder.eval()
    val_loss = 0
    with torch.no_grad():
        for batch in val_loader:
            inputs, _ = batch
            inputs = inputs.to(device)

            latent = encoder(inputs)
            loss = criterion(latent, torch.zeros_like(latent))

            val_loss += loss.item()

    avg_val_loss = val_loss / len(val_loader)

    print(f"Epoch [{epoch+1}/{num_epochs}], "
          f"Training Loss: {avg_train_loss:.4f}, "
          f"Validation Loss: {avg_val_loss:.4f}")

# Save the model
torch.save(encoder.state_dict(), 'encoder_model.pth')
print("Model saved as 'encoder_model.pth'")


Dataset successfully loaded
Epoch [1/50], Training Loss: 0.0018, Validation Loss: 0.0000
Epoch [2/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [3/50], Training Loss: 8.9115, Validation Loss: 0.0588
Epoch [4/50], Training Loss: 0.0258, Validation Loss: 0.0038
Epoch [5/50], Training Loss: 0.0061, Validation Loss: 0.0063
Epoch [6/50], Training Loss: 0.0041, Validation Loss: 0.0008
Epoch [7/50], Training Loss: 28.4725, Validation Loss: 0.0635
Epoch [8/50], Training Loss: 0.0460, Validation Loss: 0.0154
Epoch [9/50], Training Loss: 0.0819, Validation Loss: 0.0037
Epoch [10/50], Training Loss: 28.6809, Validation Loss: 2.8777
Epoch [11/50], Training Loss: 7.4135, Validation Loss: 0.0906
Epoch [12/50], Training Loss: 0.0461, Validation Loss: 0.1116
Epoch [13/50], Training Loss: 506.8120, Validation Loss: 307.3792
Epoch [14/50], Training Loss: 5.0674, Validation Loss: 0.6708
Epoch [15/50], Training Loss: 2.3500, Validation Loss: 0.4614
Epoch [16/50], Training Loss: 0.6360, Validat

KeyboardInterrupt: 

In [23]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

class Encoder(nn.Module):
    def __init__(self, input_channels=3, latent_dim=10):
        super(Encoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(input_channels, 32, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.2),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.2),
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.2),
            nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.2),
            nn.Flatten(),
            nn.Linear(256 * 14 * 14, 512),
            nn.LeakyReLU(0.2),
            nn.Linear(512, latent_dim)
        )
        self.apply(self._init_weights)  # Apply weight initialization

    def forward(self, x):
        return self.encoder(x)
    
    def _init_weights(self, m):
        if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
            nn.init.xavier_uniform_(m.weight)
            if m.bias is not None:
                nn.init.zeros_(m.bias)

# Hyperparameters
latent_dim = 10
learning_rate = 0.0001  # Reduced learning rate
num_epochs = 50

# Data transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Datasets and DataLoaders
train_data = datasets.ImageFolder(root='Dataset/augmented_training', transform=transform)
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)
val_data = datasets.ImageFolder(root='Dataset/validation', transform=transform)
val_loader = DataLoader(val_data, batch_size=64, shuffle=False)

print("Dataset successfully loaded")

# Model, Loss function, and Optimizer
encoder = Encoder(input_channels=3, latent_dim=latent_dim)
criterion = nn.MSELoss()
optimizer = optim.Adam(encoder.parameters(), lr=learning_rate)

# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
encoder.to(device)

# Training Loop
for epoch in range(num_epochs):
    encoder.train()
    train_loss = 0
    for batch in train_loader:
        inputs, _ = batch
        inputs = inputs.to(device)

        optimizer.zero_grad()
        latent = encoder(inputs)
        loss = criterion(latent, torch.zeros_like(latent))  # MSE with zero target
        loss.backward()

        # Clip gradients to prevent gradient explosion
        torch.nn.utils.clip_grad_norm_(encoder.parameters(), max_norm=1.0)

        optimizer.step()

        train_loss += loss.item()

    avg_train_loss = train_loss / len(train_loader)

    # Validation Phase
    encoder.eval()
    val_loss = 0
    with torch.no_grad():
        for batch in val_loader:
            inputs, _ = batch
            inputs = inputs.to(device)

            latent = encoder(inputs)
            loss = criterion(latent, torch.zeros_like(latent))

            val_loss += loss.item()

    avg_val_loss = val_loss / len(val_loader)

    print(f"Epoch [{epoch+1}/{num_epochs}], "
          f"Training Loss: {avg_train_loss:.4f}, "
          f"Validation Loss: {avg_val_loss:.4f}")

# Save the model
torch.save(encoder.state_dict(), 'encoder_model.pth')
print("Model saved as 'encoder_model.pth'")


Dataset successfully loaded
Epoch [1/50], Training Loss: 0.0002, Validation Loss: 0.0000
Epoch [2/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [3/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [4/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [5/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [6/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [7/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [8/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [9/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [10/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [11/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [12/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [13/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [14/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [15/50], Training Loss: 0.0000, Validation Loss: 0.0000
Epoch [16/50], Training Loss: 0.0000, Validation Lo

In [25]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import torch.nn.functional as F

# Encoder Model with 8 Convolutional Layers and latent space of size 1024
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.conv_layers = nn.Sequential(
            # First Convolution Block
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # 224 -> 112

            # Second Convolution Block
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # 112 -> 56

            # Third Convolution Block
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # 56 -> 28

            # Fourth Convolution Block
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # 28 -> 14

            # Fifth Convolution Block
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # 14 -> 7

            # Sixth Convolution Block
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),

            # Seventh Convolution Block
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),

            # Eighth Convolution Block
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # 7 -> 3 (final pooling)

            # Dropout for regularization
            nn.Dropout(0.2)
        )

        # Fully connected layer to generate latent space of size 1024
        self.fc = nn.Linear(512 * 3 * 3, 1024)  # Adjust to final size after conv layers

    def forward(self, x):
        x = self.conv_layers(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

# Custom weight initialization function
def init_weights(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        nn.init.kaiming_normal_(m.weight)

# Data Loading
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Ensure consistent input size
    transforms.ToTensor(),
])

train_data = datasets.ImageFolder(root='Dataset/augmented_training', transform=transform)
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)

val_data = datasets.ImageFolder(root='Dataset/validation', transform=transform)
val_loader = DataLoader(val_data, batch_size=32, shuffle=False)

# Training and Validation
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

encoder = Encoder().to(device)
encoder.apply(init_weights)  # Apply weight initialization

# Using a smaller learning rate and weight decay for regularization
optimizer = optim.Adam(encoder.parameters(), lr=1e-5, weight_decay=1e-5)
criterion = nn.MSELoss()  # Keep MSELoss as per your request

# Training Loop
def train_encoder(model, train_loader, val_loader, epochs):
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, _ in train_loader:
            inputs = inputs.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, torch.zeros(outputs.size()).to(device))
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        epoch_loss = running_loss / len(train_loader)
        print(f"Epoch [{epoch+1}/{epochs}], Training Loss: {epoch_loss:.6f}")

        # Validation
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for inputs, _ in val_loader:
                inputs = inputs.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, torch.zeros(outputs.size()).to(device))
                val_loss += loss.item()
        val_loss /= len(val_loader)
        print(f"Epoch [{epoch+1}/{epochs}], Validation Loss: {val_loss:.6f}")

# Start training
train_encoder(encoder, train_loader, val_loader, epochs=50)


Epoch [1/50], Training Loss: 0.002909
Epoch [1/50], Validation Loss: 0.000001
Epoch [2/50], Training Loss: 0.000002
Epoch [2/50], Validation Loss: 0.000000


KeyboardInterrupt: 

In [26]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
import numpy as np
import matplotlib.pyplot as plt
import os

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define transformations for training and validation data
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Load dataset
train_dataset = datasets.ImageFolder('Dataset/augmented_training', transform=transform)
val_dataset = datasets.ImageFolder('Dataset/validation', transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# Define Encoder using ResNet50 without fully connected layer
class ResNetEncoder(nn.Module):
    def __init__(self, latent_space_size=1024):
        super(ResNetEncoder, self).__init__()
        self.resnet = models.resnet50(pretrained=True)
        # Remove the fully connected layer
        self.resnet = nn.Sequential(*(list(self.resnet.children())[:-1]))
        # Add linear layer for the latent space
        self.fc = nn.Linear(2048, latent_space_size)

    def forward(self, x):
        x = self.resnet(x)
        x = x.view(x.size(0), -1)  # Flatten
        latent = self.fc(x)
        return latent

# Define Decoder
class Decoder(nn.Module):
    def __init__(self, latent_space_size=1024):
        super(Decoder, self).__init__()
        self.fc = nn.Linear(latent_space_size, 2048 * 7 * 7)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(2048, 1024, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(1024),
            nn.ReLU(True),
            nn.ConvTranspose2d(1024, 512, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(True),
            nn.ConvTranspose2d(512, 256, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(True),
            nn.ConvTranspose2d(256, 128, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(True),
            nn.ConvTranspose2d(128, 3, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid()  # Use Sigmoid to normalize pixel values between [0,1]
        )

    def forward(self, x):
        x = self.fc(x)
        x = x.view(x.size(0), 2048, 7, 7)
        reconstructed = self.decoder(x)
        return reconstructed

# Combine Encoder and Decoder into an Autoencoder
class Autoencoder(nn.Module):
    def __init__(self, latent_space_size=1024):
        super(Autoencoder, self).__init__()
        self.encoder = ResNetEncoder(latent_space_size)
        self.decoder = Decoder(latent_space_size)

    def forward(self, x):
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)
        return latent, reconstructed

# Instantiate model, loss function, and optimizer
latent_space_size = 1024
autoencoder = Autoencoder(latent_space_size).to(device)
criterion = nn.MSELoss()  # Mean Squared Error Loss
optimizer = optim.Adam(autoencoder.parameters(), lr=0.001)

# Training function
def train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=50):
    model.train()
    
    for epoch in range(num_epochs):
        train_loss = 0.0
        val_loss = 0.0
        
        # Training loop
        for images, _ in train_loader:
            images = images.to(device)
            optimizer.zero_grad()
            latent, reconstructed = model(images)
            loss = criterion(reconstructed, images)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * images.size(0)

        # Validation loop
        model.eval()
        with torch.no_grad():
            for images, _ in val_loader:
                images = images.to(device)
                latent, reconstructed = model(images)
                loss = criterion(reconstructed, images)
                val_loss += loss.item() * images.size(0)
        
        # Print losses
        train_loss = train_loss / len(train_loader.dataset)
        val_loss = val_loss / len(val_loader.dataset)
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.6f}, Validation Loss: {val_loss:.6f}')

# Train the model
train_model(autoencoder, criterion, optimizer, train_loader, val_loader, num_epochs=50)

# Function to save latent space and reconstructed images
def save_latent_and_reconstructed(model, data_loader, output_dir='output_images', save_all=True):
    model.eval()  # Set the model to evaluation mode

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    with torch.no_grad():
        for i, (images, _) in enumerate(data_loader):
            images = images.to(device)

            # Forward pass to get latent space and reconstructed images
            latent, reconstructed = model(images)

            # Save the latent space representations
            latent_np = latent.cpu().numpy()
            np.save(os.path.join(output_dir, f'latent_space_{i}.npy'), latent_np)

            # Save reconstructed images
            reconstructed_images = reconstructed.cpu().numpy().transpose(0, 2, 3, 1)  # Convert to (N, H, W, C)
            reconstructed_images = np.clip(reconstructed_images, 0, 1)  # Ensure pixel values are within [0, 1]
            
            for j in range(reconstructed_images.shape[0]):
                plt.imsave(os.path.join(output_dir, f'reconstructed_image_{i}_{j}.png'), reconstructed_images[j])

    print(f"Latent space and reconstructed images saved in '{output_dir}'")

# Save latent space and reconstructed images from the validation set
save_latent_and_reconstructed(autoencoder, val_loader, output_dir='output_images', save_all=True)




Epoch [1/50], Train Loss: 0.015895, Validation Loss: 0.005262
Epoch [2/50], Train Loss: 0.015692, Validation Loss: 0.004495
Epoch [3/50], Train Loss: 0.007316, Validation Loss: 0.003436
Epoch [4/50], Train Loss: 0.006209, Validation Loss: 0.002929
Epoch [5/50], Train Loss: 0.005646, Validation Loss: 0.003540
Epoch [6/50], Train Loss: 0.005366, Validation Loss: 0.002551
Epoch [7/50], Train Loss: 0.005169, Validation Loss: 0.002349
Epoch [8/50], Train Loss: 0.004983, Validation Loss: 0.002284
Epoch [9/50], Train Loss: 0.004857, Validation Loss: 0.002161
Epoch [10/50], Train Loss: 0.004760, Validation Loss: 0.002149
Epoch [11/50], Train Loss: 0.004678, Validation Loss: 0.002155
Epoch [12/50], Train Loss: 0.004612, Validation Loss: 0.002063
Epoch [13/50], Train Loss: 0.004536, Validation Loss: 0.001976
Epoch [14/50], Train Loss: 0.004474, Validation Loss: 0.001985
Epoch [15/50], Train Loss: 0.004419, Validation Loss: 0.001913
Epoch [16/50], Train Loss: 0.004377, Validation Loss: 0.001863
E

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision.io import read_image
from torchvision import transforms
import numpy as np

# Model definition
class CBAMModel(nn.Module):
    def __init__(self, latent_dim, cbam_block):
        super(CBAMModel, self).__init__()
        self.channel_adjust = nn.Conv2d(latent_dim, 3, kernel_size=1)
        self.cbam = cbam_block

    def forward(self, latent_space, reconstructed_images):
        # Ensure both tensors have the same batch size
        if latent_space.size(0) != reconstructed_images.size(0):
            raise RuntimeError("Batch size mismatch between latent space and reconstructed images.")
        
        # Resize latent space to match the spatial dimensions of reconstructed images
        latent_space = F.interpolate(latent_space.unsqueeze(-1).unsqueeze(-1), size=reconstructed_images.shape[2:], mode='bilinear', align_corners=False)
        
        # Adjust latent space channels to match image channels (3 for RGB)
        latent_space = self.channel_adjust(latent_space)
        
        # Concatenate along the channel dimension
        combined_input = torch.cat((latent_space, reconstructed_images), dim=1)
        output = self.cbam(combined_input)
        return output

# CBAM block definition (simplified for demonstration)
class CBAMBlock(nn.Module):
    def __init__(self, channels):
        super(CBAMBlock, self).__init__()
        self.conv1 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.conv2(x)
        return x

# Load tensor or latent space
def load_tensor_from_path(path, transform=None, is_latent=False):
    try:
        if is_latent:
            # For latent space, use numpy.load and convert to a torch tensor
            np_array = np.load(path)
            tensor = torch.tensor(np_array, dtype=torch.float32)  # Convert to tensor
        else:
            image = read_image(path).float() / 255.0  # Normalize image
            if transform:
                tensor = transform(image)
            else:
                tensor = image
        return tensor
    except Exception as e:
        print(f"Error loading file {path}: {e}")
        return None  # Return None if there is an error loading the file

# Paths
latent_space_path = 'latentspace'  # Path to latent space files (in .npy format)
reconstructed_images_path = 'output_images'  # Path to reconstructed images
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load filenames
latent_space_files = sorted(os.listdir(latent_space_path))
reconstructed_image_files = sorted(os.listdir(reconstructed_images_path))

# Check if paths have equal number of items
assert len(latent_space_files) == len(reconstructed_image_files), "Mismatch in latent and image files."

# Initialize Model, Loss, Optimizer
latent_dim = 1024  # Latent space size
cbam_block = CBAMBlock(channels=6)  # 3 channels for latent, 3 for reconstructed, so 6 total channels
model = CBAMModel(latent_dim, cbam_block).to(device)

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Training loop
transform = transforms.Resize((224, 224))  # Resize images to 224x224 if needed
num_epochs = 10

for epoch in range(num_epochs):
    running_loss = 0.0
    for i in range(len(latent_space_files)):
        latent_file = latent_space_files[i]
        recon_file = reconstructed_image_files[i]

        latent_space = load_tensor_from_path(os.path.join(latent_space_path, latent_file), is_latent=True).unsqueeze(0).to(device)
        reconstructed_images = load_tensor_from_path(os.path.join(reconstructed_images_path, recon_file), transform).unsqueeze(0).to(device)

        # Skip if either tensor is None
        if latent_space is None or reconstructed_images is None:
            print(f"Skipping pair {latent_file} and {recon_file} due to loading error.")
            continue

        # Forward pass
        output = model(latent_space, reconstructed_images)
        loss = criterion(output, reconstructed_images)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(latent_space_files):.4f}")

print("Training finished.")


In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
import numpy as np
import matplotlib.pyplot as plt
import os

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define transformations for training and validation data
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Load dataset
train_dataset = datasets.ImageFolder('Dataset/augmented_training', transform=transform)
val_dataset = datasets.ImageFolder('Dataset/validation', transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# Define Encoder using ResNet50 without fully connected layer
class ResNetEncoder(nn.Module):
    def __init__(self, latent_space_size=1024):
        super(ResNetEncoder, self).__init__()
        self.resnet = models.resnet50(pretrained=True)
        # Remove the fully connected layer
        self.resnet = nn.Sequential(*(list(self.resnet.children())[:-1]))
        # Add linear layer for the latent space
        self.fc = nn.Linear(2048, latent_space_size)

    def forward(self, x):
        x = self.resnet(x)
        x = x.view(x.size(0), -1)  # Flatten
        latent = self.fc(x)
        return latent

# Define Decoder
class Decoder(nn.Module):
    def __init__(self, latent_space_size=1024):
        super(Decoder, self).__init__()
        self.fc = nn.Linear(latent_space_size, 2048 * 7 * 7)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(2048, 1024, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(1024),
            nn.ReLU(True),
            nn.ConvTranspose2d(1024, 512, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(True),
            nn.ConvTranspose2d(512, 256, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(True),
            nn.ConvTranspose2d(256, 128, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(True),
            nn.ConvTranspose2d(128, 3, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid()  # Use Sigmoid to normalize pixel values between [0,1]
        )

    def forward(self, x):
        x = self.fc(x)
        x = x.view(x.size(0), 2048, 7, 7)
        reconstructed = self.decoder(x)
        return reconstructed

# Combine Encoder and Decoder into an Autoencoder
class Autoencoder(nn.Module):
    def __init__(self, latent_space_size=1024):
        super(Autoencoder, self).__init__()
        self.encoder = ResNetEncoder(latent_space_size)
        self.decoder = Decoder(latent_space_size)

    def forward(self, x):
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)
        return latent, reconstructed

# Instantiate model, loss function, and optimizer
latent_space_size = 1024
autoencoder = Autoencoder(latent_space_size).to(device)
criterion = nn.MSELoss()  # Mean Squared Error Loss
optimizer = optim.Adam(autoencoder.parameters(), lr=0.001)

# Training function
def train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=50):
    model.train()
    
    for epoch in range(num_epochs):
        train_loss = 0.0
        val_loss = 0.0
        
        # Training loop
        for images, _ in train_loader:
            images = images.to(device)
            optimizer.zero_grad()
            latent, reconstructed = model(images)
            loss = criterion(reconstructed, images)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * images.size(0)

        # Validation loop
        model.eval()
        with torch.no_grad():
            for images, _ in val_loader:
                images = images.to(device)
                latent, reconstructed = model(images)
                loss = criterion(reconstructed, images)
                val_loss += loss.item() * images.size(0)
        
        # Print losses
        train_loss = train_loss / len(train_loader.dataset)
        val_loss = val_loss / len(val_loader.dataset)
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.6f}, Validation Loss: {val_loss:.6f}')

# Train the model
train_model(autoencoder, criterion, optimizer, train_loader, val_loader, num_epochs=50)

# Function to save latent space, reconstructed images, and labels
# Function to save latent space, reconstructed images, and labels
def save_latent_and_reconstructed(model, data_loader):
    model.eval()

    latent_space_dir = 'latent_space'
    output_images_dir = 'output_images'
    os.makedirs(latent_space_dir, exist_ok=True)
    os.makedirs(output_images_dir, exist_ok=True)

    with torch.no_grad():
        for images, labels in data_loader:
            images = images.to(device)
            latent, reconstructed = model(images)

            # Save latent space and labels for each class
            for i in range(images.size(0)):
                class_label = train_dataset.classes[labels[i].item()]  # Get class name

                # Save latent space
                class_latent_dir = os.path.join(latent_space_dir, class_label)
                os.makedirs(class_latent_dir, exist_ok=True)
                latent_np = latent[i].cpu().numpy()
                np.save(os.path.join(class_latent_dir, f'latent_{i}.npy'), latent_np)

                # Save latent label
                latent_label_np = np.array(labels[i].cpu().numpy())
                np.save(os.path.join(class_latent_dir, f'label_{i}.npy'), latent_label_np)

                # Save reconstructed images
                class_dir = os.path.join(output_images_dir, class_label)
                os.makedirs(class_dir, exist_ok=True)
                reconstructed_image = reconstructed[i].cpu().numpy().transpose(1, 2, 0)
                reconstructed_image = np.clip(reconstructed_image, 0, 1)  # Ensure values are between 0 and 1
                plt.imsave(os.path.join(class_dir, f'reconstructed_image_{i}.png'), reconstructed_image)

    print(f"Latent space, labels, and reconstructed images saved in '{latent_space_dir}' and '{output_images_dir}'")



# Save latent space, reconstructed images, and labels from the validation set
save_latent_and_reconstructed(autoencoder, val_loader)




Epoch [1/50], Train Loss: 0.013823, Validation Loss: 0.004844
Epoch [2/50], Train Loss: 0.008752, Validation Loss: 0.003465
Epoch [3/50], Train Loss: 0.006174, Validation Loss: 0.003020
Epoch [4/50], Train Loss: 0.005465, Validation Loss: 0.002523
Epoch [5/50], Train Loss: 0.005082, Validation Loss: 0.002270
Epoch [6/50], Train Loss: 0.004844, Validation Loss: 0.002116
Epoch [7/50], Train Loss: 0.004671, Validation Loss: 0.002020
Epoch [8/50], Train Loss: 0.004546, Validation Loss: 0.001950
Epoch [9/50], Train Loss: 0.004449, Validation Loss: 0.001883
Epoch [10/50], Train Loss: 0.004370, Validation Loss: 0.001842
Epoch [11/50], Train Loss: 0.004308, Validation Loss: 0.001791
Epoch [12/50], Train Loss: 0.004249, Validation Loss: 0.001795
Epoch [13/50], Train Loss: 0.004202, Validation Loss: 0.001729
Epoch [14/50], Train Loss: 0.004158, Validation Loss: 0.001737
Epoch [15/50], Train Loss: 0.004121, Validation Loss: 0.001686
Epoch [16/50], Train Loss: 0.004085, Validation Loss: 0.001686
E

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score, confusion_matrix
import numpy as np

# CBAM Components
class SAM(nn.Module):
    def __init__(self, bias=False):
        super(SAM, self).__init__()
        self.bias = bias
        self.conv = nn.Conv2d(in_channels=2, out_channels=1, kernel_size=7, stride=1, padding=3, bias=self.bias)

    def forward(self, x):
        max = torch.max(x, 1)[0].unsqueeze(1)
        avg = torch.mean(x, 1).unsqueeze(1)
        concat = torch.cat((max, avg), dim=1)
        output = self.conv(concat)
        output = torch.sigmoid(output) * x
        return output

class CAM(nn.Module):
    def __init__(self, channels, r):
        super(CAM, self).__init__()
        self.channels = channels
        self.r = r
        self.linear = nn.Sequential(
            nn.Linear(in_features=self.channels, out_features=self.channels // self.r, bias=True),
            nn.ReLU(inplace=True),
            nn.Linear(in_features=self.channels // self.r, out_features=self.channels, bias=True)
        )

    def forward(self, x):
        max = nn.functional.adaptive_max_pool2d(x, output_size=1)
        avg = nn.functional.adaptive_avg_pool2d(x, output_size=1)
        b, c, _, _ = x.size()
        linear_max = self.linear(max.view(b, c)).view(b, c, 1, 1)
        linear_avg = self.linear(avg.view(b, c)).view(b, c, 1, 1)
        output = linear_max + linear_avg
        output = torch.sigmoid(output) * x
        return output

class CBAM(nn.Module):
    def __init__(self, channels, r):
        super(CBAM, self).__init__()
        self.channels = channels
        self.r = r
        self.sam = SAM(bias=False)
        self.cam = CAM(channels=self.channels, r=self.r)

    def forward(self, x):
        output = self.cam(x)
        output = self.sam(output)
        return output + x

# CBAM ResNet Model
class CBAMResNet(nn.Module):
    def __init__(self, num_classes):
        super(CBAMResNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            # ... Add your ResNet layers and CBAM layers here ...
        )
        self.cbam = CBAM(512, 16)  # Adjust based on your architecture
        self.fc = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.features(x)
        x = self.cbam(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

# Dataset class for Latent Space
class LatentSpaceDataset(Dataset):
    def __init__(self, latent_data, labels):
        self.latent_data = latent_data
        self.labels = labels

    def __len__(self):
        return len(self.latent_data)

    def __getitem__(self, idx):
        return self.latent_data[idx], self.labels[idx]

# Hyperparameters
batch_size = 32
num_epochs = 50
learning_rate = 0.001
num_classes = 10

# DataLoader for Reconstructed Images
reconstructed_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
reconstructed_dataset = datasets.ImageFolder(root='output_images2', transform=reconstructed_transform)
reconstructed_loader = DataLoader(reconstructed_dataset, batch_size=batch_size, shuffle=True)

# Load Latent Space Data (e.g., from a .npy file)
latent_data = np.load('new_latent_space/0/latent_space.npy')  # Adjust the path
latent_labels = np.load('new_latent_space/0/latent_labels.npy')  # Adjust the path
latent_dataset = LatentSpaceDataset(latent_data, latent_labels)
latent_loader = DataLoader(latent_dataset, batch_size=batch_size, shuffle=True)

# Initialize Model, Loss Function, and Optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CBAMResNet(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training function
def train_model(data_loader, num_epochs):
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        train_preds = []
        train_labels = []

        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            train_preds.extend(preds.cpu().numpy())
            train_labels.extend(labels.cpu().numpy())

        # Print metrics
        train_loss /= len(data_loader.dataset)
        train_accuracy = accuracy_score(train_labels, train_preds)
        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {train_loss:.4f}, Accuracy: {train_accuracy:.4f}")

# Train on reconstructed images
print("Training on Reconstructed Images...")
train_model(reconstructed_loader, num_epochs)

# Fine-tuning on Latent Space
print("Fine-tuning on Latent Space...")
train_model(latent_loader, num_epochs)

print("Training complete!")


Training on Reconstructed Images...


RuntimeError: mat1 and mat2 shapes cannot be multiplied (32x64 and 512x32)

In [2]:
import os
import numpy as np
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from torch.utils.data import Dataset, DataLoader

# Custom Dataset class to handle loading of latent spaces and labels
class LatentSpaceDataset(Dataset):
    def __init__(self, latent_space_dir):
        self.latent_space_dir = latent_space_dir
        self.latent_paths = []
        self.labels = []

        # Read the class folders (10 classes) from latent space directory
        for class_folder in sorted(os.listdir(latent_space_dir)):
            latent_class_folder = os.path.join(latent_space_dir, class_folder)

            if os.path.isdir(latent_class_folder):
                for latent_file in os.listdir(latent_class_folder):
                    if latent_file.startswith("latent_") and latent_file.endswith('.npy'):
                        latent_path = os.path.join(latent_class_folder, latent_file)
                        label_file = latent_file.replace("latent_", "label_")  # Change filename to get the label
                        label_path = os.path.join(latent_class_folder, label_file)

                        if os.path.exists(label_path):  # Ensure the corresponding label exists
                            self.latent_paths.append(latent_path)
                            label_data = np.load(label_path)  # Load label data
                            self.labels.append(label_data.item())  # Assuming labels are single values

    def __len__(self):
        return len(self.latent_paths)

    def __getitem__(self, idx):
        # Load the latent space
        latent_path = self.latent_paths[idx]
        latent_space = np.load(latent_path)

        # Get the label
        label = self.labels[idx]

        return latent_space, label

# Load dataset
latent_space_dir = 'latent_space'  # Path to latent spaces

# Create the dataset and dataloader
dataset = LatentSpaceDataset(latent_space_dir)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Initialize SVM model
svm_model = SVC(kernel='linear')

# Training parameters
num_epochs = 50

# Train SVM on latent spaces for 50 epochs
for epoch in range(num_epochs):
    all_preds = []
    all_labels = []

    for latent_spaces, labels in dataloader:
        latent_spaces_np = latent_spaces.numpy()
        labels_np = labels.numpy()

        # Train SVM on latent spaces
        svm_model.fit(latent_spaces_np, labels_np)

        # Predict on the same batch (or you can use a separate validation set)
        preds = svm_model.predict(latent_spaces_np)

        # Store predictions and true labels for metrics calculation
        all_preds.extend(preds)
        all_labels.extend(labels_np)

    # Calculate metrics after each epoch
    acc = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')

    print(f'Epoch {epoch+1}/{num_epochs}')
    print(f'Accuracy: {acc * 100:.2f}%')
    print(f'Precision: {precision * 100:.2f}%')
    print(f'Recall: {recall * 100:.2f}%')
    print(f'F1 Score: {f1 * 100:.2f}%')


Epoch 1/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 2/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 3/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 4/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 5/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 6/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 7/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 8/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 9/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 10/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 11/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 12/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 13/50
A

In [5]:
import os
import numpy as np
from PIL import Image
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from torch.utils.data import Dataset, DataLoader
import torch

# Custom Dataset class to handle loading of images and labels
class ImageDataset(Dataset):
    def __init__(self, output_images_dir):
        self.output_images_dir = output_images_dir
        self.image_paths = []
        self.labels = []

        # Read the class folders from the output images directory
        for class_folder in sorted(os.listdir(output_images_dir)):
            class_folder_path = os.path.join(output_images_dir, class_folder)

            if os.path.isdir(class_folder_path):
                for image_file in os.listdir(class_folder_path):
                    if image_file.endswith('.png'):
                        image_path = os.path.join(class_folder_path, image_file)
                        self.image_paths.append(image_path)
                        self.labels.append(class_folder)  # Use the folder name as the label

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # Load the image
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert('RGB')
        image_array = np.array(image)  # No resizing

        # Get the label
        label = self.labels[idx]

        return image_array, label

# Load dataset
output_images_dir = 'output_images'  # Path to output images

# Create the dataset and dataloader
dataset = ImageDataset(output_images_dir)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Initialize SVM model
svm_model = SVC(kernel='linear')

# Training parameters
num_epochs = 50

# Train SVM on reconstructed images for 50 epochs
for epoch in range(num_epochs):
    all_preds = []
    all_labels = []

    for images, labels in dataloader:
        images_np = images.numpy().reshape(images.size(0), -1)  # Flatten the images

        # Convert string labels to numerical labels
        unique_labels = list(set(labels))  # Use the list of labels directly
        label_mapping = {label: idx for idx, label in enumerate(unique_labels)}
        labels_np = np.array([label_mapping[label] for label in labels])

        # Train SVM on images
        svm_model.fit(images_np, labels_np)

        # Predict on the same batch
        preds = svm_model.predict(images_np)

        # Store predictions and true labels for metrics calculation
        all_preds.extend(preds)
        all_labels.extend(labels_np)

    # Calculate metrics after each epoch
    acc = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')

    print(f'Epoch {epoch+1}/{num_epochs}')
    print(f'Accuracy: {acc * 100:.2f}%')
    print(f'Precision: {precision * 100:.2f}%')
    print(f'Recall: {recall * 100:.2f}%')
    print(f'F1 Score: {f1 * 100:.2f}%')


Epoch 1/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 2/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 3/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 4/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 5/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 6/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 7/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 8/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 9/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 10/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 11/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 12/50
Accuracy: 100.00%
Precision: 100.00%
Recall: 100.00%
F1 Score: 100.00%
Epoch 13/50
A

KeyboardInterrupt: 

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader
import numpy as np
from sklearn import svm
from sklearn.metrics import classification_report, accuracy_score

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Load dataset
train_dataset = datasets.ImageFolder('Dataset/augmented_training', transform=transform)
val_dataset = datasets.ImageFolder('Dataset/validation', transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
print('dataset successfully loaded')
# Define Encoder
class ResNetEncoder(nn.Module):
    def __init__(self, latent_space_size=1024):
        super(ResNetEncoder, self).__init__()
        self.resnet = models.resnet50(weights='DEFAULT')  # Use updated weights parameter
        self.resnet = nn.Sequential(*(list(self.resnet.children())[:-1]))
        self.fc = nn.Linear(2048, latent_space_size)

    def forward(self, x):
        x = self.resnet(x)
        x = x.view(x.size(0), -1)  # Flatten
        latent = self.fc(x)
        return latent

# Instantiate the encoder
encoder = ResNetEncoder().to(device)

# Function to generate latent space for a dataset
def generate_latent_space(model, data_loader):
    model.eval()  # Set the model to evaluation mode
    latents = []
    labels = []
    
    with torch.no_grad():
        for images, label in data_loader:
            images = images.to(device)
            latent = model(images)
            latents.append(latent.cpu().numpy())
            labels.extend(label.numpy())
    
    return np.concatenate(latents), np.array(labels)

# Generate latent spaces for training and validation datasets
train_latent, train_labels = generate_latent_space(encoder, train_loader)
val_latent, val_labels = generate_latent_space(encoder, val_loader)

# Train SVM model on the latent space
svm_model = svm.SVC(kernel='linear', C=1.0)
svm_model.fit(train_latent, train_labels)

# Evaluate the SVM model on the validation set
val_predictions = svm_model.predict(val_latent)

# Calculate accuracy
val_accuracy = accuracy_score(val_labels, val_predictions)
print(f'Validation Accuracy: {val_accuracy:.4f}')

# Print classification report
print(classification_report(val_labels, val_predictions, target_names=train_dataset.classes))


In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader
import numpy as np
from sklearn import svm
from sklearn.metrics import classification_report, accuracy_score

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Load dataset
train_dataset = datasets.ImageFolder('Dataset/augmented_training', transform=transform)
val_dataset = datasets.ImageFolder('Dataset/validation', transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
print('dataset successfully loaded')
# Define Encoder
class ResNetEncoder(nn.Module):
    def __init__(self, latent_space_size=1024):
        super(ResNetEncoder, self).__init__()
        self.resnet = models.resnet50(weights='DEFAULT')  # Use updated weights parameter
        self.resnet = nn.Sequential(*(list(self.resnet.children())[:-1]))
        self.fc = nn.Linear(2048, latent_space_size)

    def forward(self, x):
        x = self.resnet(x)
        x = x.view(x.size(0), -1)  # Flatten
        latent = self.fc(x)
        return latent

# Instantiate the encoder
encoder = ResNetEncoder().to(device)

# Function to generate latent space for a dataset
def generate_latent_space(model, data_loader):
    model.eval()  # Set the model to evaluation mode
    latents = []
    labels = []
    
    with torch.no_grad():
        for images, label in data_loader:
            images = images.to(device)
            latent = model(images)
            latents.append(latent.cpu().numpy())
            labels.extend(label.numpy())
    
    return np.concatenate(latents), np.array(labels)

# Generate latent spaces for training and validation datasets
train_latent, train_labels = generate_latent_space(encoder, train_loader)
val_latent, val_labels = generate_latent_space(encoder, val_loader)

# Train SVM model on the latent space
svm_model = svm.SVC(kernel='linear', C=1.0)
svm_model.fit(train_latent, train_labels)

# Evaluate the SVM model on the validation set
val_predictions = svm_model.predict(val_latent)

# Calculate accuracy
val_accuracy = accuracy_score(val_labels, val_predictions)
print(f'Validation Accuracy: {val_accuracy:.4f}')

# Print classification report
print(classification_report(val_labels, val_predictions, target_names=train_dataset.classes))


dataset successfully loaded
Validation Accuracy: 0.8569
                  precision    recall  f1-score   support

    Angioectasia       0.65      0.48      0.55       497
        Bleeding       0.73      0.73      0.73       359
         Erosion       0.48      0.51      0.50      1155
        Erythema       0.40      0.32      0.35       297
    Foreign Body       0.78      0.65      0.71       340
Lymphangiectasia       0.66      0.54      0.59       343
          Normal       0.93      0.95      0.94     12287
           Polyp       0.47      0.40      0.43       500
           Ulcer       0.90      0.84      0.87       286
           Worms       0.98      0.88      0.93        68

        accuracy                           0.86     16132
       macro avg       0.70      0.63      0.66     16132
    weighted avg       0.85      0.86      0.85     16132



In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader
import numpy as np
from sklearn.metrics import classification_report, accuracy_score
import torch.nn.functional as F

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Load dataset
train_dataset = datasets.ImageFolder('Dataset/augmented_training', transform=transform)
val_dataset = datasets.ImageFolder('Dataset/validation', transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

print('Dataset successfully loaded')

# Define Encoder
class ResNetEncoder(nn.Module):
    def __init__(self, latent_space_size=1024):
        super(ResNetEncoder, self).__init__()
        self.resnet = models.resnet50(weights='DEFAULT')  # Updated weights parameter
        self.resnet = nn.Sequential(*(list(self.resnet.children())[:-1]))
        self.fc = nn.Linear(2048, latent_space_size)

    def forward(self, x):
        x = self.resnet(x)
        x = x.view(x.size(0), -1)  # Flatten
        latent = self.fc(x)
        return latent

# Instantiate the encoder
encoder = ResNetEncoder().to(device)

# Function to generate latent space for a dataset
def generate_latent_space(model, data_loader):
    model.eval()  # Set the model to evaluation mode
    latents = []
    labels = []
    
    with torch.no_grad():
        for images, label in data_loader:
            images = images.to(device)
            latent = model(images)
            latents.append(latent.cpu().numpy())
            labels.extend(label.numpy())
    
    return np.concatenate(latents), np.array(labels)

# Generate latent spaces for training and validation datasets
train_latent, train_labels = generate_latent_space(encoder, train_loader)
val_latent, val_labels = generate_latent_space(encoder, val_loader)

# Define DNN
class LatentSpaceDNN(nn.Module):
    def __init__(self, input_size=1024, num_classes=10):
        super(LatentSpaceDNN, self).__init__()
        self.fc1 = nn.Linear(input_size, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 128)
        self.fc4 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return x

# Instantiate DNN model
dnn_model = LatentSpaceDNN(input_size=1024, num_classes=len(train_dataset.classes)).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(dnn_model.parameters(), lr=0.001)

# Function to train the DNN
def train_dnn(model, train_latent, train_labels, val_latent, val_labels, num_epochs=10):
    train_latent_tensor = torch.tensor(train_latent, dtype=torch.float32).to(device)
    train_labels_tensor = torch.tensor(train_labels, dtype=torch.long).to(device)

    val_latent_tensor = torch.tensor(val_latent, dtype=torch.float32).to(device)
    val_labels_tensor = torch.tensor(val_labels, dtype=torch.long).to(device)

    for epoch in range(num_epochs):
        model.train()
        
        # Forward pass
        outputs = model(train_latent_tensor)
        loss = criterion(outputs, train_labels_tensor)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Validation
        model.eval()
        with torch.no_grad():
            val_outputs = model(val_latent_tensor)
            _, val_predicted = torch.max(val_outputs, 1)
            val_accuracy = (val_predicted == val_labels_tensor).float().mean().item()

        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Validation Accuracy: {val_accuracy:.4f}')

# Train the DNN model
train_dnn(dnn_model, train_latent, train_labels, val_latent, val_labels, num_epochs=10)

# Final evaluation on validation set
val_outputs = dnn_model(torch.tensor(val_latent, dtype=torch.float32).to(device))
_, val_predicted = torch.max(val_outputs, 1)
val_accuracy = (val_predicted.cpu().numpy() == val_labels).mean()

print(f'Final Validation Accuracy: {val_accuracy:.4f}')
print(classification_report(val_labels, val_predicted.cpu().numpy(), target_names=train_dataset.classes))


Dataset successfully loaded
Epoch [1/10], Loss: 2.2882, Validation Accuracy: 0.7617
Epoch [2/10], Loss: 2.2657, Validation Accuracy: 0.7617
Epoch [3/10], Loss: 2.2337, Validation Accuracy: 0.7617
Epoch [4/10], Loss: 2.1848, Validation Accuracy: 0.7617
Epoch [5/10], Loss: 2.1151, Validation Accuracy: 0.7617
Epoch [6/10], Loss: 2.0301, Validation Accuracy: 0.7617
Epoch [7/10], Loss: 1.9587, Validation Accuracy: 0.7617
Epoch [8/10], Loss: 1.9200, Validation Accuracy: 0.7564
Epoch [9/10], Loss: 1.8704, Validation Accuracy: 0.6835
Epoch [10/10], Loss: 1.8293, Validation Accuracy: 0.6568
Final Validation Accuracy: 0.6568
                  precision    recall  f1-score   support

    Angioectasia       0.00      0.00      0.00       497
        Bleeding       0.00      0.00      0.00       359
         Erosion       0.00      0.00      0.00      1155
        Erythema       0.06      0.80      0.11       297
    Foreign Body       1.00      0.01      0.03       340
Lymphangiectasia       0.00 

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader
import numpy as np
from sklearn.metrics import classification_report, accuracy_score
from collections import Counter

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Load dataset
train_dataset = datasets.ImageFolder('Dataset/augmented_training', transform=transform)
val_dataset = datasets.ImageFolder('Dataset/validation', transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
print('Dataset successfully loaded')

# Calculate class weights to handle class imbalance
class_counts = Counter([label for _, label in train_dataset.imgs])
class_weights = {cls: len(train_dataset) / count for cls, count in class_counts.items()}
weights = [class_weights[label] for _, label in train_dataset.imgs]
class_weights_tensor = torch.tensor([class_weights[i] for i in range(len(train_dataset.classes))]).to(device)

# Define Encoder
class ResNetEncoder(nn.Module):
    def __init__(self, latent_space_size=1024):
        super(ResNetEncoder, self).__init__()
        self.resnet = models.resnet50(weights='DEFAULT')  # Use updated weights parameter
        self.resnet = nn.Sequential(*(list(self.resnet.children())[:-1]))
        self.fc = nn.Linear(2048, latent_space_size)

    def forward(self, x):
        x = self.resnet(x)
        x = x.view(x.size(0), -1)  # Flatten
        latent = self.fc(x)
        return latent

# Instantiate the encoder
encoder = ResNetEncoder().to(device)

# Define a simple DNN classifier on top of the latent space
class DNNClassifier(nn.Module):
    def __init__(self, input_size=1024, num_classes=len(train_dataset.classes)):
        super(DNNClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, num_classes)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Instantiate the classifier
dnn_classifier = DNNClassifier().to(device)

# Define loss and optimizer
criterion = nn.CrossEntropyLoss(weight=class_weights_tensor)  # Using class weights to handle imbalance
optimizer = optim.Adam(dnn_classifier.parameters(), lr=0.0001)  # Lower learning rate

# Function to generate latent space for a dataset
def generate_latent_space(model, data_loader):
    model.eval()  # Set the model to evaluation mode
    latents = []
    labels = []
    
    with torch.no_grad():
        for images, label in data_loader:
            images = images.to(device)
            latent = model(images)
            latents.append(latent.cpu().numpy())
            labels.extend(label.numpy())
    
    return np.concatenate(latents), np.array(labels)

# Function to train the DNN
def train_dnn(encoder, classifier, train_loader, val_loader, num_epochs=50):
    best_val_accuracy = 0
    early_stopping_counter = 0
    patience = 5

    for epoch in range(num_epochs):
        classifier.train()
        total_loss = 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            # Get latent space from encoder
            latents = encoder(images)

            # Forward pass through the classifier
            outputs = classifier(latents)
            loss = criterion(outputs, labels)

            # Backpropagation and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        # Validation step
        classifier.eval()
        val_latents, val_labels = generate_latent_space(encoder, val_loader)
        val_outputs = classifier(torch.tensor(val_latents).to(device))
        val_predictions = torch.argmax(val_outputs, dim=1).cpu().numpy()

        val_accuracy = accuracy_score(val_labels, val_predictions)
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')

        # Early stopping check
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            early_stopping_counter = 0
        else:
            early_stopping_counter += 1

        if early_stopping_counter >= patience:
            print(f"Early stopping triggered at epoch {epoch+1}.")
            break

    print(f'Final Validation Accuracy: {best_val_accuracy:.4f}')
    print(classification_report(val_labels, val_predictions, target_names=train_dataset.classes))

# Train the model
train_dnn(encoder, dnn_classifier, train_loader, val_loader, num_epochs=50)


Dataset successfully loaded
Epoch [1/50], Loss: 4267.1018, Validation Accuracy: 0.7517
Epoch [2/50], Loss: 3282.1992, Validation Accuracy: 0.7892
Epoch [3/50], Loss: 2985.2504, Validation Accuracy: 0.7771
Epoch [4/50], Loss: 2789.3512, Validation Accuracy: 0.8180


KeyboardInterrupt: 

In [5]:
!pip install xgboost
import torch
import torch.nn as nn
import numpy as np
from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader
from xgboost import XGBClassifier
from sklearn.metrics import classification_report, accuracy_score

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define transformations for the images
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Load dataset using ImageFolder
train_dataset = datasets.ImageFolder('Dataset/augmented_training', transform=transform)
val_dataset = datasets.ImageFolder('Dataset/validation', transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# Define the Encoder using a pre-trained ResNet model
class ResNetEncoder(nn.Module):
    def __init__(self, latent_space_size=1024):
        super(ResNetEncoder, self).__init__()
        self.resnet = models.resnet50(weights='DEFAULT')  # Using ResNet50
        self.resnet = nn.Sequential(*(list(self.resnet.children())[:-1]))  # Removing the classification layer
        self.fc = nn.Linear(2048, latent_space_size)  # Fully connected layer to get latent space

    def forward(self, x):
        x = self.resnet(x)
        x = x.view(x.size(0), -1)  # Flatten the output
        latent = self.fc(x)  # Get the latent space
        return latent

# Instantiate the encoder and move it to the device (GPU/CPU)
encoder = ResNetEncoder().to(device)

# Function to generate latent spaces for a dataset
def generate_latent_space(model, data_loader):
    model.eval()  # Set model to evaluation mode
    latents = []
    labels = []
    
    with torch.no_grad():  # Disable gradient calculation
        for images, label in data_loader:
            images = images.to(device)
            latent = model(images)  # Pass images through the encoder
            latents.append(latent.cpu().numpy())  # Move latent space to CPU and store it
            labels.extend(label.numpy())  # Store labels
    
    return np.concatenate(latents), np.array(labels)

# Generate latent spaces for training and validation datasets
print("Generating latent space for training data...")
train_latent, train_labels = generate_latent_space(encoder, train_loader)
print("Generating latent space for validation data...")
val_latent, val_labels = generate_latent_space(encoder, val_loader)

# Define and train the XGBoost model using the latent space
print("Training XGBoost classifier...")
xgb_model = XGBClassifier(
    objective='multi:softmax',  # Multi-class classification
    num_class=len(train_dataset.classes),  # Number of classes
    n_estimators=100,  # Number of trees
    learning_rate=0.1,  # Learning rate
    max_depth=6,  # Maximum depth of a tree
    use_label_encoder=False  # Suppress warning about label encoding
)

# Fit the XGBoost model using latent spaces and corresponding labels
xgb_model.fit(train_latent, train_labels)

# Evaluate the model on validation latent space
print("Evaluating XGBoost model on validation data...")
val_predictions = xgb_model.predict(val_latent)

# Calculate accuracy
val_accuracy = accuracy_score(val_labels, val_predictions)
print(f'Validation Accuracy: {val_accuracy:.4f}')

# Print detailed classification report
print(classification_report(val_labels, val_predictions, target_names=train_dataset.classes))


Collecting xgboost
  Downloading xgboost-2.1.2-py3-none-manylinux_2_28_x86_64.whl.metadata (2.1 kB)
Downloading xgboost-2.1.2-py3-none-manylinux_2_28_x86_64.whl (153.9 MB)
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m153.9/153.9 MB[0m [31m9.0 MB/s[0m eta [36m0:00:00[0m0m eta [36m0:00:01[0m[36m0:00:01[0m
[?25hInstalling collected packages: xgboost
Successfully installed xgboost-2.1.2
Generating latent space for training data...
Generating latent space for validation data...
Training XGBoost classifier...


Parameters: { "use_label_encoder" } are not used.



Evaluating XGBoost model on validation data...
Validation Accuracy: 0.8336
                  precision    recall  f1-score   support

    Angioectasia       0.83      0.38      0.53       497
        Bleeding       0.76      0.59      0.66       359
         Erosion       0.44      0.49      0.47      1155
        Erythema       0.37      0.25      0.30       297
    Foreign Body       0.79      0.50      0.61       340
Lymphangiectasia       0.62      0.37      0.46       343
          Normal       0.90      0.95      0.93     12287
           Polyp       0.39      0.30      0.34       500
           Ulcer       0.87      0.58      0.70       286
           Worms       0.79      0.96      0.87        68

        accuracy                           0.83     16132
       macro avg       0.68      0.54      0.59     16132
    weighted avg       0.83      0.83      0.82     16132



In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.models as models
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score, confusion_matrix

# CBAM Components
class SAM(nn.Module):
    def __init__(self, bias=False):
        super(SAM, self).__init__()
        self.bias = bias
        self.conv = nn.Conv2d(in_channels=2, out_channels=1, kernel_size=7, stride=1, padding=3, dilation=1, bias=self.bias)

    def forward(self, x):
        max = torch.max(x, 1)[0].unsqueeze(1)
        avg = torch.mean(x, 1).unsqueeze(1)
        concat = torch.cat((max, avg), dim=1)
        output = self.conv(concat)
        output = torch.sigmoid(output) * x
        return output

class CAM(nn.Module):
    def __init__(self, channels, r):
        super(CAM, self).__init__()
        self.channels = channels
        self.r = r
        self.linear = nn.Sequential(
            nn.Linear(in_features=self.channels, out_features=self.channels // self.r, bias=True),
            nn.ReLU(inplace=True),
            nn.Linear(in_features=self.channels // self.r, out_features=self.channels, bias=True)
        )

    def forward(self, x):
        max = F.adaptive_max_pool2d(x, output_size=1)
        avg = F.adaptive_avg_pool2d(x, output_size=1)
        b, c, _, _ = x.size()
        linear_max = self.linear(max.view(b, c)).view(b, c, 1, 1)
        linear_avg = self.linear(avg.view(b, c)).view(b, c, 1, 1)
        output = linear_max + linear_avg
        output = torch.sigmoid(output) * x
        return output

class CBAM(nn.Module):
    def __init__(self, channels, r):
        super(CBAM, self).__init__()
        self.channels = channels
        self.r = r
        self.sam = SAM(bias=False)
        self.cam = CAM(channels=self.channels, r=self.r)

    def forward(self, x):
        output = self.cam(x)
        output = self.sam(output)
        return output + x

# ResNet + CBAM Model
class CBAMResNet(nn.Module):
    def __init__(self, num_classes):
        super(CBAMResNet, self).__init__()
        self.features = models.resnet18(pretrained=True)
        self.features.fc = nn.Identity()  # Remove the final fully connected layer
        self.cbam = CBAM(512, 16)  # Add CBAM for the final feature map (before the pooling layer)
        self.fc = nn.Linear(512, num_classes)  # Add a new fully connected layer for classification

    def forward(self, x):
        x = self.features.conv1(x)
        x = self.features.bn1(x)
        x = self.features.relu(x)
        x = self.features.maxpool(x)
        
        # Apply CBAM to intermediate feature maps
        x = self.features.layer1(x)
        x = self.features.layer2(x)
        x = self.features.layer3(x)
        x = self.features.layer4(x)
        
        x = self.cbam(x)  # Apply CBAM to the feature maps

        x = self.features.avgpool(x)  # Global average pooling
        x = torch.flatten(x, 1)  # Flatten for fully connected layer
        x = self.fc(x)  # Final classification layer
        return x

# Hyperparameters
batch_size = 32
num_epochs = 50
learning_rate = 0.001
num_classes = 10
train_data_dir = 'Dataset/new_train'

# Data Augmentation and Normalization
transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Datasets and Dataloaders
train_dataset = datasets.ImageFolder(root=train_data_dir, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Model, Loss, and Optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CBAMResNet(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Function to compute metrics
def compute_metrics(preds, targets):
    accuracy = accuracy_score(targets, preds)
    recall = recall_score(targets, preds, average='weighted')
    precision = precision_score(targets, preds, average='weighted')
    f1 = f1_score(targets, preds, average='weighted')
    
    # Confusion matrix for specificity
    cm = confusion_matrix(targets, preds)
    tn = cm.diagonal()
    fp = cm.sum(axis=1) - tn
    specificity = tn / (tn + fp)
    specificity = specificity.mean()  # Average across classes
    
    return accuracy, recall, precision, f1, specificity

# Training Loop without checkpoints or validation
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")
    print("=" * 50)
    
    # Training phase
    model.train()
    train_loss = 0.0
    train_preds = []
    train_labels = []
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        train_preds.extend(preds.cpu().numpy())
        train_labels.extend(labels.cpu().numpy())
    
    # Metrics
    train_loss /= len(train_loader.dataset)
    train_accuracy, train_recall, train_precision, train_f1, train_specificity = compute_metrics(train_preds, train_labels)
    print(f"Train Loss: {train_loss:.4f}, Accuracy: {train_accuracy:.4f}, Recall: {train_recall:.4f}, Precision: {train_precision:.4f}, F1 Score: {train_f1:.4f}, Specificity: {train_specificity:.4f}")

print("Training complete!")




Epoch 1/50
Train Loss: 1.1129, Accuracy: 0.6283, Recall: 0.6283, Precision: 0.6178, F1 Score: 0.6179, Specificity: 0.5644
Epoch 2/50
Train Loss: 0.7984, Accuracy: 0.7300, Recall: 0.7300, Precision: 0.7269, F1 Score: 0.7260, Specificity: 0.6825
Epoch 3/50
Train Loss: 0.6737, Accuracy: 0.7699, Recall: 0.7699, Precision: 0.7681, F1 Score: 0.7676, Specificity: 0.7304
Epoch 4/50
Train Loss: 0.5949, Accuracy: 0.7963, Recall: 0.7963, Precision: 0.7950, F1 Score: 0.7946, Specificity: 0.7614
Epoch 5/50
Train Loss: 0.5334, Accuracy: 0.8166, Recall: 0.8166, Precision: 0.8156, F1 Score: 0.8153, Specificity: 0.7864
Epoch 6/50
Train Loss: 0.4870, Accuracy: 0.8318, Recall: 0.8318, Precision: 0.8302, F1 Score: 0.8304, Specificity: 0.8038
Epoch 7/50
Train Loss: 0.4429, Accuracy: 0.8470, Recall: 0.8470, Precision: 0.8458, F1 Score: 0.8460, Specificity: 0.8226
Epoch 8/50
Train Loss: 0.4118, Accuracy: 0.8572, Recall: 0.8572, Precision: 0.8561, F1 Score: 0.8563, Specificity: 0.8345
Epoch 9/50
Train Loss: 0

In [7]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.models as models
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score, confusion_matrix

# CBAM Components
class SAM(nn.Module):
    def __init__(self, bias=False):
        super(SAM, self).__init__()
        self.bias = bias
        self.conv = nn.Conv2d(in_channels=2, out_channels=1, kernel_size=7, stride=1, padding=3, dilation=1, bias=self.bias)

    def forward(self, x):
        max = torch.max(x, 1)[0].unsqueeze(1)
        avg = torch.mean(x, 1).unsqueeze(1)
        concat = torch.cat((max, avg), dim=1)
        output = self.conv(concat)
        output = torch.sigmoid(output) * x
        return output

class CAM(nn.Module):
    def __init__(self, channels, r):
        super(CAM, self).__init__()
        self.channels = channels
        self.r = r
        self.linear = nn.Sequential(
            nn.Linear(in_features=self.channels, out_features=self.channels // self.r, bias=True),
            nn.ReLU(inplace=True),
            nn.Linear(in_features=self.channels // self.r, out_features=self.channels, bias=True)
        )

    def forward(self, x):
        max = F.adaptive_max_pool2d(x, output_size=1)
        avg = F.adaptive_avg_pool2d(x, output_size=1)
        b, c, _, _ = x.size()
        linear_max = self.linear(max.view(b, c)).view(b, c, 1, 1)
        linear_avg = self.linear(avg.view(b, c)).view(b, c, 1, 1)
        output = linear_max + linear_avg
        output = torch.sigmoid(output) * x
        return output

class CBAM(nn.Module):
    def __init__(self, channels, r):
        super(CBAM, self).__init__()
        self.channels = channels
        self.r = r
        self.sam = SAM(bias=False)
        self.cam = CAM(channels=self.channels, r=self.r)

    def forward(self, x):
        output = self.cam(x)
        output = self.sam(output)
        return output + x

# ResNet + CBAM Model
class CBAMResNet(nn.Module):
    def __init__(self, num_classes):
        super(CBAMResNet, self).__init__()
        self.features = models.resnet18(pretrained=True)
        self.features.fc = nn.Identity()  # Remove the final fully connected layer
        self.cbam = CBAM(512, 16)  # Add CBAM for the final feature map (before the pooling layer)
        self.fc = nn.Linear(512, num_classes)  # Add a new fully connected layer for classification

    def forward(self, x):
        x = self.features.conv1(x)
        x = self.features.bn1(x)
        x = self.features.relu(x)
        x = self.features.maxpool(x)
        
        # Apply CBAM to intermediate feature maps
        x = self.features.layer1(x)
        x = self.features.layer2(x)
        x = self.features.layer3(x)
        x = self.features.layer4(x)
        
        x = self.cbam(x)  # Apply CBAM to the feature maps

        x = self.features.avgpool(x)  # Global average pooling
        x = torch.flatten(x, 1)  # Flatten for fully connected layer
        x = self.fc(x)  # Final classification layer
        return x

# Hyperparameters
batch_size = 32
num_epochs = 50
learning_rate = 0.001
num_classes = 10
train_data_dir = 'Dataset/new_train'
val_data_dir = 'Dataset/validation'

# Data Augmentation and Normalization
transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Datasets and Dataloaders
train_dataset = datasets.ImageFolder(root=train_data_dir, transform=transform)
val_dataset = datasets.ImageFolder(root=val_data_dir, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Model, Loss, and Optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CBAMResNet(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Function to compute metrics
def compute_metrics(preds, targets):
    accuracy = accuracy_score(targets, preds)
    recall = recall_score(targets, preds, average='weighted')
    precision = precision_score(targets, preds, average='weighted')
    f1 = f1_score(targets, preds, average='weighted')
    
    # Confusion matrix for specificity
    cm = confusion_matrix(targets, preds)
    tn = cm.diagonal()
    fp = cm.sum(axis=1) - tn
    specificity = tn / (tn + fp)
    specificity = specificity.mean()  # Average across classes
    
    return accuracy, recall, precision, f1, specificity

# Training and Evaluation Loop with Checkpoints
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")
    print("=" * 50)
    
    # Training phase
    model.train()
    train_loss = 0.0
    train_preds = []
    train_labels = []
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        train_preds.extend(preds.cpu().numpy())
        train_labels.extend(labels.cpu().numpy())
    
    # Metrics
    train_loss /= len(train_loader.dataset)
    train_accuracy, train_recall, train_precision, train_f1, train_specificity = compute_metrics(train_preds, train_labels)
    print(f"Train Loss: {train_loss:.4f}, Accuracy: {train_accuracy:.4f}, Recall: {train_recall:.4f}, Precision: {train_precision:.4f}, F1 Score: {train_f1:.4f}, Specificity: {train_specificity:.4f}")

    # Validation phase
    model.eval()
    val_loss = 0.0
    val_preds = []
    val_labels = []
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            val_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            val_preds.extend(preds.cpu().numpy())
            val_labels.extend(labels.cpu().numpy())
    
    # Metrics
    val_loss /= len(val_loader.dataset)
    val_accuracy, val_recall, val_precision, val_f1, val_specificity = compute_metrics(val_preds, val_labels)
    print(f"Validation Loss: {val_loss:.4f}, Accuracy: {val_accuracy:.4f}, Recall: {val_recall:.4f}, Precision: {val_precision:.4f}, F1 Score: {val_f1:.4f}, Specificity: {val_specificity:.4f}")
    
    # Save checkpoint
    checkpoint_path = f"checkpoint_epoch_{epoch+1}.pth"
    torch.save(model.state_dict(), checkpoint_path)
    print(f"Checkpoint saved at {checkpoint_path}")

print("Training complete!")



Epoch 1/50
Train Loss: 1.1068, Accuracy: 0.6304, Recall: 0.6304, Precision: 0.6196, F1 Score: 0.6198, Specificity: 0.5673
Validation Loss: 0.7915, Accuracy: 0.7282, Recall: 0.7282, Precision: 0.7742, F1 Score: 0.7427, Specificity: 0.5803
Checkpoint saved at checkpoint_epoch_1.pth
Epoch 2/50
Train Loss: 0.7937, Accuracy: 0.7314, Recall: 0.7314, Precision: 0.7275, F1 Score: 0.7272, Specificity: 0.6851
Validation Loss: 0.7422, Accuracy: 0.7628, Recall: 0.7628, Precision: 0.8302, F1 Score: 0.7852, Specificity: 0.6706
Checkpoint saved at checkpoint_epoch_2.pth
Epoch 3/50
Train Loss: 0.6734, Accuracy: 0.7691, Recall: 0.7691, Precision: 0.7670, F1 Score: 0.7666, Specificity: 0.7296
Validation Loss: 0.4646, Accuracy: 0.8412, Recall: 0.8412, Precision: 0.8612, F1 Score: 0.8489, Specificity: 0.7080
Checkpoint saved at checkpoint_epoch_3.pth
Epoch 4/50
Train Loss: 0.5919, Accuracy: 0.7962, Recall: 0.7962, Precision: 0.7948, F1 Score: 0.7945, Specificity: 0.7620
Validation Loss: 0.4388, Accuracy: 

In [10]:
import torch
import torch.nn as nn
from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader
import numpy as np
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier, VotingClassifier
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, classification_report

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Load dataset
train_dataset = datasets.ImageFolder('Dataset/augmented_training', transform=transform)
val_dataset = datasets.ImageFolder('Dataset/validation', transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
print('Dataset successfully loaded')

# Define Encoder (ResNet50)
class ResNetEncoder(nn.Module):
    def __init__(self, latent_space_size=1024):
        super(ResNetEncoder, self).__init__()
        self.resnet = models.resnet50(weights='DEFAULT')  # Use updated weights parameter
        self.resnet = nn.Sequential(*(list(self.resnet.children())[:-1]))  # Remove the last FC layer
        self.fc = nn.Linear(2048, latent_space_size)

    def forward(self, x):
        x = self.resnet(x)
        x = x.view(x.size(0), -1)  # Flatten
        latent = self.fc(x)
        return latent

# Instantiate the encoder and move to device
encoder = ResNetEncoder().to(device)

# Function to generate latent space for a dataset
def generate_latent_space(model, data_loader):
    model.eval()  # Set model to evaluation mode
    latents = []
    labels = []
    
    with torch.no_grad():
        for images, label in data_loader:
            images = images.to(device)
            latent = model(images)
            latents.append(latent.cpu().numpy())
            labels.extend(label.numpy())
    
    return np.concatenate(latents), np.array(labels)

# Generate latent spaces for training and validation datasets
print("Generating latent space for training data...")
train_latent, train_labels = generate_latent_space(encoder, train_loader)
print("Generating latent space for validation data...")
val_latent, val_labels = generate_latent_space(encoder, val_loader)

# Define individual models
svm_model = SVC(kernel='linear', C=1.0)  # SVM
knn_model = KNeighborsClassifier(n_neighbors=7)  # KNN
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)  # Random Forest
xgb_model = XGBClassifier(objective='multi:softmax', num_class=len(train_dataset.classes),
                          n_estimators=100, learning_rate=0.1, max_depth=6, use_label_encoder=False)  # XGBoost

# Train each model on the latent space
print("Training SVM...")
svm_model.fit(train_latent, train_labels)
print("Training KNN...")
knn_model.fit(train_latent, train_labels)
print("Training Random Forest...")
rf_model.fit(train_latent, train_labels)
print("Training XGBoost...")
xgb_model.fit(train_latent, train_labels)

# Ensemble using VotingClassifier (hard voting)
voting_clf = VotingClassifier(estimators=[
    ('svm', svm_model),
    ('knn', knn_model),
    ('rf', rf_model),
    ('xgb', xgb_model)
], voting='hard')

# Train the ensemble classifier on the training latent space
print("Training ensemble with hard voting...")
voting_clf.fit(train_latent, train_labels)

# Predict on the validation latent space
print("Predicting on validation data...")
val_predictions = voting_clf.predict(val_latent)

# Evaluate the ensemble on validation data
val_accuracy = accuracy_score(val_labels, val_predictions)
print(f'Validation Accuracy (Hard Voting Ensemble): {val_accuracy:.4f}')

# Print classification report
print("Classification Report:")
print(classification_report(val_labels, val_predictions, target_names=train_dataset.classes))


Dataset successfully loaded
Generating latent space for training data...
Generating latent space for validation data...
Training SVM...
Training KNN...
Training Random Forest...
Training XGBoost...


Parameters: { "use_label_encoder" } are not used.



Training ensemble with hard voting...


Parameters: { "use_label_encoder" } are not used.



Predicting on validation data...
Validation Accuracy (Hard Voting Ensemble): 0.8580
Classification Report:
                  precision    recall  f1-score   support

    Angioectasia       0.79      0.44      0.57       497
        Bleeding       0.75      0.69      0.72       359
         Erosion       0.46      0.59      0.52      1155
        Erythema       0.45      0.27      0.34       297
    Foreign Body       0.90      0.65      0.75       340
Lymphangiectasia       0.80      0.48      0.60       343
          Normal       0.92      0.96      0.94     12287
           Polyp       0.53      0.31      0.39       500
           Ulcer       0.97      0.64      0.77       286
           Worms       0.91      0.99      0.94        68

        accuracy                           0.86     16132
       macro avg       0.75      0.60      0.65     16132
    weighted avg       0.86      0.86      0.85     16132



In [15]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms, models
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score, confusion_matrix
import pandas as pd
from PIL import Image

# Define CBAM Components
class SAM(nn.Module):
    def __init__(self, bias=False):
        super(SAM, self).__init__()
        self.conv = nn.Conv2d(2, 1, 7, padding=3, bias=bias)

    def forward(self, x):
        max_pool = torch.max(x, dim=1, keepdim=True)[0]
        avg_pool = torch.mean(x, dim=1, keepdim=True)
        concat = torch.cat([max_pool, avg_pool], dim=1)
        output = torch.sigmoid(self.conv(concat)) * x
        return output

class CAM(nn.Module):
    def __init__(self, channels, r):
        super(CAM, self).__init__()
        self.fc1 = nn.Linear(channels, channels // r)
        self.fc2 = nn.Linear(channels // r, channels)

    def forward(self, x):
        max_pool = torch.max(x, dim=2, keepdim=True)[0].max(dim=3, keepdim=True)[0]
        avg_pool = torch.mean(x, dim=(2, 3), keepdim=True)
        max_out = torch.relu(self.fc1(max_pool.view(x.size(0), -1)))
        avg_out = torch.relu(self.fc1(avg_pool.view(x.size(0), -1)))
        max_out = torch.sigmoid(self.fc2(max_out)).view(x.size(0), -1, 1, 1)
        avg_out = torch.sigmoid(self.fc2(avg_out)).view(x.size(0), -1, 1, 1)
        return (max_out + avg_out) * x

class CBAM(nn.Module):
    def __init__(self, channels, r=16):
        super(CBAM, self).__init__()
        self.cam = CAM(channels, r)
        self.sam = SAM()

    def forward(self, x):
        x = self.cam(x)
        x = self.sam(x)
        return x

# Define CBAM-Enhanced ResNet Model
class CBAMResNet(nn.Module):
    def __init__(self, num_classes=10):
        super(CBAMResNet, self).__init__()
        self.resnet = models.resnet18(pretrained=True)
        self.resnet.fc = nn.Identity()  # Remove original FC layer
        self.cbam = CBAM(512)  # 512 is the output channels from ResNet18
        self.fc = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.resnet(x)  # Output shape: (batch_size, 512)
        x = x.unsqueeze(-1).unsqueeze(-1)  # Add height and width dimensions
        x = self.cbam(x)  # Apply CBAM
        x = x.view(x.size(0), -1)  # Flatten for FC layer
        return self.fc(x)

# Custom Dataset for Unlabelled Test Data
class CustomImageDataset(Dataset):
    def __init__(self, img_dir, transform=None):
        self.img_dir = img_dir
        self.transform = transform
        self.image_names = os.listdir(img_dir)

    def __len__(self):
        return len(self.image_names)

    def __getitem__(self, idx):
        img_name = self.image_names[idx]
        img_path = os.path.join(self.img_dir, img_name)
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, img_name

# Hyperparameters
batch_size = 32
num_epochs = 50
learning_rate = 0.001
num_classes = 10
train_data_dir = 'Dataset/new_train'
val_data_dir = 'Dataset/validation'
test_data_dir = 'Dataset/testing/images'

# Data Transformations
transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

test_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Datasets and Dataloaders
train_dataset = datasets.ImageFolder(root=train_data_dir, transform=transform)
val_dataset = datasets.ImageFolder(root=val_data_dir, transform=transform)
test_dataset = CustomImageDataset(img_dir=test_data_dir, transform=test_transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Initialize Model, Loss Function, and Optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CBAMResNet(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Function to Compute Metrics
def compute_metrics(preds, targets):
    accuracy = accuracy_score(targets, preds)
    recall = recall_score(targets, preds, average='weighted')
    precision = precision_score(targets, preds, average='weighted')
    f1 = f1_score(targets, preds, average='weighted')
    cm = confusion_matrix(targets, preds)
    specificity = cm.diagonal() / (cm.sum(axis=1) + 1e-6)
    return accuracy, recall, precision, f1, specificity.mean()

# Training Loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    train_preds, train_labels = [], []
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * inputs.size(0)
        train_preds.extend(outputs.argmax(dim=1).cpu().numpy())
        train_labels.extend(labels.cpu().numpy())
    train_loss = running_loss / len(train_loader.dataset)
    accuracy, recall, precision, f1, specificity = compute_metrics(train_preds, train_labels)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss:.4f}, Acc: {accuracy:.4f}, Rec: {recall:.4f}, Prec: {precision:.4f}, F1: {f1:.4f}, Spec: {specificity:.4f}")

# Validation Loop
model.eval()
with torch.no_grad():
    val_preds, val_labels = [], []
    val_loss = 0.0
    for inputs, labels in val_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        val_loss += criterion(outputs, labels).item() * inputs.size(0)
        val_preds.extend(outputs.argmax(dim=1).cpu().numpy())
        val_labels.extend(labels.cpu().numpy())
    val_loss /= len(val_loader.dataset)
    accuracy, recall, precision, f1, specificity = compute_metrics(val_preds, val_labels)
    print(f"Validation Loss: {val_loss:.4f}, Acc: {accuracy:.4f}, Rec: {recall:.4f}, Prec: {precision:.4f}, F1: {f1:.4f}, Spec: {specificity:.4f}")

# Test Prediction and Report Generation
test_results = []
with torch.no_grad():
    for images, img_name in test_loader:
        images = images.to(device)
        outputs = model(images)
        probabilities = torch.softmax(outputs, dim=1).cpu().numpy().flatten()
        predicted_class = probabilities.argmax()
        result = {
            'image_name': img_name[0],
            **{f'class_{i}': prob for i, prob in enumerate(probabilities)},
            'predicted_class': predicted_class
        }
        test_results.append(result)

# Save Test Predictions to CSV
df = pd.DataFrame(test_results)
df.to_csv('test_results_CBAM.csv', index=False)
print("Test predictions saved to 'test_results_CBAM.csv'.")


Epoch [1/50], Loss: 1.0939, Acc: 0.6341, Rec: 0.6341, Prec: 0.6244, F1: 0.6241, Spec: 0.5730
Epoch [2/50], Loss: 0.7837, Acc: 0.7337, Rec: 0.7337, Prec: 0.7309, F1: 0.7304, Spec: 0.6879
Epoch [3/50], Loss: 0.6638, Acc: 0.7717, Rec: 0.7717, Prec: 0.7703, F1: 0.7697, Spec: 0.7329
Epoch [4/50], Loss: 0.5897, Acc: 0.7966, Rec: 0.7966, Prec: 0.7954, F1: 0.7951, Spec: 0.7626
Epoch [5/50], Loss: 0.5242, Acc: 0.8185, Rec: 0.8185, Prec: 0.8175, F1: 0.8174, Spec: 0.7888
Epoch [6/50], Loss: 0.4737, Acc: 0.8362, Rec: 0.8362, Prec: 0.8355, F1: 0.8354, Spec: 0.8104
Epoch [7/50], Loss: 0.4408, Acc: 0.8471, Rec: 0.8471, Prec: 0.8462, F1: 0.8463, Spec: 0.8227
Epoch [8/50], Loss: 0.4066, Acc: 0.8585, Rec: 0.8585, Prec: 0.8575, F1: 0.8577, Spec: 0.8360
Epoch [9/50], Loss: 0.3813, Acc: 0.8678, Rec: 0.8678, Prec: 0.8672, F1: 0.8673, Spec: 0.8479
Epoch [10/50], Loss: 0.3532, Acc: 0.8776, Rec: 0.8776, Prec: 0.8770, F1: 0.8771, Spec: 0.8588
Epoch [11/50], Loss: 0.3395, Acc: 0.8815, Rec: 0.8815, Prec: 0.8809, 

In [16]:
torch.save(model.state_dict(), 'cbam.pth')