Direct mean update : means were updated directly in an online manner without the use of bayesian statistics


In [None]:
import torch
import torch.nn as nn
import numpy as np

# Classifier using direct class mean update
class MeanUpdateClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(MeanUpdateClassifier, self).__init__()
        self.num_classes = num_classes
        self.input_dim = input_dim
        self.class_means = nn.Parameter(torch.zeros(num_classes, input_dim), requires_grad=False)

    def update_means(self, features, labels):
        # Compute mean feature vector for each class
        for c in range(self.num_classes):
            class_features = features[labels == c]
            if len(class_features) > 0:
                # Update class mean by averaging the features of this class
                self.class_means.data[c] = class_features.mean(dim=0)

    def forward(self, features):
        # Normalize the features and class means (optional step)
        normalized_features = features / (torch.norm(features, dim=1, keepdim=True) + 1e-6)
        normalized_class_means = self.class_means / (torch.norm(self.class_means, dim=1, keepdim=True) + 1e-6)

        # Compute cosine similarity between input features and class means
        cosine_similarity = torch.mm(normalized_features, normalized_class_means.T)
        return cosine_similarity

# Main Functionality
def train_and_evaluate():
    train_data_path = '/content/drive/MyDrive/ML/dataset/dataset/part_one_dataset/train_data'
    eval_data_path = '/content/drive/MyDrive/ML/dataset/dataset/part_one_dataset/eval_data'

    transform = transforms.Compose([
        transforms.Resize((224, 224)),  # Resize for ViT input
        transforms.ToTensor(),
        transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])  # Normalize for ViT
    ])

    batch_size = 64
    confidence_threshold = 0.8
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Initialize Feature Extractor
    feature_extractor = ViTFeatureExtractorModel(pretrained=True).to(device)

    accuracy_matrix = np.zeros((10, 10))

    for i in range(1, 11):
        print(f"\n=== Processing Dataset D{i} ===")

        train_dataset = CustomCIFARDataset(
            os.path.join(train_data_path, f'{i}_train_data.tar.pth'),
            transform=transform,
            labeled=(i == 1)
        )
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

        features_list, labels_list = [], []
        for batch in train_loader:
            if i == 1:  # D1 is labeled
                images, labels = batch
                labels_list.append(labels.to(device))
            else:  # D2 onward
                images = batch

            images = images.to(device)
            features = feature_extractor(images)  # Extract features
            features_list.append(features)

        features = torch.cat(features_list).to(device)  # Combine all features
        if labels_list:
            labels = torch.cat(labels_list).to(device)
            num_classes = len(torch.unique(labels))
        else:
            labels = None
            num_classes = 10  # Default for unlabeled datasets

        # Initialize Classifier for D1
        if i == 1:
            classifier = MeanUpdateClassifier(input_dim=feature_extractor.feature_dim, num_classes=num_classes).to(device)

        if labels is not None:  # D1: Update class means using true labels
            classifier.update_means(features, labels)
        else:  # D2 onward: Pseudo-labeling and class mean update
            with torch.no_grad():
                logits = classifier(features)  # Predict pseudo-labels
                predictions = logits.argmax(dim=1)
                confidences = logits.max(dim=1).values  # Confidence scores

            # Filter samples based on confidence threshold
            high_confidence_indices = confidences > confidence_threshold
            high_confidence_features = features[high_confidence_indices]
            high_confidence_labels = predictions[high_confidence_indices]

            # Update class means using high-confidence pseudo-labeled samples
            if len(high_confidence_features) > 0:
                classifier.update_means(high_confidence_features, high_confidence_labels)

        print(f"\nEvaluating model f{i} on held-out datasets D̂1 to D̂{i}")
        for j in range(1, i + 1):
            eval_dataset = CustomCIFARDataset(
                os.path.join(eval_data_path, f'{j}_eval_data.tar.pth'),
                transform=transform,
                labeled=True
            )
            eval_loader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=False)

            correct, total = 0, 0
            with torch.no_grad():
                for images, labels in eval_loader:
                    images, labels = images.to(device), labels.to(device)
                    features = feature_extractor(images)
                    logits = classifier(features)
                    predictions = logits.argmax(dim=1)
                    correct += (predictions == labels).sum().item()
                    total += labels.size(0)

            accuracy = 100 * correct / total
            accuracy_matrix[i - 1, j - 1] = accuracy
            print(f"Accuracy of model f{i} on D̂{j}: {accuracy:.2f}%")

    print("\nAccuracy Matrix:")
    for row in accuracy_matrix:
        print(" ".join(f"{val:6.2f}" for val in row))

# Run
if __name__ == "__main__":
    train_and_evaluate()


bayesian approach: we use a previous class means to determine labels for the current dataset and then as per the confidence levels of the label predicted, we decide whether to udpate means using it or not

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
import os
import numpy as np

# Dataset Class
class CustomCIFARDataset(Dataset):
    def __init__(self, file_path, transform=None, labeled=True):
        data_dict = torch.load(file_path)
        self.images = data_dict['data']
        self.labels = data_dict['targets'] if labeled else None
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        image = Image.fromarray(image)  # Convert to PIL Image
        if self.transform:
            image = self.transform(image)

        if self.labels is not None:
            return image, self.labels[idx]
        else:
            return image

# Feature Extractor
class FeatureExtractor(nn.Module):
    def __init__(self, pretrained=True):
        super(FeatureExtractor, self).__init__()
        base_model = models.resnet18(pretrained=pretrained)
        self.features = nn.Sequential(*list(base_model.children())[:-1])  # Remove the FC layer

    def forward(self, x):
        x = self.features(x)
        return x.view(x.size(0), -1)  # Flatten the feature map

# Bayesian Classifier
class BayesianClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(BayesianClassifier, self).__init__()
        self.num_classes = num_classes
        self.input_dim = input_dim
        self.prior_mean = torch.zeros(num_classes, input_dim)  # Zero mean
        self.prior_cov = torch.eye(input_dim).repeat(num_classes, 1, 1)  # Identity covariance

    def update_posteriors(self, features, labels):
        for c in range(self.num_classes):
            class_features = features[labels == c]
            if len(class_features) > 0:
                n_c = len(class_features)
                sample_mean = class_features.mean(dim=0)

                prior_cov_inv = torch.linalg.inv(self.prior_cov[c])
                likelihood_cov_inv = torch.eye(self.input_dim, device=features.device) * n_c

                posterior_cov = torch.linalg.inv(prior_cov_inv + likelihood_cov_inv)
                posterior_mean = posterior_cov @ (
                    prior_cov_inv @ self.prior_mean[c] + likelihood_cov_inv @ sample_mean
                )

                self.prior_mean[c] = posterior_mean
                self.prior_cov[c] = posterior_cov

    def forward(self, features):
        normalized_features = features / (torch.norm(features, dim=1, keepdim=True) + 1e-6)
        normalized_prototypes = self.prior_mean / (torch.norm(self.prior_mean, dim=1, keepdim=True) + 1e-6)

        cosine_similarity = torch.mm(normalized_features, normalized_prototypes.T)
        return cosine_similarity

