# CNN Example

https://www.youtube.com/watch?v=wnK3uWv_WkU&list=PLhhyoLH6IjfxeoooqP9rhU3HJIAVAJ3Vz&index=4

In [None]:
import torch
import torch.nn as nn                            # Neural Network Models
import torch.optim as optim                      # Optimization algorithms
import torch.nn.functional as F                  # Functions that don't have parameters
from torch.utils.data import DataLoader          # Easier dataset management

import torchvision.datasets as datasets          # Standard datasets
import torchvision.transforms as transforms      # Transformations for dataset

## CNN

In [None]:
class CNN(nn.Module):
    
    def __init__(self, in_channels=1, num_classes=10):
        
        super(CNN, self).__init__()
        
        # same convolution: output_size = input_size
        self.conv1  = nn.Conv2d(in_channels=in_channels, out_channels=8, kernel_size=(3,3), stride=(1,1), padding=(1,1))
        self.pool   = nn.MaxPool2d(kernel_size=(2,2), stride=(2,2))
        self.conv2  = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=(3,3), stride=(1,1), padding=(1,1))
        self.fc1    = nn.Linear(16*7*7, 20)
        self.fc2    = nn.Linear(20, num_classes)
        self.fci1   = nn.Linear(num_classes, 20)
        self.fci2   = nn.Linear(20, 16*7*7)
        self.convt1 = nn.ConvTranspose2d(16, 8, kernel_size=2, stride=2)
        self.convt2 = nn.ConvTranspose2d(8, 8, kernel_size=2, stride=2)


    def forward(self, x):
        
        x = F.relu(self.conv1(x))
        print(x.shape)
        x = self.pool(x)
        print(x.shape)
        x = F.relu(self.conv2(x))
        print(x.shape)
        x = self.pool(x)

        print(x.shape)
        x = x.reshape(x.shape[0], -1)
        print(x.shape)        
        x = self.fc1(x)
        print(x.shape)
        x = self.fc2(x)
        print(x.shape)
        x = self.fci1(x)
        print(x.shape)
        x = self.fci2(x)
        print(x.shape)
        x = x.reshape(x.shape[0], 16, 7, 7)
        print(x.shape)
        x = self.convt1(x)
        print(x.shape)
        x = self.convt2(x)
        print(x.shape)
        
        return x

In [None]:
model = CNN()
x = torch.randn(64, 1, 28, 28)
y = model(x)

torch.Size([64, 8, 28, 28])
torch.Size([64, 8, 14, 14])
torch.Size([64, 16, 14, 14])
torch.Size([64, 16, 7, 7])
torch.Size([64, 784])
torch.Size([64, 20])
torch.Size([64, 10])
torch.Size([64, 20])
torch.Size([64, 784])
torch.Size([64, 16, 7, 7])
torch.Size([64, 8, 14, 14])
torch.Size([64, 8, 28, 28])


In [None]:
x = torch.randn(64, 1, 28, 28)
s = x.shape[1:]
s[0] * s[1] * s[2]

784

In [None]:
features = 512
model = EncoderFC(features)
x = torch.randn(64, features, 2, 45)
y = model(x)

torch.Size([64, 46080])
torch.Size([64, 1024])
torch.Size([64, 512])
torch.Size([64, 1024])
torch.Size([64, 46080])
torch.Size([64, 512, 2, 45])


In [None]:
import torch
import torch.nn as nn

from torchsummary import summary

import numpy as np
import matplotlib.pyplot as plt

from os import listdir
from os.path import isfile, join

from tqdm import tqdm

import random

import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import pandas as pd

from os import listdir
from os.path import isfile, join

from tqdm import tqdm

import random

import torch
from torch import nn, sigmoid
from torch.nn.modules.upsampling import Upsample
from torch.nn.functional import interpolate
from torch.autograd import Variable
from torch.nn import MaxPool2d
from torch.nn.modules.conv import Conv2d
from torch.nn.modules.activation import Sigmoid, ReLU

from torchsummary import summary

from sklearn.neighbors import NearestNeighbors
from sklearn.cluster import KMeans
from sklearn.cluster import SpectralClustering
from sklearn.preprocessing import normalize
from scipy.spatial.distance import cosine as cosine_distance

from torchsummary import summary
import torchvision.transforms.functional as TF

In [None]:
# Define model of double convolution

class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        
        self.conv = nn.Sequential(
        
            nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            
            nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        
        )
        
    def forward(self, x):
        return self.conv(x)

In [None]:
class Encoder(nn.Module):
    def __init__(self, in_channels, features=[64, 128, 256, 512]):
        super(Encoder, self).__init__()
        
        self.downs = nn.ModuleList()
        self.pool  = nn.MaxPool2d(kernel_size=2, stride=2)
        
        for feature in features:
            self.downs.append(DoubleConv(in_channels, feature))
            in_channels = feature
            
        self.skip_connections = []
        
    def forward(self, x):
        
        for down in self.downs:
            x = down(x)
            self.skip_connections.append(x)
            x = self.pool(x)
        
        return x

