In [1]:
### from torchvision import transforms
from PIL import Image
import torch

class SimCLRAugmentations:
    def __init__(self):
        self.augment = transforms.Compose([
            transforms.RandomResizedCrop(size=128),
            transforms.RandomHorizontalFlip(),
            transforms.RandomApply([transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.2)], p=0.8),
            transforms.RandomGrayscale(p=0.2),
            transforms.ToTensor()
        ])

    def __call__(self, x):
        num_channels = x.shape[2]
        augmented_channels_0 = []
        augmented_channels_90 = []
        augmented_channels_180 = []
        augmented_channels_270 = []
        for i in range(num_channels):
            channel_pil = Image.fromarray(x[:, :, i].astype('uint8'))
            channel_augmented_0 = self.augment(channel_pil)
            channel_augmented_90 = self.augment(channel_pil.rotate(90))
            channel_augmented_180 = self.augment(channel_pil.rotate(180))
            channel_augmented_270 = self.augment(channel_pil.rotate(270))
            augmented_channels_0.append(channel_augmented_0)
            augmented_channels_90.append(channel_augmented_90)
            augmented_channels_180.append(channel_augmented_180)
            augmented_channels_270.append(channel_augmented_270)
        augmented_image_0 = torch.cat(augmented_channels_0, dim=0)
        augmented_image_90 = torch.cat(augmented_channels_90, dim=0)
        augmented_image_180 = torch.cat(augmented_channels_180, dim=0)
        augmented_image_270 = torch.cat(augmented_channels_270, dim=0)
        return augmented_image_0, augmented_image_90, augmented_image_180, augmented_image_270



In [2]:
import os
import glob
import numpy as np
import rasterio
from PIL import Image
from torch.utils.data import Dataset

class Sentinel2Dataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.samples = self.load_samples()

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        sample_path, crop_type = self.samples[idx]
        bands = self.load_bands(sample_path)
        if bands is None:
            print(f"No data found for sample: {sample_path}")
            return None, None
        if self.transform:
            x_0, x_90, x_180, x_270 = self.transform(bands)
            result = ([x_0, x_90, x_180, x_270], crop_type)
            #print(f"__getitem__ output: {result}")  # Debug print
            return result
        return bands, crop_type

    def load_samples(self):
        samples = []
        for crop_type in os.listdir(self.root_dir):
            crop_path = os.path.join(self.root_dir, crop_type)
            if os.path.isdir(crop_path):
                for sample_dir in os.listdir(crop_path):
                    sample_path = os.path.join(crop_path, sample_dir)
                    if os.path.isdir(sample_path):
                        sample = (sample_path, crop_type)
                        samples.append(sample)
        return samples

    #Original target shape used was (128,128) , for hyperparameter tuning i am changing it to 64*64
    def load_bands(self, sample_path, target_shape=(128, 128)):
        contents = os.listdir(sample_path)
        subdirectories = [content for content in contents if os.path.isdir(os.path.join(sample_path, content))]
        if not subdirectories:
            return None
        for subdir in subdirectories:
            subdirectory_path = os.path.join(sample_path, subdir)
            band_files = sorted(glob.glob(os.path.join(subdirectory_path, "*.tif")))
            if len(band_files) == 0:
                print("No band files found.")
                continue
            bands = []
            for band_file in band_files:
                with rasterio.open(band_file) as src:
                    band = src.read(1)
                    band_image = Image.fromarray(band)
                    band_resized = band_image.resize(target_shape, Image.NEAREST)
                    bands.append(np.array(band_resized))
            return np.stack(bands, axis=-1)
        

#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%#

class Sentinel2ValidationDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.samples = self.load_samples()

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        sample_path = self.samples[idx]
        bands = self.load_bands(sample_path)
        if bands is None:
            print(f"No data found for sample: {sample_path}")
            return None
        if self.transform:
            x_0, x_90, x_180, x_270 = self.transform(bands)
            bands = [x_0, x_90, x_180, x_270]
        return bands

    def load_samples(self):
        samples = []
        for sample_dir in os.listdir(self.root_dir):
            sample_path = os.path.join(self.root_dir, sample_dir)
            if os.path.isdir(sample_path):
                samples.append(sample_path)
        return samples

    def load_bands(self, sample_path, target_shape=(128, 128)):
        band_files = sorted(glob.glob(os.path.join(sample_path, "*.tif")))
        if len(band_files) == 0:
            print("No band files found.")
            return None
        bands = []
        for band_file in band_files:
            with rasterio.open(band_file) as src:
                band = src.read(1)
                band_image = Image.fromarray(band)
                band_resized = band_image.resize(target_shape, Image.NEAREST)
                bands.append(np.array(band_resized))
        return np.stack(bands, axis=-1)


In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms

# Define the autoencoder architecture
class Autoencoder(nn.Module):
    def __init__(self, num_input_channels=12):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(num_input_channels, 16, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(16, 32, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 64, 3, stride=2, padding=1),
            nn.ReLU(),
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 32, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 16, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(16, num_input_channels, 4, stride=2, padding=1),
            nn.Sigmoid(),
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# Initialize the autoencoder
autoencoder = Autoencoder(num_input_channels=12)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
autoencoder.to(device)

# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(autoencoder.parameters(), lr=0.0001)

# Assuming train_dataset is already defined
train_transform = SimCLRAugmentations()
train_dataset = Sentinel2Dataset(root_dir='/kaggle/input/beyond-visible-spectrum-ai-for-agriculture-2024p2/archive/share/train', transform=train_transform)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, drop_last=True)
#batch size was 16 i am changing it to 8

val_transform = SimCLRAugmentations()
val_dataset = Sentinel2ValidationDataset(root_dir='/kaggle/input/beyond-visible-spectrum-ai-for-agriculture-2024p2/archive/share/val', transform=val_transform)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, drop_last=True)
#batch size was 16 i am changing it to 8


In [4]:
import torch

num_epochs = 100
for epoch in range(num_epochs):
    autoencoder.train()
    total_train_loss = 0
    for batch in train_loader:
        # Filter out invalid samples (those with None)
        batch = [item for item in batch if item is not None]
        if not batch:
            continue

        try:
            augmented_images_list, _ = batch
        except Exception as e:
            print(f"Error unpacking batch: {e}")
            continue

        # Flatten the list of lists and convert them into a tensor
        augmented_images_flat = [img for sublist in augmented_images_list for img in sublist]
        augmented_images = torch.stack(augmented_images_flat).to(device)

        optimizer.zero_grad()
        outputs = autoencoder(augmented_images)
        loss = criterion(outputs, augmented_images)
        loss.backward()
        optimizer.step()

        total_train_loss += loss.item()

    average_train_loss = total_train_loss / len(train_loader)
    #print(f'Epoch {epoch+1}, Train Loss: {average_train_loss:.4f}')

    # Evaluation on validation data
    autoencoder.eval()  # Set the model to evaluation mode
    total_val_loss = 0
    with torch.no_grad():
        for val_batch in val_loader:
            val_batch = [item for item in val_batch if item is not None]
            if not val_batch:
                continue

            try:
                val_images_list = val_batch
            except Exception as e:
                print(f"Error unpacking validation batch: {e}")
                continue

            val_images_flat = [img for sublist in val_images_list for img in sublist]
            val_images = torch.stack(val_images_flat).to(device)

            val_outputs = autoencoder(val_images)
            val_loss = criterion(val_outputs, val_images)
            total_val_loss += val_loss.item()

    average_val_loss = total_val_loss / len(val_loader)
    print(f'Epoch {epoch+1}, Train Loss: {average_train_loss:.4f},Validation Loss: {average_val_loss:.4f}')


  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)