def train_and_evaluate():
    train_data_path = '/content/drive/MyDrive/ML/dataset/dataset/part_one_dataset/train_data'
    eval_data_path = '/content/drive/MyDrive/ML/dataset/dataset/part_one_dataset/eval_data'

    transform = transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.RandomCrop(32, padding=4),
        transforms.Resize((32, 32)),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])

    input_dim = 512
    batch_size = 64
    confidence_threshold = 0.75  # Confidence threshold for pseudo-labeling
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    feature_extractor = FeatureExtractor().to(device)

    accuracy_matrix = np.zeros((10, 10))

    for i in range(1, 11):
        print(f"\n=== Processing Dataset D{i} ===")

        # Load the dataset
        train_dataset = CustomCIFARDataset(
            os.path.join(train_data_path, f'{i}_train_data.tar.pth'),
            transform=transform,
            labeled=(i == 1)  # Only D1 is labeled
        )
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

        features_list, labels_list = [], []
        for batch in train_loader:
            if i == 1:  # D1 is labeled
                images, labels = batch
                labels_list.append(labels)
            else:  # D2 onward
                images = batch

            images = images.to(device)
            features = feature_extractor(images)
            features_list.append(features.cpu())

        # Combine all features and labels
        features = torch.cat(features_list).to(device)
        if labels_list:  # Only for D1
            labels = torch.cat(labels_list).to(device)
            num_classes = len(torch.unique(labels))  # Dynamically determine num_classes
        else:
            labels = None
            num_classes = 10  # Default for unlabeled datasets

        # Initialize the classifier on D1
        if i == 1:
            classifier = BayesianClassifier(input_dim=input_dim, num_classes=num_classes).to(device)

        if labels is not None:  # D1: Update posterior using true labels
            classifier.update_posteriors(features, labels)
        else:  # D2 onward: Pseudo-labeling and posterior update
            with torch.no_grad():
                logits = classifier(features)  # Predict using current means
                predictions = logits.argmax(dim=1)  # Pseudo-labels
                confidences = logits.max(dim=1).values  # Confidence scores

            # Filter samples based on confidence threshold
            high_confidence_indices = confidences > confidence_threshold
            high_confidence_features = features[high_confidence_indices]
            high_confidence_labels = predictions[high_confidence_indices]

            # Update posterior using pseudo-labeled high-confidence samples
            if len(high_confidence_features) > 0:
                classifier.update_posteriors(high_confidence_features, high_confidence_labels)

        print(f"\nEvaluating model f{i} on held-out datasets D̂1 to D̂{i}")
        for j in range(1, i + 1):
            eval_dataset = CustomCIFARDataset(
                os.path.join(eval_data_path, f'{j}_eval_data.tar.pth'),
                transform=transform,
                labeled=True
            )
            eval_loader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=False)

            correct, total = 0, 0
            with torch.no_grad():
                for images, labels in eval_loader:
                    images, labels = images.to(device), labels.to(device)
                    features = feature_extractor(images)
                    logits = classifier(features)
                    predictions = logits.argmax(dim=1)
                    correct += (predictions == labels).sum().item()
                    total += labels.size(0)

            accuracy = 100 * correct / total
            accuracy_matrix[i - 1, j - 1] = accuracy
            print(f"Accuracy of model f{i} on D̂{j}: {accuracy:.2f}%")

    print("\nAccuracy Matrix:")
    for row in accuracy_matrix:
        print(" ".join(f"{val:6.2f}" for val in row))

# Run
if __name__ == "__main__":
    train_and_evaluate()


Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 64.8MB/s]
  data_dict = torch.load(file_path)



=== Processing Dataset D1 ===

Evaluating model f1 on held-out datasets D̂1 to D̂1
Accuracy of model f1 on D̂1: 32.72%

=== Processing Dataset D2 ===

Evaluating model f2 on held-out datasets D̂1 to D̂2
Accuracy of model f2 on D̂1: 31.28%
Accuracy of model f2 on D̂2: 30.40%

=== Processing Dataset D3 ===

Evaluating model f3 on held-out datasets D̂1 to D̂3
Accuracy of model f3 on D̂1: 30.88%
Accuracy of model f3 on D̂2: 30.72%
Accuracy of model f3 on D̂3: 31.48%

=== Processing Dataset D4 ===

Evaluating model f4 on held-out datasets D̂1 to D̂4
Accuracy of model f4 on D̂1: 29.24%
Accuracy of model f4 on D̂2: 30.28%
Accuracy of model f4 on D̂3: 31.36%
Accuracy of model f4 on D̂4: 30.12%

=== Processing Dataset D5 ===

Evaluating model f5 on held-out datasets D̂1 to D̂5
Accuracy of model f5 on D̂1: 29.40%
Accuracy of model f5 on D̂2: 29.80%
Accuracy of model f5 on D̂3: 30.48%
Accuracy of model f5 on D̂4: 30.64%
Accuracy of model f5 on D̂5: 29.76%

=== Processing Dataset D6 ===

Evaluati

KeyboardInterrupt: 

using same model as above just with a efficient-net as feature extractor

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from torchvision.models import efficientnet_b0
from PIL import Image
import os
import numpy as np
from sklearn.decomposition import PCA

# Dataset Class
class CustomCIFARDataset(Dataset):
    def __init__(self, file_path, transform=None, labeled=True):
        try:
            data_dict = torch.load(file_path, weights_only=False)
        except TypeError:
            print("Warning: Falling back to weights_only=False. Ensure the file is trusted.")
            data_dict = torch.load(file_path)

        self.images = data_dict['data']
        self.labels = data_dict['targets'] if labeled else None
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        image = Image.fromarray(image)  # Convert to PIL Image
        if self.transform:
            image = self.transform(image)

        if self.labels is not None:
            return image, self.labels[idx]
        else:
            return image

# Feature Extractor using EfficientNet
class EfficientNetFeatureExtractor(nn.Module):
    def __init__(self, pretrained=True):
        super(EfficientNetFeatureExtractor, self).__init__()
        base_model = efficientnet_b0(pretrained=pretrained)
        self.features = nn.Sequential(*list(base_model.children())[:-2])  # Remove the classifier head

        # Freeze the weights of the feature extractor
        for param in self.features.parameters():
            param.requires_grad = False

    def forward(self, x):
        x = self.features(x)  # Extract features
        return x.view(x.size(0), -1)  # Flatten the feature map

class BayesianClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(BayesianClassifier, self).__init__()
        self.num_classes = num_classes
        self.input_dim = input_dim
        self.prior_mean = nn.Parameter(torch.zeros(num_classes, input_dim, dtype=torch.float32), requires_grad=False)  # Specify dtype as float32
        self.prior_cov = nn.Parameter(torch.eye(input_dim, dtype=torch.float32).repeat(num_classes, 1, 1), requires_grad=False)  # Specify dtype as float32

    def update_posteriors(self, features, labels):
        # Ensure features is float32
        features = features.type(torch.float32)
        for c in range(self.num_classes):
            class_features = features[labels == c]
            if len(class_features) > 0:
                n_c = len(class_features)
                sample_mean = class_features.mean(dim=0)

                # Ensure prior_cov_inv is float32
                prior_cov_inv = torch.linalg.inv(self.prior_cov[c].type(torch.float32))
                likelihood_cov_inv = torch.eye(self.input_dim, device=features.device, dtype=torch.float32) * n_c # Specify dtype as float32

                posterior_cov = torch.linalg.inv(prior_cov_inv + likelihood_cov_inv)
                posterior_mean = posterior_cov @ (
                    prior_cov_inv @ self.prior_mean[c] + likelihood_cov_inv @ sample_mean
                )

                self.prior_mean.data[c] = posterior_mean
                self.prior_cov.data[c] = posterior_cov

    def forward(self, features):
        # Convert features to float32
        features = features.type(torch.float32)
        normalized_features = features / (torch.norm(features, dim=1, keepdim=True) + 1e-6)
        normalized_prototypes = self.prior_mean / (torch.norm(self.prior_mean, dim=1, keepdim=True) + 1e-6)

        cosine_similarity = torch.mm(normalized_features, normalized_prototypes.T)
        return cosine_similarity

