原理：其实我们就是通过encoder和decoder去将一张图片重新编码和解码，对于model见过的图片，经历过整个流程，所以model知道如何还原这张图片，所以经由模型生成的图片和原来的图片是比较相近的；而对于model未曾见过的图片，他不知道该如何复原这张图片，所以经由模型生成的图片和原来的图片差别会比较大


在test的过程中，我们对原图与model生成的图取平方差，理想的结果是train过的图片对的方差较小，没有train过的图片对方差较大，根据这点不同可以判断做异常检测

In [2]:
!git clone https://github.com/wkentaro/gdown
!cd gdown
!pip install gdown

fatal: destination path 'gdown' already exists and is not an empty directory.


In [3]:
!gdown --id '1_zT3JOpvXFGr7mkxs3XJDeGxTn_8pItq' --output train.npy 
!gdown --id '11Y_6JDjlhIY-M5-jW1rLRshDMqeKi9Kr' --output test.npy 

import numpy as np

train = np.load('train.npy', allow_pickle=True)
test = np.load('test.npy', allow_pickle=True)

Downloading...
From: https://drive.google.com/uc?id=1_zT3JOpvXFGr7mkxs3XJDeGxTn_8pItq
To: /kaggle/working/train.npy
983MB [00:06, 142MB/s]  
Downloading...
From: https://drive.google.com/uc?id=11Y_6JDjlhIY-M5-jW1rLRshDMqeKi9Kr
To: /kaggle/working/test.npy
246MB [00:01, 199MB/s]  


In [5]:
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler, TensorDataset
import torch
data = torch.tensor(train, dtype=torch.float)
train_dataset = TensorDataset(data)
train_sampler = RandomSampler(train_dataset)
train_dataloader = DataLoader(train_dataset, sampler=train_sampler, batch_size=2)

In [6]:
task = 'ae'

In [7]:
import torch
from torch import nn
import torch.nn.functional as F