Epoch 1, Train Loss: 0.0592,Validation Loss: 0.0589
Epoch 2, Train Loss: 0.0593,Validation Loss: 0.0591
Epoch 3, Train Loss: 0.0589,Validation Loss: 0.0588
Epoch 4, Train Loss: 0.0585,Validation Loss: 0.0577
Epoch 5, Train Loss: 0.0579,Validation Loss: 0.0573
Epoch 6, Train Loss: 0.0575,Validation Loss: 0.0570
Epoch 7, Train Loss: 0.0572,Validation Loss: 0.0563
Epoch 8, Train Loss: 0.0564,Validation Loss: 0.0558
Epoch 9, Train Loss: 0.0555,Validation Loss: 0.0548
Epoch 10, Train Loss: 0.0545,Validation Loss: 0.0537
Epoch 11, Train Loss: 0.0534,Validation Loss: 0.0533
Epoch 12, Train Loss: 0.0529,Validation Loss: 0.0523
Epoch 13, Train Loss: 0.0522,Validation Loss: 0.0520
Epoch 14, Train Loss: 0.0520,Validation Loss: 0.0519
Epoch 15, Train Loss: 0.0514,Validation Loss: 0.0515
Epoch 16, Train Loss: 0.0513,Validation Loss: 0.0513
Epoch 17, Train Loss: 0.0514,Validation Loss: 0.0507
Epoch 18, Train Loss: 0.0513,Validation Loss: 0.0508
Epoch 19, Train Loss: 0.0508,Validation Loss: 0.0508
Ep

In [5]:
import torch.nn.functional as F

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
autoencoder.to(device)

with torch.no_grad():
    total_mse = 0.0
    total_samples = 0  # Keep track of the number of samples processed
    for batch in train_loader:
        batch = [item for item in batch if item is not None]
        if not batch:
            continue
        
        augmented_images_list, _ = batch

        # Flatten the list of lists and convert them into a tensor
        augmented_images_flat = [img for sublist in augmented_images_list for img in sublist]
        augmented_images = torch.stack(augmented_images_flat).to(device)
        
        # Get the reconstructed images
        reconstructed_images = autoencoder(augmented_images)
        
        # Calculate the MSE for each reconstructed image
        mse = F.mse_loss(reconstructed_images, augmented_images, reduction='sum')
        total_mse += mse.item()
        
        # Update the number of samples processed
        total_samples += len(augmented_images_flat)

    # Compute mean squared error
    mean_mse = total_mse / total_samples
    print(f'Accuracy (MSE): {mean_mse:.4f}')


Accuracy (MSE): 8788.3898


In [6]:
import torch.nn.functional as F
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
autoencoder.to(device)

with torch.no_grad():
    total_psnr = 0.0
    total_ssim = 0.0
    total_samples = 0
    
    for batch in train_loader:
        batch = [item for item in batch if item is not None]
        if not batch:
            continue
        
        augmented_images_list, _ = batch

        # Flatten the list of lists and convert them into a tensor
        augmented_images_flat = [img for sublist in augmented_images_list for img in sublist]
        augmented_images = torch.stack(augmented_images_flat).to(device)
        
        # Get the reconstructed images
        reconstructed_images = autoencoder(augmented_images)
        
        # Move back to CPU for metric calculation
        reconstructed_images = reconstructed_images.cpu()
        augmented_images = augmented_images.cpu()

        # Calculate PSNR and SSIM for each image
        for i in range(augmented_images.shape[0]):
            psnr_value = psnr(
                augmented_images[i].numpy(), 
                reconstructed_images[i].numpy(), 
                data_range=augmented_images[i].max().numpy() - augmented_images[i].min().numpy()
            )
            ssim_value = ssim(
                augmented_images[i].numpy().transpose((1, 2, 0)), 
                reconstructed_images[i].numpy().transpose((1, 2, 0)), 
                data_range=augmented_images[i].max().numpy() - augmented_images[i].min().numpy(), 
                multichannel=True
            )
            total_psnr += psnr_value
            total_ssim += ssim_value
        
        # Update the number of samples processed
        total_samples += augmented_images.shape[0]
    
    # Compute mean PSNR and SSIM
    mean_psnr = total_psnr / total_samples
    mean_ssim = total_ssim / total_samples
    print(f'PSNR: {mean_psnr:.4f}, SSIM: {mean_ssim:.4f}')


PSNR: 13.5532, SSIM: 0.1692


In [7]:
import os
import numpy as np
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

# Define the root directory
root_dir = "/kaggle/input/beyond-visible-spectrum-ai-for-agriculture-2024p2/ICPR02/kaggle/"

# Define the labels and corresponding directories
labels = ["Aphid", "Blast", "RPH", "Rust"]
label_dirs = ["Aphid", "Blast", "RPH", "Rust"]

# Define the desired shape for resizing
target_shape = (128, 128)  # Adjust according to your requirements

# Initialize empty lists to store stacked images and labels
stacked_images = []
stacked_labels = []

# Initialize a dictionary to count the number of images for each class
class_counts = {label: 0 for label in labels}

def augment_image(image_array, angles=[0, 90, 180, 270]):
    augmented_images = []
    for angle in angles:
        rotated_channels = [Image.fromarray(image_array[:,:,i]).rotate(angle) for i in range(image_array.shape[2])]
        rotated_image = np.stack([np.array(channel) for channel in rotated_channels], axis=-1)
        augmented_images.append(rotated_image)
    return augmented_images

# Iterate through each label directory
for label, label_dir in zip(labels, label_dirs):
    # Get the path to the current label directory
    label_path = os.path.join(root_dir, label_dir)
    
    # Initialize an empty list to store individual image arrays
    image_arrays = []
    
    # Iterate through each subdirectory containing images
    for subdir in os.listdir(label_path):
        subdir_path = os.path.join(label_path, subdir)
        
        # Initialize an empty list to store individual band images
        band_images = []
        
        # Iterate through each band image file
        for file in os.listdir(subdir_path):
            if file.endswith(".tif"):
                # Open the image file
                image = Image.open(os.path.join(subdir_path, file))
                
                # Resize the image to the target shape
                resized_image = image.resize(target_shape)
                
                # Convert the resized image to numpy array
                band_images.append(np.array(resized_image).astype(np.uint8))
        
        # Stack the band images along the third axis to create a single array
        stacked_image = np.stack(band_images, axis=-1)
        
        # Perform augmentation for 'Blast' and 'Rust' classes
        if label in ['Blast', 'Rust']:
            augmented_images = augment_image(stacked_image, angles=[0, 90, 180, 270])
            for img in augmented_images:
                image_arrays.append(img)
                class_counts[label] += 1  # Count each augmented image
        else:
            image_arrays.append(stacked_image)
            class_counts[label] += 1  # Count each original image
    
    # Convert the list to numpy array
    stacked_label_images = np.array(image_arrays)
    
    # Append the stacked images to the list of all stacked images
    stacked_images.append(stacked_label_images)
    
    # Create an array of labels corresponding to the stacked images
    stacked_label = np.full((stacked_label_images.shape[0],), label)
    stacked_labels.append(stacked_label)