# Main Functionality
def train_and_evaluate():
    train_data_path = '/content/drive/MyDrive/ML/dataset/dataset/part_one_dataset/train_data'
    eval_data_path = '/content/drive/MyDrive/ML/dataset/dataset/part_one_dataset/eval_data'

    transform = transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.RandomCrop(32, padding=4),
        transforms.Resize((224, 224)),  # Resize for EfficientNet input
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # Normalize for EfficientNet
    ])

    batch_size = 64
    reduced_dim = 128  # Target PCA dimensions
    confidence_threshold = 0.8
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Initialize Feature Extractor
    feature_extractor = EfficientNetFeatureExtractor(pretrained=True).to(device)
    pca = None  # PCA will be fitted on the first dataset (D1)

    accuracy_matrix = np.zeros((10, 10))

    for i in range(1, 11):
        print(f"\n=== Processing Dataset D{i} ===")

        train_dataset = CustomCIFARDataset(
            os.path.join(train_data_path, f'{i}_train_data.tar.pth'),
            transform=transform,
            labeled=(i == 1)
        )
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

        features_list, labels_list = [], []
        for batch in train_loader:
            if i == 1:  # D1 is labeled
                images, labels = batch
                labels_list.append(labels.to(device))
            else:  # D2 onward
                images = batch

            images = images.to(device)
            features = feature_extractor(images)  # Extract high-dimensional features
            features_list.append(features)

        features = torch.cat(features_list).to(device)  # Combine all features
        if labels_list:
            labels = torch.cat(labels_list).to(device)
            num_classes = len(torch.unique(labels))
        else:
            labels = None
            num_classes = 10  # Default for unlabeled datasets

        # Apply PCA to reduce dimensions
        if i == 1:
            print("Fitting PCA on Dataset D1...")
            pca = PCA(n_components=reduced_dim)
            features = torch.tensor(pca.fit_transform(features.cpu())).to(device)
        else:
            print(f"Transforming Dataset D{i} with PCA...")
            features = torch.tensor(pca.transform(features.cpu())).to(device)

        # Initialize Classifier for D1
        if i == 1:
            classifier = BayesianClassifier(input_dim=reduced_dim, num_classes=num_classes).to(device)

        if labels is not None:  # D1: Update posterior using true labels
            classifier.update_posteriors(features, labels)
        else:  # D2 onward: Pseudo-labeling and posterior update
            with torch.no_grad():
                logits = classifier(features)  # Predict pseudo-labels
                predictions = logits.argmax(dim=1)
                confidences = logits.max(dim=1).values  # Confidence scores

            # Filter samples based on confidence threshold
            high_confidence_indices = confidences > confidence_threshold
            high_confidence_features = features[high_confidence_indices]
            high_confidence_labels = predictions[high_confidence_indices]

            # Update posterior using high-confidence pseudo-labeled samples
            if len(high_confidence_features) > 0:
                classifier.update_posteriors(high_confidence_features, high_confidence_labels)

        print(f"\nEvaluating model f{i} on held-out datasets D̂1 to D̂{i}")
        for j in range(1, i + 1):
            eval_dataset = CustomCIFARDataset(
                os.path.join(eval_data_path, f'{j}_eval_data.tar.pth'),
                transform=transform,
                labeled=True
            )
            eval_loader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=False)

            correct, total = 0, 0
            with torch.no_grad():
                for images, labels in eval_loader:
                    images, labels = images.to(device), labels.to(device)
                    features = feature_extractor(images)
                    features = torch.tensor(pca.transform(features.cpu())).to(device)  # Apply PCA
                    logits = classifier(features)
                    predictions = logits.argmax(dim=1)
                    correct += (predictions == labels).sum().item()
                    total += labels.size(0)

            accuracy = 100 * correct / total
            accuracy_matrix[i - 1, j - 1] = accuracy
            print(f"Accuracy of model f{i} on D̂{j}: {accuracy:.2f}%")

    print("\nAccuracy Matrix:")
    for row in accuracy_matrix:
        print(" ".join(f"{val:6.2f}" for val in row))

# Run
if __name__ == "__main__":
    train_and_evaluate()





=== Processing Dataset D1 ===
Fitting PCA on Dataset D1...

Evaluating model f1 on held-out datasets D̂1 to D̂1
Accuracy of model f1 on D̂1: 56.80%

=== Processing Dataset D2 ===
Transforming Dataset D2 with PCA...

Evaluating model f2 on held-out datasets D̂1 to D̂2
Accuracy of model f2 on D̂1: 59.04%
Accuracy of model f2 on D̂2: 55.36%

=== Processing Dataset D3 ===
Transforming Dataset D3 with PCA...

Evaluating model f3 on held-out datasets D̂1 to D̂3
Accuracy of model f3 on D̂1: 57.56%
Accuracy of model f3 on D̂2: 57.32%
Accuracy of model f3 on D̂3: 56.32%

=== Processing Dataset D4 ===
Transforming Dataset D4 with PCA...

Evaluating model f4 on held-out datasets D̂1 to D̂4
Accuracy of model f4 on D̂1: 57.76%
Accuracy of model f4 on D̂2: 56.24%
Accuracy of model f4 on D̂3: 55.60%
Accuracy of model f4 on D̂4: 56.68%

=== Processing Dataset D5 ===
Transforming Dataset D5 with PCA...

Evaluating model f5 on held-out datasets D̂1 to D̂5
Accuracy of model f5 on D̂1: 56.92%
Accuracy of

Using a kernalized PCA(gaussian PCA) for dimension reduction of the features obtained from the feature extractor

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from torchvision.models import efficientnet_b0
from PIL import Image
import os
import numpy as np
from sklearn.decomposition import KernelPCA  # Import KernelPCA for dimensionality reduction with a Gaussian kernel
import matplotlib.pyplot as plt  # For visualizing the accuracy matrix

# Custom dataset class for CIFAR-like data
class CustomCIFARDataset(Dataset):
    def __init__(self, file_path, transform=None, labeled=True):
        # Load data from the specified file
        try:
            data_dict = torch.load(file_path)
        except Exception as e:
            print(f"Error loading file {file_path}: {e}")
            raise e

        self.images = data_dict['data']  # Image data
        self.labels = data_dict['targets'] if labeled else None  # Labels if dataset is labeled
        self.transform = transform  # Transformations to apply to images

    def __len__(self):
        # Return the total number of samples in the dataset
        return len(self.images)

    def __getitem__(self, idx):
        # Retrieve an image and optionally its label
        image = self.images[idx]
        image = Image.fromarray(image)  # Convert numpy array to PIL Image
        if self.transform:
            image = self.transform(image)  # Apply transformations

        if self.labels is not None:
            return image, self.labels[idx]  # Return image and label
        else:
            return image  # For unlabeled data, return only the image

# Feature extractor based on EfficientNet
class EfficientNetFeatureExtractor(nn.Module):
    def __init__(self, pretrained=True):
        super(EfficientNetFeatureExtractor, self).__init__()
        # Load EfficientNet model and remove the classification head
        base_model = efficientnet_b0(pretrained=pretrained)
        self.features = nn.Sequential(*list(base_model.children())[:-1])

        # Freeze the weights of the feature extractor
        for param in self.features.parameters():
            param.requires_grad = False

    def forward(self, x):
        # Extract features and flatten the output
        x = self.features(x)
        x = torch.flatten(x, 1)
        return x

