In [None]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import torchvision.transforms as transforms
import torch.utils.data as data
import torchvision
from torch.autograd import Variable
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.metrics import accuracy_score
import pickle

In [None]:
# for CRNN
class Dataset_CRNN(data.Dataset):
    "Characterizes a dataset for PyTorch"
    def __init__(self, data_path, transform=None):
        "Initialization"
        self.transform = transform
        #self.frames = frames
        self.folders = data_path

    def __len__(self):
        "Denotes the total number of samples"
        return len(os.listdir(self.folders))

    def read_images(self, data_path, selected_folder, use_transform):
        X = []
        for i in os.listdir(data_path):
            #print("file name is ",i)
            image = Image.open(os.path.join(data_path,i))
            
            #print(image.shape)
            if use_transform is not None:
                image = use_transform(image)
                #print(image.size)
            image = torch.from_numpy(np.asarray(image))
            X.append(image)
        X = torch.stack(X, dim=0)

        return X

    def __getitem__(self, index):
        data_path = os.path.join(self.folders,os.listdir(self.folders)[index])
              
        # Load data
        X = self.read_images(data_path, self.transform)                     # (input) spatial images
        
        y = 1
        if 'orig' in data_path:
            y = 0
        # print(X.shape)
        return X, torch.from_numpy(np.array(y)).type(torch.LongTensor)

In [None]:
TRANSFORM_IMG = transforms.Compose([
    transforms.Resize(256),
    #transforms.CenterCrop(256),
    #transforms.ToTensor()
    #transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         #std=[0.229, 0.224, 0.225] )
    ])

In [None]:
train_path = '/home/chinmay/datatset/train/'
train_data = Dataset_CRNN(train_path, transform=TRANSFORM_IMG)
# for step, (x, y) in enumerate(data):
#     print(x.shape)
val_path = '/home/chinmay/datatset/val/'
val_data = Dataset_CRNN(val_path, transform=TRANSFORM_IMG)

In [None]:
class Meso4(nn.Module):
    def __init__(self):
        super(Meso1,self).__init__(in_channel=3, number_of_classes=2)
        self.conv1 = nn.Conv2d(in_channel,8, kernel_size=(3,3), stride = 1, padding= 1)
        self.batch_norm_1 = nn.BatchNorm2d(8)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(8,8, kernel_size=(5,5),stride=1, padding=2)
        self.batch_norm_2 = nn.BatchNorm2d(8)
        self.conv3 = nn.Conv2d(8,16, kernel_size=(5,5),stride=1, padding=2)
        self.batch_norm_3 = nn.BatchNorm2d(16)
        self.conv4 = nn.Conv2d(16,16, kernel_size=(5,5),stride=1, padding=2)
        self.batch_norm_4 = nn.BatchNorm2d(16)
        self.max_pool_2 = nn.MaxPooling2D(kernel_size=(2, 2), stride=2)
        self.max_pool_4 = nn.MaxPooling2D(kernel_size=(4, 4), stride=4)
        
    def forward(self,x):
        x = self.conv1(x)
        x = self.batch_norm_1(x)
        x = self.relu(x)
        x = self.max_pool_2(x)
        x = self.conv2(x)
        x = self.batch_norm_2(x)
        x = self.relu(x)
        x = self.max_pool_2(x)
        x = self.conv3(x)
        x = self.batch_norm_3(x)
        x = self.relu(x)
        x = self.conv4(x)
        x = self.batch_norm_4(x)
        x = self.max_pool_2(x)
        x = x.view(x.size[0], -1)
        return x

In [None]:
X,y = train_loader[0]
model = Meso4()
y_pred = model(X)
print (y_pred.shape)

