In [4]:
from google.colab import drive
import shutil
import torch
from torchvision import transforms
import torchvision
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
from tqdm import tqdm

In [5]:
import zipfile
import os

Dataset

In [6]:
drive.mount ('/content/gdrive')

Mounted at /content/gdrive


In [7]:
## Hyper Parameters
batch_size = 4

Loading the dataset

In [8]:
# Transformer to tensor
img_size = 256

transformer=transforms.Compose([
    transforms.Resize((img_size,img_size)),
    transforms.ToTensor(),  #0-255 to 0-1, numpy to tensors
])  

In [9]:
dataset_path = '/content/gdrive/MyDrive/Train-Test-Val/'

In [10]:
def load_dataset(d_path):
    train_dataset_manual = torchvision.datasets.ImageFolder(d_path, transform=transformer)
    train_loader_manual = torch.utils.data.DataLoader(train_dataset_manual)
    return train_loader_manual

In [11]:
train_dataset = load_dataset(str(dataset_path + 'train'))
test_dataset = load_dataset(str(dataset_path + 'test'))
valid_dataset = load_dataset(str(dataset_path + 'val'))

In [12]:
train_dataset = train_dataset.dataset
test_dataset = test_dataset.dataset
valid_dataset = valid_dataset.dataset

In [13]:
train_loader = DataLoader(train_dataset, batch_size=batch_size, num_workers=2, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, num_workers=2, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, num_workers=2, shuffle=True)

In [14]:
train_count = (len(train_dataset))
test_count = len(test_dataset)

In [15]:
print('Train Set- ' + str(len(train_dataset)) + ' images in ' + str(len(train_loader)) +' batches')
print('Testing Set - ' + str(len(test_dataset)) + ' images in ' + str(len(test_loader)) + ' batches' )
print('Validation Set - ' + str(len(valid_dataset)) + ' images in ' + str(len(valid_loader)) + ' batches')

Train Set- 4168 images in 1042 batches
Testing Set - 1397 images in 350 batches
Validation Set - 1388 images in 347 batches


In [16]:
for images, labels in valid_loader:  
    print('Image batch dimensions:', images.shape)
    print('Image label dimensions:', labels.shape)
    break

Image batch dimensions: torch.Size([4, 3, 256, 256])
Image label dimensions: torch.Size([4])


In [17]:
for images, labels in train_loader:
    print (labels)
    break

tensor([3, 1, 2, 2])


Network

In [18]:
class depthwise_separable_conv(nn.Module):
    def __init__(self, nin, nout):
        super(depthwise_separable_conv, self).__init__()
        self.depthwise = nn.Conv2d(nin, nin, kernel_size=3, padding=1, groups=nin)
        self.pointwise = nn.Conv2d(nin, nout, kernel_size=1)

    def forward(self, x):
        out = self.depthwise(x)
        out = self.pointwise(out)
        return out

In [19]:
class channel_shuffle (nn.Module):
    def __init__(self, groups):
      super (channel_shuffle, self).__init__()
      
      self.groups = groups

    def forward (self, x):

      batchsize, num_channels, height, width = x.size()
      channels_per_group = num_channels // self.groups

      x = x.view(batchsize, self.groups, channels_per_group, height, width)
      x = torch.transpose(x, 1, 2).contiguous()
      x = x.view(batchsize, -1, height, width)
      
      return x

In [20]:
class GDSW (nn.Module):
  def __init__ (self, dim_in, dim_out):
    super(GDSW, self).__init__()

    self.gc1 = nn.Conv2d (dim_in, 6, kernel_size = (3,3), padding = 1,  groups = 3)
    self.cs = channel_shuffle (groups = 3)
    self.DSWC = depthwise_separable_conv (6, 12)
    self.gc2 = nn.Conv2d (12, dim_out, kernel_size = (3, 3), padding = 1, groups = 3)

  def forward (self, x):
    x = self.gc1 (x)
    x = self.cs(x)
    x = self.DSWC (x)
    x = self.gc2(x)

    return x