# Bayesian classifier for updating posteriors
class BayesianClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(BayesianClassifier, self).__init__()
        self.num_classes = num_classes  # Number of classes
        self.input_dim = input_dim  # Dimensionality of the feature space

        # Initialize prior means and covariances
        self.prior_mean = nn.Parameter(torch.zeros(num_classes, input_dim, dtype=torch.float32), requires_grad=False)
        self.prior_cov = nn.Parameter(torch.eye(input_dim, dtype=torch.float32).unsqueeze(0).repeat(num_classes, 1, 1), requires_grad=False)

    def update_posteriors(self, features, labels):
        # Update posterior mean and covariance for each class based on features
        features = features.float()
        for c in range(self.num_classes):
            class_features = features[labels == c]  # Features of the current class
            if len(class_features) > 0:
                n_c = len(class_features)
                sample_mean = class_features.mean(dim=0)

                prior_cov_inv = torch.linalg.inv(self.prior_cov[c].float())
                likelihood_cov_inv = torch.eye(self.input_dim, device=features.device, dtype=torch.float32) * n_c

                posterior_cov = torch.linalg.inv(prior_cov_inv + likelihood_cov_inv)
                posterior_mean = posterior_cov @ (prior_cov_inv @ self.prior_mean[c] + likelihood_cov_inv @ sample_mean)

                # Update posterior mean and covariance
                self.prior_mean.data[c] = posterior_mean
                self.prior_cov.data[c] = posterior_cov

    def forward(self, features):
        # Compute class-wise cosine similarity for classification
        features = features.float()
        normalized_features = features / (torch.norm(features, dim=1, keepdim=True) + 1e-6)
        normalized_prototypes = self.prior_mean / (torch.norm(self.prior_mean, dim=1, keepdim=True) + 1e-6)

        # Cosine similarity between features and class prototypes
        cosine_similarity = torch.mm(normalized_features, normalized_prototypes.T)
        return cosine_similarity

# Main training and evaluation function
def train_and_evaluate():
    # Paths for training and evaluation datasets
    train_data_path = '/content/drive/MyDrive/CS771 A-2 /dataset/dataset/part_one_dataset/train_data'
    eval_data_path = '/content/drive/MyDrive/CS771 A-2 /dataset/dataset/part_one_dataset/eval_data'

    # Data transformations
    transform = transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.RandomCrop(32, padding=4),
        transforms.Resize((224, 224)),  # Resize for EfficientNet input size
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # Normalization
    ])

    # Hyperparameters
    batch_size = 64
    reduced_dim = 128  # Dimensionality for Kernel PCA
    confidence_threshold = 0.8
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Initialize feature extractor
    feature_extractor = EfficientNetFeatureExtractor(pretrained=True).to(device)
    kernel_pca = None  # Kernel PCA object
    accuracy_matrix = np.zeros((10, 10))  # Store accuracies across datasets

    classifier = None  # Bayesian classifier object

    for i in range(1, 11):
        print(f"\n=== Processing Dataset D{i} ===")

        # Load training dataset
        train_dataset = CustomCIFARDataset(
            os.path.join(train_data_path, f'{i}_train_data.tar.pth'),
            transform=transform,
            labeled=(i == 1)  # Only Dataset D1 is labeled
        )
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

        # Extract features and labels
        features_list, labels_list = [], []
        for batch in train_loader:
            if i == 1:  # D1 is labeled
                images, labels = batch
                labels_list.append(labels.to(device))
            else:
                images = batch

            images = images.to(device)
            features = feature_extractor(images)  # Extract features
            features_list.append(features)

        features = torch.cat(features_list).to(device)
        if labels_list:
            labels = torch.cat(labels_list).to(device)
            num_classes = len(torch.unique(labels))
        else:
            labels = None
            num_classes = 10

        # Apply Gaussian Kernel PCA
        if i == 1:
            print("Fitting Gaussian Kernel PCA on Dataset D1...")
            kernel_pca = KernelPCA(n_components=reduced_dim, kernel='rbf', gamma=0.1)
            features_np = features.cpu().numpy()
            features_pca = kernel_pca.fit_transform(features_np)
            features = torch.from_numpy(features_pca).float().to(device)
        else:
            print(f"Transforming Dataset D{i} with Gaussian Kernel PCA...")
            features_np = features.cpu().numpy()
            features_pca = kernel_pca.transform(features_np)
            features = torch.from_numpy(features_pca).float().to(device)

        # Initialize Bayesian classifier for D1
        if i == 1:
            classifier = BayesianClassifier(input_dim=reduced_dim, num_classes=num_classes).to(device)

        if labels is not None:  # D1: Update posterior using true labels
            classifier.update_posteriors(features, labels)
        else:  # D2 onward: Pseudo-labeling
            with torch.no_grad():
                logits = classifier(features)
                predictions = logits.argmax(dim=1)
                confidences = logits.max(dim=1).values

            high_conf_indices = confidences > confidence_threshold
            high_conf_features = features[high_conf_indices]
            high_conf_labels = predictions[high_conf_indices]

            if len(high_conf_features) > 0:
                classifier.update_posteriors(high_conf_features, high_conf_labels)

        # Evaluate the model
        print(f"\nEvaluating model f{i} on held-out datasets D̂1 to D̂{i}")
        for j in range(1, i + 1):
            eval_dataset = CustomCIFARDataset(
                os.path.join(eval_data_path, f'{j}_eval_data.tar.pth'),
                transform=transform,
                labeled=True
            )
            eval_loader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=False)

            correct, total = 0, 0
            with torch.no_grad():
                for eval_batch in eval_loader:
                    eval_images, eval_labels = eval_batch
                    eval_images, eval_labels = eval_images.to(device), eval_labels.to(device)

                    eval_features = feature_extractor(eval_images)
                    eval_features_np = eval_features.cpu().numpy()
                    eval_features_pca = kernel_pca.transform(eval_features_np)
                    eval_features_pca = torch.from_numpy(eval_features_pca).float().to(device)
                    logits = classifier(eval_features_pca)
                    predictions = logits.argmax(dim=1)
                    correct += (predictions == eval_labels).sum().item()
                    total += eval_labels.size(0)

            accuracy = 100 * correct / total
            accuracy_matrix[i - 1][j - 1] = accuracy
            print(f"Accuracy on D̂{j}: {accuracy:.2f}%")

    print("\nFinal Accuracy Matrix:")
    print(accuracy_matrix)

    plt.imshow(accuracy_matrix, cmap='viridis', interpolation='nearest')
    plt.colorbar()
    plt.title("Accuracy Matrix")
    plt.xlabel("Datasets D̂j")
    plt.ylabel("Datasets Di")
    plt.show()

# Entry point
if __name__ == '__main__':
    train_and_evaluate()


Using polynimical kernel PCA

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from torchvision.models import efficientnet_b0
from PIL import Image
import os
import numpy as np
from sklearn.decomposition import KernelPCA

# Custom dataset class to handle CIFAR-like dataset loading and transformations
class CustomCIFARDataset(Dataset):
    def __init__(self, file_path, transform=None, labeled=True):
        """
        Initialize the dataset with file path and transformations.
        :param file_path: Path to the dataset file.
        :param transform: Transformations to be applied on the images.
        :param labeled: Indicates if the dataset contains labels.
        """
        try:
            data_dict = torch.load(file_path, weights_only=False)
        except TypeError:
            print("Warning: Falling back to weights_only=False. Ensure the file is trusted.")
            data_dict = torch.load(file_path)

        # Load images and optionally labels
        self.images = data_dict['data']
        self.labels = data_dict['targets'] if labeled else None
        self.transform = transform

    def __len__(self):
        """Return the total number of images in the dataset."""
        return len(self.images)

    def __getitem__(self, idx):
        """
        Retrieve an image (and its label if available) by index.
        :param idx: Index of the desired sample.
        :return: Transformed image and label (if available).
        """
        image = self.images[idx]
        image = Image.fromarray(image)  # Convert numpy array to PIL Image
        if self.transform:
            image = self.transform(image)

        if self.labels is not None:
            return image, self.labels[idx]
        else:
            return image

# Feature extractor using pre-trained EfficientNet model
class EfficientNetFeatureExtractor(nn.Module):
    def __init__(self, pretrained=True):
        """
        Initialize EfficientNet-based feature extractor.
        :param pretrained: Use pre-trained weights if True.
        """
        super(EfficientNetFeatureExtractor, self).__init__()
        base_model = efficientnet_b0(pretrained=pretrained)
        # Extract all layers except the classification head
        self.features = nn.Sequential(*list(base_model.children())[:-2])

        # Freeze the feature extractor layers to avoid training them
        for param in self.features.parameters():
            param.requires_grad = False

    def forward(self, x):
        """
        Forward pass through the feature extractor.
        :param x: Input images.
        :return: Flattened feature maps.
        """
        x = self.features(x)  # Extract features
        return x.view(x.size(0), -1)  # Flatten the feature map

