In [1]:
import numpy as np 
import random 
import torch 
import torchvision
import torchvision.transforms as transforms
import torch.nn.functional as F
import pandas as pd
import pdb 
import torchvision.models as models 

from torch.utils.data import DataLoader, RandomSampler, SequentialSampler, TensorDataset
from torch import nn
from torch.autograd import Variable
from torch.optim import Adam, AdamW
from sklearn.cluster import MiniBatchKMeans
from scipy.cluster.vq import vq, kmeans
from qqdm import qqdm, format_str
from tqdm import tqdm

  warn(


In [2]:
train = np.load('./data-bin/trainingset.npy', allow_pickle=True)
test = np.load('./data-bin/testingset.npy', allow_pickle=True)
print(train.shape, test.shape)

(140001, 64, 64, 3) (19999, 64, 64, 3)


In [3]:
train[1].shape, test[1].shape

((64, 64, 3), (64, 64, 3))

In [4]:
train = train[:10000]

In [5]:
def same_seeds(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True
same_seeds(2023)

In [6]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cpu'

In [7]:
class fcn_autoencoder(nn.Module):
    def __init__(self):
        super(fcn_autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(64 * 64 * 3, 128),
            nn.ReLU(True),
            nn.Linear(128, 64),
            nn.ReLU(True),
            nn.Linear(64, 12),
            nn.ReLU(True),
            nn.Linear(12, 3)
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(3, 12),
            nn.ReLU(True),
            nn.Linear(12, 64),
            nn.ReLU(True),
            nn.Linear(64, 128),
            nn.ReLU(True),
            nn.Linear(128, 64 * 64 * 3),
            nn.Tanh()
        )
    
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [8]:
class conv_autoencoder(nn.Module):
    def __init__(self):
        super(conv_autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 12, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(12, 24, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(24, 48, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(48, 96, 4, stride=2, padding=1),
            nn.ReLU()
        )
        
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(96, 48, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(48, 24, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(24, 12, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(12, 3, 4, stride=2, padding=1),
            nn.Tanh()
        )
        
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [9]:
class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 12, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(12, 24, 4, stride=2, padding=1),
            nn.ReLU()
        )
        self.enc_out_1 = nn.Sequential(
            nn.Conv2d(24, 48, 4, stride=2, padding=1),
            nn.ReLU()
        )
        self.enc_out_2 = nn.Sequential(
            nn.Conv2d(24, 48, 4, stride=2, padding=1),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(48, 24, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(24, 12, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(12, 3, 4, stride=2, padding=1),
            nn.Tanh()
        )
        
    def encode(self, x):
        h1 = self.encoder(x)
        return self.enc_out_1(h1), self.enc_out_2(h1)
    
    def reparametrize(self, mu, logvar):
        std = logvar.mul(0.5).exp_()
        if torch.cuda.is_available():
            eps = torch.cuda.FloatTensor(std.size()).normal_()
        else:
            eps = torch.FloatTensor(std.size()).normal_()
        eps = Variable(eps)
        return eps.mul(std).add_(mu)
    
    def decode(self, z):
        return self.decoder(z)

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparametrize(mu, logvar)
        return self.decode(z), mu, logvar
    
def loss_vae(recon_x, x, mu, logvar, criterion):
    mse = criterion(recon_x, x)
    kl = mu.pow(2).add_(logvar.exp()).mul_(-1).add_(1).add_(logvar)
    kld = torch.sum(kl).mul_(-0.5)
    return mse + kld

In [10]:
class Resnet(nn.Module):
    def __init__(self, fc_hidden1=1024, fc_hidden2=768, drop_p=0.3, CNN_embed_dim=256):
        super(Resnet, self).__init__()
        self.fc_hidden1 = fc_hidden1
        self.fc_hidden2 = fc_hidden2
        self.CNN_embed_dim = CNN_embed_dim
        self.ch1, self.ch2, self.ch3, self.ch4 = 16, 32, 64, 128
        self.k1, self.k2, self.k3, self.k4 = (5, 5), (3, 3), (3, 3), (3, 3)
        self.s1, self.s2, self.s3, self.s4 = (2, 2), (2, 2), (2, 2), (2, 2)
        self.pd1, self.pd2, self.pd3, self.pd4 = (0, 0), (0, 0), (0, 0), (0, 0)
        
        resnet = models.resnet18(weights=None)
        modules = list(resnet.children())[:-1]
        self.resnet = nn.Sequential(*modules)
        self.fc1 = nn.Linear(resnet.fc.in_features, self.fc_hidden1)
        self.bn1 = nn.BatchNorm1d(self.fc_hidden1, momentum=0.01)
        self.fc2 = nn.Linear(self.fc_hidden1, self.fc_hidden2)
        self.bn2 = nn.BatchNorm1d(self.fc_hidden2, momentum=0.01)
        self.fc3_mu = nn.Linear(self.fc_hidden2, self.CNN_embed_dim)
        self.fc4 = nn.Linear(self.CNN_embed_dim, self.fc_hidden2)
        self.fc_bn4 = nn.BatchNorm1d(self.fc_hidden2)
        self.fc5 = nn.Linear(self.fc_hidden2, 64 * 4 * 4)
        self.fc_bn5 = nn.BatchNorm1d(64 * 4 * 4)
        self.relu = nn.ReLU(inplace=True)
        
        self.convTrans6 = nn.Sequential(
            nn.ConvTranspose2d(in_channels=64, out_channels=32, kernel_size=self.k4, stride=self.s4, padding=self.pd4),
            nn.BatchNorm2d(32, momentum=0.01),
            nn.ReLU(inplace=True)
        )
        self.convTrans7 = nn.Sequential(
            nn.ConvTranspose2d(in_channels=32, out_channels=8, kernel_size=self.k3, stride=self.s3, padding=self.pd3),
            nn.BatchNorm2d(8, momentum=0.01),
            nn.ReLU(inplace=True)
        )
        self.convTrans8 = nn.Sequential(
            nn.ConvTranspose2d(in_channels=8, out_channels=3, kernel_size=self.k2, stride=self.s2, padding=self.pd2),
            nn.BatchNorm2d(3, momentum=0.01),
            nn.Sigmoid()
        )
        
    def encode(self, x):
        x = self.resnet(x)
        x = x.view(x.size(0), -1)
        
        if x.shape[0] > 1:
            x = self.bn1(self.fc1(x))
        else:
            x = self.fc1(x)
            
        x = self.relu(x)
        
        if x.shape[0] > 1:
            x = self.bn2(self.fc2(x))
        else:
            x = self.fc2(x)
            
        x = self.relu(x)
        x = self.fc3_mu(x)
        return x
            
    def decode(self, z):
        if z.shape[0] > 1:
            x = self.relu(self.fc_bn4(self.fc4(z)))
            x = self.relu(self.fc_bn5(self.fc5(x))).view(-1, 64, 4, 4)
        else:
            x = self.relu(self.fc4(z))
            x = self.relu(self.fc5(x)).view(-1, 64, 4, 4)
        x = self.convTrans6(x)
        x = self.convTrans7(x)
        x = self.convTrans8(x)
        x = F.interpolate(x, size=(64, 64), mode='bilinear', align_corners=True)
        return x
    
    def forward(self, x):
        z = self.encode(x)
        x_reconst = self.decode(z)
        return x_reconst

In [11]:
def to_float32(x):
    return x.to(torch.float32)

def normalize(x):
    return 2.0 * x / 255.0 - 1.0

class CustomTensorDataset(TensorDataset):
    def __init__(self, tensors):
        self.tensors = tensors
        if tensors.shape[-1] == 3:
            self.tensors = tensors.permute(0, 3, 1, 2)
        self.transform = transforms.Compose([
            transforms.Lambda(to_float32),
            transforms.Lambda(normalize)
        ])
    
    def __getitem__(self, index):
        x = self.tensors[index]
        if self.transform:
            x = self.transform(x)
        return x 
    
    def __len__(self):
        return len(self.tensors)

In [12]:
num_epochs = 1
batch_size = 64
learning_rate = 1e-3

x = torch.from_numpy(train)
train_dataset = CustomTensorDataset(x)
train_sampler = RandomSampler(train_dataset)
train_dataloader = DataLoader(train_dataset, sampler=train_sampler, batch_size=batch_size)

model_type = 'cnn'
model_classes = {
    'resnet': Resnet(),
    'fcn': fcn_autoencoder(),
    'cnn': conv_autoencoder(),
    'vae': VAE()
}

model = model_classes[model_type].to(device)
    
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [13]:
# best_loss = np.inf 
# model.train()

# qqdm_train = qqdm(range(num_epochs), desc=formatCustomTensorDatasetr('bold', 'Description'))

# for epoch in range(num_epochs):
#     total_loss = list()
#     for data in tqdm(train_dataloader):
#         if model_type in ['cnn', 'vae', 'resnet']:
#             img = data.float().to(device)
#         elif model_type in ['fcn']:
#             img = data.float().to(device)
#             img = img.view(img.shape[0], -1)
            
#         output = model(img)
        
#         if model_type in ['vae']:
#             loss = loss_vae(output[0], img, output[1], output[2], criterion)
#         else:
#             loss = criterion(output, img)
#         total_loss.append(loss.item())
        
#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()
        
#     mean_loss = np.mean(total_loss)
#     if mean_loss < best_loss:
#         best_loss = mean_loss
#         torch.save(model, './data-bin/best_model_{}.pt'.format(model_type))
    
#     print('Epoch:', epoch + 1, '|', 'Mean loss:', mean_loss)
        
#     qqdm_train.set_infos({
#         'epoch': f'{epoch + 1:.0f} / {num_epochs:.0f}',
#         'loss': f'{mean_loss:.4f}'
#     })

In [14]:
eval_batch_size = 64 

data = torch.tensor(test, dtype=torch.float32)
test_dataset = CustomTensorDataset(data)
test_sampler = SequentialSampler(test_dataset)
test_dataloader = DataLoader(test_dataset, sampler=test_sampler, batch_size=eval_batch_size, num_workers=0)
eval_loss = nn.MSELoss(reduction='none')

model = torch.load('./data-bin/best_model_{}.pt'.format(model_type), map_location=torch.device('cpu'))
model.eval()

conv_autoencoder(
  (encoder): Sequential(
    (0): Conv2d(3, 12, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(12, 24, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (3): ReLU()
    (4): Conv2d(24, 48, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (5): ReLU()
    (6): Conv2d(48, 96, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (7): ReLU()
  )
  (decoder): Sequential(
    (0): ConvTranspose2d(96, 48, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (1): ReLU()
    (2): ConvTranspose2d(48, 24, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (3): ReLU()
    (4): ConvTranspose2d(24, 12, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (5): ReLU()
    (6): ConvTranspose2d(12, 3, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (7): Tanh()
  )
)

In [15]:
anomality = list()

with torch.no_grad():
    for data in tqdm(test_dataloader):
        if model_type in ['cnn', 'vae', 'resnet']:
            img = data.float().to(device)
        elif model_type in ['fcn']:
            img = data.float().to(device)
            img = img.view(img.shape[0], -1)
        else:
            img = data[0].to(device)
        
        output = model(img)
        
        if model_type in ['cnn', 'resnet', 'fcn']:
            output = output 
        elif model_type in ['res_vae']:
            output = output[0]
        elif model_type in ['vae']:
            output = output[0]
        
        if model_type in ['fcn']:
            loss = eval_loss(output, img).sum(-1)
        else:
            loss = eval_loss(output, img).sum([1, 2, 3])
        anomality.append(loss)
        
anomality = torch.cat(anomality, axis=0)
anomality = torch.sqrt(anomality).reshape(len(test), 1).cpu().numpy()

df = pd.DataFrame(anomality, columns=['Predicted'])
df.to_csv('./data-bin/submission.csv', index_label='Id')

100%|████████████████████████████████████████████████████████████████████████████████| 313/313 [00:53<00:00,  5.85it/s]