In [21]:
class FPN (nn.Module):
  def __init__ (self):
    super().__init__()
    self.enc_conv0 = nn.Conv2d(in_channels=18, out_channels=24, kernel_size=(3,3), padding=1)
    self.act0 = nn.ReLU()
    self.bn0 = nn.BatchNorm2d(24)
    self.pool0 = nn.MaxPool2d(kernel_size=(2,2))

    self.enc_conv1 = nn.Conv2d(in_channels=24, out_channels=32, kernel_size=(3,3), padding=1)
    self.act1 = nn.ReLU()
    self.bn1 = nn.BatchNorm2d(32)
    self.pool1 = nn.MaxPool2d(kernel_size=(2,2))

    self.enc_conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(3,3), padding=1)
    self.act2 = nn.ReLU()
    self.bn2 = nn.BatchNorm2d(64)
    self.pool2 =  nn.MaxPool2d(kernel_size=(2,2))

    self.bottleneck_conv = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(3,3), padding=1)

    self.upsample2 = nn.UpsamplingBilinear2d(scale_factor=2)
    self.dec_conv2 = nn.Conv2d(in_channels=192, out_channels=32, kernel_size=(3,3), padding=1)
    self.dec_act2 = nn.ReLU()
    self.dec_bn2 = nn.BatchNorm2d(32)

    self.upsample3 = nn.UpsamplingBilinear2d(scale_factor=2)
    self.dec_conv3 = nn.Conv2d(in_channels=64, out_channels=16, kernel_size=(3,3), padding=1)
    self.dec_act3 = nn.ReLU()
    self.dec_bn3 = nn.BatchNorm2d(16)

    self.upsample4 = nn.UpsamplingBilinear2d(scale_factor=2)
    self.dec_conv4 = nn.Conv2d(in_channels=40, out_channels=32, kernel_size=(1,1))
    self.dec_act4 = nn.ReLU()
    self.dec_bn4 = nn.BatchNorm2d(32)

    self.poolg = nn.MaxPool2d(kernel_size=(2,2))
    self.avgpool = nn.AdaptiveAvgPool2d (8)

  def forward (self, x):

    e0 = self.pool0(self.bn0(self.act0(self.enc_conv0(x))))   #4
    e1 = self.pool1(self.bn1(self.act1(self.enc_conv1(e0))))   #2
    e2 = self.pool2(self.bn2(self.act2(self.enc_conv2(e1))))   #1

    cat0 = self.bn0(self.act0(self.enc_conv0(x)))
    cat1 = self.bn1(self.act1(self.enc_conv1(e0)))      
    cat2 = self.bn2(self.act2(self.enc_conv2(e1)))

    b = self.bottleneck_conv(e2)

    d2 = self.dec_bn2(self.dec_act2(self.dec_conv2(torch.cat((self.upsample2(b), cat2), dim=1))))
    d3 = self.dec_bn3(self.dec_act3(self.dec_conv3(torch.cat((self.upsample3(d2), cat1), dim=1))))
    d4 = self.dec_bn4(self.dec_act4(self.dec_conv4(torch.cat((self.upsample4(d3), cat0), dim=1))))

    return d4

In [22]:

class CNN_Branch(nn.Module):
    def __init__(self):
        super().__init__()

        self.GDSW1 = GDSW(dim_in = 3, dim_out = 6)
        self.GDSW2 = GDSW(dim_in = 6, dim_out = 9)
        self.GDSW3 = GDSW(dim_in = 9, dim_out = 12)

        self.bnn1 = nn.BatchNorm2d (6)
        self.bnn2 = nn.BatchNorm2d (9)
        self.bnn3 = nn.BatchNorm2d (12)

        self.FPN = FPN ()

        self.poolg = nn.MaxPool2d(kernel_size=(2,2))
        self.poolg4 = nn.MaxPool2d(kernel_size=(4,4))
        self.avgpool = nn.AdaptiveAvgPool2d (8)

        self.fc = nn.Linear(128 * img_size, 7)        

    def forward(self, x):

        g0 = self.poolg(self.bnn1(self.GDSW1(x)))        
        g1 = self.poolg(self.bnn2(self.GDSW2(g0)))        
        g2 = self.poolg(self.bnn3(self.GDSW3(g1)))        

        g3 = torch.cat ((self.poolg4(g0), g2), dim = 1)

        d4 = self.FPN (g3)

        d4 = self.poolg4 (d4)

        return d4

In [23]:
class Transformer_Branch (nn.Module):
    def __init__(self):
        super().__init__()

        self.SWIN = torchvision.models.swin_b()

        self.SWIN.avgpool = nn.Identity()

        self.SWIN.flatten = nn.Identity()
        self.SWIN.head = nn.Identity()

    def forward (self, x):

      x = self.SWIN (x)

      return x

In [24]:
import torch
import torch.nn as nn
from torch.nn.parameter import Parameter


