In [None]:
import torch as T
import torch.nn.functional as F
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from torch.autograd import Variable
from torchvision import transforms
import torchvision as vision
import numpy as np
import os
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt
import datetime

class stairdata(Dataset):
    def __init__(self, datafolder, transforms):
        super(stairdata, self).__init__()
        self.images = list(map(lambda x: datafolder+x, os.listdir(datafolder)))
        self.transform = transforms
    
    def get_label(string): 
        return int((string.split('.')[-2]).split('_')[-1])

    def __len__(self):
        return len(os.listdir(self.datafolder))

    def __getitem__(self, idx):
        image = Image.open(self.images[idx])
        image = self.transform(image)
        return (image, self.get_label(self.images[idx]))


class trainer(object):
    def __init__(self, net,  loss_function, train_folder, val_folder, batch_size=100, optimizer=T.optim.Adam, lr=0.001, start_epoch= 1, nb_epochs=10,
                    save_dir='model/'):
        self.net = net
        self.batch_size = batch_size
        self.optimizer = optimizer(self.net.parameters(), lr=lr)
        self.loss_function = loss_function
        self.save_dir = save_dir
        self.train_folder = train_folder
        self.val_folder = val_folder
        self.running_loss_train = list()
        self.running_loss_val = list()
        self.accuracy = list()

    def fit(self, nb_epochs, start_epoch=1, print_after=20):
        train_loader, val_loader = self.get_dataloaders()
        print('staring training ********')
        last_loss = 0
        for epoch in range(start_epoch, nb_epochs):
            self.net.train()
            running_loss = 0

            for step, batch in enumerate(train_loader, 0):
                x, y = batch
                x = Variable(x).cuda()
                y = Variable(y.float()).cuda()
                predictions = self.net(x)

                loss = self.loss_function(predictions, y)
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                running_loss += loss.item()
                if step % print_after == 0:
                    print('epoch: {} | step: {} | loss: {}'.format(epoch, step, loss.item()))

            self.running_loss_train.append(running_loss/step)
            print('avg training loss: {:.5f}'.format(running_loss/step))
            running_loss = 0

            self.net.eval()
            print("----------------------------------------------------validating------------------------------------------------")
            for step, batch in enumerate(val_loader, 0):
                
                x, y = batch
                x = Variable(x).cuda()
                y = Variable(y.float()).cuda()
                predictions = self.net(x)
                loss = self.loss_function(predictions, y)
                running_loss += loss.item()
                if step % print_after == 0:
                    print('epoch: {} | step: {} | loss: {}'.format(epoch, step, loss.item()))

            self.running_loss_val.append(running_loss/step)
            print('avg val loss: {:.5f}'.format(running_loss/step))

            self.save_loss_img()
            if last_loss == 0 or running_loss < last_loss:
                T.save(self.net, self.save_dir + f'model_best.pth')
                last_loss = running_loss

        T.save(self.net, self.save_dir + f'model_{epoch}.pth')

    def get_loss(self):
        return [self.running_loss_train, self.running_loss_val]

    def plot_loss(self):
        train_loss = pd.DataFrame(self.get_loss()).transpose()
        train_loss.plot()
        plt.show()

    def save_loss_img(self):
        train_loss = pd.DataFrame(self.get_loss()).transpose()
        train_loss.plot()
        plt.savefig('train_loss.png')

    def get_dataloaders(self):
        train_stairdata,val_stairdata = self.create_stairdata()
        train_loader = DataLoader(train_stairdata, batch_size=self.batch_size, shuffle=True, num_workers=2)
        val_loader = DataLoader(sval_stairdata, batch_size=self.batch_size, shuffle=True, num_workers=2)
        print('DataLoader returned')
        return (train_loader, val_loader)

    def create_stairdata(self):
        return stairdata(self.train_folder, self.get_transforms()), stairdata(self.val_folder, self.get_transforms())

    def get_transforms(self):
        return transforms.Compose(transforms.ToTensor(),transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]))

    def get_accuracy(self, predictions, targets):
        predictions = predictions.argmax(dim=1)
        accuracy = sum(predictions == target) * 100 / len(predictions)
        self.accuracy.append(accuracy)
        return accuracy

  


In [1]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.lyr1 = nn.Sequential(nn.Conv2d(3, 36, kernel_size = 5, stride = 2), nn.BatchNorm2d(36), nn.ELU())
        self.lyr2 = nn.Sequential(nn.Conv2d(36, 48, kernel_size = 5, stride = 2), nn.BatchNorm2d(48), nn.ELU())
        self.lyr3 = nn.Sequential(nn.Conv2d(48, 64, kernel_size = 5, stride = 2), nn.BatchNorm2d(64), nn.ELU())
        self.lyr4 = nn.Sequential(nn.Conv2d(64, 64, kernel_size = 3), nn.BatchNorm2d(64), nn.ELU())
        self.lyr5 = nn.Sequential(nn.Conv2d(64, 64, kernel_size = 3), nn.BatchNorm2d(64), nn.ELU())

        self.fc1 = nn.Sequential(nn.Linear(64*73*53, 100), nn.ELU())
        self.fc2 = nn.Sequential(nn.Linear(100, 50), nn.ELU())
        self.fc3 = nn.Sequential(nn.Linear(50, 10), nn.ELU())
        self.fc4 = nn.Linear(10, 4)

    def forward(self, x):
        out = self.lyr1(x)
        out = self.lyr2(out)
        out = self.lyr3(out)
        out = self.lyr4(out)
        out = self.lyr5(out)
        out = out.view(out.size(0), -1)
        out = self.fc1(out)
        out = self.fc2(out)
        out = self.fc3(out)
        out = self.fc4(out)
        return out
    
model = CNN().cuda()

In [None]:
train_folder = 'data/cloning/'
val_folder = ''
batch_size = 
loss_function = nn.CrossEntropyLoss()
nb_epochs = 

In [2]:
stairnet_trainer = trainer(model, loss_function, train_folder, val_folder, batch_size=batch_size)
stairnet_trainer.fit(args.nb_epochs)

In [3]:
get_label(string)

'3'

In [1]:
import numpy as np
np.random.seed(38)

def split_train_val(path, pct=0.9):
    os.mkdir(path +'/train', exist_ok=True)
    os.mkdir(path +'/val', exist_ok=True)
    lst = list(np.random.shuffle(os.listdir(path)))
    lst_train, lst_val= lst[:int(len(lst)*pct)], lst[int(len(lst)*pct):]
    for i in lst_train:
        os.rename(f'{path}/{i}', f'{path}/train/{i}')
    for i in lst_val:
        os.rename(f'{path}/{i}', f'{path}/val/{i}')
    print('done!')
        
    