In [None]:
import torch
import numpy as np
from sklearn.metrics import accuracy_score
from tqdm import tqdm
from torchvision import transforms, models
from torchvision.models import ConvNeXt_Tiny_Weights
import torch.nn as nn
from sklearn.decomposition import PCA

# 1. Load data functions
def load_data(file_path):
    """Loads the labeled dataset (D1)."""
    data_dict = torch.load(file_path)
    return data_dict['data'], data_dict['targets']

def load_data_unlabeled(file_path):
    """Loads the unlabeled datasets (D2, D3, ..., D10)."""
    data_dict = torch.load(file_path)
    return data_dict['data']  # Only 'data', no 'targets'

# 2. Transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # ConvNeXt expects 224x224
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Standard ImageNet stats
])

def apply_transforms(data):
    """Apply transformations to a batch of images."""
    data_transformed = torch.stack([transform(image / 255.0) for image in data])
    return data_transformed

# 3. ConvNeXt Embedder
class ConvNeXtEmbedder(nn.Module):
    def __init__(self):
        super(ConvNeXtEmbedder, self).__init__()
        convnext = models.convnext_tiny(weights=ConvNeXt_Tiny_Weights.IMAGENET1K_V1)
        self.feature_extractor = nn.Sequential(*list(convnext.children())[:-2])  # Remove classification head
    
    def forward(self, x):
        x = self.feature_extractor(x)
        return x.reshape(x.size(0), -1)  # Flatten using reshape

embedder = ConvNeXtEmbedder().eval()

# 4. Extract embeddings
def extract_embeddings(data, embedder, batch_size=32):
    """Extract embeddings for a dataset using the ConvNeXt embedder."""
    embeddings = []
    with torch.no_grad():
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
        data = data.to(device)
        embedder = embedder.to(device)
        for i in tqdm(range(0, len(data), batch_size), desc="Extracting Embeddings"):
            batch = data[i:i+batch_size]
            embeddings.append(embedder(batch).cpu().numpy())
    return np.vstack(embeddings)

# 5. Gaussian Generative Classifier (GGC)
class GaussianGenerativeClassifier:
    def __init__(self):
        self.means = {}  # Mean vector for each class
        self.covariances = {}  # Covariance matrix for each class
        self.priors = {}  # Prior probability for each class

    def fit(self, X, y):
        """
        Fit the generative model by estimating the means, covariances, and priors.
        """
        for cls in np.unique(y):
            class_points = X[y == cls]
            self.means[cls] = np.mean(class_points, axis=0)
            self.covariances[cls] = np.cov(class_points, rowvar=False)
            self.priors[cls] = class_points.shape[0] / X.shape[0]

    def gaussian_log_density(self, x, mean, covariance):
        """
        Compute the logarithm of the Gaussian density function.
        """
        d = mean.shape[0]
        diff = x - mean
        # Regularize covariance to avoid singular matrix issues
        regularization = 1e-6 * np.eye(d)
        cov_inv = np.linalg.inv(covariance + regularization)
        det_cov = np.linalg.det(covariance + regularization)
        
        # Log-normalization term
        log_norm = -0.5 * (d * np.log(2 * np.pi) + np.log(det_cov))
        # Log-exponent term
        log_exponent = -0.5 * diff.T @ cov_inv @ diff
        return log_norm + log_exponent

    def predict(self, X):
        """
        Predict the class labels for the given data using log-densities.
        """
        preds = []
        for x in tqdm(X, desc="Predicting Labels"):
            class_probs = {}
            for cls in self.means.keys():
                log_likelihood = self.gaussian_log_density(x, self.means[cls], self.covariances[cls])
                log_prior = np.log(self.priors[cls])
                class_probs[cls] = log_likelihood + log_prior
            preds.append(max(class_probs, key=class_probs.get))
        return np.array(preds)

# 6. PCA Wrapper for Dimensionality Reduction
class PCAEmbedder:
    def __init__(self, n_components=50):
        self.pca = PCA(n_components=n_components)

    def fit_transform(self, embeddings):
        """Fit PCA on the embeddings and transform them."""
        return self.pca.fit_transform(embeddings)

    def transform(self, embeddings):
        """Transform embeddings using fitted PCA."""
        return self.pca.transform(embeddings)

