In [1]:
import pandas as pd
import torch
import torch.nn as nn

from torch.utils.data import Dataset
from torchvision import models
from typing import Tuple

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BATCH_SIZE = 1
MEMBERSHIP_DATASET_PATH = "data/priv_out.pt"       # Path to priv_out_.pt
MIA_CKPT_PATH = "data/01_MIA_69.pt"                 # Path to 01_MIA_69.pt
PUB_DATASET_PATH = "data/pub.pt"              # Path to pub.pt


allowed_models = {
    "resnet18": models.resnet18,
    "resnet34": models.resnet34,
    "resnet50": models.resnet50,
}


class TaskDataset(Dataset):
    def __init__(self, transform=None):

        self.ids = []
        self.imgs = []
        self.labels = []

        self.transform = transform

    def __getitem__(self, index) -> Tuple[int, torch.Tensor, int]:
        id_ = self.ids[index]
        img = self.imgs[index]
        if not self.transform is None:
            img = self.transform(img)
        label = self.labels[index]
        return id_, img, label

    def __len__(self):
        return len(self.ids)


class MembershipDataset(TaskDataset):
    def __init__(self, transform=None):
        super().__init__(transform)
        self.membership = []

    def __getitem__(self, index) -> Tuple[int, torch.Tensor, int, int]:
        id_, img, label = super().__getitem__(index)
        return id_, img, label, self.membership[index]

def get_dataset(dataset_path):
    from torchvision import transforms
    
    MEAN = [0.2980, 0.2962, 0.2987]
    STD = [0.2886, 0.2875, 0.2889]
    
    transform = transforms.Compose([
        transforms.Normalize(mean=MEAN, std=STD)  # Apply normalization
    ])

    with torch.serialization.safe_globals([MembershipDataset]):
        dataset: MembershipDataset = torch.load(dataset_path)
        dataset.transform = transform

    for i in range(len(dataset)):
            if dataset.membership[i] is None:
                dataset.membership[i] = 0
    
    return dataset

def inference_dataloader(dataset: MembershipDataset, batch_size):
    
    return torch.utils.data.DataLoader(dataset, batch_size, shuffle=False)


def load_model(model_name, model_path):
    try:
        model: nn.Module = allowed_models[model_name](weights=None)
        model.fc = nn.Linear(model.fc.weight.shape[1], 44)
    except Exception as e:
        raise Exception(
            f"Invalid model class, {e}, only {allowed_models.keys()} are allowed"
        )

    try:
        model_state_dict = torch.load(model_path, map_location=DEVICE, weights_only=False)
        model.load_state_dict(model_state_dict, strict=True)
        model.eval()
    except Exception as e:
        raise Exception(f"Invalid model, {e}")

    return model

def get_loss_value(model, img, label):
    criterion = nn.CrossEntropyLoss()
    output = model(img)
    loss = criterion(output, label)
    return loss.item()


In [2]:
def get_loss_value(model, img, label):
    criterion = nn.CrossEntropyLoss()
    output = model(img)
    loss = criterion(output, label)
    return loss.item()

from tqdm import tqdm
import numpy as np
from scipy.stats import gaussian_kde