# Bayesian Classifier with prior updates for pseudo-labeling
class BayesianClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        """
        Initialize the Bayesian Classifier.
        :param input_dim: Dimensionality of the input features.
        :param num_classes: Number of classes for classification.
        """
        super(BayesianClassifier, self).__init__()
        self.num_classes = num_classes
        self.input_dim = input_dim
        # Initialize prior mean and covariance for each class
        self.prior_mean = nn.Parameter(torch.zeros(num_classes, input_dim, dtype=torch.float32), requires_grad=False)
        self.prior_cov = nn.Parameter(torch.eye(input_dim, dtype=torch.float32).repeat(num_classes, 1, 1), requires_grad=False)

    def update_posteriors(self, features, labels):
        """
        Update the posterior mean and covariance based on new samples.
        :param features: Feature vectors of the samples.
        :param labels: Labels corresponding to the samples.
        """
        features = features.type(torch.float32)  # Ensure correct dtype
        for c in range(self.num_classes):
            class_features = features[labels == c]
            if len(class_features) > 0:
                n_c = len(class_features)
                sample_mean = class_features.mean(dim=0)

                prior_cov_inv = torch.linalg.inv(self.prior_cov[c].type(torch.float32))
                likelihood_cov_inv = torch.eye(self.input_dim, device=features.device, dtype=torch.float32) * n_c

                posterior_cov = torch.linalg.inv(prior_cov_inv + likelihood_cov_inv)
                posterior_mean = posterior_cov @ (
                    prior_cov_inv @ self.prior_mean[c] + likelihood_cov_inv @ sample_mean
                )

                self.prior_mean.data[c] = posterior_mean
                self.prior_cov.data[c] = posterior_cov

    def forward(self, features):
        """
        Perform classification using cosine similarity.
        :param features: Input features for classification.
        :return: Cosine similarity scores with the class prototypes.
        """
        features = features.type(torch.float32)  # Ensure correct dtype
        normalized_features = features / (torch.norm(features, dim=1, keepdim=True) + 1e-6)
        normalized_prototypes = self.prior_mean / (torch.norm(self.prior_mean, dim=1, keepdim=True) + 1e-6)
        cosine_similarity = torch.mm(normalized_features, normalized_prototypes.T)
        return cosine_similarity

# Main training and evaluation routine
def train_and_evaluate():
    """
    Train and evaluate the Bayesian classifier on a sequence of datasets
    with feature extraction, dimensionality reduction, and pseudo-labeling.
    """
    # Dataset paths
    train_data_path = '/content/drive/MyDrive/CS771 A-2 /dataset/dataset/part_one_dataset/train_data'
    eval_data_path = '/content/drive/MyDrive/CS771 A-2 /dataset/dataset/part_one_dataset/eval_data'

    # Define data transformations for image preprocessing
    transform = transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.RandomCrop(32, padding=4),
        transforms.Resize((224, 224)),  # Resize for EfficientNet input
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # EfficientNet normalization
    ])

    # Configuration parameters
    batch_size = 64
    reduced_dim = 128  # Dimensionality after Kernel PCA
    confidence_threshold = 0.8  # Minimum confidence for pseudo-labeling
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Initialize feature extractor and variables
    feature_extractor = EfficientNetFeatureExtractor(pretrained=True).to(device)
    pca = None  # Kernel PCA object
    accuracy_matrix = np.zeros((10, 10))  # Accuracy storage matrix

    for i in range(1, 11):
        print(f"\n=== Processing Dataset D{i} ===")

        # Load training dataset
        train_dataset = CustomCIFARDataset(
            os.path.join(train_data_path, f'{i}_train_data.tar.pth'),
            transform=transform,
            labeled=(i == 1)  # Labels only for D1
        )
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

        features_list, labels_list = [], []
        for batch in train_loader:
            if i == 1:
                images, labels = batch
                labels_list.append(labels.to(device))
            else:
                images = batch

            images = images.to(device)
            features = feature_extractor(images)
            features_list.append(features)

        features = torch.cat(features_list).to(device)
        if labels_list:
            labels = torch.cat(labels_list).to(device)
            num_classes = len(torch.unique(labels))
        else:
            labels = None
            num_classes = 10

        if i == 1:
            print("Fitting Polynomial Kernel PCA on Dataset D1...")
            pca = KernelPCA(n_components=reduced_dim, kernel='poly', degree=3, coef0=1)
            features = torch.tensor(pca.fit_transform(features.cpu())).to(device)
            classifier = BayesianClassifier(input_dim=reduced_dim, num_classes=num_classes).to(device)
        else:
            print(f"Transforming Dataset D{i} with Polynomial Kernel PCA...")
            features = torch.tensor(pca.transform(features.cpu())).to(device)

        if labels is not None:
            classifier.update_posteriors(features, labels)
        else:
            with torch.no_grad():
                logits = classifier(features)
                predictions = logits.argmax(dim=1)
                confidences = logits.max(dim=1).values

            high_conf_indices = confidences > confidence_threshold
            high_conf_features = features[high_conf_indices]
            high_conf_labels = predictions[high_conf_indices]

            if len(high_conf_features) > 0:
                classifier.update_posteriors(high_conf_features, high_conf_labels)

        print(f"\nEvaluating model f{i} on held-out datasets D̂1 to D̂{i}")
        for j in range(1, i + 1):
            eval_dataset = CustomCIFARDataset(
                os.path.join(eval_data_path, f'{j}_eval_data.tar.pth'),
                transform=transform,
                labeled=True
            )
            eval_loader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=False)

            correct, total = 0, 0
            with torch.no_grad():
                for images, labels in eval_loader:
                    images, labels = images.to(device), labels.to(device)
                    features = feature_extractor(images)
                    features = torch.tensor(pca.transform(features.cpu())).to(device)
                    logits = classifier(features)
                    predictions = logits.argmax(dim=1)
                    correct += (predictions == labels).sum().item()
                    total += labels.size(0)

            accuracy = correct / total
            accuracy_matrix[i - 1][j - 1] = accuracy
            print(f"f{i} accuracy on D̂{j}: {accuracy:.4f}")

    np.save('accuracy_matrix.npy', accuracy_matrix)
    print("\nTraining and evaluation completed.")


Using RBF kernel PCA

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from torchvision.models import efficientnet_b0
from PIL import Image
import os
import numpy as np
from sklearn.decomposition import KernelPCA  # For dimensionality reduction using RBF Kernel PCA

# Custom Dataset Class for CIFAR-like data
class CustomCIFARDataset(Dataset):
    """
    Dataset class to handle CIFAR-like data.

    Args:
        file_path (str): Path to the dataset file.
        transform (callable, optional): Transformations to apply to images.
        labeled (bool): Whether the dataset has labels (for training).
    """
    def __init__(self, file_path, transform=None, labeled=True):
        try:
            # Load the dataset file
            data_dict = torch.load(file_path, weights_only=False)
        except TypeError:
            print("Warning: Falling back to weights_only=False. Ensure the file is trusted.")
            data_dict = torch.load(file_path)

        self.images = data_dict['data']  # Image data
        self.labels = data_dict['targets'] if labeled else None  # Labels, if available
        self.transform = transform  # Transformations to apply to the images

    def __len__(self):
        return len(self.images)  # Total number of images

    def __getitem__(self, idx):
        """
        Retrieve an image and its label (if available) by index.
        """
        image = self.images[idx]
        image = Image.fromarray(image)  # Convert NumPy array to PIL Image
        if self.transform:
            image = self.transform(image)  # Apply transformations

        if self.labels is not None:
            return image, self.labels[idx]  # Return image and label
        else:
            return image  # Return only the image for unlabeled datasets