# Initialize variables
train_datasets = [f"/kaggle/input/cs771-mp2/dataset/part_one_dataset/train_data/{i}_train_data.tar.pth" for i in range(1, 11)] #change path to the location of the dataset
eval_datasets = [f"/kaggle/input/cs771-mp2/dataset/part_one_dataset/eval_data/{i}_eval_data.tar.pth" for i in range(1, 11)] #change path to the location of the dataset
models = []
accuracy_matrix = np.zeros((10, 10))  # Rows: Models f1 to f10, Columns: Held-out datasets D̂1 to D̂10
pca_embedder = PCAEmbedder(n_components=50)

all_train_embeddings = []
all_train_labels = []

# Train f1 with PCA
print("Training model f1...")
train_data, train_targets = load_data(train_datasets[0])
train_data = apply_transforms(torch.tensor(train_data).permute(0, 3, 1, 2))
train_targets = torch.tensor(train_targets)

train_embeddings = extract_embeddings(train_data, embedder)
all_train_embeddings.append(train_embeddings)
all_train_labels.append(train_targets.numpy())

reduced_train_embeddings = pca_embedder.fit_transform(np.vstack(all_train_embeddings))

model_f1 = GaussianGenerativeClassifier()
model_f1.fit(reduced_train_embeddings, np.hstack(all_train_labels))
models.append(model_f1)

for j, eval_file in enumerate(eval_datasets[:1]):
    eval_data, eval_targets = load_data(eval_file)
    eval_data = apply_transforms(torch.tensor(eval_data).permute(0, 3, 1, 2))
    eval_embeddings = extract_embeddings(eval_data, embedder)
    reduced_eval_embeddings = pca_embedder.transform(eval_embeddings)
    eval_targets = torch.tensor(eval_targets)

    predictions = model_f1.predict(reduced_eval_embeddings)
    accuracy = accuracy_score(eval_targets.numpy(), predictions)
    accuracy_matrix[0, j] = accuracy
    print(f"Accuracy of f1 on D̂{j+1}: {accuracy:.4f}")

# Train f2 to f10 with updated PCA
for i in range(1, 10):
    print(f"Training model f{i+1}...")
    train_data = load_data_unlabeled(train_datasets[i])
    train_data = apply_transforms(torch.tensor(train_data).permute(0, 3, 1, 2))

    current_model = models[-1]
    train_embeddings = extract_embeddings(train_data, embedder)
    reduced_train_embeddings = pca_embedder.transform(train_embeddings)
    predicted_labels = current_model.predict(reduced_train_embeddings)

    all_train_embeddings.append(train_embeddings)
    all_train_labels.append(predicted_labels)

    all_embeddings_stack = np.vstack(all_train_embeddings)
    pca_embedder = PCAEmbedder(n_components=50)
    reduced_all_embeddings = pca_embedder.fit_transform(all_embeddings_stack)

    updated_model = GaussianGenerativeClassifier()
    updated_model.fit(reduced_all_embeddings, np.hstack(all_train_labels))
    models.append(updated_model)

    for j, eval_file in enumerate(eval_datasets[:i + 1]):
        eval_data, eval_targets = load_data(eval_file)
        eval_data = apply_transforms(torch.tensor(eval_data).permute(0, 3, 1, 2))
        eval_embeddings = extract_embeddings(eval_data, embedder)
        reduced_eval_embeddings = pca_embedder.transform(eval_embeddings)
        eval_targets = torch.tensor(eval_targets)

        predictions = updated_model.predict(reduced_eval_embeddings)
        accuracy = accuracy_score(eval_targets.numpy(), predictions)
        accuracy_matrix[i, j] = accuracy
        print(f"Accuracy of f{i+1} on D̂{j+1}: {accuracy:.4f}")

# Print final accuracy matrix
print("Final Accuracy Matrix (Models vs Held-out Datasets):")
print(accuracy_matrix)
# Save the accuracy matrix as a CSV file
np.savetxt("accuracy_matrix_part_one.csv", accuracy_matrix, delimiter=",")