class MembershipScoreCalculation:
    def __init__(self, in_losses, out_losses, prior_in=0.5, prior_out=0.5):
            # Convert inputs to numpy arrays and apply transformations
            in_losses = self.apply_transformations(np.array(in_losses))
            out_losses = self.apply_transformations(np.array(out_losses))

            # Check if we have enough data for KDE
            if in_losses.size <= 1 or out_losses.size <= 1:
                self.use_kde = False
                self.kde_in = self.kde_out = None  # No KDE available
                self.in_mean = in_losses.mean() if in_losses.size > 0 else 0.0
                self.out_mean = out_losses.mean() if out_losses.size > 0 else 0.0
            else:
                self.use_kde = True
                self.kde_in = gaussian_kde(in_losses)
                self.kde_out = gaussian_kde(out_losses)

            self.prior_in = prior_in
            self.prior_out = prior_out

    @staticmethod
    def apply_transformations(loss):
        # check if any of loss values is nan
        loss = np.exp(-loss)
        eps = 1e-9  
        loss = np.clip(loss, eps, 1 - eps)
        loss = np.log(loss / (1 - loss))
        return loss
    
    def get_score(self, loss: float) -> float:
        """
        Oblicza prawdopodobieństwo, że loss pochodzi z "in".
        :param loss: Wartość loss do oceny.
        :return: Prawdopodobieństwo przynależności do "in" w zakresie [0, 1].
        """
        loss = self.apply_transformations(loss)
        
        if not self.use_kde:
            if hasattr(self, 'in_mean') and hasattr(self, 'out_mean'):
                    midpoint = (self.in_mean + self.out_mean) / 2 if self.in_mean != self.out_mean else 0.0
                    score = 1 / (1 + np.exp(loss - midpoint))  # Sigmoid function
                    return float(np.clip(score, 0, 1))
            else:
                # If no data at all, map transformed_loss to [0, 1] directly
                score = 1 / (1 + np.exp(loss))  # Sigmoid without reference point
                return float(np.clip(score, 0, 1))

        p_in = self.kde_in(loss) * self.prior_in
        p_out = self.kde_out(loss) * self.prior_out
        
        if p_in + p_out == 0:
            return 0.5 
        
        res = p_in / (p_in + p_out)
        return res[0]




In [4]:

model = load_model(model_name="resnet18", model_path=MIA_CKPT_PATH)


class_losses_in = {c: [] for c in range(44)}
class_losses_out = {c: [] for c in range(44)}

dataset_pub = get_dataset(PUB_DATASET_PATH)
dataloader_pub = inference_dataloader(dataset_pub, BATCH_SIZE)

for id_, img, label, membership in tqdm(dataloader_pub):
    loss = get_loss_value(model, img, label)
    if membership == 1:
        class_losses_in[label.item()].append(loss)
    else:
        class_losses_out[label.item()].append(loss)

class_scorers = dict()
for i in range(44):
    print(f"Class {i}")
    class_scorers[i] = MembershipScoreCalculation(class_losses_in[i], class_losses_out[i])

Exception: Invalid model, PytorchStreamReader failed reading zip archive: failed finding central directory

In [3]:
def membership_prediction(model, dataset_path):
    dataloader = inference_dataloader(get_dataset(dataset_path), BATCH_SIZE)

    outputs_list = []

    for _, img, label, _ in tqdm(dataloader):
        img = img.to(DEVICE)

        with torch.no_grad():
            loss = get_loss_value(model, img, label)
        
        membership_score = [class_scorers[label.item()].get_score(loss)]

        outputs_list += membership_score

    return pd.DataFrame(
        {
            "ids": dataset.ids,
            "score": outputs_list,
        }
    )


if __name__ == '__main__':
    model = load_model(model_name="resnet18", model_path=MIA_CKPT_PATH)
    preds = membership_prediction(model, MEMBERSHIP_DATASET_PATH)
    preds.to_csv("data/pub_submission.csv", index=False)

    print("Outputs saved to pub_submission.csv")

Exception: Invalid model, PytorchStreamReader failed reading zip archive: failed finding central directory

In [None]:

result = requests.post(
    URL,
    headers={"token": TOKEN},
    files={
        "csv_file": ("submission.csv", open("./submission.csv", "rb"))
    }
)

print(result.status_code, result.text)

In [11]:
class_scorers = dict()
for i in range(44):
    print(f"Class {i}")
    class_scorers[i] = MembershipScoreCalculation(class_losses_in[i], class_losses_out[i])



Class 0
Class 1
Class 2
Class 3
Class 4
Class 5
Class 6
Class 7
Class 8
Class 9
Class 10
Class 11
Class 12
Class 13
Class 14
Class 15
Class 16
Class 17
Class 18
Class 19
Class 20
Class 21


ValueError: `dataset` input should have multiple elements.