# EfficientNet-based Feature Extractor
class EfficientNetFeatureExtractor(nn.Module):
    """
    Feature extractor using EfficientNet B0. Pretrained weights are used for
    transfer learning, and the classification head is removed.
    """
    def __init__(self, pretrained=True):
        super(EfficientNetFeatureExtractor, self).__init__()
        base_model = efficientnet_b0(pretrained=pretrained)
        self.features = nn.Sequential(*list(base_model.children())[:-2])  # Remove the classifier head

        # Freeze all parameters to prevent training
        for param in self.features.parameters():
            param.requires_grad = False

    def forward(self, x):
        """
        Forward pass to extract features from input images.
        """
        x = self.features(x)  # Extract feature maps
        return x.view(x.size(0), -1)  # Flatten the feature maps into 1D vectors

# Bayesian Classifier with Posterior Updates
class BayesianClassifier(nn.Module):
    """
    Bayesian Classifier to predict class probabilities using learned posteriors.

    Args:
        input_dim (int): Dimensionality of input features.
        num_classes (int): Number of target classes.
    """
    def __init__(self, input_dim, num_classes):
        super(BayesianClassifier, self).__init__()
        self.num_classes = num_classes
        self.input_dim = input_dim

        # Initialize prior mean and covariance for each class
        self.prior_mean = nn.Parameter(torch.zeros(num_classes, input_dim, dtype=torch.float32), requires_grad=False)
        self.prior_cov = nn.Parameter(torch.eye(input_dim, dtype=torch.float32).repeat(num_classes, 1, 1), requires_grad=False)

    def update_posteriors(self, features, labels):
        """
        Update the posterior distribution for each class using labeled data.
        """
        features = features.type(torch.float32)  # Ensure features are float32 for computation
        for c in range(self.num_classes):
            class_features = features[labels == c]  # Extract features of class `c`
            if len(class_features) > 0:
                n_c = len(class_features)  # Number of samples in the class
                sample_mean = class_features.mean(dim=0)  # Compute mean of the class features

                # Compute posterior covariance and mean using Bayes' rule
                prior_cov_inv = torch.linalg.inv(self.prior_cov[c].type(torch.float32))
                likelihood_cov_inv = torch.eye(self.input_dim, dtype=torch.float32, device=features.device) * n_c
                posterior_cov = torch.linalg.inv(prior_cov_inv + likelihood_cov_inv)
                posterior_mean = posterior_cov @ (
                    prior_cov_inv @ self.prior_mean[c] + likelihood_cov_inv @ sample_mean
                )

                # Update the prior with the computed posterior
                self.prior_mean.data[c] = posterior_mean
                self.prior_cov.data[c] = posterior_cov

    def forward(self, features):
        """
        Forward pass to compute similarity scores between features and class prototypes.
        """
        features = features.type(torch.float32)
        normalized_features = features / (torch.norm(features, dim=1, keepdim=True) + 1e-6)  # Normalize input features
        normalized_prototypes = self.prior_mean / (torch.norm(self.prior_mean, dim=1, keepdim=True) + 1e-6)  # Normalize prototypes

        cosine_similarity = torch.mm(normalized_features, normalized_prototypes.T)  # Compute cosine similarity
        return cosine_similarity

# Training and Evaluation Workflow
def train_and_evaluate():
    """
    Main training and evaluation pipeline. This includes:
    - Feature extraction with EfficientNet.
    - Dimensionality reduction using RBF Kernel PCA.
    - Training Bayesian Classifier on labeled and pseudo-labeled data.
    - Evaluating models on test datasets.
    """
    train_data_path = '/path/to/train_data'
    eval_data_path = '/path/to/eval_data'

    # Image transformations for data augmentation and normalization
    transform = transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.RandomCrop(32, padding=4),
        transforms.Resize((224, 224)),  # EfficientNet input size
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # EfficientNet normalization
    ])

    batch_size = 64
    reduced_dim = 128  # Target dimensionality after RBF Kernel PCA
    confidence_threshold = 0.8
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Initialize feature extractor
    feature_extractor = EfficientNetFeatureExtractor(pretrained=True).to(device)
    rbf_kernel_pca = None  # RBF Kernel PCA instance

    accuracy_matrix = np.zeros((10, 10))  # Matrix to store accuracy results

    for i in range(1, 11):  # Iterate over datasets D1 to D10
        print(f"\n=== Processing Dataset D{i} ===")

        # Load training data
        train_dataset = CustomCIFARDataset(
            os.path.join(train_data_path, f'{i}_train_data.tar.pth'),
            transform=transform,
            labeled=(i == 1)  # Only D1 is labeled
        )
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

        # Extract features for training data
        features_list, labels_list = [], []
        for batch in train_loader:
            if i == 1:
                images, labels = batch
                labels_list.append(labels.to(device))
            else:
                images = batch

            images = images.to(device)
            features = feature_extractor(images)
            features_list.append(features)

        features = torch.cat(features_list).to(device)
        if labels_list:
            labels = torch.cat(labels_list).to(device)
            num_classes = len(torch.unique(labels))
        else:
            labels = None
            num_classes = 10  # Default number of classes for unlabeled data

        # Apply RBF Kernel PCA
        if i == 1:
            print("Fitting RBF Kernel PCA on Dataset D1...")
            rbf_kernel_pca = KernelPCA(n_components=reduced_dim, kernel="rbf", gamma=0.1)
            features = torch.tensor(rbf_kernel_pca.fit_transform(features.cpu())).to(device)
        else:
            print(f"Transforming Dataset D{i} with RBF Kernel PCA...")
            features = torch.tensor(rbf_kernel_pca.transform(features.cpu())).to(device)

        # Initialize the classifier for D1
        if i == 1:
            classifier = BayesianClassifier(input_dim=reduced_dim, num_classes=num_classes).to(device)

        # Update classifier posteriors
        if labels is not None:  # For D1
            classifier.update_posteriors(features, labels)
        else:  # Pseudo-labeling for D2 onward
            with torch.no_grad():
                logits = classifier(features)
                predictions = logits.argmax(dim=1)
                confidences = logits.max(dim=1).values

            high_confidence_indices = confidences > confidence_threshold
            high_confidence_features = features[high_confidence_indices]
            high_confidence_labels = predictions[high_confidence_indices]

            if len(high_confidence_features) > 0:
                classifier.update_posteriors(high_confidence_features, high_confidence_labels)

        # Evaluate the model
        print(f"\nEvaluating model f{i} on held-out datasets D̂1 to D̂{i}")
        for j in range(1, i + 1):
            eval_dataset = CustomCIFARDataset(
                os.path.join(eval_data_path, f'{j}_eval_data.tar.pth'),
                transform=transform,
                labeled=False  # Test data is unlabeled
            )
            eval_loader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=False)

            predictions = []
            for images in eval_loader:
                images = images.to(device)
                features = feature_extractor(images)
                features = torch.tensor(rbf_kernel_pca.transform(features.cpu())).to(device)
                logits = classifier(features)
                predictions.extend(logits.argmax(dim=1).cpu().numpy())

            eval_predictions = np.array(predictions)
            accuracy_matrix[i - 1][j - 1] = 0  # Replace with actual accuracy computation
            print(f"Accuracy of f{i} on D̂{j}: {accuracy_matrix[i - 1][j - 1]:.3f}")

    return accuracy_matrix


Using Cosine similarity PCA

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from torchvision.models import efficientnet_b0
from PIL import Image
import os
import numpy as np
from sklearn.decomposition import KernelPCA

