In [1]:
import gc

In [2]:
train_Dataset = None
test_Dataset = None
model = None

try:
  gc.collect()
  torch.cuda.empty_cache()
  del torch
  gc.collect()
  print("torch deleted...")

except:
  print("torch not defined")

torch not defined


In [3]:
import torch
from torchvision import transforms
import torch.nn as nn
import sklearn
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from PIL import Image
import cv2
import os
import gc
os.environ["TORCH_CUDA_ALLOC_CONF"] =  "caching_allocator"

In [4]:
if torch.cuda.is_available():
  device = torch.device("cuda:0")
  print("using gpu")
else:
  device = torch.device("cpu")
  print("cuda not available")

using gpu


In [5]:
# from google.colab import drive
# drive.mount('/content/drive')

In [6]:
def get_image_paths(image_folder_path):
    paths = []
    files = os.listdir(image_folder_path)
    for f in files:
        paths.append(image_folder_path+f)

    return paths

In [7]:
class PlotDataset(Dataset):
    def __init__(self, x,y, transform=None):

        self.x = x
        self.y = y
        self.transform = transform

    def __len__(self):
        return len(self.x)

    def __getitem__(self, index):
        img = Image.open(self.x[index])
        #img = cv2.imread(self.x[index])
        # print(img.shape)
        label = self.y[index]
        if self.transform:
            img = (self.transform(img))
        label = torch.tensor(label,dtype=torch.float32)

        return img, label


In [8]:
batch_size = 64
test_batch_size = 64
# image_folder_path = './drive/MyDrive/train_data/'
# labels_path = './drive/MyDrive/labels.npy'
# checkpoint_path = './model.pt'

image_folder_path = './train_data_2/'
labels_path = './labels_2.npy'
checkpoint_path = './model_2.pt'

image_paths = get_image_paths(image_folder_path)
labels = np.load(labels_path).reshape(-1,4)

train_x, test_x, train_y, test_y = train_test_split(image_paths,labels,test_size=.256,random_state=42)

transform = transforms.Compose([
    transforms.Grayscale(),
    transforms.Resize((270,360)),
    transforms.ToTensor()
])

train_Dataset = DataLoader((PlotDataset(train_x,train_y,transform)),batch_size=batch_size,shuffle=True)
test_Dataset = DataLoader(PlotDataset(test_x,test_y,transform),batch_size=test_batch_size,shuffle=True)

