In [2]:
import os
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from models.model import SmallCNN
from poisoning import PoisoningAttackCNN
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
import torch

DATA_DIR = './train'
CLASSES = ['0', '1', '2']
IMG_SIZE = (32, 32) 

X = []
y = []

for class_label in CLASSES:
    class_dir = os.path.join(DATA_DIR, class_label)
    for fname in sorted(os.listdir(class_dir)): 
        if fname.endswith('.png'):
            img_path = os.path.join(class_dir, fname)
            img = Image.open(img_path).resize(IMG_SIZE).convert('RGB')
            img_array = np.array(img).flatten() 
            X.append(img_array)
            y.append(int(class_label))

X = np.array(X, dtype=np.uint8)
y = np.array(y, dtype=np.uint8)

indices = np.random.permutation(len(X))
X = X[indices]
y = y[indices]

class0=0
class1=1
class2=2

class0_xdata=X[y==class0]
class1_xdata=X[y==class1]
class2_xdata=X[y==class2]
class0_ydata=y[y==class0]
class1_ydata=y[y==class1]
class2_ydata=y[y==class2]

X0_tensor = torch.tensor(class0_xdata, dtype=torch.float32).reshape(-1, 3, 32, 32) / 255.0
y0_tensor = torch.tensor(class0_ydata, dtype=torch.long)
X1_tensor = torch.tensor(class1_xdata, dtype=torch.float32).reshape(-1, 3, 32, 32) / 255.0
y1_tensor = torch.tensor(class1_ydata, dtype=torch.long)
X2_tensor = torch.tensor(class2_xdata, dtype=torch.float32).reshape(-1, 3, 32, 32) / 255.0   
y2_tensor = torch.tensor(class2_ydata, dtype=torch.long)


In [3]:
import torch 
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

class SmallCNN(nn.Module):
    def __init__(self, num_classes=2):
        super(SmallCNN, self).__init__()
        
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.pool = nn.MaxPool2d(2, 2)

        self.fc1 = nn.Linear(64 * 8 * 8, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, num_classes)
        
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool(x)
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool(x)
        
        x = x.view(x.size(0), -1)
        
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        
        return x

In [7]:
def train_eval_test_model(X_train, y_train, X_test, y_test):
    model = SmallCNN(num_classes=3)
    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    batch_size = 64
    epochs = 50

    train_indices = torch.randperm(X_train.size(0))
    X_train = X_train[train_indices]
    y_train = y_train[train_indices]

    model.train()
    for epoch in range(epochs):
        for i in range(0, len(X_train), batch_size):
            x_batch = X_train[i:i+batch_size]
            y_batch = y_train[i:i+batch_size]

            optimizer.zero_grad()
            outputs = model(x_batch)
            loss = loss_fn(outputs, y_batch)
            loss.backward()
            optimizer.step()

    model.eval()
    with torch.no_grad():
        outputs = model(X_test)
        _, predicted = torch.max(outputs, 1)
        accuracy = (predicted == y_test).sum().item() / len(y_test)
        return accuracy * 100


X_train = X[:4000]
y_train = y[:4000]
X_test = X[4000:]
y_test = y[4000:]

def original():
    return train_eval_test_model(
        torch.tensor(X_train, dtype=torch.float32).reshape(-1, 3, 32, 32) / 255.0,
        torch.tensor(y_train, dtype=torch.long),
        torch.tensor(X_test, dtype=torch.float32).reshape(-1, 3, 32, 32) / 255.0,
        torch.tensor(y_test, dtype=torch.long)
    )

