In [1]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from torchvision import models, transforms
import numpy as np

In [2]:
train_dir = os.path.join('dataset', 'part_one_dataset', 'train_data')
eval_dir = os.path.join('dataset', 'part_one_dataset', 'eval_data')
train_path = os.path.join(train_dir, '1_train_data.tar.pth')
eval_path = os.path.join(eval_dir, '1_eval_data.tar.pth')

t = torch.load(train_path, weights_only = False)

In [3]:
import numpy as np
from sklearn.metrics.pairwise import cosine_distances, manhattan_distances

class LWP:
    """Learning Vector Prototypes with configurable distance function"""
    
    DISTANCE_FUNCTIONS = {
        'euclidean': lambda x, y: np.linalg.norm(x - y),
        'cosine': lambda x, y: cosine_distances(x.reshape(1, -1), y.reshape(1, -1))[0][0],
        'manhattan': lambda x, y: manhattan_distances(x.reshape(1, -1), y.reshape(1, -1))[0][0],
        'minkowski': lambda x, y, p=2: np.power(np.sum(np.power(np.abs(x - y), p)), 1/p)
    }
    
    def __init__(self, distance_metric='euclidean', **distance_params):
        """
            distance_params (dict): Additional parameters for the distance function
        """
        self.prototypes = {}
        self.class_counts = {i: 0 for i in range(10)}
        
        if callable(distance_metric):
            self.distance_fn = distance_metric
        elif distance_metric in self.DISTANCE_FUNCTIONS:
            if distance_metric == 'minkowski':
                p = distance_params.get('p', 2)
                self.distance_fn = lambda x, y: self.DISTANCE_FUNCTIONS[distance_metric](x, y, p)
            else:
                self.distance_fn = self.DISTANCE_FUNCTIONS[distance_metric]
        else:
            raise ValueError(f"Unknown distance metric: {distance_metric}. " 
                           f"Available metrics: {list(self.DISTANCE_FUNCTIONS.keys())}")

    def fit(self, features, labels):
        unique_labels = np.unique(labels)
        for label in unique_labels:
            samples = features[labels == label]
            num_samples = len(samples)
            
            if label not in self.prototypes:  # Original condition was: if label not in self.prototypes
                self.prototypes[label] = samples.mean(axis=0)
                self.class_counts[label] = len(samples)
            else:
                self.class_counts[label] += len(samples)
                self.prototypes[label] = (
                    (self.class_counts[label] - num_samples) / self.class_counts[label] * self.prototypes[label] +
                    num_samples / self.class_counts[label] * samples.mean(axis=0)
                )

    def predict(self, features):
        preds = []
        for feature in features:
            distances = {
                label: self.distance_fn(feature, proto)
                for label, proto in self.prototypes.items()
            }
            preds.append(min(distances, key=distances.get))
        return np.array(preds)

In [4]:
print (max(t['targets']))

9


In [5]:
print ( t['data'].shape)

(2500, 32, 32, 3)


In [6]:
data, targets = t['data'], t['targets'] 
data=data.reshape(data.shape[0], -1)
print(data.shape)

(2500, 3072)


In [7]:
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
from sklearn.preprocessing import normalize

In [8]:
data1, targets1= t['data'], t['targets']
data1=data1.reshape(data1.shape[0], -1)
data1=normalize(data1)
dataloader=DataLoader(data1, batch_size=32, shuffle=False)
lwp_model = LWP(distance_metric='cosine')  # LWP model with cosine distance
lwp_model.fit(data1, targets1) 

In [9]:
import os
import numpy as np
import torch
import torch.nn.functional as F
from sklearn.preprocessing import normalize
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics.pairwise import cosine_distances

In [None]:
for i in range(2,11):
    train_path=os.path.join(train_dir, f'{i}_train_data.tar.pth')
    print(f"Processing dataset {i} from {train_path}")

    dataset= torch.load(train_path, weights_only=False)
    data= dataset['data']
    data = data.reshape(-1,3,32,32)
    data = normalize(data.reshape(data.shape[0],-1))  # Normalize data (important for distance calculations)
    # Prepare DataLoader for the dataset
    dataloader = DataLoader(data, batch_size=32, shuffle=False)
    confidence_scores = []
    embeddings = []
    predictions = []

    with torch.no_grad():
        for batch in dataloader:
            inputs = [x.cpu().numpy() for x in batch]
            batch_predictions = []
            batch_distances = []
            # print("Prototypes:", lwp_model.prototypes)

            # Predict pseudo-labels and calculate distances to prototypes
            for sample in inputs:
                
                dist_to_prototypes = {label: lwp_model.distance_fn(sample, proto) for label, proto in lwp_model.prototypes.items()}
                closest_label = min(dist_to_prototypes, key=dist_to_prototypes.get)
                closest_distance = dist_to_prototypes[closest_label]
                batch_predictions.append(closest_label)
                batch_distances.append(closest_distance)

            predictions.extend(batch_predictions)
            confidence = 1 / (1 + np.array(batch_distances))  # Convert distances to confidence scores
            confidence_scores.extend(confidence.tolist())
            embeddings.extend(inputs)

    embeddings = np.array(embeddings)

    # Step 4: Select top 50% most confident samples
    sorted_indices = np.argsort(confidence_scores)[::-1]  # Sort by confidence scores (descending)
    top_50_percent_indices = sorted_indices[:len(sorted_indices) // 2]

    top_50_embeddings = embeddings[top_50_percent_indices]
    top_50_predictions = np.array(predictions)[top_50_percent_indices]

    # Step 5: Construct class centroids
    class_centroids = {}
    for label in np.unique(top_50_predictions):
        class_embeddings = top_50_embeddings[top_50_predictions == label]
        if class_embeddings.size > 0:
            centroid = class_embeddings.mean(axis=0)
            class_centroids[label] = centroid

    # Print centroids
    print(f"Class centroids calculated for dataset {i}:")
    for label, centroid in class_centroids.items():
        print(f"Class {label}: Centroid = {centroid[:5]}...")
   


Processing dataset 2 from dataset/part_one_dataset/train_data/2_train_data.tar.pth


In [None]:
lwp_model.prototypes.items()

dict_items([])