<a href="https://colab.research.google.com/github/ShounakDas101/AIML_Hari/blob/main/DeepLensE2CNN_Hari.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Equivariant CNN


In [1]:
# try:
#     done
# except:
#     import os

#     os.chdir("../../")
#     from utils.download import download
#     from utils.extract import extract

#     args = {"model": "Model-1"}
#     download(args)
#     extract("data/Model_I.tgz", "data/")
#     extract("data/Model_I_test.tgz", "data/")
#     done = True

In [2]:
import gc
import copy
import warnings

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.optim as optim
import torchvision.datasets as datasets
import torchvision.transforms as transforms

from itertools import cycle
from PIL import Image
from sklearn.metrics import (
    auc,
    confusion_matrix,
    ConfusionMatrixDisplay,
    roc_auc_score,
    roc_curve,
)
from torch import optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.transforms import RandomRotation, Pad, Resize, ToTensor, Compose

from tqdm import tqdm

warnings.filterwarnings("ignore")

In [3]:
# images are padded to have shape 129x129.
# this allows to use odd-size filters with stride 2 when downsampling a feature map in the model
pad = Pad((0, 0, 1, 1), fill=0)
# to reduce interpolation artifacts (e.g. when testing the model on rotated images),
# we upsample an image by a factor of 3, rotate it and finally downsample it again
resize1 = Resize(387)
resize2 = Resize(129)
totensor = ToTensor()
togray = transforms.Grayscale(num_output_channels=1)

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Data Preparation


In [5]:
# train_transforms = transforms.Compose([
#     transforms.Resize(128),
#     transforms.Grayscale(num_output_channels=1),
#     transforms.ToTensor()
# ])

# test_transforms = transforms.Compose([
#     transforms.Resize(128),
#     transforms.Grayscale(num_output_channels=1),
#     transforms.ToTensor()
# ])

train_transforms = transforms.Compose([
    transforms.RandomCrop(128),
    pad,
    resize1,
    # RandomRotation(180, resample=Image.BILINEAR, expand=False), # this line is original
    RandomRotation(180, interpolation=Image.BILINEAR, expand=False),
    resize2,
    totensor,
    togray,
])
test_transforms = transforms.Compose([
    transforms.RandomCrop(128),
    pad,
    totensor,
    togray,
])

In [6]:
def npy_loader(path):
    if 'axion' in path:
        sample = np.load(path, allow_pickle=True)[0]
    else:
        sample = np.load(path, allow_pickle=True)
    sample = 255 * (sample / sample.max())
    sample = Image.fromarray(sample.astype('uint8')).convert("RGB")
    return sample


trainset = datasets.DatasetFolder(
    root='/content/drive/MyDrive/Model_I',
    loader=npy_loader,
    extensions=['.npy'],
    transform = train_transforms
)

testset = datasets.DatasetFolder(
    root='/content/drive/MyDrive/Model_I',
    loader=npy_loader,
    extensions=['.npy'],
    transform = test_transforms
)
train_loader = torch.utils.data.DataLoader(trainset, batch_size=64,shuffle=True)
test_loader = torch.utils.data.DataLoader(testset, batch_size=64,shuffle=True)

In [7]:
del(trainset)
del(testset)

In [8]:
lr = 0.0001
epochs = 30
gamma = 0.5
batch_size = 64
device = "cuda" if torch.cuda.is_available() else "cpu"

# Model


In [9]:
!pip install e2cnn