def random_flip():
    poison_indices = np.random.choice(len(X_train), 400, replace=False)
    X_poisoned = X_train[poison_indices]
    y_poisoned = y_train[poison_indices].copy()
    y_poisoned = np.array([np.random.choice(list(set(range(3)) - {label})) for label in y_poisoned])
    X_train_poisoned = np.concatenate([X_train, X_poisoned], axis=0)
    y_train_poisoned = np.concatenate([y_train, y_poisoned], axis=0)

    return train_eval_test_model(
        torch.tensor(X_train_poisoned, dtype=torch.float32).reshape(-1, 3, 32, 32) / 255.0,
        torch.tensor(y_train_poisoned, dtype=torch.long),
        torch.tensor(X_test, dtype=torch.float32).reshape(-1, 3, 32, 32) / 255.0,
        torch.tensor(y_test, dtype=torch.long)
    )

def random_flip_12_21():
    poison_indices = np.random.choice(np.where((y_train == 1) | (y_train == 2))[0], 400, replace=False)
    X_poisoned = X_train[poison_indices]
    y_poisoned = y_train[poison_indices].copy()
    y_poisoned = np.where(y_poisoned == 1, 2, 1)
    X_train_poisoned = np.concatenate([X_train, X_poisoned], axis=0)
    y_train_poisoned = np.concatenate([y_train, y_poisoned], axis=0)

    return train_eval_test_model(
        torch.tensor(X_train_poisoned, dtype=torch.float32).reshape(-1, 3, 32, 32) / 255.0,
        torch.tensor(y_train_poisoned, dtype=torch.long),
        torch.tensor(X_test, dtype=torch.float32).reshape(-1, 3, 32, 32) / 255.0,
        torch.tensor(y_test, dtype=torch.long)
    )

def random_flip_12():
    poison_indices = np.random.choice(np.where(y_train == 1)[0], 400, replace=False)
    X_poisoned = X_train[poison_indices]
    y_poisoned = np.full(400, 2)
    X_train_poisoned = np.concatenate([X_train, X_poisoned], axis=0)
    y_train_poisoned = np.concatenate([y_train, y_poisoned], axis=0)

    return train_eval_test_model(
        torch.tensor(X_train_poisoned, dtype=torch.float32).reshape(-1, 3, 32, 32) / 255.0,
        torch.tensor(y_train_poisoned, dtype=torch.long),
        torch.tensor(X_test, dtype=torch.float32).reshape(-1, 3, 32, 32) / 255.0,
        torch.tensor(y_test, dtype=torch.long)
    )

def cnn_poisoned():
    CLASSES = ['0', '1']
    IMG_SIZE = (32, 32) 

    X_poisoned = []
    y_poisoned = []

    for class_label in CLASSES:
        class_dir = os.path.join("./poisoned/cnn-0", class_label)
        for fname in sorted(os.listdir(class_dir)): 
            if fname.endswith('.png'):
                img_path = os.path.join(class_dir, fname)
                img = Image.open(img_path).resize(IMG_SIZE).convert('RGB')
                img_array = np.array(img).flatten() 
                X_poisoned.append(img_array)
                y_poisoned.append(int(class_label))

    X_poisoned = np.array(X_poisoned, dtype=np.uint8)
    y_poisoned = np.array(y_poisoned, dtype=np.uint8) + 1

    indices = np.random.permutation(len(X_poisoned))
    X_poisoned = X_poisoned[indices]
    y_poisoned = y_poisoned[indices]
    
    X_train_poisoned = np.concatenate([X_train, X_poisoned], axis=0)
    y_train_poisoned = np.concatenate([y_train, y_poisoned], axis=0)

    return train_eval_test_model(
        torch.tensor(X_train_poisoned, dtype=torch.float32).reshape(-1, 3, 32, 32) / 255.0,
        torch.tensor(y_train_poisoned, dtype=torch.long),
        torch.tensor(X_test, dtype=torch.float32).reshape(-1, 3, 32, 32) / 255.0,
        torch.tensor(y_test, dtype=torch.long)
    )