# Concatenate all stacked images and labels to create a single dataset
dataset_images = np.concatenate(stacked_images, axis=0)
dataset_labels = np.concatenate(stacked_labels, axis=0)

# Print the shapes of the dataset
print("Dataset Images Shape:", dataset_images.shape)  # Expected Output: (1245, 128, 128, 12)
print("Dataset Labels Shape:", dataset_labels.shape)

# Print the number of images for each class
print("Number of images per class:")
for label, count in class_counts.items():
    print(f"{label}: {count}")


Dataset Images Shape: (1245, 128, 128, 12)
Dataset Labels Shape: (1245,)
Number of images per class:
Aphid: 290
Blast: 300
RPH: 495
Rust: 160


In [8]:
from sklearn.model_selection import train_test_split

train_images, val_images, train_labels, val_labels = train_test_split(dataset_images, dataset_labels, test_size=0.2, stratify=dataset_labels, random_state=42)

# Print the shapes of the training and validation sets
print("Training Images Shape:", train_images.shape)
print("Training Labels Shape:", train_labels.shape)
print("Validation Images Shape:", val_images.shape)
print("Validation Labels Shape:", val_labels.shape)


Training Images Shape: (996, 128, 128, 12)
Training Labels Shape: (996,)
Validation Images Shape: (249, 128, 128, 12)
Validation Labels Shape: (249,)


In [9]:
import numpy as np

# Count the occurrences of each class in train_labels and val_labels
train_class_counts = {class_name: np.sum(train_labels == class_name) for class_name in np.unique(train_labels)}
val_class_counts = {class_name: np.sum(val_labels == class_name) for class_name in np.unique(val_labels)}

# Print the class counts
print("Class counts in training set:")
for class_name, count in train_class_counts.items():
    print(f"{class_name}: {count}")

print("\nClass counts in validation set:")
for class_name, count in val_class_counts.items():
    print(f"{class_name}: {count}")


Class counts in training set:
Aphid: 232
Blast: 240
RPH: 396
Rust: 128

Class counts in validation set:
Aphid: 58
Blast: 60
RPH: 99
Rust: 32


In [10]:
class CustomDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

# Define transformations for data augmentation (if needed)
transform = transforms.Compose([
    transforms.ToTensor(),
    # Add more transformations as needed
])

# Create custom datasets
train_dataset = CustomDataset(train_images, train_labels, transform=transform)
val_dataset = CustomDataset(val_images, val_labels, transform=transform)

# Create DataLoader for training and validation
train_loader_new = DataLoader(train_dataset, batch_size=16, shuffle=True)
eval_loader_new = DataLoader(val_dataset, batch_size=16, shuffle=False)

# Define a dictionary to map class names to indices
class_to_index = {"Aphid": 0, "Rust": 1, "Blast": 2, "RPH": 3}

# Fine-tune the autoencoder for the classification task
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
autoencoder.to(device)

# Here, you would proceed with fine-tuning the autoencoder as shown in the previous code
# fine_tuned_autoencoder = fine_tune_model(autoencoder, train_loader, val_loader, num_classes=4, num_epochs=50, device=device)