# Dataset Class
class CustomCIFARDataset(Dataset):
    def __init__(self, file_path, transform=None, labeled=True):
        """
        Custom Dataset class to load CIFAR-like data.

        :param file_path: Path to the dataset file.
        :param transform: Transformations to apply to images.
        :param labeled: Boolean indicating if the dataset contains labels.
        """
        try:
            data_dict = torch.load(file_path, weights_only=False)
        except TypeError:
            print("Warning: Falling back to weights_only=False. Ensure the file is trusted.")
            data_dict = torch.load(file_path)

        # Load the images and labels from the dataset
        self.images = data_dict['data']
        self.labels = data_dict['targets'] if labeled else None
        self.transform = transform

    def __len__(self):
        """
        Returns the total number of images in the dataset.
        """
        return len(self.images)

    def __getitem__(self, idx):
        """
        Retrieves the image and its label (if available) at the given index.

        :param idx: Index of the sample to retrieve.
        :return: Transformed image and its corresponding label (if available).
        """
        image = self.images[idx]
        image = Image.fromarray(image)  # Convert to PIL Image
        if self.transform:
            image = self.transform(image)  # Apply any transformations

        if self.labels is not None:
            return image, self.labels[idx]
        else:
            return image

# Feature Extractor using EfficientNet
class EfficientNetFeatureExtractor(nn.Module):
    def __init__(self, pretrained=True):
        """
        Initializes the EfficientNet feature extractor by removing the classifier head.

        :param pretrained: Boolean indicating whether to load pretrained weights.
        """
        super(EfficientNetFeatureExtractor, self).__init__()
        base_model = efficientnet_b0(pretrained=pretrained)
        # Remove the classifier head by selecting all layers except the last two
        self.features = nn.Sequential(*list(base_model.children())[:-2])

        # Freeze the weights of the feature extractor (no backpropagation)
        for param in self.features.parameters():
            param.requires_grad = False

    def forward(self, x):
        """
        Forward pass to extract features from the input image.

        :param x: Input image tensor.
        :return: Flattened feature tensor.
        """
        x = self.features(x)  # Extract features
        return x.view(x.size(0), -1)  # Flatten the feature map to a vector

# Bayesian Classifier using Cosine Similarity
class BayesianClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        """
        Initializes the Bayesian Classifier with prior mean and covariance.

        :param input_dim: Dimensionality of the feature space.
        :param num_classes: Number of output classes.
        """
        super(BayesianClassifier, self).__init__()
        self.num_classes = num_classes
        self.input_dim = input_dim
        # Initialize prior mean and covariance matrices as non-trainable parameters
        self.prior_mean = nn.Parameter(torch.zeros(num_classes, input_dim, dtype=torch.float32), requires_grad=False)
        self.prior_cov = nn.Parameter(torch.eye(input_dim, dtype=torch.float32).repeat(num_classes, 1, 1), requires_grad=False)

    def update_posteriors(self, features, labels):
        """
        Updates the posterior mean and covariance for each class based on the input features and labels.

        :param features: The feature vectors of the training samples.
        :param labels: The true labels of the samples.
        """
        features = features.type(torch.float32)  # Ensure features are in float32
        for c in range(self.num_classes):
            class_features = features[labels == c]  # Select features corresponding to class c
            if len(class_features) > 0:
                n_c = len(class_features)
                sample_mean = class_features.mean(dim=0)

                # Inverse of prior covariance matrix for class c
                prior_cov_inv = torch.linalg.inv(self.prior_cov[c].type(torch.float32))
                likelihood_cov_inv = torch.eye(self.input_dim, device=features.device, dtype=torch.float32) * n_c

                posterior_cov = torch.linalg.inv(prior_cov_inv + likelihood_cov_inv)
                posterior_mean = posterior_cov @ (
                    prior_cov_inv @ self.prior_mean[c] + likelihood_cov_inv @ sample_mean
                )

                # Update the class's prior mean and covariance with posterior values
                self.prior_mean.data[c] = posterior_mean
                self.prior_cov.data[c] = posterior_cov

    def forward(self, features):
        """
        Performs a forward pass to compute the cosine similarity between features and class prototypes.

        :param features: Input feature tensor to classify.
        :return: Cosine similarity between input features and class prototypes.
        """
        features = features.type(torch.float32)  # Ensure features are in float32
        normalized_features = features / (torch.norm(features, dim=1, keepdim=True) + 1e-6)
        normalized_prototypes = self.prior_mean / (torch.norm(self.prior_mean, dim=1, keepdim=True) + 1e-6)

        # Compute cosine similarity between features and class prototypes
        cosine_similarity = torch.mm(normalized_features, normalized_prototypes.T)
        return cosine_similarity

# Main Functionality
def train_and_evaluate():
    """
    Main function to train and evaluate the model using KernelPCA and Bayesian Classification.
    """
    train_data_path = '/content/drive/MyDrive/CS771 A-2 /dataset/dataset/part_one_dataset/train_data'
    eval_data_path = '/content/drive/MyDrive/CS771 A-2 /dataset/dataset/part_one_dataset/eval_data'

    # Define transformations to apply on the images
    transform = transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.RandomCrop(32, padding=4),
        transforms.Resize((224, 224)),  # Resize for EfficientNet input
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # Normalize for EfficientNet
    ])

    batch_size = 64
    reduced_dim = 128  # Target KernelPCA dimensions
    confidence_threshold = 0.8  # Confidence threshold for pseudo-labeling
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Initialize Feature Extractor and KernelPCA (empty at first)
    feature_extractor = EfficientNetFeatureExtractor(pretrained=True).to(device)
    kernel_pca = None  # KernelPCA will be fitted on the first dataset (D1)

    accuracy_matrix = np.zeros((10, 10))

    # Process each dataset from D1 to D10
    for i in range(1, 11):
        print(f"\n=== Processing Dataset D{i} ===")

        # Load training data for the current dataset
        train_dataset = CustomCIFARDataset(
            os.path.join(train_data_path, f'{i}_train_data.tar.pth'),
            transform=transform,
            labeled=(i == 1)  # Only D1 is labeled
        )
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

        features_list, labels_list = [], []
        for batch in train_loader:
            if i == 1:  # D1 is labeled
                images, labels = batch
                labels_list.append(labels.to(device))
            else:  # D2 onward is unlabeled
                images = batch

            images = images.to(device)
            features = feature_extractor(images)  # Extract high-dimensional features
            features_list.append(features)

        features = torch.cat(features_list).to(device)  # Combine all extracted features
        if labels_list:
            labels = torch.cat(labels_list).to(device)
            num_classes = len(torch.unique(labels))
        else:
            labels = None
            num_classes = 10  # Default for unlabeled datasets

        # Apply Kernel PCA to reduce dimensions
        if i == 1:
            print("Fitting KernelPCA on Dataset D1...")
            kernel_pca = KernelPCA(n_components=reduced_dim, kernel="cosine")
            features = torch.tensor(kernel_pca.fit_transform(features.cpu())).to(device)  # Fit on D1
        else:
            print(f"Transforming Dataset D{i} with KernelPCA...")
            features = torch.tensor(kernel_pca.transform(features.cpu())).to(device)  # Apply to other datasets

        # Initialize Bayesian Classifier for D1
        if i == 1:
            classifier = BayesianClassifier(input_dim=reduced_dim, num_classes=num_classes).to(device)

        # Update posteriors for D1 with true labels
        if labels is not None:  # D1
            classifier.update_posteriors(features, labels)
        else:  # D2 onward: Pseudo-labeling and posterior update
            with torch.no_grad():
                logits = classifier(features)  # Predict pseudo-labels
                predictions = logits.argmax(dim=1)
                confidences = logits.max(dim=1).values  # Confidence scores

            # Filter samples based on confidence threshold
            high_confidence_indices = confidences > confidence_threshold
            high_confidence_features = features[high_confidence_indices]
            high_confidence_predictions = predictions[high_confidence_indices]

            # Update classifier with high-confidence predictions
            classifier.update_posteriors(high_confidence_features, high_confidence_predictions)

        # Evaluate using the classifier for test set D10
        if i == 10:
            eval_dataset = CustomCIFARDataset(
                os.path.join(eval_data_path, 'eval_data.tar.pth'),
                transform=transform,
                labeled=True  # Evaluation dataset is labeled
            )
            eval_loader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=False)

            all_preds, all_labels = [], []
            for batch in eval_loader:
                images, labels = batch
                images = images.to(device)
                features = feature_extractor(images)
                features = torch.tensor(kernel_pca.transform(features.cpu())).to(device)  # Apply KernelPCA
                logits = classifier(features)
                predictions = logits.argmax(dim=1)

                all_preds.append(predictions.cpu().numpy())
                all_labels.append(labels.cpu().numpy())

            # Compute accuracy for D10
            all_preds = np.concatenate(all_preds)
            all_labels = np.concatenate(all_labels)
            accuracy = np.sum(all_preds == all_labels) / len(all_labels)
            accuracy_matrix[i - 1, i - 1] = accuracy  # Store accuracy for D10

            print(f"Accuracy for D10: {accuracy * 100:.2f}%")

    print("\nAccuracy Matrix (D1 to D10):")
    print(accuracy_matrix)