In [None]:
x1 = Conv2D(8, (3, 3), padding='same', activation = 'relu')(x)
        x1 = BatchNormalization()(x1)
        x1 = MaxPooling2D(pool_size=(2, 2), padding='same')(x1)
        
        x2 = Conv2D(8, (5, 5), padding='same', activation = 'relu')(x1)
        x2 = BatchNormalization()(x2)
        x2 = MaxPooling2D(pool_size=(2, 2), padding='same')(x2)
        
        x3 = Conv2D(16, (5, 5), padding='same', activation = 'relu')(x2)
        x3 = BatchNormalization()(x3)
        x3 = MaxPooling2D(pool_size=(2, 2), padding='same')(x3)
        
        x4 = Conv2D(16, (5, 5), padding='same', activation = 'relu')(x3)
        x4 = BatchNormalization()(x4)
        x4 = MaxPooling2D(pool_size=(4, 4), padding='same')(x4)
        
        y = Flatten()(x4)
        y = Dropout(0.5)(y)
        y = Dense(16)(y)
        y = LeakyReLU(alpha=0.1)(y)
        y = Dropout(0.5)(y)
        y = Dense(1, activation = 'sigmoid')(y)


In [None]:
loss_function = nn.CrossEntropy()

In [None]:
def train(log_interval, model, device, train_loader, optimizer, epoch):
    # set model as training mode
    cnn_encoder, rnn_decoder = model
    cnn_encoder.train()
    rnn_decoder.train()

    losses = []
    scores = []
    N_count = 0   # counting total trained sample in one epoch
    for batch_idx, (X, y) in enumerate(train_loader):
        # distribute data to device
        X, y = X.to(device), y.to(device)

        N_count += X.size(0)

        optimizer.zero_grad()
        output = rnn_decoder(cnn_encoder(X))   # output has dim = (batch, number of classes)

        loss = loss_function(output, y)
        losses.append(loss.item())

        # to compute accuracy
        y_pred = torch.max(output, 1)[1]  # y_pred != output
        step_score = accuracy_score(y.cpu().data.squeeze().numpy(), y_pred.cpu().data.squeeze().numpy())
        scores.append(step_score)         # computed on CPU

        loss.backward()
        optimizer.step()

        # show information
        if (batch_idx + 1) % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}, Accu: {:.2f}%'.format(
                epoch + 1, N_count, len(train_loader.dataset), 100. * (batch_idx + 1) / len(train_loader), loss.item(), 100 * step_score))

    return losses, scores

In [None]:
def validation(model, device, optimizer, test_loader):
    # set model as testing mode
    cnn_encoder, rnn_decoder = model
    cnn_encoder.eval()
    rnn_decoder.eval()

    test_loss = 0
    all_y = []
    all_y_pred = []
    with torch.no_grad():
        for X, y in test_loader:
            # distribute data to device
            X, y = X.to(device), y.to(device)

            output = rnn_decoder(cnn_encoder(X))

            loss = F.cross_entropy(output, y, reduction='sum')
            test_loss += loss.item()                 # sum up batch loss
            y_pred = output.max(1, keepdim=True)[1]  # (y_pred != output) get the index of the max log-probability

            # collect all y and y_pred in all batches
            all_y.extend(y)
            all_y_pred.extend(y_pred)

    test_loss /= len(test_loader.dataset)

    # compute accuracy
    all_y = torch.stack(all_y, dim=0)
    all_y_pred = torch.stack(all_y_pred, dim=0)
    test_score = accuracy_score(all_y.cpu().data.squeeze().numpy(), all_y_pred.cpu().data.squeeze().numpy())

    # show information
    print('\nTest set ({:d} samples): Average loss: {:.4f}, Accuracy: {:.2f}%\n'.format(len(all_y), test_loss, 100* test_score))

    # save Pytorch models of best record
#     torch.save(cnn_encoder.state_dict(), os.path.join(save_model_path, 'cnn_encoder_epoch{}.pth'.format(epoch + 1)))  # save spatial_encoder
#     torch.save(rnn_decoder.state_dict(), os.path.join(save_model_path, 'rnn_decoder_epoch{}.pth'.format(epoch + 1)))  # save motion_encoder
#     torch.save(optimizer.state_dict(), os.path.join(save_model_path, 'optimizer_epoch{}.pth'.format(epoch + 1)))      # save optimizer
#     print("Epoch {} model saved!".format(epoch + 1))

    return test_loss, test_score