In [9]:
class CNN(nn.Module):
    def __init__(self, num_channels):
        super(CNN, self).__init__()
        
        self.feature_maps = {}
        self.activation = nn.ReLU()
        self.conv1 = nn.Conv2d(num_channels, 16, kernel_size=3)
        self.conv2 = nn.Conv2d(16, 64, kernel_size=5)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=5)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=7)

        self.maxPool1 = nn.MaxPool2d(kernel_size=2, stride=3)
        self.maxPool2 = nn.MaxPool2d(kernel_size=2, stride=3)
        self.maxPool3 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.flatten = nn.Flatten(start_dim=1, end_dim=-1)
        self.FC = nn.Linear(3840, 1024)
        self.FC2 = nn.Linear(1024, 512)

        self.output = nn.Linear(512, 4)

        self.dropout = nn.Dropout(p=0.1)

        self.bn1 = nn.BatchNorm2d(16)
        self.bn2 = nn.BatchNorm2d(64)
        self.bn3 = nn.BatchNorm2d(128)
        self.bn4 = nn.BatchNorm2d(256)
        
        self.conv1.register_forward_hook(self.feature_hook)
        self.conv2.register_forward_hook(self.feature_hook)
        
    def feature_hook(self, module, input, output):
    # Access the feature maps of the conv1 layer here
        self.feature_maps[module] = output.detach()
        #print("Feature maps of conv1 layer:", feature_maps.shape)
        
    def forward(self, x):
        x = x.to(torch.float32)

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.activation(x)
        x = self.maxPool1(x)

        x = self.conv2(x)
        x = self.bn2(x)

        x = self.activation(x)
        x = self.maxPool2(x)

        x = self.conv3(x)
        x = self.bn3(x)
        
        x = self.activation(x)
        x = self.maxPool3(x)

        x = self.conv4(x)
        x = self.bn4(x)
        
        x = self.activation(x)
        x = self.maxPool3(x)
        
        # x = self.flatten(x)
        x = torch.flatten(x, 1)
        
        x = self.FC(x)
        x = self.activation(x)
        x = self.dropout(x)
        x = self.FC2(x)
        x = self.activation(x)
        x = self.dropout(x)

        x = self.output(x)
        return x

    def trainer(self, epochs):
        lr = 1e-1
        optimizer = torch.optim.Adam(self.parameters(), lr=lr)
        loss_func = nn.MSELoss()
        best_loss = -1

        for i in range(epochs):
            train_loss = 0.0
            counter_train = 0
            for images, labels in train_Dataset:
                counter_train += 1
                images = images.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()
                y = self(images)
                loss = loss_func(y, labels)

                train_loss += loss.detach()
                
                loss.backward()

                optimizer.step()


            with torch.no_grad():
                self.eval()
                test_loss = 0.0
                counter_test = 0
                for images, labels in test_Dataset:
                    counter_test += 1
                    images = images.to(device)
                    labels = labels.to(device)

                    y = self(images)

                    for layer, feature_map in self.feature_maps.items():
                        print(f"Feature maps of layer:", feature_map.shape)
                    
                    loss = loss_func(y, labels)
                    test_loss += loss.detach()
                    

                train_loss /= counter_train
                test_loss /= counter_test

                if best_loss < 0 or best_loss > test_loss * 1.07:
                    best_loss = test_loss
                    torch.save(
                        {
                            "epoch": i,
                            "model_state_dict": self.state_dict(),
                            "optimizer_state_dict": optimizer.state_dict(),
                            "loss": best_loss,
                        },
                        checkpoint_path,
                    )
                    print("checkpoint saved!")

                print(
                    f"epoch: {i} -- train-loss: {train_loss} -- \
test-loss: {test_loss}"
                )


#               with open(drive_dir,'a') as f:
#                   f.write(f'epoch: {i} -- train-loss: {train_loss} --\
#   test-loss: {test_loss}\n')
#               self.train()

In [10]:
# drive_dir = './drive/MyDrive/log.txt'
# with open(drive_dir,'w') as f:
#     f.write(f'logging:\n')

In [11]:
model = CNN(1).to(device)
model.trainer(6000)

Feature maps of layer: torch.Size([64, 16, 268, 358])
Feature maps of layer: torch.Size([64, 64, 85, 115])
Feature maps of layer: torch.Size([64, 16, 268, 358])
Feature maps of layer: torch.Size([64, 64, 85, 115])
Feature maps of layer: torch.Size([64, 16, 268, 358])
Feature maps of layer: torch.Size([64, 64, 85, 115])
Feature maps of layer: torch.Size([64, 16, 268, 358])
Feature maps of layer: torch.Size([64, 64, 85, 115])
Feature maps of layer: torch.Size([7, 16, 268, 358])
Feature maps of layer: torch.Size([7, 64, 85, 115])
checkpoint saved!
epoch: 0 -- train-loss: 1647974144.0 -- test-loss: 1072349.875
Feature maps of layer: torch.Size([64, 16, 268, 358])
Feature maps of layer: torch.Size([64, 64, 85, 115])
Feature maps of layer: torch.Size([64, 16, 268, 358])
Feature maps of layer: torch.Size([64, 64, 85, 115])
Feature maps of layer: torch.Size([64, 16, 268, 358])
Feature maps of layer: torch.Size([64, 64, 85, 115])
Feature maps of layer: torch.Size([64, 16, 268, 358])
Feature map

KeyboardInterrupt: 

In [None]:
1/0

ZeroDivisionError: division by zero