<a href="https://colab.research.google.com/github/Aymenayachi/4G-3GPP-ETSI-machine-learning-anomaly-detection/blob/main/notebooks/colab-github-demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.cluster import KMeans
from sklearn.manifold import TSNE
from tqdm import tqdm

# Ensure GPU availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
def generate_uap(model, data_loader, max_iter=50, eps=0.1):
    """
    Generate UAP using DeepFool-like method (as in paper).
    """
    model.eval()
    uap = torch.zeros(1, 3, 32, 32).to(device)  # Adjust input shape for your dataset
    for _ in range(max_iter):
        for x, _ in data_loader:
            x = x.to(device)
            x_perturbed = x + uap
            x_perturbed = torch.clamp(x_perturbed, 0, 1)

            # Compute gradient to maximize misclassification
            x_perturbed.requires_grad = True
            outputs = model(x_perturbed)
            loss = -torch.sum(outputs)  # Maximize loss to fool the model
            model.zero_grad()
            loss.backward()

            grad = x_perturbed.grad.data
            uap = uap + eps * torch.sign(grad.mean(dim=0, keepdim=True))
            uap = torch.clamp(uap, -eps, eps)  # L-infinity constraint
    return uap.detach()

In [5]:
def generate_fingerprints(model, uap, dataset, n_clusters=100):
    """
    Generate fingerprints using K-means clustering on the victim model's last layer.
    """
    # Extract features from the last layer
    features = []
    model.eval()
    with torch.no_grad():
        for x, _ in dataset:
            x = x.to(device)
            feat = model.features(x)  # Adjust to extract features before classification head
            features.append(feat.cpu().numpy())
    features = np.concatenate(features, axis=0)

    # Cluster features using K-means
    kmeans = KMeans(n_clusters=n_clusters)
    cluster_labels = kmeans.fit_predict(features)

    # Select one sample per cluster
    selected_indices = []
    for i in range(n_clusters):
        cluster_samples = np.where(cluster_labels == i)[0]
        selected_indices.append(np.random.choice(cluster_samples))

    # Generate fingerprints
    fingerprints = []
    for idx in selected_indices:
        x, _ = dataset[idx]
        x_perturbed = x + uap
        with torch.no_grad():
            logits_clean = model(x.unsqueeze(0).to(device))
            logits_perturbed = model(x_perturbed.unsqueeze(0).to(device))
        fingerprints.append(torch.cat([logits_clean, logits_perturbed], dim=1))

    return torch.cat(fingerprints, dim=0)

In [6]:
class ContrastiveEncoder(nn.Module):
    def __init__(self, input_dim=200, hidden_dim=128, output_dim=64):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, x):
        return self.net(x)

def contrastive_loss(z1, z2, temperature=0.5):
    """
    Supervised contrastive loss (Eq. 7 in the paper).
    """
    z = torch.cat([z1, z2], dim=0)
    similarity = torch.mm(z, z.T) / temperature
    labels = torch.cat([torch.arange(z1.size(0)) for _ in range(2)], dim=0)
    loss = nn.CrossEntropyLoss()(similarity, labels)
    return loss

# Training loop
encoder = ContrastiveEncoder().to(device)
optimizer = optim.Adam(encoder.parameters(), lr=1e-3)

for epoch in range(100):
    for batch in fingerprint_dataloader:
        # Assume batch contains victim, piracy, and homologous fingerprints
        victim_fp, piracy_fp, homo_fp = batch
        z_victim = encoder(victim_fp)
        z_piracy = encoder(piracy_fp)
        z_homo = encoder(homo_fp)

        # Positive pairs: victim-piracy, Negative pairs: victim-homo
        loss = contrastive_loss(z_victim, z_piracy) + contrastive_loss(z_victim, z_homo, neg_weight=2.0)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

NameError: name 'fingerprint_dataloader' is not defined

In [2]:
# ... [Previous imports and setup code] ...
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision
import torchvision.transforms as transforms
import numpy as np
from sklearn.cluster import KMeans