Collecting e2cnn
  Downloading e2cnn-0.2.3-py3-none-any.whl (225 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/225.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━[0m [32m174.1/225.3 kB[0m [31m5.0 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m225.3/225.3 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: e2cnn
Successfully installed e2cnn-0.2.3


In [10]:
print(len(train_loader))

184


In [11]:
import torch
from e2cnn import gspaces
from e2cnn import nn

In [12]:
#Defining the equivariant neural network model
class Equivariant_Network(torch.nn.Module):

    def __init__(self, n_classes=3, sym_group = "Dihyderal", N = 4):

        super(Equivariant_Network, self).__init__()

        # Dihyderal Equivariance
        if sym_group == 'Dihyderal':
            self.r2_act = gspaces.FlipRot2dOnR2(N=N)

        # Circular Equivariance
        elif sym_group == 'Circular':
            self.r2_act = gspaces.Rot2dOnR2(N=N)


        # the input image is a scalar field, corresponding to the trivial representation
        in_type = nn.FieldType(self.r2_act, [self.r2_act.trivial_repr])

        # we store the input type for wrapping the images into a geometric tensor during the forward pass
        self.input_type = in_type

        # convolution 1
        # first specify the output type of the convolutional layer
        # we choose 24 feature fields, each transforming under the regular representation of C8
        out_type = nn.FieldType(self.r2_act, 24*[self.r2_act.regular_repr])
        self.block1 = nn.SequentialModule(
            nn.MaskModule(in_type, 129, margin=1),
            nn.R2Conv(in_type, out_type, kernel_size=7, padding=1, bias=False),
            nn.InnerBatchNorm(out_type),
            nn.ReLU(out_type, inplace=True)
        )

        # convolution 2
        # the old output type is the input type to the next layer
        in_type = self.block1.out_type
        # the output type of the second convolution layer are 48 regular feature fields of C8
        out_type = nn.FieldType(self.r2_act, 48*[self.r2_act.regular_repr])
        self.block2 = nn.SequentialModule(
            nn.R2Conv(in_type, out_type, kernel_size=5, padding=2, bias=False),
            nn.InnerBatchNorm(out_type),
            nn.ReLU(out_type, inplace=True)
        )
        self.pool1 = nn.SequentialModule(
            nn.PointwiseAvgPoolAntialiased(out_type, sigma=0.66, stride=2)
        )

        # convolution 3
        # the old output type is the input type to the next layer
        in_type = self.block2.out_type
        # the output type of the third convolution layer are 48 regular feature fields of C8
        out_type = nn.FieldType(self.r2_act, 48*[self.r2_act.regular_repr])
        self.block3 = nn.SequentialModule(
            nn.R2Conv(in_type, out_type, kernel_size=5, padding=2, bias=False),
            nn.InnerBatchNorm(out_type),
            nn.ReLU(out_type, inplace=True)
        )

        # convolution 4
        # the old output type is the input type to the next layer
        in_type = self.block3.out_type
        # the output type of the fourth convolution layer are 96 regular feature fields of C8
        out_type = nn.FieldType(self.r2_act, 96*[self.r2_act.regular_repr])
        self.block4 = nn.SequentialModule(
            nn.R2Conv(in_type, out_type, kernel_size=5, padding=2, bias=False),
            nn.InnerBatchNorm(out_type),
            nn.ReLU(out_type, inplace=True)
        )
        self.pool2 = nn.SequentialModule(
            nn.PointwiseAvgPoolAntialiased(out_type, sigma=0.66, stride=2)
        )

        # convolution 5
        # the old output type is the input type to the next layer
        in_type = self.block4.out_type
        # the output type of the fifth convolution layer are 96 regular feature fields of C8
        out_type = nn.FieldType(self.r2_act, 96*[self.r2_act.regular_repr])
        self.block5 = nn.SequentialModule(
            nn.R2Conv(in_type, out_type, kernel_size=5, padding=2, bias=False),
            nn.InnerBatchNorm(out_type),
            nn.ReLU(out_type, inplace=True)
        )

        # convolution 6
        # the old output type is the input type to the next layer
        in_type = self.block5.out_type
        # the output type of the sixth convolution layer are 64 regular feature fields of C8
        out_type = nn.FieldType(self.r2_act, 64*[self.r2_act.regular_repr])
        self.block6 = nn.SequentialModule(
            nn.R2Conv(in_type, out_type, kernel_size=5, padding=1, bias=False),
            nn.InnerBatchNorm(out_type),
            nn.ReLU(out_type, inplace=True)
        )
        self.pool3 = nn.PointwiseAvgPoolAntialiased(out_type, sigma=0.66, stride=1, padding=0)

        self.gpool = nn.GroupPooling(out_type)

        # number of output channels
        c = self.gpool.out_type.size

        # Fully Connected
        self.fully_net = torch.nn.Sequential(
            torch.nn.Linear(43264, 64),
            torch.nn.BatchNorm1d(64),
            torch.nn.ELU(inplace=True),
            torch.nn.Linear(64, n_classes),
            # torch.nn.Linear(64, 1),
            # torch.nn.Sigmoid(),
        )

    def forward(self, input: torch.Tensor):
        # wrap the input tensor in a GeometricTensor
        # (associate it with the input type)
        x = nn.GeometricTensor(input, self.input_type)

        # apply each equivariant block

        # Each layer has an input and an output type
        # A layer takes a GeometricTensor in input.
        # This tensor needs to be associated with the same representation of the layer's input type
        #
        # The Layer outputs a new GeometricTensor, associated with the layer's output type.
        # As a result, consecutive layers need to have matching input/output types
        x = self.block1(x)
        x = self.block2(x)
        x = self.pool1(x)

        x = self.block3(x)
        x = self.block4(x)
        x = self.pool2(x)

        x = self.block5(x)
        x = self.block6(x)

        # pool over the spatial dimensions
        x = self.pool3(x)

        # pool over the group
        x = self.gpool(x)

        # unwrap the output GeometricTensor
        # (take the Pytorch tensor and discard the associated representation)
        x = x.tensor

        # classify with the final fully connected layers)
        # print(x.reshape(x.shape[0], -1).size())
        x = self.fully_net(x.reshape(x.shape[0], -1))

        return x

In [13]:
use_cuda = True # True, if use cuda
cuda_idx = 0 # Set idx of cuda device to be used, default is 0
data_dir = 'images_f' # Path of the data directory
sym_group = 'Circular' # Symmetry group to be used:{'Circular', 'Dihyderal'}
N = 4 # Order of the symmetry group
epochs = 40 # Number of Epochs
batch_size = 64 # Batch Size
lr = 5e-5 # learning rate
n_classes = 3 # Number of classes to be classified

In [14]:
model = Equivariant_Network(n_classes=n_classes, sym_group = sym_group, N =N).to(device)

In [15]:
print(model)

Equivariant_Network(
  (block1): SequentialModule(
    (0): MaskModule()
    (1): R2Conv([4-Rotations: {irrep_0}], [4-Rotations: {regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular}], kernel_size=7, stride=1, padding=1, bias=False)
    (2): InnerBatchNorm([4-Rotations: {regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular}], eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): ReLU(inplace=True, type=[4-Rotations: {regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular, regular}])
  )
  (block2): S

In [16]:
loss_function = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5)
total_epochs = epochs
log_interval = 100  #Intervals after which results are displayed

In [17]:
# clearing cuda cache memory
import gc
torch.cuda.empty_cache()
gc.collect()

12

In [None]:
all_train_loss = []
all_test_loss = []
all_train_accuracy = []
all_test_accuracy = []

best_accuracy = 0

for epoch in range(total_epochs):
    model.train()
    tr_loss_epoch = []
    test_loss_epoch = []
    total = 0
    correct = 0
    for i, (x, t) in enumerate(train_loader):
        optimizer.zero_grad()

        x = x.to(device)
        t = t.to(device)
        y = model(x)
        y_pred = y.flatten().to(torch.float64)

        _, prediction = torch.max(y.data, 1)
        total += t.shape[0]
        correct += (prediction == t).sum().item()
        loss = loss_function(y, t)
        tr_loss_epoch.append(loss.item())
        if i % log_interval == 0:
            print('Loss: ',loss.item())
        loss.backward()

        optimizer.step()

    all_train_loss.append(np.asarray(tr_loss_epoch))
    all_train_accuracy.append(correct/total*100)


    if epoch % 1 == 0:
        total = 0
        correct = 0
        with torch.no_grad():
            model.eval()
            for i, (x, t) in enumerate(test_loader):
                x = x.to(device)
                t = t.to(device)
                y = model(x)

                loss = loss_function(y, t)
                test_loss_epoch.append(loss.item())

                _, prediction = torch.max(y.data, 1)
                total += t.shape[0]
                correct += (prediction == t).sum().item()

        all_test_loss.append(np.asarray(test_loss_epoch))
        all_test_accuracy.append(correct/total*100)
        print("epoch {} | test accuracy: {}".format(epoch,correct/total*100))

        test_accuracy = correct/total*100

        if test_accuracy > best_accuracy:
            best_accuracy = test_accuracy
            best_model = copy.deepcopy(model)

all_epochs = [i for i in range (total_epochs)]

all_train_loss_mean = [j.mean() for j in all_train_loss]
all_test_loss_mean = [j.mean() for j in all_test_loss]

Loss:  0.5359084010124207