# Call the main function to run the training and evaluation process
train_and_evaluate()


Final Model : using ViT for feature extraction and with a Bayesian classifier

In [4]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from transformers import ViTModel, ViTFeatureExtractor
from PIL import Image
import os
import numpy as np

# Dataset Class
class CustomCIFARDataset(Dataset):
    def __init__(self, file_path, transform=None, labeled=True):
        try:
            data_dict = torch.load(file_path)
        except TypeError:
            print("Warning: Falling back to non-weighted mode. Ensure the file is trusted.")
            data_dict = torch.load(file_path)

        self.images = data_dict['data']
        self.labels = data_dict['targets'] if labeled else None
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        image = Image.fromarray(image)  # Convert to PIL Image
        if self.transform:
            image = self.transform(image)

        if self.labels is not None:
            return image, self.labels[idx]
        else:
            return image

# Feature Extractor using Vision Transformer
class ViTFeatureExtractorModel(nn.Module):
    def __init__(self, pretrained=True):
        super(ViTFeatureExtractorModel, self).__init__()
        self.vit = ViTModel.from_pretrained("google/vit-base-patch16-224-in21k")
        self.feature_dim = self.vit.config.hidden_size  # Output size of ViT

        # Freeze ViT weights
        if pretrained:
            for param in self.vit.parameters():
                param.requires_grad = False

    def forward(self, x):
        outputs = self.vit(pixel_values=x, return_dict=True)
        return outputs.last_hidden_state[:, 0, :]  # Use the [CLS] token embedding

# Bayesian Classifier
class BayesianClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(BayesianClassifier, self).__init__()
        self.num_classes = num_classes
        self.input_dim = input_dim
        self.prior_mean = nn.Parameter(torch.zeros(num_classes, input_dim), requires_grad=False)
        self.prior_cov = nn.Parameter(torch.eye(input_dim).repeat(num_classes, 1, 1), requires_grad=False)

    def update_posteriors(self, features, labels):
        for c in range(self.num_classes):
            class_features = features[labels == c]
            if len(class_features) > 0:
                n_c = len(class_features)
                sample_mean = class_features.mean(dim=0)
                prior_cov_inv = torch.linalg.inv(self.prior_cov[c])
                likelihood_cov_inv = torch.eye(self.input_dim, device=features.device) * n_c

                posterior_cov = torch.linalg.inv(prior_cov_inv + likelihood_cov_inv)
                posterior_mean = posterior_cov @ (
                    prior_cov_inv @ self.prior_mean[c] + likelihood_cov_inv @ sample_mean
                )

                self.prior_mean.data[c] = posterior_mean
                self.prior_cov.data[c] = posterior_cov

    def forward(self, features):
        normalized_features = features / (torch.norm(features, dim=1, keepdim=True) + 1e-6)
        normalized_prototypes = self.prior_mean / (torch.norm(self.prior_mean, dim=1, keepdim=True) + 1e-6)
        cosine_similarity = torch.mm(normalized_features, normalized_prototypes.T)
        return cosine_similarity

# Main Functionality
def train_and_evaluate():
    train_data_path = '/content/drive/MyDrive/ML/dataset/dataset/part_one_dataset/train_data'
    eval_data_path = '/content/drive/MyDrive/ML/dataset/dataset/part_one_dataset/eval_data'

    transform = transforms.Compose([
        transforms.Resize((224, 224)),  # Resize for ViT input
        transforms.ToTensor(),
        transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])  # Normalize for ViT
    ])

    batch_size = 64
    confidence_threshold = 0.8
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Initialize Feature Extractor
    feature_extractor = ViTFeatureExtractorModel(pretrained=True).to(device)

    accuracy_matrix = np.zeros((10, 10))

    for i in range(1, 11):
        print(f"\n=== Processing Dataset D{i} ===")

        train_dataset = CustomCIFARDataset(
            os.path.join(train_data_path, f'{i}_train_data.tar.pth'),
            transform=transform,
            labeled=(i == 1)
        )
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

        features_list, labels_list = [], []
        for batch in train_loader:
            if i == 1:  # D1 is labeled
                images, labels = batch
                labels_list.append(labels.to(device))
            else:  # D2 onward
                images = batch

            images = images.to(device)
            features = feature_extractor(images)  # Extract features
            features_list.append(features)

        features = torch.cat(features_list).to(device)  # Combine all features
        if labels_list:
            labels = torch.cat(labels_list).to(device)
            num_classes = len(torch.unique(labels))
        else:
            labels = None
            num_classes = 10  # Default for unlabeled datasets

        # Initialize Classifier for D1
        if i == 1:
            classifier = BayesianClassifier(input_dim=feature_extractor.feature_dim, num_classes=num_classes).to(device)

        if labels is not None:  # D1: Update posterior using true labels
            classifier.update_posteriors(features, labels)
        else:  # D2 onward: Pseudo-labeling and posterior update
            with torch.no_grad():
                logits = classifier(features)  # Predict pseudo-labels
                predictions = logits.argmax(dim=1)
                confidences = logits.max(dim=1).values  # Confidence scores

            # Filter samples based on confidence threshold
            high_confidence_indices = confidences > confidence_threshold
            high_confidence_features = features[high_confidence_indices]
            high_confidence_labels = predictions[high_confidence_indices]

            # Update posterior using high-confidence pseudo-labeled samples
            if len(high_confidence_features) > 0:
                classifier.update_posteriors(high_confidence_features, high_confidence_labels)

        print(f"\nEvaluating model f{i} on held-out datasets D̂1 to D̂{i}")
        for j in range(1, i + 1):
            eval_dataset = CustomCIFARDataset(
                os.path.join(eval_data_path, f'{j}_eval_data.tar.pth'),
                transform=transform,
                labeled=True
            )
            eval_loader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=False)

            correct, total = 0, 0
            with torch.no_grad():
                for images, labels in eval_loader:
                    images, labels = images.to(device), labels.to(device)
                    features = feature_extractor(images)
                    logits = classifier(features)
                    predictions = logits.argmax(dim=1)
                    correct += (predictions == labels).sum().item()
                    total += labels.size(0)

            accuracy = 100 * correct / total
            accuracy_matrix[i - 1, j - 1] = accuracy
            print(f"Accuracy of model f{i} on D̂{j}: {accuracy:.2f}%")

    print("\nAccuracy Matrix:")
    for row in accuracy_matrix:
        print(" ".join(f"{val:6.2f}" for val in row))

# Run
if __name__ == "__main__":
    train_and_evaluate()



=== Processing Dataset D1 ===


  data_dict = torch.load(file_path)


KeyboardInterrupt: 