In [None]:
class UNet(nn.Module):
    def __init__(self, in_channels, out_channels, features):
        super(UNet, self).__init__()
        
        self.encoder = Encoder(in_channels, features)
        self.ups     = nn.ModuleList()

        self.fce     = EncoderFC(512)
        self.fcd     = DecoderFC(512)
        
        for feature in reversed(features):
            self.ups.append(nn.ConvTranspose2d(feature*2, feature, kernel_size=2, stride=2))
            self.ups.append(DoubleConv(feature*2, feature))
            
        self.bottleneck = DoubleConv(features[-1], features[-1]*2)
        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)
        
        self.decoder_skip_connections = []
        
    def forward(self, x):
        
        x = self.encoder(x)

        s = x.shape[0]
        x = self.fce(x, s)
        x = self.fcd(x, s)
        
        x = self.bottleneck(x)
        
        self.decoder_skip_connections.append(x)

        
        self.encoder_skip_connections = self.encoder.skip_connections[::-1]
        
        for i in range(0, len(self.ups), 2):
            
            x = self.ups[i](x)
            
            self.decoder_skip_connections.append(x)
        
            skip_connection = self.encoder_skip_connections[i//2]
            if x.shape != skip_connection.shape:
                x = TF.resize(x, size=skip_connection.shape[2:])
                
            concat_skip = torch.cat((skip_connection, x), dim=1)
            
            x = self.ups[i+1](concat_skip)
        
        return self.final_conv(x)

In [None]:
class EncoderFC(nn.Module):
    def __init__(self, features=512):
        super(EncoderFC, self).__init__()

        self.features = features

        self.fci1  = nn.Sequential(nn.Linear(features*1*22, features*2),
                                   nn.ReLU(),
                                   nn.Linear(features*2, features),
                                   nn.ReLU())
        
        
        
    def forward(self, x, s):

        print(x.shape)
        x = x.reshape(s, -1)
        print(x.shape)

        x = self.fci1(x)
        print(x.shape)
        
        return x

In [None]:
class DecoderFC(nn.Module):
    def __init__(self, features=512):
        super(DecoderFC, self).__init__()

        self.features = features
        self.fci2 = nn.Sequential(nn.Linear(features, features*2),
                                  nn.ReLU(),
                                  nn.Linear(features*2, features*1*22),
                                  nn.ReLU())
        
        
        
    def forward(self, x, s):

        x = self.fci2(x)
        print(x.shape)

        x = x.reshape(s, self.features, 1, 22)
        print(x.shape)
        
        return x

In [None]:
x     = torch.randn((8, 3, 23, 360))
model = UNet(3, 3, [64, 128, 256, 512])
preds = model(x)

print(x.shape)
print(preds.shape)

print(f'\nConnections:\n')
for i in reversed(model.encoder_skip_connections):
    print(i.shape)

print(f'')    

for i, m in enumerate(model.decoder_skip_connections):
    print(m.shape)
    if i == 0:
        print(f'')
        
# summary(model, input_size=(1, 360, 68))

torch.Size([8, 512, 1, 22])
torch.Size([8, 11264])
torch.Size([8, 512])
torch.Size([8, 11264])
torch.Size([8, 512, 1, 22])
torch.Size([8, 3, 23, 360])
torch.Size([8, 3, 23, 360])

Connections:

torch.Size([8, 64, 23, 360])
torch.Size([8, 128, 11, 180])
torch.Size([8, 256, 5, 90])
torch.Size([8, 512, 2, 45])

torch.Size([8, 1024, 1, 22])

torch.Size([8, 512, 2, 44])
torch.Size([8, 256, 4, 90])
torch.Size([8, 128, 10, 180])
torch.Size([8, 64, 22, 360])


In [None]:
output = model.encoder(x)
output = model.fce(output, output.shape[0])
output.shape

torch.Size([8, 512, 1, 22])
torch.Size([8, 11264])
torch.Size([8, 512])


torch.Size([8, 512])

In [None]:
output = model.fc(x)
output.shape

torch.Size([64, 3, 23, 360])
torch.Size([64, 24840])


RuntimeError: ignored

## Set Device

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

## Hyperparameters

In [None]:
in_channels = 1
num_classes = 10
lr          = 0.001
batch_size  = 64
n_epochs    = 1

## Load Data

In [None]:
train_dataset = datasets.MNIST(root='dataset/',
                               train=True,
                               transform=transforms.ToTensor(),
                               download=True)

test_dataset  = datasets.MNIST(root='dataset/',
                               train=False,
                               transform=transforms.ToTensor(),
                               download=True)

  return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)


In [None]:
train_loader = DataLoader(train_dataset, 
                          batch_size=batch_size,
                          shuffle=True)
test_loader  = DataLoader(test_dataset, 
                          batch_size=batch_size,
                          shuffle=True)

## Initialize the Network

In [None]:
model = CNN(in_channels=in_channels,
            num_classes=num_classes)

model = model.to(device)

## Loss and Optimizer

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

## Train the Network

In [None]:
for epoch in range(n_epochs):
    
    for batch_idx, (data, targets) in enumerate(train_loader):
        
        data    = data.to(device=device)
        targets = targets.to(device=device)
        
        # Forward
        targets_pred = model(data)
        loss         = criterion(targets_pred, targets)
        
        # Backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

## Test Accuracy

In [None]:
def check_accuracy(loader, model):
    
    num_corrects = 0
    num_samples  = 0
    
    if loader.dataset.train:
        print('Checking accuracy on training data: \n')
    else:
        print('Checking accuracy on test data: \n')
        
    model.eval()
    
    with torch.no_grad():
        
        for x, y in loader:
            
            x = x.to(device=device)
            y = y.to(device=device)
            
            score = model(x)            
            _, pred = score.max(dim=1)
    
    
            num_corrects += (pred == y).sum()
            num_samples  += pred.size(0)
            
        acc = (num_corrects*100)/num_samples
        
        print(f'Corrects: {num_corrects} \nSamples: {num_samples} \nAccuracy: {acc:0.4f}')

In [None]:
check_accuracy(train_loader, model)

Checking accuracy on training data: 

Corrects: 57708 
Samples: 60000 
Accuracy: 96.1800


In [None]:
check_accuracy(test_loader, model)

Checking accuracy on test data: 

Corrects: 9658 
Samples: 10000 
Accuracy: 96.5800


In [1]:
import numpy as np
from google.colab import drive

In [None]:
drive.mount("/content/drive")