In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms

import numpy as np
from sklearn import metrics

import matplotlib.pyplot as plt


In [2]:
label = 9
p = 0.25

transform = transforms.ToTensor()

# Download and load the MNIST training data
train_data = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_data = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

selected_label = label

# Filter the dataset to include only images with label normal and 5
label_normal_indices = [i for i, (_, label) in enumerate(train_data) if label == selected_label]
label_anomalie_indices = [i for i, (_, label) in enumerate(train_data) if label != selected_label]

num_label_normal =  len(label_normal_indices)
num_label_anomalie = int((num_label_normal/(1-p))-num_label_normal)

print(num_label_normal, num_label_anomalie, num_label_normal + num_label_anomalie)

selected_label_anomalie_indices = np.random.choice(label_anomalie_indices, num_label_anomalie, replace=False)

# Combine the selected indices
selected_indices_train = np.concatenate([label_normal_indices, selected_label_anomalie_indices])

# Create a Subset of the original dataset with the selected indices
filtered_dataset = torch.utils.data.Subset(train_data, selected_indices_train)

print("Len Train:", len(filtered_dataset))
print("Len Test:", len(test_data))

# Create a DataLoader to iterate through the filtered dataset
batch_size = 64
train_loader = torch.utils.data.DataLoader(filtered_dataset, batch_size=batch_size, shuffle=True)
train_loader_without_batch = torch.utils.data.DataLoader(filtered_dataset, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_data, shuffle=True)


KeyboardInterrupt: 

In [None]:
class CAE_pytorch(nn.Module):
    def __init__(self, in_channels = 3, rep_dim = 256):
        super(CAE_pytorch, self).__init__()
        nf = 16
        self.nf = nf

        # Encoder
        self.enc_conv1 = nn.Conv2d(in_channels=in_channels, out_channels=nf, kernel_size=3, stride=2, padding=1)
        self.enc_bn1 = nn.BatchNorm2d(num_features=nf)
        self.enc_act1 = nn.ReLU(inplace=True)

        self.enc_conv2 = nn.Conv2d(in_channels=nf, out_channels=nf * 2, kernel_size=3, stride=2, padding=1)
        self.enc_bn2 = nn.BatchNorm2d(num_features=nf * 2)
        self.enc_act2 = nn.ReLU(inplace=True)

        self.enc_conv3 = nn.Conv2d(in_channels=nf * 2, out_channels=nf * 4, kernel_size=3, stride=2, padding=1)
        self.enc_bn3 = nn.BatchNorm2d(num_features=nf * 4)
        self.enc_act3 = nn.ReLU(inplace=True)

        self.enc_fc = nn.Linear(nf * 4 * 4 * 4, rep_dim)
        self.rep_act = nn.Tanh()

        # Decoder
        self.dec_fc = nn.Linear(rep_dim, nf * 4 * 4 * 4)
        self.dec_bn0 = nn.BatchNorm1d(num_features=nf * 4 * 4 *4)
        self.dec_act0 = nn.ReLU(inplace=True)

        self.dec_conv1 = nn.ConvTranspose2d(in_channels=nf * 4, out_channels=nf * 2, kernel_size=3, stride=2, padding=1)
        self.dec_bn1 = nn.BatchNorm2d(num_features=nf * 2)
        self.dec_act1 = nn.ReLU(inplace=True)

        self.dec_conv2 = nn.ConvTranspose2d(in_channels=nf * 2, out_channels=nf, kernel_size=3, stride=2, padding=1, output_padding=1)
        self.dec_bn2 = nn.BatchNorm2d(num_features=nf)
        self.dec_act2 = nn.ReLU(inplace=True)

        self.dec_conv3 = nn.ConvTranspose2d(in_channels=nf, out_channels=in_channels, kernel_size=3, stride=2, padding=1, output_padding=1)
        self.output_act = nn.Tanh()

    def encode(self, x):
        #print(len(x[0]), len(x[0][0]))
        x = self.enc_act1(self.enc_bn1(self.enc_conv1(x)))
        #print(len(x[0]), len(x[0][0]))
        x = self.enc_act2(self.enc_bn2(self.enc_conv2(x)))
        #print(len(x[0]), len(x[0][0]))
        x = self.enc_act3(self.enc_bn3(self.enc_conv3(x)))
        #print(len(x[0]), len(x[0][0]))
        rep = self.rep_act(self.enc_fc(x.view(x.size(0), -1)))
        #print(len(rep[0]))
        return rep

    def decode(self, rep):
        #print("Decode")
        #print(len(rep[0]))
        x = self.dec_act0(self.dec_bn0(self.dec_fc(rep)))
        #print(len(x[0]))
        x = x.view(-1, self.nf * 4, 4, 4)
        #print(len(x[0]), len(x[0][0]))
        x = self.dec_act1(self.dec_bn1(self.dec_conv1(x)))
        #print(len(x[0]), len(x[0][0]))
        x = self.dec_act2(self.dec_bn2(self.dec_conv2(x)))
        #print(len(x[0]), len(x[0][0]))
        x = self.output_act(self.dec_conv3(x))
        #print(len(x[0]), len(x[0][0]))
        return x

    def forward(self, x):
        return self.decode(self.encode(x))

model = CAE_pytorch(in_channels=1)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(),
                            lr=1e-4, 
                            weight_decay=1e-5)

In [None]:
# Point to training loop video
num_epochs = 30
outputs = []
losses = []
for epoch in range(num_epochs):
    imgs = []
    for (img, _) in train_loader:
        recon = model(img)
        loss = criterion(recon, img)
        imgs.append((img, loss))

    imgs.sort(key=lambda x: x[1])
    l = int(train_loss_percent * len(imgs))
    imgs = imgs[:l]

    optimizer.zero_grad()

    for (img, _) in imgs:
        recon = model(img)
        loss = criterion(recon, img)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    losses.append(loss)
    # print(f'Epoch:{epoch+1}, Loss:{loss.item():.4f}')
    outputs.append((epoch, img, recon))

NameError: name 'train_loss_percent' is not defined