In [1]:
import torch 
import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader,random_split
import torch.nn as nn 
import torch.nn.functional as fn 
from torch.utils.data import DataLoader
import pandas as pd 
import numpy as np 
import matplotlib.pyplot as plt 

In [2]:
##Checking if GPU is available 
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f"Selected device :{device}")

Selected device :cpu


In [3]:
##louding data 

train_set = torchvision.datasets.MNIST(root ='./.data/',train=True, download=True,transform=transforms.ToTensor())
test_set = torchvision.datasets.MNIST(root='./.data/',train=False,download=True,transform=transforms.ToTensor())

In [4]:
m = len(train_set)
print(m)

60000


In [5]:
##splititng the dataset 
train_set ,val_set = random_split(train_set, [int(m-m*0.2), int(m*0.2)])

In [6]:
##Datalouders
train_louder = DataLoader(train_set,batch_size= 256)
test_louder = DataLoader(test_set,batch_size=256)

In [7]:
##Encoder Class
class Encoder(nn.Module):
    
    def __init__(self, encoded_space_dim,fc2_input_dim):
        super().__init__()
    
        self.encoder_cnn = nn.Sequential(
            nn.Conv2d(1, 8, 3, stride=2, padding=1),
            nn.ReLU(True),
            nn.Conv2d(8, 16, 3, stride=2, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(True),
            nn.Conv2d(16, 32, 3, stride=2, padding=0),
            nn.ReLU(True)
        )
        
        ### Flatten layer
        self.flatten = nn.Flatten(start_dim=1)
        self.encoder_lin = nn.Sequential(
            nn.Linear(3 * 3 * 32, 128),
            nn.ReLU(True),
            nn.Linear(128, encoded_space_dim)
        )
        
    def forward(self, x):
        x = self.encoder_cnn(x)
        x = self.flatten(x)
        x = self.encoder_lin(x)
        return x

In [8]:
##Decoder Class
class Decoder(nn.Module):
    
    def __init__(self, encoded_space_dim,fc2_input_dim):
        super().__init__()
        self.decoder_lin = nn.Sequential(
            nn.Linear(encoded_space_dim, 128),
            nn.ReLU(True),
            nn.Linear(128, 3 * 3 * 32),
            nn.ReLU(True)
        )

        self.unflatten = nn.Unflatten(dim=1, 
        unflattened_size=(32, 3, 3))

        self.decoder_conv = nn.Sequential(
            nn.ConvTranspose2d(32, 16, 3, 
            stride=2, output_padding=0),
            nn.BatchNorm2d(16),
            nn.ReLU(True),
            nn.ConvTranspose2d(16, 8, 3, stride=2, 
            padding=1, output_padding=1),
            nn.BatchNorm2d(8),
            nn.ReLU(True),
            nn.ConvTranspose2d(8, 1, 3, stride=2, 
            padding=1, output_padding=1)
        )
        
    def forward(self, x):
        x = self.decoder_lin(x)
        x = self.unflatten(x)
        x = self.decoder_conv(x)
        x = torch.sigmoid(x)
        return x

In [9]:
##
criterian = torch.nn.MSELoss()
encoder = Encoder(encoded_space_dim=4, fc2_input_dim=128)
decoder = Decoder(encoded_space_dim=4,fc2_input_dim=128)

params_to_optimize = [
    {'params': encoder.parameters()},
    {'params': decoder.parameters()}
]

optimizer = torch.optim.Adam(params_to_optimize,lr=0.001,weight_decay=1e-05)

encoder.to(device)
decoder.to(device)

Decoder(
  (decoder_lin): Sequential(
    (0): Linear(in_features=4, out_features=128, bias=True)
    (1): ReLU(inplace=True)
    (2): Linear(in_features=128, out_features=288, bias=True)
    (3): ReLU(inplace=True)
  )
  (unflatten): Unflatten(dim=1, unflattened_size=(32, 3, 3))
  (decoder_conv): Sequential(
    (0): ConvTranspose2d(32, 16, kernel_size=(3, 3), stride=(2, 2))
    (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): ConvTranspose2d(16, 8, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
    (4): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
    (6): ConvTranspose2d(8, 1, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
  )
)

In [17]:
im = test_set[0][0].unsqueeze(0)
a = im + torch.randn_like(im)*0.3
b = torch.clip(a,0.,1.)
 
##Noise function 

def add_noise(inputs,noise_factor=0.3):
     noisy = inputs+torch.randn_like(inputs) * noise_factor
     noisy = torch.clip(noisy,0.,1.)
     return noisy

In [25]:
##Train function 
def training (encoder,decoder,device,datalouder,loss_fn ,optimizer,noise_factor=0.3):
    ##train mode
    encoder.train()
    decoder.train()
    train_loss = []

    ##iteration of data louder, in here we don't need the labels coz we are dealing unsupervised learning 
    for image_batch,_ in datalouder:
        image_noisy = add_noise(image_batch,noise_factor)
       ## image_batch= image_batch.to(device)
        image_noisy = image_noisy.to(device)

        ##Encoded &Decoded data 
        encoded_data = encoder(image_noisy)
        decoded_data = decoder(encoded_data)

        loss = loss_fn(decoded_data,image_noisy)
        ##backward pas
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        print('\t partial train loss (single batch): %f' % (loss.data))
        train_loss.append(loss.detach().cpu().numpy())
    return np.mean(train_loss)

In [20]:
### Testing function
def test_epoch_den(encoder, decoder, device, dataloader, loss_fn,noise_factor=0.3):
    encoder.eval()
    decoder.eval()
    with torch.no_grad(): 
        conc_out = []
        conc_label = []
        for image_batch, _ in dataloader:
            image_noisy = add_noise(image_batch,noise_factor)
            image_noisy = image_noisy.to(device)
            encoded_data = encoder(image_noisy)
            decoded_data = decoder(encoded_data)
            conc_out.append(decoded_data.cpu())
            conc_label.append(image_batch.cpu())
        conc_out = torch.cat(conc_out)
        conc_label = torch.cat(conc_label) 
        val_loss = loss_fn(conc_out, conc_label)
    return val_loss.data

In [29]:
### Training cycle
noise_factor = 0.3
num_epochs = 20
history_da={'train_loss':[],'val_loss':[]}
for epoch in range(num_epochs):
    print('EPOCH %d/%d' % (epoch + 1, num_epochs))
    train_loss=training(
        encoder=encoder, 
        decoder=decoder, 
        device=device, 
        datalouder=train_louder, 
        loss_fn=criterian, 
        optimizer=optimizer,noise_factor=noise_factor)
    val_loss = test_epoch_den(
        encoder=encoder, 
        decoder=decoder, 
        device=device, 
        dataloader=test_louder, 
        loss_fn=criterian,noise_factor=noise_factor)
    history_da['train_loss'].append(train_loss)
    history_da['val_loss'].append(val_loss)
    print('\n EPOCH {}/{} \t train loss {:.3f} \t val loss {:.3f}'.format(epoch + 1, num_epochs,train_loss,val_loss))

EPOCH 1/20
	 partial train loss (single batch): 0.065181
	 partial train loss (single batch): 0.064257
	 partial train loss (single batch): 0.065377
	 partial train loss (single batch): 0.064776
	 partial train loss (single batch): 0.064117
	 partial train loss (single batch): 0.065475
	 partial train loss (single batch): 0.064667
	 partial train loss (single batch): 0.063450
	 partial train loss (single batch): 0.064459
	 partial train loss (single batch): 0.064201
	 partial train loss (single batch): 0.063959
	 partial train loss (single batch): 0.063309
	 partial train loss (single batch): 0.064172
	 partial train loss (single batch): 0.064160
	 partial train loss (single batch): 0.064325
	 partial train loss (single batch): 0.064318
	 partial train loss (single batch): 0.063097
	 partial train loss (single batch): 0.064361
	 partial train loss (single batch): 0.063554
	 partial train loss (single batch): 0.063812
	 partial train loss (single batch): 0.062965
	 partial train loss (s