# ========== Device Setup ==========
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ========== Dataset Preparation ==========
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = DataLoader(trainset, batch_size=64, shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = DataLoader(testset, batch_size=64, shuffle=False, num_workers=2)

# ========== Model Definition ==========
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(64 * 8 * 8, 256),
            nn.ReLU(),
            nn.Linear(256, 10)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x



Files already downloaded and verified
Files already downloaded and verified


In [3]:
# ========== Improved UAP Generation ==========
def generate_uap(model, data_loader, max_iter=50, eps=0.1, overshoot=0.02):
    model.eval()
    uap = torch.zeros(1, 3, 32, 32).to(device)
    for _ in range(max_iter):
        for x, _ in data_loader:
            x = x.to(device)
            x_perturbed = torch.clamp(x + uap, 0, 1)
            x_perturbed.requires_grad = True

            outputs = model(x_perturbed)
            _, preds = torch.max(outputs, 1)

            loss = torch.sum(outputs[:, preds])
            model.zero_grad()
            loss.backward()

            grad = x_perturbed.grad.data
            perturbation = (grad.mean(dim=0, keepdim=True) + 1e-8) * eps
            uap = torch.clamp(uap + perturbation, -eps, eps)

    return uap.detach()

In [4]:
# ========== Enhanced Fingerprint Generation ==========
def generate_fingerprints(model, uap, dataset, n_clusters=100):
    features = []
    model.eval()
    with torch.no_grad():
        for x, _ in DataLoader(dataset, batch_size=100):
            x = x.to(device)
            feat = model.features(x)
            features.append(feat.cpu())
    features = torch.cat(features).numpy()

    kmeans = KMeans(n_clusters=n_clusters)
    kmeans.fit(features)
    cluster_centers = kmeans.cluster_centers_

    selected_indices = [np.argmin(np.linalg.norm(features - center, axis=1)) for center in cluster_centers]

    fingerprints = []
    for idx in selected_indices:
        x, _ = dataset[idx]
        x = x.to(device)
        x_perturbed = torch.clamp(x + uap, 0, 1)

        with torch.no_grad():
            logits_clean = model(x.unsqueeze(0))
            logits_perturbed = model(x_perturbed.unsqueeze(0))

        fingerprints.append(torch.cat([logits_clean, logits_perturbed], dim=1))

    return torch.cat(fingerprints, dim=0)

In [5]:
# ========== Model Training Functions ==========
def train_model(model, train_loader, epochs=10):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
        print(f'Epoch {epoch+1}, Loss: {running_loss/len(train_loader):.4f}')
    return model

# ========== Contrastive Encoder ==========
class ContrastiveEncoder(nn.Module):
    def __init__(self):
        super(ContrastiveEncoder, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(20, 128),
            nn.ReLU(),
            nn.Linear(128, 64)
        )

    def forward(self, x):
        return self.fc(x)



In [6]:
# ========== Main Workflow ==========
if __name__ == "__main__":
    victim_model = SimpleCNN().to(device)
    piracy_model = SimpleCNN().to(device)
    homo_model = SimpleCNN().to(device)

    print("Training victim model...")
    victim_model = train_model(victim_model, trainloader, epochs=10)

    print("Training homologous model...")
    homo_model = train_model(homo_model, trainloader, epochs=10)

    print("Generating UAP...")
    victim_uap = generate_uap(victim_model, trainloader)

    print("Generating fingerprints...")
    victim_fps = generate_fingerprints(victim_model, victim_uap, trainset)
    piracy_fps = generate_fingerprints(piracy_model, victim_uap, trainset)
    homo_fps = generate_fingerprints(homo_model, victim_uap, trainset)

    class FingerprintDataset(Dataset):
        def __init__(self, victim_fps, piracy_fps, homo_fps):
            self.positive_pairs = [(victim_fps[i], piracy_fps[i]) for i in range(len(victim_fps))]
            self.negative_pairs = [(victim_fps[i], homo_fps[i]) for i in range(len(victim_fps))]
            self.all_pairs = self.positive_pairs + self.negative_pairs
            self.labels = [1]*len(self.positive_pairs) + [0]*len(self.negative_pairs)

        def __len__(self):
            return len(self.all_pairs)

        def __getitem__(self, idx):
            return self.all_pairs[idx], self.labels[idx]

    dataset = FingerprintDataset(victim_fps, piracy_fps, homo_fps)
    fingerprint_dataloader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=4)

    encoder = ContrastiveEncoder().to(device)
    optimizer = optim.Adam(encoder.parameters(), lr=1e-3)

    for epoch in range(50):
        total_loss = 0
        for (anchor, compare), labels in fingerprint_dataloader:
            anchor, compare, labels = anchor.to(device), compare.to(device), labels.to(device)

            z_anchor = encoder(anchor)
            z_compare = encoder(compare)

            logits = torch.cosine_similarity(z_anchor, z_compare, dim=-1)
            loss = nn.BCEWithLogitsLoss()(logits, labels.float())

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f'Epoch {epoch+1}, Loss: {total_loss/len(fingerprint_dataloader):.4f}')

    def verify_ownership(suspect_model, victim_uap, encoder, threshold=0.8):
        suspect_fps = generate_fingerprints(suspect_model, victim_uap, testset)
        with torch.no_grad():
            z_victim = encoder(victim_fps.to(device))
            z_suspect = encoder(suspect_fps.to(device))
            similarities = torch.cosine_similarity(z_victim, z_suspect, dim=1)
        return similarities.mean().item() > threshold

    print("\nVerification results:")
    print(f"Piracy model: {verify_ownership(piracy_model, victim_uap, encoder)}")
    print(f"Homologous model: {verify_ownership(homo_model, victim_uap, encoder)}")


Training victim model...
Epoch 1, Loss: 2.0818
Epoch 2, Loss: 1.6856
Epoch 3, Loss: 1.4857
Epoch 4, Loss: 1.3577
Epoch 5, Loss: 1.2727
Epoch 6, Loss: 1.1988
Epoch 7, Loss: 1.1371
Epoch 8, Loss: 1.0795
Epoch 9, Loss: 1.0298
Epoch 10, Loss: 0.9833
Training homologous model...
Epoch 1, Loss: 2.0603
Epoch 2, Loss: 1.6724
Epoch 3, Loss: 1.4670
Epoch 4, Loss: 1.3443
Epoch 5, Loss: 1.2617
Epoch 6, Loss: 1.2005
Epoch 7, Loss: 1.1418
Epoch 8, Loss: 1.0878
Epoch 9, Loss: 1.0391
Epoch 10, Loss: 0.9934
Generating UAP...
Generating fingerprints...


ValueError: Found array with dim 4. KMeans expected <= 2.