class sa_layer(nn.Module):
    #def __init__(self, channel, groups=64):
    def __init__(self, channel, groups=66):
        super(sa_layer, self).__init__()
        self.groups = groups
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.cweight = Parameter(torch.zeros(1, channel // (2 * groups), 1, 1))
        self.cbias = Parameter(torch.ones(1, channel // (2 * groups), 1, 1))
        self.sweight = Parameter(torch.zeros(1, channel // (2 * groups), 1, 1))
        self.sbias = Parameter(torch.ones(1, channel // (2 * groups), 1, 1))

        self.sigmoid = nn.Sigmoid()
        self.gn = nn.GroupNorm(channel // (2 * groups), channel // (2 * groups))

    @staticmethod
    def channel_shuffle(x, groups):
        b, c, h, w = x.shape

        x = x.reshape(b, groups, -1, h, w)
        x = x.permute(0, 2, 1, 3, 4)

        # flatten
        x = x.reshape(b, -1, h, w)

        return x

    def forward(self, x):
        b, c, h, w = x.shape

        x = x.reshape(b * self.groups, -1, h, w)
        x_0, x_1 = x.chunk(2, dim=1)

        # channel attention
        xn = self.avg_pool(x_0)
        xn = self.cweight * xn + self.cbias
        xn = x_0 * self.sigmoid(xn)

        # spatial attention
        xs = self.gn(x_1)
        xs = self.sweight * xs + self.sbias
        xs = x_1 * self.sigmoid(xs)

        # concatenate along channel axis
        out = torch.cat([xn, xs], dim=1)
        out = out.reshape(b, -1, h, w)

        out = self.channel_shuffle(out, 2)
        return out

In [34]:
class Overall_Arch (nn.Module):
    def __init__(self):
        super().__init__()

        self.CNN_Branch = CNN_Branch()
        self.Transformer_Branch = Transformer_Branch()
        self.SA_Block = sa_layer(channel = 1056)

        self.gap = nn.AvgPool2d (kernel_size = (8,8))
        self.fc = nn.Sequential (nn.Linear (2048, 7), nn.Softmax(dim=1))
 
    
    def forward (self, x):

      local_f = self.CNN_Branch (x)

      f = torch.flatten (local_f, 1)

      f = self.fc (f)

      return f

Model

In [35]:
if torch.cuda.is_available():
  torch.backends.cudnn.deterministic = True

In [36]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda', index=0)

In [37]:
model = Overall_Arch().to(device)

In [38]:
#save_path = '/content/gdrive/MyDrive/Train-Test-Val/Epochn35'
#model = torch.load(save_path)

In [39]:
model.eval()

Overall_Arch(
  (CNN_Branch): CNN_Branch(
    (GDSW1): GDSW(
      (gc1): Conv2d(3, 6, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=3)
      (cs): channel_shuffle()
      (DSWC): depthwise_separable_conv(
        (depthwise): Conv2d(6, 6, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=6)
        (pointwise): Conv2d(6, 12, kernel_size=(1, 1), stride=(1, 1))
      )
      (gc2): Conv2d(12, 6, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=3)
    )
    (GDSW2): GDSW(
      (gc1): Conv2d(6, 6, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=3)
      (cs): channel_shuffle()
      (DSWC): depthwise_separable_conv(
        (depthwise): Conv2d(6, 6, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=6)
        (pointwise): Conv2d(6, 12, kernel_size=(1, 1), stride=(1, 1))
      )
      (gc2): Conv2d(12, 9, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=3)
    )
    (GDSW3): GDSW(
      (gc1): Conv2d(9, 6, kernel_size=(3, 3), stride=

Running

In [40]:
num_epochs = 10
LEARNING_RATE = 0.0001
WEIGHT_DECAY = 0.0001

In [41]:
optimizer=optim.Adam(model.parameters(),lr=LEARNING_RATE,weight_decay=WEIGHT_DECAY)
loss_function=nn.CrossEntropyLoss()

In [None]:
for epoch in range(num_epochs):
    
    #Evaluation and training on training dataset
    model.train()

    train_accuracy=0.0
    train_loss=0.0
    
    for i, (images,labels) in enumerate(train_loader):
        if torch.cuda.is_available():
            images=Variable(images.cuda())
            labels=Variable(labels.cuda())
            
        optimizer.zero_grad()
        
        outputs=model(images)
        
        loss=loss_function(outputs,labels)
        loss.backward()
        optimizer.step()
        
        
        #train_loss+= loss.cpu().data*images.size(0)
        train_loss += loss.item()
        _,prediction=torch.max(outputs.data,1)
        
        train_accuracy+=int(torch.sum(prediction==labels.data))
        
    train_accuracy=train_accuracy/train_count
    train_loss=train_loss/train_count
    
    
    # Evaluation on testing dataset
    model.eval()
    
    valid_accuracy = 0.0
    valid_loss = 0.0

    for i, (images,labels) in enumerate(valid_loader):
        if torch.cuda.is_available():
          images=Variable(images.cuda())
          labels=Variable(labels.cuda())
            
        outputs=model(images)

        loss = loss_function (outputs, labels)
        _,prediction=torch.max(outputs.data,1)
        valid_accuracy += int(torch.sum(prediction == labels.data))
        valid_loss+= loss.item()
        
    valid_accuracy = valid_accuracy/test_count
    valid_loss = valid_loss/test_count
        
    print('Epoch: %d Train Accuracy: %.5f Train Loss: %.5f Validation Accuracy: %.5f  Validation Loss: %.5f' % (epoch, train_accuracy, train_loss, valid_accuracy, valid_loss))


In [None]:
#save_path = '/content/gdrive/MyDrive/Train-Test-Val/Epochn45'
#torch.save(model, save_path)