Autoencoder(
  (encoder): Sequential(
    (0): Conv2d(12, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (3): ReLU()
    (4): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (5): ReLU()
  )
  (decoder): Sequential(
    (0): ConvTranspose2d(64, 32, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (1): ReLU()
    (2): ConvTranspose2d(32, 16, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (3): ReLU()
    (4): ConvTranspose2d(16, 12, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (5): Sigmoid()
  )
)

In [11]:

total_size_train = len(train_loader_new.dataset)
print("Total size of the train_loader:", total_size_train)

total_size_val = len(eval_loader_new.dataset)
print("Total size of the val_loader: ", total_size_val)

Total size of the train_loader: 996
Total size of the val_loader:  249


In [12]:
#### import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import accuracy_score, f1_score
import torch.nn.functional as F

class FineTunedAutoencoder(nn.Module):
    def __init__(self, autoencoder, num_classes):
        super(FineTunedAutoencoder, self).__init__()
        self.encoder = autoencoder.encoder
        num_features = self._get_num_features(autoencoder.encoder, (12, 128, 128))
        self.fc = nn.Linear(num_features, num_classes)

    def forward(self, x):
        x = self.encoder(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x
    
    def _get_num_features(self, model, input_size):
        with torch.no_grad():
            output = model(torch.zeros(1, *input_size).to(next(model.parameters()).device))
            num_features = output.view(1, -1).size(1)
        return num_features

def fine_tune_model(autoencoder, train_loader, val_loader, num_classes, num_epochs=50, device='cuda'):
    model = FineTunedAutoencoder(autoencoder, num_classes)
    model.to(device)

    criterion = nn.CrossEntropyLoss(weight=calculate_class_weights(train_loader, num_classes).to(device))  # Adjusted
    optimizer = optim.Adam(model.parameters(), lr=1e-4)

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0

        for images, labels in train_loader:
            images = images.float().to(device)
            labels = [class_to_index[label_str] for label_str in labels]
            labels = torch.tensor(labels, dtype=torch.long).to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        val_acc, val_f1 = evaluate_model(model, val_loader, device)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(train_loader)}, Val Acc: {val_acc}, Val F1: {val_f1}")

    return model


def calculate_class_weights(data_loader, num_classes):
    class_counts = torch.zeros(num_classes)
    total_samples = 0

    for _, labels in data_loader:
        for label in labels:
            if isinstance(label, str):
                label = class_to_index[label]
            class_counts[label] += 1
            total_samples += 1

    class_weights = total_samples / (num_classes * class_counts)
    return class_weights


def evaluate_model(model, data_loader, device='cuda'):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for images, labels in data_loader:
            images = images.to(device)
            labels = [class_to_index[label_str] for label_str in labels]
            labels = torch.tensor(labels, dtype=torch.long).to(device)
            all_labels.extend(labels.cpu().numpy())

            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())

    # Map predictions back to class names
    pred_class_names = [list(class_to_index.keys())[index] for index in all_preds]
    true_class_names = [list(class_to_index.keys())[index] for index in all_labels]

    # Print the predictions and true labels for debugging
    print("Predictions:\n", pred_class_names)
    print("True Labels:\n", true_class_names)

    # Calculate accuracy and F1 score
    accuracy = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='weighted')
    return accuracy, f1

# Fine-tune the autoencoder for the classification task
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
autoencoder.to(device)

fine_tuned_autoencoder = fine_tune_model(autoencoder, train_loader_new, eval_loader_new, num_classes=4, num_epochs=50, device=device)


Predictions:
 ['RPH', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'RPH', 'Rust', 'RPH', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'RPH', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'RPH', 'RPH', 'RPH', 'Rust', 'Rust', 'RPH', 'RPH', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'RPH', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'RPH', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'RPH', 'Rust', 'RPH', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'RPH', 'Rust', 'Rust', 'Rust', 'Rust', 'RPH', 'Rust', 'Rust', 'Rust', 'RPH', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'RPH', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'R

In [13]:
# Evaluate the fine-tuned model
accuracy, f1 = evaluate_model(fine_tuned_autoencoder, eval_loader_new, device=device)
print(f"Final Evaluation - Accuracy: {accuracy}, F1 Score: {f1}")

Predictions:
 ['Aphid', 'Aphid', 'RPH', 'Aphid', 'Blast', 'Aphid', 'Aphid', 'Blast', 'Blast', 'Blast', 'Blast', 'Blast', 'RPH', 'Aphid', 'Blast', 'RPH', 'Rust', 'Blast', 'Aphid', 'RPH', 'RPH', 'Rust', 'RPH', 'Blast', 'Blast', 'Rust', 'Rust', 'RPH', 'Rust', 'Aphid', 'RPH', 'Rust', 'RPH', 'Aphid', 'RPH', 'RPH', 'Aphid', 'Rust', 'Rust', 'Aphid', 'Aphid', 'Blast', 'Aphid', 'RPH', 'Blast', 'Aphid', 'RPH', 'RPH', 'RPH', 'Blast', 'Rust', 'Rust', 'RPH', 'RPH', 'Blast', 'Aphid', 'Aphid', 'RPH', 'RPH', 'Aphid', 'Blast', 'Rust', 'Aphid', 'RPH', 'Aphid', 'RPH', 'Blast', 'Blast', 'Aphid', 'Aphid', 'Blast', 'RPH', 'RPH', 'Blast', 'Blast', 'Blast', 'Rust', 'RPH', 'RPH', 'Blast', 'Aphid', 'Blast', 'Rust', 'RPH', 'RPH', 'RPH', 'Blast', 'Rust', 'RPH', 'Aphid', 'RPH', 'Aphid', 'Rust', 'RPH', 'RPH', 'Aphid', 'Aphid', 'Aphid', 'Rust', 'RPH', 'Aphid', 'RPH', 'Blast', 'Aphid', 'Aphid', 'Blast', 'Aphid', 'Aphid', 'RPH', 'RPH', 'Rust', 'Aphid', 'RPH', 'Aphid', 'Aphid', 'Aphid', 'Aphid', 'RPH', 'Aphid', 'RPH', 

In [14]:
train_transform = SimCLRAugmentations()
pretext_task_dataset = Sentinel2Dataset(root_dir='/kaggle/input/beyond-visible-spectrum-ai-for-agriculture-2024p2/archive/share/train', transform=train_transform)
pretext_task_loader  = DataLoader(pretext_task_dataset, batch_size=16, shuffle=False, drop_last=False)
#batch size was 16 i am changing it to 8

val_transform = SimCLRAugmentations()
pretext_val_dataset = Sentinel2ValidationDataset(root_dir='/kaggle/input/beyond-visible-spectrum-ai-for-agriculture-2024p2/archive/share/val', transform=val_transform)
pretext_val_loader = DataLoader(pretext_val_dataset, batch_size=16, shuffle=False, drop_last=False)

In [15]:
def predict_pretext_labels(model, data_loader, device='cuda'):
    model.eval()
    all_preds = []

    with torch.no_grad():
        for batch in data_loader:
            batch = [item for item in batch if item is not None]  # Filter out None values
            if not batch:
                continue

            try:
                augmented_images_list, _ = batch
            except Exception as e:
                print(f"Error unpacking batch: {e}")
                continue

            augmented_images_flat = [img for sublist in augmented_images_list for img in sublist]
            images = torch.stack(augmented_images_flat).to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())

    pred_class_names = [list(class_to_index.keys())[index] for index in all_preds]
    return pred_class_names

# Predict labels for the pretext task data
predicted_labels = predict_pretext_labels(fine_tuned_autoencoder, pretext_task_loader, device=device)

# Print predicted labels
print("Predicted Labels for Pretext Task Data:", predicted_labels)

Predicted Labels for Pretext Task Data: ['RPH', 'RPH', 'RPH', 'RPH', 'RPH', 'RPH', 'RPH', 'RPH', 'RPH', 'Blast', 'RPH', 'Aphid', 'Blast', 'RPH', 'RPH', 'Aphid', 'Aphid', 'Aphid', 'RPH', 'RPH', 'RPH', 'Aphid', 'RPH', 'RPH', 'Rust', 'Aphid', 'Aphid', 'Aphid', 'Rust', 'RPH', 'Aphid', 'Rust', 'RPH', 'RPH', 'RPH', 'RPH', 'Aphid', 'Blast', 'RPH', 'RPH', 'Blast', 'Rust', 'RPH', 'Aphid', 'Rust', 'RPH', 'RPH', 'RPH', 'Aphid', 'Aphid', 'Blast', 'Aphid', 'Aphid', 'RPH', 'RPH', 'RPH', 'RPH', 'Blast', 'Blast', 'RPH', 'RPH', 'RPH', 'RPH', 'RPH', 'Blast', 'RPH', 'RPH', 'RPH', 'Aphid', 'Blast', 'RPH', 'RPH', 'Aphid', 'RPH', 'RPH', 'RPH', 'RPH', 'RPH', 'RPH', 'Aphid', 'RPH', 'Rust', 'Rust', 'RPH', 'Blast', 'Rust', 'RPH', 'Rust', 'Rust', 'Aphid', 'RPH', 'RPH', 'Blast', 'RPH', 'Aphid', 'RPH', 'Rust', 'RPH', 'RPH', 'Rust', 'RPH', 'Blast', 'Blast', 'Aphid', 'RPH', 'Aphid', 'RPH', 'RPH', 'RPH', 'Aphid', 'Aphid', 'Blast', 'Blast', 'RPH', 'Blast', 'RPH', 'Rust', 'Blast', 'RPH', 'Aphid', 'Blast', 'RPH', 'Rust'

In [16]:
print("Predicted Labels for Pretext Task Data:",len(predicted_labels))

Predicted Labels for Pretext Task Data: 2928


In [17]:
import os
import glob
import numpy as np
import rasterio
from PIL import Image
from torch.utils.data import Dataset
import torch
from torchvision import transforms

class Sentinel2Dataset(Dataset):
    def __init__(self, root_dir, model, device='cuda', transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.model = model
        self.device = device
        self.samples = self.load_samples()

    def __len__(self):
        return len(self.samples) * 4  # Adjust for 4 augmentations per sample

    def __getitem__(self, idx):
        original_idx = idx // 4
        transform_idx = idx % 4

        sample_path, crop_type = self.samples[original_idx]
        bands = self.load_bands(sample_path)

        if bands is None:
            print(f"No data found for sample: {sample_path}")
            return None, None

        if self.transform:
            x_0, x_90, x_180, x_270 = self.transform(bands)
            augmented_images = (x_0, x_90, x_180, x_270)
            augmented_image = augmented_images[transform_idx]

            image_tensor = torch.unsqueeze(torch.tensor(augmented_image, dtype=torch.float32), 0).to(self.device)
            with torch.no_grad():
                outputs = self.model(image_tensor)
                _, preds = torch.max(outputs, 1)
            predicted_label = list(class_to_index.keys())[preds.item()]

            return augmented_image, predicted_label
        
        return bands, crop_type

    def load_samples(self):
        samples = []
        for crop_type in os.listdir(self.root_dir):
            crop_path = os.path.join(self.root_dir, crop_type)
            if os.path.isdir(crop_path):
                for sample_dir in os.listdir(crop_path):
                    sample_path = os.path.join(crop_path, sample_dir)
                    if os.path.isdir(sample_path):
                        sample = (sample_path, crop_type)
                        samples.append(sample)
        return samples

    def load_bands(self, sample_path, target_shape=(128, 128)):
        contents = os.listdir(sample_path)
        subdirectories = [content for content in contents if os.path.isdir(os.path.join(sample_path, content))]
        if not subdirectories:
            return None
        for subdir in subdirectories:
            subdirectory_path = os.path.join(sample_path, subdir)
            band_files = sorted(glob.glob(os.path.join(subdirectory_path, "*.tif")))
            if len(band_files) == 0:
                print("No band files found.")
                continue
            bands = []
            for band_file in band_files:
                with rasterio.open(band_file) as src:
                    band = src.read(1)
                    band_image = Image.fromarray(band)
                    band_resized = band_image.resize(target_shape, Image.NEAREST)
                    bands.append(np.array(band_resized))
            return np.stack(bands, axis=-1)


In [18]:
# import os
# import glob
# import numpy as np
# import rasterio
# from PIL import Image
# from torch.utils.data import Dataset
# import torch
# from torchvision import transforms

# class Sentinel2Dataset(Dataset):
#     def __init__(self, root_dir, model, device='cuda', transform=None):
#         self.root_dir = root_dir
#         self.transform = transform
#         self.model = model
#         self.device = device
#         self.samples = self.load_samples()

#     def __len__(self):
#         return len(self.samples)

#     def __getitem__(self, idx):
#         sample_path, crop_type = self.samples[idx]
#         bands = self.load_bands(sample_path)

#         if bands is None:
#             print(f"No data found for sample: {sample_path}")
#             return None, None

#         if self.transform:
#             augmented_images_flat = []
#             predicted_labels = []
            
#             transform_fn = self.transform
#             x_0, x_90, x_180, x_270 = transform_fn(bands)
#             augmented_images = (x_0, x_90, x_180, x_270)
#             augmented_images_flat.append(augmented_images)

#             # Convert each augmented image to tensor and perform inference
#             for image in augmented_images:
#                 image_tensor = torch.unsqueeze(torch.tensor(image, dtype=torch.float32), 0).to(self.device)
#                 with torch.no_grad():
#                     outputs = self.model(image_tensor)
#                     _, preds = torch.max(outputs, 1)
#                 predicted_label = list(class_to_index.keys())[preds.item()]
#                 predicted_labels.append(predicted_label)
             
#             return augmented_images_flat, predicted_labels
        
#         return bands, crop_type


#     def load_samples(self):
#         samples = []
#         for crop_type in os.listdir(self.root_dir):
#             crop_path = os.path.join(self.root_dir, crop_type)
#             if os.path.isdir(crop_path):
#                 for sample_dir in os.listdir(crop_path):
#                     sample_path = os.path.join(crop_path, sample_dir)
#                     if os.path.isdir(sample_path):
#                         sample = (sample_path, crop_type)
#                         samples.append(sample)
#         return samples

#     def load_bands(self, sample_path, target_shape=(64, 64)):
#         contents = os.listdir(sample_path)
#         subdirectories = [content for content in contents if os.path.isdir(os.path.join(sample_path, content))]
#         if not subdirectories:
#             return None
#         for subdir in subdirectories:
#             subdirectory_path = os.path.join(sample_path, subdir)
#             band_files = sorted(glob.glob(os.path.join(subdirectory_path, "*.tif")))
#             if len(band_files) == 0:
#                 print("No band files found.")
#                 continue
#             bands = []
#             for band_file in band_files:
#                 with rasterio.open(band_file) as src:
#                     band = src.read(1)
#                     band_image = Image.fromarray(band)
#                     band_resized = band_image.resize(target_shape, Image.NEAREST)
#                     bands.append(np.array(band_resized))
#             return np.stack(bands, axis=-1)


pretext_task_root_dir = '/kaggle/input/beyond-visible-spectrum-ai-for-agriculture-2024p2/archive/share/train'

# Create the DataLoader for pretext task data
train_transform = SimCLRAugmentations()
pretext_task_dataset = Sentinel2Dataset(root_dir=pretext_task_root_dir, model=fine_tuned_autoencoder, device=device, transform=train_transform)
pretext_task_loader = DataLoader(pretext_task_dataset, batch_size=16, shuffle=False)

# Predict labels for the pretext task data
predicted_labels = []

print(len(pretext_task_loader))


# Print predicted labels
# print("Predicted Labels for Pretext Task Data:")
# for i, label in enumerate(predicted_labels):
#     print(f"Sample {i+1}: {label}")


183


In [19]:
all_images = []
all_labels = []

for batch in pretext_task_loader:
    augmented_images_batch, predicted_labels_batch = batch
    for augmented_images, predicted_label in zip(augmented_images_batch, predicted_labels_batch):
        if augmented_images is not None:
            all_images.append(augmented_images)
            all_labels.append(predicted_label)


  image_tensor = torch.unsqueeze(torch.tensor(augmented_image, dtype=torch.float32), 0).to(self.device)


In [20]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np

class AutoencoderWithClassifier(nn.Module):
    def __init__(self, num_input_channels=12, num_classes=4):
        super(AutoencoderWithClassifier, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(num_input_channels, 16, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(16, 32, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 64, 3, stride=2, padding=1),
            nn.ReLU(),
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128 * 8 * 16, 256),  # Assuming the final feature map is 8x8
            nn.ReLU(),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.classifier(x)
        return x

# Define a custom dataset for the pretext autoencoder training
class PretextAutoencoderDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        # Ensure image is a float tensor and label is a long tensor
        image = torch.tensor(image, dtype=torch.float32)
        label = torch.tensor(label, dtype=torch.long)

        return image, label

# Example labels (modify this according to your actual label set)
label_classes = ["Aphid", "Blast", "RPH", "Rust"]  # Your label set

# Create a mapping from string labels to integers
label_mapping = {label: idx for idx, label in enumerate(label_classes)}
print("Label mapping:", label_mapping)

# Convert string labels to integers using the mapping
all_labels_int = np.array([label_mapping[label] for label in all_labels], dtype=np.int64)

# Ensure all_images is a numpy array of type float32
all_images = np.array(all_images, dtype=np.float32)

# Define transformation (if any)
transform = None  # You can add transforms like ToTensor(), Normalize(), etc., as needed

# Create dataset and dataloader
pretext_dataset = PretextAutoencoderDataset(all_images, all_labels_int, transform=transform)
batch_size = 16  # Adjust as per your memory and GPU capacity
pretext_loader = DataLoader(pretext_dataset, batch_size=batch_size, shuffle=True)

# Initialize the autoencoder with classifier
num_classes = len(label_mapping)  # Number of unique labels
autoencoder_pretext = AutoencoderWithClassifier(num_input_channels=12, num_classes=num_classes)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(autoencoder_pretext.parameters(), lr=1e-3)

# Training loop
num_epochs = 25  # Adjust as needed
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
autoencoder_pretext.to(device)

for epoch in range(num_epochs):
    total_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    for batch_images, batch_labels in pretext_loader:
        batch_images = batch_images.to(device)
        batch_labels = batch_labels.to(device)

        # Forward pass
        outputs = autoencoder_pretext(batch_images)
        loss = criterion(outputs, batch_labels)

        # Backward pass and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        # Calculate accuracy
        _, predicted = torch.max(outputs, 1)
        correct_predictions += (predicted == batch_labels).sum().item()
        total_samples += batch_labels.size(0)

    accuracy = correct_predictions / total_samples

    # Print epoch loss and accuracy
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss / len(pretext_loader):.4f}, Accuracy: {accuracy:.4f}")

# Optionally, save the trained autoencoder model
torch.save(autoencoder_pretext.state_dict(), 'pretext_autoencoder_with_classifier.pth')
print("Pretext autoencoder model with classifier saved.")


Label mapping: {'Aphid': 0, 'Blast': 1, 'RPH': 2, 'Rust': 3}
Epoch [1/25], Loss: 1.2531, Accuracy: 0.4908
Epoch [2/25], Loss: 1.2447, Accuracy: 0.4935
Epoch [3/25], Loss: 1.2254, Accuracy: 0.4935
Epoch [4/25], Loss: 1.2009, Accuracy: 0.4935
Epoch [5/25], Loss: 1.0626, Accuracy: 0.5464
Epoch [6/25], Loss: 0.7721, Accuracy: 0.6998
Epoch [7/25], Loss: 0.4937, Accuracy: 0.8064
Epoch [8/25], Loss: 0.2175, Accuracy: 0.9191
Epoch [9/25], Loss: 0.0837, Accuracy: 0.9740
Epoch [10/25], Loss: 0.0179, Accuracy: 0.9969
Epoch [11/25], Loss: 0.0046, Accuracy: 0.9997
Epoch [12/25], Loss: 0.0013, Accuracy: 1.0000
Epoch [13/25], Loss: 0.0005, Accuracy: 1.0000
Epoch [14/25], Loss: 0.0003, Accuracy: 1.0000
Epoch [15/25], Loss: 0.0002, Accuracy: 1.0000
Epoch [16/25], Loss: 0.0002, Accuracy: 1.0000
Epoch [17/25], Loss: 0.0001, Accuracy: 1.0000
Epoch [18/25], Loss: 0.0001, Accuracy: 1.0000
Epoch [19/25], Loss: 0.0001, Accuracy: 1.0000
Epoch [20/25], Loss: 0.0001, Accuracy: 1.0000
Epoch [21/25], Loss: 0.0001,

In [21]:
unique_labels = list(set(all_labels))

# Print the unique labels
print("Unique labels:", unique_labels)

Unique labels: ['Aphid', 'RPH', 'Blast', 'Rust']


**FineTuning on Labeled Pretext Task**

In [22]:
import os
import numpy as np
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

# Define the root directory
root_dir = "/kaggle/input/beyond-visible-spectrum-ai-for-agriculture-2024p2/ICPR02/kaggle/"

# Define the labels and corresponding directories
labels = ["Aphid", "Blast", "RPH", "Rust"]
label_dirs = ["Aphid", "Blast", "RPH", "Rust"]

# Define the desired shape for resizing
target_shape = (128, 128)  # Adjust according to your requirements

# Initialize empty lists to store stacked images and labels
stacked_images = []
stacked_labels = []

# Initialize a dictionary to count the number of images for each class
class_counts = {label: 0 for label in labels}

def augment_image(image_array, angles=[0, 90, 180, 270]):
    augmented_images = []
    for angle in angles:
        rotated_channels = [Image.fromarray(image_array[:,:,i]).rotate(angle) for i in range(image_array.shape[2])]
        rotated_image = np.stack([np.array(channel) for channel in rotated_channels], axis=-1)
        augmented_images.append(rotated_image)
    return augmented_images

# Iterate through each label directory
for label, label_dir in zip(labels, label_dirs):
    # Get the path to the current label directory
    label_path = os.path.join(root_dir, label_dir)
    
    # Initialize an empty list to store individual image arrays
    image_arrays = []
    
    # Iterate through each subdirectory containing images
    for subdir in os.listdir(label_path):
        subdir_path = os.path.join(label_path, subdir)
        
        # Initialize an empty list to store individual band images
        band_images = []
        
        # Iterate through each band image file
        for file in os.listdir(subdir_path):
            if file.endswith(".tif"):
                # Open the image file
                image = Image.open(os.path.join(subdir_path, file))
                
                # Resize the image to the target shape
                resized_image = image.resize(target_shape)
                
                # Convert the resized image to numpy array
                band_images.append(np.array(resized_image).astype(np.uint8))
        
        # Stack the band images along the third axis to create a single array
        stacked_image = np.stack(band_images, axis=-1)
        
        # Perform augmentation for 'Blast' and 'Rust' classes
        if label in ['Blast', 'Rust']:
            augmented_images = augment_image(stacked_image, angles=[0, 90, 180, 270])
            for img in augmented_images:
                image_arrays.append(img)
                class_counts[label] += 1  # Count each augmented image
        else:
            image_arrays.append(stacked_image)
            class_counts[label] += 1  # Count each original image
    
    # Convert the list to numpy array
    stacked_label_images = np.array(image_arrays)
    
    # Append the stacked images to the list of all stacked images
    stacked_images.append(stacked_label_images)
    
    # Create an array of labels corresponding to the stacked images
    stacked_label = np.full((stacked_label_images.shape[0],), label)
    stacked_labels.append(stacked_label)

# Concatenate all stacked images and labels to create a single dataset
dataset_images = np.concatenate(stacked_images, axis=0)
dataset_labels = np.concatenate(stacked_labels, axis=0)

# Print the shapes of the dataset
print("Dataset Images Shape:", dataset_images.shape)  # Expected Output: (1245, 128, 128, 12)
print("Dataset Labels Shape:", dataset_labels.shape)

# Print the number of images for each class
print("Number of images per class:")
for label, count in class_counts.items():
    print(f"{label}: {count}")


Dataset Images Shape: (1245, 128, 128, 12)
Dataset Labels Shape: (1245,)
Number of images per class:
Aphid: 290
Blast: 300
RPH: 495
Rust: 160


In [23]:
from sklearn.model_selection import train_test_split

train_images, val_images, train_labels, val_labels = train_test_split(dataset_images, dataset_labels, test_size=0.2, stratify=dataset_labels, random_state=42)

# Print the shapes of the training and validation sets
print("Training Images Shape:", train_images.shape)
print("Training Labels Shape:", train_labels.shape)
print("Validation Images Shape:", val_images.shape)
print("Validation Labels Shape:", val_labels.shape)


Training Images Shape: (996, 128, 128, 12)
Training Labels Shape: (996,)
Validation Images Shape: (249, 128, 128, 12)
Validation Labels Shape: (249,)


In [24]:
import numpy as np

# Count the occurrences of each class in train_labels and val_labels
train_class_counts = {class_name: np.sum(train_labels == class_name) for class_name in np.unique(train_labels)}
val_class_counts = {class_name: np.sum(val_labels == class_name) for class_name in np.unique(val_labels)}

# Print the class counts
print("Class counts in training set:")
for class_name, count in train_class_counts.items():
    print(f"{class_name}: {count}")

print("\nClass counts in validation set:")
for class_name, count in val_class_counts.items():
    print(f"{class_name}: {count}")


Class counts in training set:
Aphid: 232
Blast: 240
RPH: 396
Rust: 128

Class counts in validation set:
Aphid: 58
Blast: 60
RPH: 99
Rust: 32


In [25]:
class CustomDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

# Define transformations for data augmentation (if needed)
transform = transforms.Compose([
    transforms.ToTensor(),
    # Add more transformations as needed
])

# Create custom datasets
train_dataset = CustomDataset(train_images, train_labels, transform=transform)
val_dataset = CustomDataset(val_images, val_labels, transform=transform)

# Create DataLoader for training and validation
train_loader_new = DataLoader(train_dataset, batch_size=16, shuffle=True)
eval_loader_new = DataLoader(val_dataset, batch_size=16, shuffle=False)

# Define a dictionary to map class names to indices
class_to_index = {"Aphid": 0, "Rust": 1, "Blast": 2, "RPH": 3}

# Fine-tune the autoencoder for the classification task
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
autoencoder_pretext.to(device)

# Here, you would proceed with fine-tuning the autoencoder as shown in the previous code
# fine_tuned_autoencoder = fine_tune_model(autoencoder, train_loader, val_loader, num_classes=4, num_epochs=50, device=device)

AutoencoderWithClassifier(
  (encoder): Sequential(
    (0): Conv2d(12, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (3): ReLU()
    (4): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (5): ReLU()
  )
  (classifier): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1)
    (1): Linear(in_features=16384, out_features=256, bias=True)
    (2): ReLU()
    (3): Linear(in_features=256, out_features=4, bias=True)
  )
)

In [26]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import accuracy_score, f1_score
import torch.nn.functional as F

class FineTunedAutoencoder(nn.Module):
    def __init__(self, autoencoder, num_classes):
        super(FineTunedAutoencoder, self).__init__()
        self.encoder = autoencoder.encoder
        num_features = self._get_num_features(autoencoder.encoder, (12, 128, 128))
        self.fc = nn.Linear(num_features, num_classes)

    def forward(self, x):
        x = self.encoder(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x
    
    def _get_num_features(self, model, input_size):
        with torch.no_grad():
            output = model(torch.zeros(1, *input_size).to(next(model.parameters()).device))
            num_features = output.view(1, -1).size(1)
        return num_features

def fine_tune_model(autoencoder, train_loader, val_loader, num_classes, num_epochs=50, device='cuda'):
    model = FineTunedAutoencoder(autoencoder, num_classes)
    model.to(device)

    criterion = nn.CrossEntropyLoss(weight=calculate_class_weights(train_loader, num_classes).to(device))  # Adjusted
    optimizer = optim.Adam(model.parameters(), lr=1e-4)

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0

        for images, labels in train_loader:
            images = images.float().to(device)
            labels = [class_to_index[label_str] for label_str in labels]
            labels = torch.tensor(labels, dtype=torch.long).to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        val_acc, val_f1 = evaluate_model(model, val_loader, device)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(train_loader)}, Val Acc: {val_acc}, Val F1: {val_f1}")

    return model


def calculate_class_weights(data_loader, num_classes):
    class_counts = torch.zeros(num_classes)
    total_samples = 0

    for _, labels in data_loader:
        for label in labels:
            if isinstance(label, str):
                label = class_to_index[label]
            class_counts[label] += 1
            total_samples += 1

    class_weights = total_samples / (num_classes * class_counts)
    return class_weights


def evaluate_model(model, data_loader, device='cuda'):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for images, labels in data_loader:
            images = images.to(device)
            labels = [class_to_index[label_str] for label_str in labels]
            labels = torch.tensor(labels, dtype=torch.long).to(device)
            all_labels.extend(labels.cpu().numpy())

            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())

    # Map predictions back to class names
    pred_class_names = [list(class_to_index.keys())[index] for index in all_preds]
    true_class_names = [list(class_to_index.keys())[index] for index in all_labels]

    # Print the predictions and true labels for debugging
    print("Predictions:\n", pred_class_names)
    print("True Labels:\n", true_class_names)

    # Calculate accuracy and F1 score
    accuracy = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='weighted')
    return accuracy, f1

# Fine-tune the autoencoder for the classification task
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
autoencoder_pretext.to(device)

fine_tuned_autoencoder_pretext = fine_tune_model(autoencoder_pretext, train_loader_new, eval_loader_new, num_classes=4, num_epochs=50, device=device)


Predictions:
 ['Blast', 'Blast', 'Aphid', 'Aphid', 'Rust', 'RPH', 'Rust', 'RPH', 'Rust', 'Aphid', 'RPH', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'RPH', 'Rust', 'Rust', 'Rust', 'Rust', 'RPH', 'Rust', 'Aphid', 'Aphid', 'RPH', 'Blast', 'Rust', 'Rust', 'RPH', 'RPH', 'RPH', 'RPH', 'Rust', 'Rust', 'Aphid', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Rust', 'Aphid', 'Rust', 'Aphid', 'Rust', 'RPH', 'RPH', 'Aphid', 'RPH', 'Rust', 'Rust', 'RPH', 'Rust', 'RPH', 'Rust', 'Rust', 'Aphid', 'RPH', 'Blast', 'Rust', 'Rust', 'RPH', 'Aphid', 'Rust', 'RPH', 'Aphid', 'RPH', 'Rust', 'RPH', 'Rust', 'RPH', 'Rust', 'Rust', 'Aphid', 'Aphid', 'RPH', 'RPH', 'Rust', 'RPH', 'RPH', 'Rust', 'RPH', 'Rust', 'Rust', 'RPH', 'Rust', 'Rust', 'Rust', 'RPH', 'Rust', 'Blast', 'Rust', 'RPH', 'Rust', 'Rust', 'RPH', 'Rust', 'RPH', 'Rust', 'RPH', 'Rust', 'Rust', 'Rust', 'RPH', 'Rust', 'Aphid', 'Rust', 'RPH', 'Rust', 'Blast', 'Blast', 'Rust', 'Rust', 'Rust', 'Rust', 'RPH', 'Rust', 'Rust', 'Rust', 'Aphid', 'RPH', 'Rust', 'Aphid

In [27]:
# Evaluate the fine-tuned model
accuracy, f1 = evaluate_model(fine_tuned_autoencoder_pretext, eval_loader_new, device=device)
print(f"Final Evaluation - Accuracy: {accuracy}, F1 Score: {f1}")

Predictions:
 ['Blast', 'Blast', 'Aphid', 'Aphid', 'Blast', 'Aphid', 'Aphid', 'Rust', 'Rust', 'Blast', 'Blast', 'RPH', 'RPH', 'Aphid', 'Blast', 'RPH', 'Rust', 'Blast', 'Rust', 'RPH', 'RPH', 'RPH', 'RPH', 'Blast', 'Blast', 'Aphid', 'Aphid', 'RPH', 'Rust', 'RPH', 'RPH', 'Rust', 'Blast', 'Aphid', 'RPH', 'RPH', 'RPH', 'Blast', 'Rust', 'Aphid', 'Aphid', 'Blast', 'Aphid', 'RPH', 'Blast', 'Aphid', 'Aphid', 'RPH', 'RPH', 'Blast', 'RPH', 'Rust', 'RPH', 'RPH', 'Blast', 'Blast', 'RPH', 'Blast', 'RPH', 'RPH', 'Blast', 'Rust', 'Aphid', 'RPH', 'Aphid', 'RPH', 'Aphid', 'Blast', 'Blast', 'Aphid', 'Blast', 'RPH', 'RPH', 'Blast', 'Rust', 'Blast', 'Rust', 'RPH', 'RPH', 'RPH', 'RPH', 'RPH', 'Blast', 'RPH', 'RPH', 'RPH', 'RPH', 'Blast', 'RPH', 'RPH', 'RPH', 'Blast', 'Aphid', 'RPH', 'RPH', 'Aphid', 'Aphid', 'Aphid', 'Rust', 'RPH', 'Aphid', 'RPH', 'Blast', 'Rust', 'Rust', 'Blast', 'RPH', 'Aphid', 'RPH', 'RPH', 'Rust', 'Blast', 'RPH', 'RPH', 'Blast', 'Aphid', 'RPH', 'RPH', 'RPH', 'Rust', 'Blast', 'Aphid', 'RP

**Evaluation Dataset**

In [28]:
import os
import numpy as np
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import pandas as pd

# Define a custom dataset class for the evaluation dataset
class EvaluationDataset(Dataset):
    def __init__(self, images, transform=None):
        self.images = images
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        if self.transform:
            image = self.transform(image)
        return image

# Define a function to stack and resize evaluation images
def stack_and_resize_evaluation_images(root_dir, target_shape):
    evaluation_dir = os.path.join(root_dir, "evaluation")
    evaluation_images = []
    image_ids = []

    for subdir in os.listdir(evaluation_dir):
        subdir_path = os.path.join(evaluation_dir, subdir)
        band_images = []

        for file in os.listdir(subdir_path):
            if file.endswith(".tif"):
                image = Image.open(os.path.join(subdir_path, file))
                resized_image = image.resize(target_shape)
                band_images.append(np.array(resized_image).astype(np.uint8))

        stacked_image = np.stack(band_images, axis=-1)
        evaluation_images.append(stacked_image)
        image_ids.append(subdir)

    return np.array(evaluation_images), image_ids

# Stack and resize evaluation images
root_dir = '/kaggle/input/beyond-visible-spectrum-ai-for-agriculture-2024p2/ICPR02/kaggle'
target_shape = (128, 128)
eval_images_stacked, eval_image_ids = stack_and_resize_evaluation_images(root_dir, target_shape)
print("Stacked and Resized Evaluation Images Shape:", eval_images_stacked.shape)

# Define transformations for the evaluation dataset
eval_transform = transforms.Compose([
    transforms.ToTensor(),
    # Add more transformations as needed
])

# Define a function to get predictions for the evaluation dataset
def get_predictions(model, data_loader, device='cuda'):
    model.eval()
    all_preds = []

    with torch.no_grad():
        for images in data_loader:
            images = images.to(device)
            outputs = model(images)
            if isinstance(outputs, tuple):
                outputs = outputs[0]  # Only use the main output
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())

    return all_preds

# Create an evaluation dataset
eval_dataset = EvaluationDataset(eval_images_stacked, transform=eval_transform)

# Create a DataLoader for the evaluation dataset
eval_loader = DataLoader(eval_dataset, batch_size=16, shuffle=False)

# Assuming class_to_index is a dictionary mapping class names to indices
class_to_index = {'Aphid': 0, 'Blast': 1, 'RPH': 2, 'Rust': 3}

# Assuming you have a trained fine-tuned autoencoder model loaded as `fine_tuned_autoencoder`
fine_tuned_autoencoder_pretext = fine_tuned_autoencoder_pretext.to(device)

# Evaluate the fine-tuned model on the evaluation dataset
eval_predictions = get_predictions(fine_tuned_autoencoder_pretext, eval_loader, device=device)

# Map predictions back to class names
pred_class_names = []
for pred in eval_predictions:
    # Clip predicted indices to ensure they fall within the valid range
    pred = max(0, min(pred, len(class_to_index) - 1))
    pred_class_names.append(list(class_to_index.keys())[pred])

# Create a DataFrame for submission
submission_df = pd.DataFrame({
    'Id': eval_image_ids,
    'Category': pred_class_names
})

# Print the submission DataFrame
print(submission_df)

# Save the DataFrame to a CSV file
submission_file_path = '/kaggle/working/submission.csv'  # Change this path if needed
submission_df.to_csv(submission_file_path, index=False)


Stacked and Resized Evaluation Images Shape: (40, 128, 128, 12)
                                  Id Category
0   994b5409c8e946538d87109a99897659    Blast
1   1a419acc1ecc467897d5477a47353fa8      RPH
2   8662df21b2c94788adce4a885ae2b4dc    Blast
3   a564868c3d8c4d4fabde67a536f178ad     Rust
4   796e611aaf8a4f0db57cb79be058f3ae    Blast
5   e77d3a0965fe46d9b3275a7d7f34dbe2      RPH
6   a39dcd0a21824289bb38b40ddf98da89     Rust
7   e427f07618794fd58dfc9e6c786e3743    Aphid
8   13739e32e7a84f669e6ef1284715e93b     Rust
9   b6eeb2bfd281476883fc273b61133e60     Rust
10  c283cbe9d0ae46aaa9adf354b714f68f    Aphid
11  2fb5f497ae1b4b1eb7e8d7ced143aa46      RPH
12  8a9d2c25f8f44309ac7d2318ba5f2d1d    Aphid
13  fe481b0935fd4043b964c287a76321c2    Aphid
14  5bf370118f3043f1bbeafbb91bd78f32      RPH
15  e66d89ba472d42cf91508a2182553b60    Aphid
16  e3a69009935d4e93b93a72554fb8a51e    Aphid
17  4da1b698cdad4c8db6c1716a51a56bd4     Rust
18  05835a9764364429b5ac3e11b052649d     Rust
19  7f4ecf086b6b