class fcn_autoencoder(nn.Module):
    def __init__(self):
        super(fcn_autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(32*32*3, 128),
            nn.ReLU(True),
            nn.Linear(128, 64),
            nn.ReLU(True),
            nn.Linear(64, 12),
            nn.ReLU(True),
            nn.Linear(12, 3),
        )
        self.decoder = nn.Sequential(
            nn.Linear(3, 12),
            nn.ReLU(True),
            nn.Linear(12, 64),
            nn.ReLU(True),
            nn.Linear(64, 128),
            nn.ReLU(True),
            nn.Linear(128, 32*32*3),
            nn.Tanh()
        )
    
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [8]:
class conv_autoencoder(nn.Module):
    def __init__(self):
        super(conv_autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 12, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(12, 24, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(24, 48, 4, stride=2, padding=1),
            nn.ReLU(),
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(48, 24, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(24, 12, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(12, 3, 4, stride=2, padding=1),
            nn.Tanh(),
        )
    
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [9]:
class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()
        self.fc1 = nn.Linear(32*32*3,  400)
        self.fc21 = nn.Linear(400, 20)
        self.fc22 = nn.Linear(400, 20)
        self.fc3 = nn.Linear(20, 400)
        self.fc4 = nn.Linear(400, 32*32*3)
        
    def encode(self, x):
        h1 = F.relu(self.fc1(x))
        return self.fc21(h1), self.fc22(h1)
    
    def reparametrize(self, mu, logvar):
        std = logvar.mul(0.5).exp_()
        if torch.cuda.is_available():
            eps = torch.cuda.FloatTensor(std.size()).normal_()
        else:
            eps = torch.FloatTensor(std.size()).normal_()
        eps = Variable(eps)
        return eps.mul(std).add_(mu)
    
    def decode(self, z):
        h3 = F.relu(self.fc3(z))
        return F.sigmoid(self.fc4(h3))
    
    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparametrize(mu, logvar)
        return self.decode(z), mu, logvar
    
def loss_vae(recon_x, x, mu, logvar, criterion):
    mse = criterion(recon_x, x)
    KLD_element = mu.pow(2).add_(logvar.exp()).mul_(-1).add_(1).add_(logvar)
    KLD = torch.sum(KLD_element).mul_(-0.5)
    return mse+KLD

In [15]:
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torch.optim import Adam, AdamW
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler, TensorDataset

if task == 'ae':
    num_epochs = 1000
    batch_size = 128
    learning_rate = 1e-3
    
    model_type = 'cnn'
    x = train
    if model_type == 'fcn' or model_type == 'vae':
        x =x.reshape(len(x), -1)
        
    data = torch.tensor(x, dtype=torch.float)
    train_sampler = RandomSampler(train_dataset)
    train_dataloader = DataLoader(train_dataset, sampler=train_sampler, batch_size=batch_size)
    
    model_classes = {'fcn':fcn_autoencoder(), 'cnn':conv_autoencoder(), 'vae':VAE()}
    model = model_classes[model_type].cuda()
    criterion = nn.MSELoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
    
    best_loss = 1000
    model.train()
    
    for epoch in range(num_epochs):
        for data in train_dataloader:
            if model_type == 'cnn':
                img = data[0].transpose(3, 1).cuda()
            else:
                img = data[0].cuda()
            
            output = model(img)
            if model_type == 'vae':
                loss = loss_vae(output[0], img, output[1], output[2], criterion)
            else:
                loss = criterion(output, img)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
        if loss.item() < best_loss:
            best_loss = loss.item()
            torch.save(model, 'best_model_{}.pt'.format(model_type))
        print('epoch [{}/{}], loss:{:.4f}'.format(epoch+1, num_epochs, loss.item()))

epoch [1/1000], loss:0.0367
epoch [2/1000], loss:0.0272
epoch [3/1000], loss:0.0280
epoch [4/1000], loss:0.0164
epoch [5/1000], loss:0.0179
epoch [6/1000], loss:0.0162
epoch [7/1000], loss:0.0125
epoch [8/1000], loss:0.0136
epoch [9/1000], loss:0.0118
epoch [10/1000], loss:0.0128
epoch [11/1000], loss:0.0120
epoch [12/1000], loss:0.0119
epoch [13/1000], loss:0.0106
epoch [14/1000], loss:0.0103
epoch [15/1000], loss:0.0105
epoch [16/1000], loss:0.0107
epoch [17/1000], loss:0.0095
epoch [18/1000], loss:0.0108
epoch [19/1000], loss:0.0105
epoch [20/1000], loss:0.0090
epoch [21/1000], loss:0.0104
epoch [22/1000], loss:0.0104
epoch [23/1000], loss:0.0101
epoch [24/1000], loss:0.0091
epoch [25/1000], loss:0.0083
epoch [26/1000], loss:0.0086
epoch [27/1000], loss:0.0085
epoch [28/1000], loss:0.0086
epoch [29/1000], loss:0.0084
epoch [30/1000], loss:0.0064
epoch [31/1000], loss:0.0072
epoch [32/1000], loss:0.0070
epoch [33/1000], loss:0.0071
epoch [34/1000], loss:0.0077
epoch [35/1000], loss:0

In [18]:
if task == 'ae':
    if model_type == 'fcn' or model_type == 'vae':
        y = test.reshape(len(test), -1)
    else:
        y = test
        
    data = torch.tensor(y, dtype=torch.float)
    test_dataset = TensorDataset(data)
    test_sampler = SequentialSampler(test_dataset)
    test_dataloader = DataLoader(test_dataset, sampler=test_sampler, batch_size=batch_size)
    
    model.eval()
    reconstructed = list()
    for i, data in enumerate(test_dataloader):
        if model_type == 'cnn':
            img = data[0].transpose(3, 1).cuda()
        else:
            img = data[0].cuda()
        output = model(img)
        if model_type == 'cnn':
            output = output.transpose(3, 1)
        elif model_type == 'vae':
            output = output[0]
        reconstructed.append(output.cpu().detach().numpy())
    
    reconstructed = np.concatenate(reconstructed, axis=0)
    anomality = np.sqrt(np.sum(np.square(reconstructed - y).reshape(len(y), -1), axis=1))
    y_pred = anomality
    with open('prediction.csv', 'w') as f:
        f.write('id, anomaly\n')
        for i in range(len(y_pred)):
            f.write('{}, {}\n'.format(i+1, y_pred[i]))