In [0]:
import numpy as np
import matplotlib.pyplot as plt
import os
import cv2
%matplotlib inline

import torchvision.transforms as transforms
from torch.utils.data.sampler import SubsetRandomSampler
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader, Dataset
import torchvision
import torch.optim as optim
import torchvision.models as models

# Approach
We want to point out the anomalies. The idea is to use a convolutional autoencoder as mentioned above. We try to learn the dark matter representation using an autoencoder. The latent space is the representation of the dark matter. After that we try to reconstruct it back. The ones with the highest Mean Absolute Error will be the ones which can be considered an anomaly. We run this on both the datasets and try to figure out the anomalies.

In [0]:
class Lenses(Dataset):

  def __init__(self,img_dir,split,transforms=None,train=True):
    super(Lenses, self).__init__()
    self.img_dir = img_dir
    self.transforms = transforms
    if train:
      self.imgs =  sorted(os.listdir(self.img_dir))[:split]
    else:
      self.imgs = sorted(os.listdir(self.img_dir))[split:]

  def __len__(self):
    return len(self.imgs)

  def __getitem__(self,index):
    img = cv2.imread(self.img_dir+self.imgs[index])
    img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (128,128), interpolation = cv2.INTER_AREA)
    if self.transforms != None:
      img = self.transforms(img)
    return img
    

In [0]:
train_transforms= transforms.Compose([transforms.ToPILImage(),
                                      transforms.ToTensor(),
                                      transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
                                      
                                      ])
# train_transforms = None
train_data = Lenses("drive/My Drive/lenses/sub/",4000,train_transforms,True)

In [0]:
def init_weights(m):
	if  type(m) != nn.Sequential and type(m) != nn.Upsample and type(m) != nn.ReLU and type(m) != nn.Sigmoid 		and type(m) != nn.LeakyReLU and type(m) != nn.MaxPool2d and type(m) != nn.Tanh and type(m) != nn.BatchNorm2d:
		nn.init.xavier_uniform_(m.weight)
		m.bias.data.fill_(0.01)


class AutoEncoder(nn.Module):
    def __init__(self):
        super(AutoEncoder,self).__init__()
#         input = 256*256*10
        self.encoder = nn.Sequential(nn.Conv2d(3,16,kernel_size=3,padding=1),
                                     nn.ReLU(),
                                     nn.MaxPool2d(2),
                                     nn.Conv2d(16,32,kernel_size=3,padding=1),
                                     nn.ReLU(),
                                     nn.MaxPool2d(2),
                                     nn.Conv2d(32,64,kernel_size=3,padding=1),
                                     nn.ReLU(),
                                     nn.MaxPool2d(2))
        self.decoder = nn.Sequential(nn.ConvTranspose2d(64,32,kernel_size=3,padding=1),
                                     nn.ReLU(),
                                     nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
                                     nn.ConvTranspose2d(32,16,kernel_size=3,padding=1),
                                     nn.ReLU(),
                                     nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),

                                     nn.ConvTranspose2d(16,3,kernel_size=3,padding=1),
                                     nn.ReLU(),
                                     nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
                                     )
    
    def initialise_weights(self):
        self.decoder.apply(init_weights)
        self.encoder.apply(init_weights)

    def forward(self,img):
        # print("Image Shape: ",img.shape)
        encoded = self.encoder(img)
        # print("Rep Shape: ", encoded.shape)
        decoded = self.decoder(encoded)
        # print("Recon shape: ",decoded.shape)
        
        return decoded

# Autoencoder
Our Auto encoder consists of an encoder and decoder. The encoder consists of 3 Conv+Maxpool layers. We compress it to 16x16x64 volume. The decoder then tries to reconstruct it back.

In [105]:

train_on_gpu = torch.cuda.is_available()

model = AutoEncoder()
model.initialise_weights()
if train_on_gpu:
    print("Yes Gpu is on")
    model = model.cuda()
else:
    print("Time to sleep")
error = nn.L1Loss()
opt = optim.Adam(model.parameters(), lr = 0.001,weight_decay=5e-4)
batch = 8
valid_size = 0.2
num = train_data.__len__()
indices = list(range(num))
np.random.shuffle(indices)
split = int(np.floor(valid_size*num))
train_idx,valid_idx = indices[split:], indices[:split]


train_sampler = SubsetRandomSampler(train_idx)
valid_sampler = SubsetRandomSampler(valid_idx)

train_loader = DataLoader(train_data, batch_size = batch, sampler = train_sampler)
valid_loader = DataLoader(train_data, batch_size = batch, sampler = valid_sampler)

if train_on_gpu:
    model.cuda()

Yes Gpu is on


In [0]:
train_losses =[]
valid_losses =[]
n_epochs = 20
valid_loss_min = np.Inf
for epoch in range(n_epochs):
  train_loss = 0.0
  valid_loss = 0.0
  model.train()
  for data in train_loader:
      if train_on_gpu:
          data = data.cuda()
      opt.zero_grad()
      output = model(data)
      loss = error(output,data)
      loss.backward()
      opt.step()
      train_loss += loss.item()*data.size(0)
  model.eval()
  for data in valid_loader:
      if train_on_gpu:
        data = data.cuda()
      output = model(data)
      loss = error(output,data)
      valid_loss += loss.item()*data.size(0)
  train_losses.append(train_loss)
  valid_losses.append(valid_loss)
  print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'.format(
        epoch, train_loss, valid_loss))

  if valid_loss <= valid_loss_min:
    print("Validation Loss decreased {:0.6f} -> {:0.6f}".format(valid_loss_min,valid_loss))
    valid_loss_min = valid_loss
    torch.save(model.state_dict(), 'drive/My Drive/best_model_so_far_lenses.pt')

In [0]:
test_transforms= transforms.Compose([transforms.ToPILImage(),
                                      transforms.ToTensor(),
                                      transforms.Normalize((0.485,0.456,0.406),(0.229,0.224,0.225)),
                                      
                                      ])
test_data = Lenses("drive/My Drive/small_lenses/sub/",1500,train_transforms,False)

test_loader = DataLoader(test_data, batch_size=1, shuffle=False)

In [0]:
model.load_state_dict(torch.load('drive/My Drive/best_model_so_far_lenses.pt'))

model.eval()

preds = []
for batch_i, data in enumerate(test_loader):
    data = data.cuda()
    output = model(data)
    s = dict()
    s['error'] = error(output,data).item()
    s['img'] = torch.Tensor.cpu(data).detach().numpy()
    preds.append(s) 