# Confidence-based flipping
# acc_conf = train_eval_test_model(
#     torch.tensor(np.concatenate([X_train, X_train_sorted.reshape(10000, 3072)[:400]]), dtype=torch.float32).reshape(-1, 3, 32, 32) / 255.0,
#     torch.tensor(np.concatenate([y_train, 2 - y_train_sorted[:400]]), dtype=torch.long),
#     torch.tensor(X_test, dtype=torch.float32).reshape(-1, 3, 32, 32) / 255.0,
#     torch.tensor(y_test, dtype=torch.long)
# )




# print(f"Original Model Accuracy: {acc_original:.2f}%")                              # 81.47, 82.97, 82.73, 80.70, 82.54, 82.27, 82.80, 83.33, 83.90
# print(f"Randomly Flipped Labels Accuracy: {acc_random_flip:.2f}%")                  # 79.63, 80.87, 73.73, 78.90, 79.43, 80.10, 78.80, 78.80, 78.87
# print(f"Flipped Class 1 to 2 and 2 to 1 Accuracy: {acc_flip_1_2_both_ways:.2f}%")   # 81.87, 78.63, 81.77, 81.53, 80.40, 79.83, 81.50, 80.20, 80.63
# print(f"Flipped Class 1 to 2 Accuracy: {acc_flip_1_to_2:.2f}%")                     # 83.67, 81.97, 81.93, 77.63, 82.53, 81.50, 72.77, 79.93, 74.23
# print(f"Confidence-based Flipping Accuracy: {acc_conf:.2f}%")                       # 83.27, 83.40, 78.10, 83.03, 82.40, 81.30, 82.67, 80.37, 82.67

# 
# orig = [ 78.49, 78.39, 80.32, 76.22, 79.11, 76.08, 73.95, 77.13, 79.37 ]
# rand = [ 73.94, 76.88, 75.12, 76.30, 70.46, 75.34, 75.95, 77.50, 77.65 ]
# flip_12_21 = [ 77.65, 77.84, 74.35, 71.12, 77.34, 77.53, 75.98, 68.22, 72.92 ]
# flip_12 = [ 79.46, 76.75, 78.24, 79.16, 78.50, 73.03, 78.51, 75.58, 79.20 ]
cnn_poison = []
# 78.91, 79.15, 77.71, 77.27, 78.41, 79.81, 77.57, 76.46, 76.93

for i in range(500):
    # orig_i = original()
    # print(f"Original Model Accuracy: {orig_i:.2f}%")
    # orig.append(orig_i)

    # rand_i = random_flip()
    # print(f"Randomly Flipped Labels Accuracy: {rand_i:.2f}%")
    # rand.append(rand_i)

    # flip_12_21_i = random_flip_12_21()
    # print(f"Flipped Class 1 to 2 and 2 to 1 Accuracy: {flip_12_21_i:.2f}%")
    # flip_12_21.append(flip_12_21_i)

    # flip_12_i = random_flip_12()
    # print(f"Flipped Class 1 to 2 Accuracy: {flip_12_i:.2f}%")
    # flip_12.append(flip_12_i)

    cnn_poison_i = cnn_poisoned()
    print(f"CNN Poisoned Model Accuracy: {cnn_poison_i:.2f}%")
    cnn_poison.append(cnn_poison_i)

    # TODO loss zu jeder klasse berechnen

# with open('results2.txt', 'w') as f:
#     for i, arr in enumerate([orig, rand, flip_12_21, flip_12], start=1):
#         f.write(f"arr{i} = {arr}\n")
#         mean = np.mean(arr)
#         std = np.std(arr)
#         leng = len(arr)
#         print(f"arr{i} - mean: {mean:.2f}, std: {std:.2f}, length: {leng}")

    