In [None]:
# File paths for part two datasets D11 to D20 (unlabeled)
train_datasets_part_two = [f"/kaggle/input/cs771-mp2/dataset/part_two_dataset/train_data/{i}_train_data.tar.pth" for i in range(1, 11)] #change path to the location of the dataset
eval_datasets_part_two = [f"/kaggle/input/cs771-mp2/dataset/part_two_dataset/eval_data/{i}_eval_data.tar.pth" for i in range(1, 11)] #change path to the location of the dataset

# Combine all train and evaluation datasets (D1 to D20)
all_train_datasets = train_datasets + train_datasets_part_two
all_eval_datasets = eval_datasets + eval_datasets_part_two

# Accuracy matrix of size 10x20 for f11 to f20 vs D̂1 to D̂20
accuracy_matrix_part_two = np.zeros((10, 20))

# Initialize PCA embedder for dimensionality reduction
pca_embedder_part_two = PCAEmbedder(n_components=50)

# Accumulate embeddings and labels from Task 1
all_train_embeddings_part_two = list(np.vstack(all_train_embeddings))  # Embeddings from Task 1
all_train_labels_part_two = list(np.hstack(all_train_labels))  # Labels from Task 1

# Sequential training for D11 to D20
for i in range(10, 20):  # Start from D11 (index 10)
    print(f"Training model f{i+1}...")

    # Load the unlabeled dataset D11 to D20
    train_data = load_data_unlabeled(all_train_datasets[i])  # Unlabeled datasets
    train_data = apply_transforms(torch.tensor(train_data).permute(0, 3, 1, 2))

    # Use the last trained model (e.g., f10 for D11) to predict pseudo-labels
    current_model = models[-1]  # Start with f10
    train_embeddings = extract_embeddings(train_data, embedder)

    # Add new embeddings to accumulated embeddings
    all_train_embeddings_part_two.append(train_embeddings)

    # Refit PCA with all embeddings seen so far
    pca_embedder_part_two = PCAEmbedder(n_components=50)  # Reset PCA embedder
    reduced_all_embeddings = pca_embedder_part_two.fit_transform(np.vstack(all_train_embeddings_part_two))

    # Transform the current dataset using the updated PCA
    reduced_train_embeddings = reduced_all_embeddings[-len(train_embeddings):]  # Get reduced embeddings for current dataset

    # Predict pseudo-labels
    predicted_labels = current_model.predict(reduced_train_embeddings)

    # Add predicted labels to accumulated labels
    all_train_labels_part_two.append(predicted_labels)

    # Train the updated model with pseudo-labeled data
    updated_model = GaussianGenerativeClassifier()
    updated_model.fit(reduced_all_embeddings, np.hstack(all_train_labels_part_two))  # Fit with all reduced embeddings and labels
    models.append(updated_model)

    # Evaluate the updated model on all held-out datasets up to the current one
    for j, eval_file in enumerate(all_eval_datasets[:i + 1]):  # Evaluate on D̂1 to D̂i+1
        eval_data, eval_targets = load_data(eval_file)
        eval_data = apply_transforms(torch.tensor(eval_data).permute(0, 3, 1, 2))
        eval_embeddings = extract_embeddings(eval_data, embedder)

        # Transform evaluation data using updated PCA
        reduced_eval_embeddings = pca_embedder_part_two.transform(eval_embeddings)

        # Predict and calculate accuracy
        predictions = updated_model.predict(reduced_eval_embeddings)
        accuracy = accuracy_score(eval_targets, predictions)
        accuracy_matrix_part_two[i - 10, j] = accuracy  # Adjusted indexing for f11 to f20
        print(f"Accuracy of f{i+1} on D̂{j+1}: {accuracy:.4f}")

# Print final accuracy matrix for Task 2
print("Final Accuracy Matrix for Models f11 to f20 vs Held-out Datasets D̂1 to D̂20:")
print(accuracy_matrix_part_two)
# Save the accuracy matrix as a CSV file
np.savetxt("accuracy_matrix_part_two.csv", accuracy_matrix_part_two, delimiter=",")