In [114]:
import numpy as np
from scipy.stats import gaussian_kde

class MembershipScoreCalculation:
    def __init__(self, in_losses, out_losses, prior_in=0.5, prior_out=0.5):

        in_losses = self.apply_transformations(np.array(in_losses))
        out_losses = self.apply_transformations( np.array(out_losses))

        self.kde_in = gaussian_kde(in_losses)
        self.kde_out = gaussian_kde(out_losses)

        self.prior_in = prior_in
        self.prior_out = prior_out

    @staticmethod
    def apply_transformations(loss):
        # check if any of loss values is nan
        loss = np.exp(-loss)
        eps = 1e-9  
        loss = np.clip(loss, eps, 1 - eps)
        loss = np.log(loss / (1 - loss))
        return loss
    
    def get_score(self, loss: float) -> float:
        """
        Oblicza prawdopodobieństwo, że loss pochodzi z "in".
        :param loss: Wartość loss do oceny.
        :return: Prawdopodobieństwo przynależności do "in" w zakresie [0, 1].
        """
        loss = self.apply_transformations(loss)

        p_in = self.kde_in(loss) * self.prior_in
        p_out = self.kde_out(loss) * self.prior_out
        
        if p_in + p_out == 0:
            return 0.5 
        
        return p_in / (p_in + p_out)

# Przykład użycia:
in_losses = np.random.normal(3, 4, 1000)  # Symulowane straty dla "in"
out_losses = np.random.normal(5.7, 4, 1000)  # Symulowane straty dla "out"

calculator = MembershipScoreCalculation(in_losses, out_losses)

loss_value = 4
prob_in = calculator.get_score(loss_value)
print(prob_in)


[0.4629805]


In [47]:
membership

tensor([1])

In [None]:
import torch.nn.functional as F

# Assuming label is a tensor with class indices
num_classes = 44  # Number of classes in your dataset
one_hot_label = F.one_hot(label, num_classes=num_classes)
print(one_hot_label)

In [56]:
import torch.nn.functional as F

softmax_output = F.softmax(output, dim=1)
print(softmax_output)

tensor([[1.0114e-09, 1.6680e-08, 3.8089e-11, 1.0310e-10, 1.3761e-06, 8.8811e-14,
         2.4352e-06, 9.8979e-01, 2.9824e-05, 4.0334e-13, 3.4454e-09, 7.0554e-03,
         1.8168e-13, 6.5499e-08, 7.0310e-08, 1.4095e-07, 4.9365e-10, 1.3497e-09,
         3.9538e-04, 6.9777e-09, 1.4155e-08, 1.7135e-10, 3.2620e-06, 1.8935e-08,
         1.3199e-09, 2.1556e-03, 1.2281e-07, 1.1705e-10, 1.8560e-08, 1.0162e-06,
         4.4139e-16, 6.7755e-12, 1.6703e-07, 8.4817e-05, 6.9779e-05, 4.4324e-10,
         1.7834e-11, 4.0862e-06, 2.6119e-05, 3.8731e-06, 1.4408e-07, 6.1190e-12,
         1.2935e-09, 3.7189e-04]], grad_fn=<SoftmaxBackward0>)


In [54]:
output

tensor([[-15.0871, -12.2842, -18.3662, -17.3704,  -7.8714, -24.4274,  -7.3006,
           5.6146,  -4.7953, -22.9141, -13.8614,   0.6709, -23.7117, -10.9164,
         -10.8455, -10.1500, -15.8043, -14.7985,  -2.2108, -13.1557, -12.4483,
         -16.8624,  -7.0083, -12.1574, -14.8208,  -0.5148, -10.2877, -17.2435,
         -12.1774,  -8.1745, -29.7317, -20.0928,  -9.9802,  -3.7501,  -3.9453,
         -15.9120, -19.1250,  -6.7830,  -4.9280,  -6.8366, -10.1280, -20.1947,
         -14.8411,  -2.2720]], grad_fn=<AddmmBackward0>)