CNN Poisoned Model Accuracy: 80.21%
CNN Poisoned Model Accuracy: 78.95%
CNN Poisoned Model Accuracy: 78.72%
CNN Poisoned Model Accuracy: 80.50%
CNN Poisoned Model Accuracy: 78.65%
CNN Poisoned Model Accuracy: 73.39%
CNN Poisoned Model Accuracy: 75.03%
CNN Poisoned Model Accuracy: 75.04%
CNN Poisoned Model Accuracy: 78.57%
CNN Poisoned Model Accuracy: 78.96%
CNN Poisoned Model Accuracy: 76.38%
CNN Poisoned Model Accuracy: 70.76%
CNN Poisoned Model Accuracy: 77.54%
CNN Poisoned Model Accuracy: 78.72%
CNN Poisoned Model Accuracy: 76.82%
CNN Poisoned Model Accuracy: 77.78%
CNN Poisoned Model Accuracy: 78.24%
CNN Poisoned Model Accuracy: 76.91%
CNN Poisoned Model Accuracy: 78.88%
CNN Poisoned Model Accuracy: 78.37%
CNN Poisoned Model Accuracy: 76.48%
CNN Poisoned Model Accuracy: 73.79%
CNN Poisoned Model Accuracy: 77.49%
CNN Poisoned Model Accuracy: 73.35%
CNN Poisoned Model Accuracy: 76.74%
CNN Poisoned Model Accuracy: 78.89%
CNN Poisoned Model Accuracy: 71.20%
CNN Poisoned Model Accuracy:

KeyboardInterrupt: 

In [8]:
with open('results3.txt', 'w') as f:
    arr = cnn_poison
    f.write(f"arr = {arr}\n")
    mean = np.mean(arr)
    std = np.std(arr)
    leng = len(arr)
    print(f"arr - mean: {mean:.2f}, std: {std:.2f}, length: {leng}")

arr - mean: 76.92, std: 2.47, length: 131


In [None]:
import torch 
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

device = "cpu"
model = SmallCNN(num_classes=2).to(device)
X_train = torch.cat((X1_tensor, X2_tensor), dim=0)
y_train = torch.cat((y1_tensor, y2_tensor), dim=0) - 1

train_indices = torch.randperm(X_train.size(0))
X_train = X_train[train_indices]
y_train = y_train[train_indices]

loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
batch_size = 64
epochs = 50

model.train()
for epoch in range(epochs):
    for i in range(0, len(X_train), batch_size):
        x_batch = X_train[i:i+batch_size]
        y_batch = y_train[i:i+batch_size]

        optimizer.zero_grad()
        outputs = model(x_batch)
        loss = loss_fn(outputs, y_batch)
        loss.backward()
        optimizer.step()


In [None]:
model.eval()
loss_fn = nn.CrossEntropyLoss(reduction='none')

with torch.no_grad():
    outputs = model(X_train)
    losses = loss_fn(outputs, y_train)
    probs = F.softmax(outputs, dim=1)
    confidences, _ = torch.max(probs, dim=1)

loss_scores = losses.numpy()
confidence_scores = confidences.numpy()

sorted_indices = torch.argsort(confidences)

X_train_sorted = X_train[sorted_indices]
y_train_sorted = y_train[sorted_indices]
confidence_scores_sorted = confidence_scores[sorted_indices]
loss_scores_sorted = loss_scores[sorted_indices]

In [None]:
import os
from PIL import Image
import numpy as np

base_dir = './poisoned/conf-flip'

for i in range(1500):
    img = X_train_sorted[i].reshape(32,32,3)

    if isinstance(img, torch.Tensor):
        img = img.detach().cpu().numpy()

    if img.ndim == 4:
        img = img.squeeze(0)
    if img.ndim == 3 and img.shape[0] in [1, 3]:
        img = np.transpose(img, (1, 2, 0))

    if img.ndim == 1 and img.shape[0] == 3072:
        img = img.reshape(3, 32, 32).transpose(1, 2, 0)

    if img.dtype in [np.float32, np.float64]:
        img = (img * 255).clip(0, 255).astype(np.uint8)

    img = Image.fromarray(img)

    label = y_train_sorted[i].item()
    save_class = 2 - label

    save_dir = os.path.join(base_dir, str(save_class))
    os.makedirs(save_dir, exist_ok=True)
    img.save(os.path.join(save_dir, f'{i:05d}.png'))