In [None]:
# ================================================================
# VGG-16 on CIFAR-10 + Filter Fault Injection Benchmark (JSON Logging)
# ================================================================
# colab env
#!pip install --quiet torch torchvision tqdm

import math, random, os, pathlib, torch, torch.nn as nn, torch.optim as optim
import torchvision, torchvision.transforms as T
from torch.utils.data import DataLoader
from tqdm import tqdm
import json

# ----------------------------- CONFIG ---------------------------
NUM_EPOCHS        = 1
BATCH_SIZE        = 128
LR                = 0.1
DEVICE            = 'cuda' if torch.cuda.is_available() else 'cpu'

FAULT_PERCENT     = 40          # % of *all* filters to random-re-draw
RANDOM_SEED       = 42
PRETRAINED_PATH   = ''          # leave empty to train from scratch
SAVE_CHECKPOINT_TO = "model/baseline_vgg16.pt"
torch.manual_seed(RANDOM_SEED)
random.seed(RANDOM_SEED)

# -------------------------- DATASET -----------------------------
transform_train = T.Compose([
    T.RandomHorizontalFlip(),
    T.RandomCrop(32, padding=4),
    T.ToTensor(),
    T.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
transform_test  = T.Compose([
    T.ToTensor(),
    T.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
testset  = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)

train_loader = DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True,  num_workers=2, pin_memory=True)
test_loader  = DataLoader(testset,  batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)

# --------------------------- MODEL ------------------------------
def make_vgg16():
    # VGG-16 without pretraining
    vgg16 = torchvision.models.vgg16_bn(pretrained=False)

    # Replace first conv to adapt CIFAR-10 input (3x32x32)
    vgg16.features[0] = nn.Conv2d(3, 64, kernel_size=3, padding=1)

    # Add Adaptive Pooling to get fixed output shape
    vgg16.avgpool = nn.AdaptiveAvgPool2d((1, 1))  # Output will be [B, 512, 1, 1]

    # Replace classifier
    vgg16.classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(512, 512),
        nn.ReLU(True),
        nn.Dropout(),
        nn.Linear(512, 512),
        nn.ReLU(True),
        nn.Dropout(),
        nn.Linear(512, 10),  # 10 CIFAR-10 classes
    )
    return vgg16


model = make_vgg16().to(DEVICE)

# ---------------------- TRAIN / LOAD ----------------------------
def accuracy(net, loader):
    net.eval()
    correct = total = 0
    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            preds = net(images).argmax(1)
            correct += (preds == labels).sum().item()
            total   += labels.size(0)
    return 100. * correct / total

if PRETRAINED_PATH and pathlib.Path(PRETRAINED_PATH).exists():
    model.load_state_dict(torch.load(PRETRAINED_PATH, map_location=DEVICE))
    print(f'Loaded pretrained weights from {PRETRAINED_PATH}')
else:
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=LR, momentum=0.9, weight_decay=5e-4)
    scheduler = optim.lr_scheduler.MultiStepLR(optimizer,
                    milestones=[NUM_EPOCHS//2, int(NUM_EPOCHS*0.75)], gamma=0.1)

    for epoch in range(NUM_EPOCHS):
        model.train()
        pbar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{NUM_EPOCHS}')
        for images, labels in pbar:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            loss = criterion(model(images), labels)
            loss.backward()
            optimizer.step()
            pbar.set_postfix({'loss': f'{loss.item():.3f}'})
        scheduler.step()

    torch.save(model.state_dict(), SAVE_CHECKPOINT_TO)
    print(f'Saved checkpoint to {SAVE_CHECKPOINT_TO}')

base_acc = accuracy(model, test_loader)
print(f'\nBaseline accuracy: {base_acc:5.2f} %')

# ------------------ FILTER FAULT INJECTION ----------------------
import json, math, random, torch
from torch import nn

def redraw_filters(
    net,
    percent: float,
    sigma: float = 0.05,
    save_json_path: str = "content/filter_faults.json",
    deterministic: bool = False,   # ← set to True if you *want* repeatability
):
    """
    Redraw `percent` % of filters *in every Conv2d layer* from 𝒩(0,σ²).
    Logs all changes to JSON and prints one example per layer.
    """
    if deterministic:
        random.seed(42)           # or any constant you like
        torch.manual_seed(42)
    else:
        random.seed()             # system-time seed → different each run
        torch.random.manual_seed(torch.randint(0, 2**31, ()).item())

    changed_filters = []
    conv_layers = [m for m in net.modules() if isinstance(m, nn.Conv2d)]

    for layer_idx, layer in enumerate(conv_layers):
        n_filters = layer.weight.size(0)
        n_fault   = math.floor(n_filters * percent / 100 + 1e-6)
        if n_fault == 0:                         # skip tiny layers
            continue

        idx_list = random.sample(range(n_filters), n_fault)

        for idx in idx_list:
            original = layer.weight[idx].detach().cpu().numpy().tolist()
            noise    = torch.randn_like(layer.weight[idx]) * sigma
            layer.weight.data[idx] = noise
            modified = layer.weight[idx].detach().cpu().numpy().tolist()

            changed_filters.append({
                "layer"         : layer_idx,
                "filter_index"  : idx,
                "original_filter": original,
                "modified_filter": modified,
            })

    # ---------- save ----------
    with open(save_json_path, "w") as f:
        json.dump(changed_filters, f, indent=2)

    # ---------- nicer preview ----------
    print(f"\nSaved filter fault info to: {save_json_path}")
    print("Per-layer injection preview (one example each):")
    seen = set()
    for entry in changed_filters:
        L = entry['layer']
        if L in seen:                     # already showed one for this layer
            continue
        seen.add(L)
        print(f"\nLayer {L} | Filter {entry['filter_index']}")
        print("Original[0][0][:3] →", entry['original_filter'][0][0][:3])
        print("Modified[0][0][:3] →", entry['modified_filter'][0][0][:3])
        if len(seen) == 3:                # don’t spam the console
            break
    print(f"\nTotal layers affected: {len(seen)}/{len(conv_layers)}")
    print(f"Total filters modified: {len(changed_filters)}")




print(f'\nInjecting random-re-draw fault into {FAULT_PERCENT} % of all filters …')
redraw_filters(model, FAULT_PERCENT, save_json_path='filter_faults.json')
faulty_acc = accuracy(model, test_loader)
print(f'Post-fault accuracy: {faulty_acc:5.2f} %')

print(f'\nAccuracy drop: {base_acc - faulty_acc:5.2f} percentage points')
# save the faulty model (badan line ro avaz kon)
torch.save(model.state_dict(), "model/faulty_vgg16.pt")



Files already downloaded and verified
Files already downloaded and verified


Epoch 1/1: 100%|██████████| 391/391 [01:03<00:00,  6.12it/s, loss=1.891]


Saved checkpoint to model/baseline_vgg16.pt

Baseline accuracy: 33.80 %

Injecting random-re-draw fault into 40 % of all filters …

Saved filter fault info to: filter_faults.json
Per-layer injection preview (one example each):

Layer 0 | Filter 61
Original[0][0][:3] → [-0.0362093560397625, -0.1769891232252121, 0.17965148389339447]
Modified[0][0][:3] → [0.04680362343788147, -0.12242104113101959, -0.06647156924009323]

Layer 1 | Filter 27
Original[0][0][:3] → [-0.08169501274824142, -0.03102574683725834, -0.11332571506500244]
Modified[0][0][:3] → [0.054369598627090454, -0.015161635354161263, -0.10016524791717529]

Layer 2 | Filter 11
Original[0][0][:3] → [-0.0022287238389253616, 0.03973657637834549, -0.047957923263311386]
Modified[0][0][:3] → [-0.048091307282447815, 0.10355325788259506, 0.005390446167439222]

Total layers affected: 3/13
Total filters modified: 1682
Post-fault accuracy: 10.05 %

Accuracy drop: 23.75 percentage points


In [11]:
import collections, json, pprint
with open("filter_faults.json") as f:
    log = json.load(f)

cnt = collections.Counter([e["layer"] for e in log])
pprint.pprint(cnt)          # prints {layer_idx: how_many_faults}


Counter({7: 204,
         8: 204,
         9: 204,
         10: 204,
         11: 204,
         12: 204,
         4: 102,
         5: 102,
         6: 102,
         2: 51,
         3: 51,
         0: 25,
         1: 25})
