RNNs are used over point sequence to predict whether the user wanted to have the pencil up or pencil down

In [1]:
import torch
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import wandb
import cv2
from os import listdir
from contextlib import ExitStack
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

# Dataset

In [2]:
class DrawingsDS(torch.utils.data.Dataset):
    def __init__(self, folder="../../data/processed_labeled/"):
        self.folder = folder
        self.ds = []
        
        for f in listdir(self.folder):
            self.ds.append(self.load_file(f))
        
        self.compute_mean()
        self.compute_std()
        
        for f in self.ds:
            f['input'] = (f['input'] - self.mean)/self.std
        
    def compute_mean(self):
        self.mean = np.zeros(6)
        tot = 0
        self.y_mean = 0
        for f in self.ds:
            x = f['input']
            self.mean += np.sum(x,axis=0)
            self.y_mean += np.sum(f['output'])
            tot += x.shape[0]
        self.mean /= tot
        self.y_mean /= tot
        
        
    def compute_std(self):
        variance = np.zeros(6)
        tot = 0
        for f in self.ds:
            x = f['input'] - self.mean
            x = np.square(x)
            variance += np.sum(x,axis=0)
            tot += x.shape[0]
        variance /= tot
        self.std = np.sqrt(variance)
        
    def load_file(self,f):
        df = pd.read_csv(self.folder+f,index_col=0)
        raw_input = df[['x','y']].to_numpy().astype(np.int)
        inputs = df[['vx','vy','v','ax','ay','a']].to_numpy().astype(np.double)
        ouput = df[['label']].to_numpy().astype(np.double)
        ttf = df[['ttf']].to_numpy().astype(np.double)
        return {
            'raw_input' : raw_input,
            'input' : inputs,
            'output' : ouput,
            'ttf' : ttf,
            'name': f
        }
    
    def __len__(self):
        return len(self.ds)
    
    def __getitem__(self, idx):
        return self.ds[idx]

In [3]:
dataset = DrawingsDS()
print(dataset.mean)
print(dataset.y_mean)
print(dataset.std)
print(dataset[0]['input'].shape)
print(dataset[0]['output'].shape)
train_set, test_set = torch.utils.data.random_split(dataset,(35,5))

[ 0.33941933 -0.22546204  9.42321543  0.02886129  0.1246964   6.15246857]
0.581060116354234
[11.0053177  10.57159029 12.01018687 10.97142049  9.97192017 13.48980253]
(295, 6)
(295, 1)


In [4]:
def visualize(sample,pred):
    sample_output = np.squeeze(pred)
    pts = sample['raw_input'][sample_output == True]

    img = np.zeros((720,1280), dtype=np.uint8)
    img[pts.T[1],pts.T[0]]=255
    img = cv2.flip(img, 1)

    cv2.imshow('frame', img)
    key = cv2.waitKey(0)
    cv2.destroyAllWindows()

# Model

In [5]:
class RNN(torch.nn.Module):
    def __init__(self,input_size=6,output_size=1,hidden_size=64,num_layers=3):
        super(RNN, self).__init__()
        self.rnn = torch.nn.RNN(
            input_size = input_size,
            hidden_size = hidden_size,
            num_layers = num_layers,
            batch_first = True,
            dropout = 0.1
        )
        self.fc = torch.nn.Linear(
            in_features = hidden_size,
            out_features = output_size
        )
        
    def forward(self,x):
        output, _ = self.rnn(x)
        output = self.fc(output)
        return output
    
class LSTM(torch.nn.Module):
    def __init__(self,input_size=6,output_size=1,hidden_size_in=64,hidden_size_out=32,num_layers=2):
        super(LSTM, self).__init__()
        self.lstm = torch.nn.GRU(
            input_size = hidden_size_in,
            hidden_size = hidden_size_out,
            num_layers = num_layers,
            batch_first = True
        )
        self.fc1 = torch.nn.Linear(
            in_features = input_size,
            out_features = hidden_size_in
        )
        self.fc2 = torch.nn.Linear(
            in_features = hidden_size_in,
            out_features = hidden_size_in
        )
        self.fc3 = torch.nn.Linear(
            in_features = hidden_size_out,
            out_features = hidden_size_out
        )
        self.fc4 = torch.nn.Linear(
            in_features = hidden_size_out,
            out_features = output_size
        )
        self.relu = torch.nn.ReLU()
        
    def forward(self,x):
        output = self.fc2(self.relu(self.fc1(x)))
        output, _ = self.lstm(output)
        output = self.fc4(self.relu(self.fc3(output)))
        return output
    
class GRU(torch.nn.Module):
    def __init__(self,input_size=6,output_size=1,hidden_size_in=64,hidden_size_out=32,num_layers=2):
        super(GRU, self).__init__()
        self.gru = torch.nn.GRU(
            input_size = hidden_size_in,
            hidden_size = hidden_size_out,
            num_layers = num_layers,
            batch_first = True,
            dropout = 0.3
        )
        self.fc1 = torch.nn.Linear(
            in_features = input_size,
            out_features = hidden_size_in
        )
        self.fc2 = torch.nn.Linear(
            in_features = hidden_size_out,
            out_features = output_size
        )
        
    def forward(self,x):
        output = self.fc1(x)
        output, _ = self.gru(output)
        output = self.fc2(output)
        return output
      
class WeibullGRU(torch.nn.Module):
    def __init__(self,input_size=6,output_size=3,hidden_size=64,num_layers=3):
        super(WeibullGRU, self).__init__()
        self.gru = torch.nn.GRU(
            input_size = input_size,
            hidden_size = hidden_size,
            num_layers = num_layers,
            batch_first = True
        )
        self.fc = torch.nn.Linear(
            in_features = hidden_size,
            out_features = output_size
        )
        self.softplus = torch.nn.Softplus()
        
    def forward(self,x):
        output, _ = self.gru(x)
        output = self.fc(output)
        return torch.squeeze(output)

In [6]:
model1 = LSTM().double()
model2 = GRU().double()
model3 = WeibullGRU().double()
model4 = RNN().double()
sample = torch.tensor(train_set[0]['input']).unsqueeze(0)
sample_truth = torch.tensor(train_set[0]['ttf'])
print(sample.size())
print(sample_truth.size())
print(model1(sample).size())
print(model2(sample).size())
print(model3(sample).size())
print(model4(sample).size())

torch.Size([1, 301, 6])
torch.Size([301, 1])
torch.Size([1, 301, 1])
torch.Size([1, 301, 1])
torch.Size([301, 3])
torch.Size([1, 301, 1])


In [7]:
softplus = torch.nn.Softplus()
def weibull_loss(pred,y):
    epsilon = 1e-8
    alpha = torch.exp(pred[:,1])
    beta = softplus(pred[:,0])
    ya = (y+epsilon)/alpha
    loss = torch.log(beta+epsilon) + beta*torch.log(ya) - torch.log(y+epsilon) - torch.pow(ya,beta)
    return -torch.mean(loss)

def compute_ttf_from_weibull(pred):
    # computes the mean of each weibull distribution to make prediction about the ttf (time to failure)
    alpha = torch.exp(pred[:,1])
    beta = softplus(pred[:,0])
    mean = alpha*torch.exp(torch.lgamma(1.+1./beta))
    return mean

def visualize_weibull_pred(pred,truth):
    plt.plot(40*compute_ttf_from_weibull(pred).detach().numpy())
    plt.plot(truth.detach().numpy())

# print(sample.shape)
# print(compute_ttf_from_weibull(model3(sample)[:,1:]).shape)
# visualize_weibull_pred(model3(sample)[:,1:],sample_truth)

# Training

In [8]:
def compute_metrics(pred,y):
    pred_np = pred.squeeze().detach().numpy()
    y_np = y.squeeze().detach().numpy()
    accuracy = accuracy_score(y_np,pred_np)
    precision = precision_score(y_np,pred_np,zero_division=0)
    recall = recall_score(y_np,pred_np,zero_division=0)
    f1 = f1_score(y_np,pred_np,zero_division=0)
    return accuracy, precision, recall, f1
    
def epoch_weibull(loader,optimizer,model,loss,w_loss,iteration_type = 'train'):
    if iteration_type == 'train':
        MODEL.train()
    if iteration_type == 'test':
        MODEL.eval()

    with ExitStack() as stack:
        if iteration_type == 'test':
            gs = stack.enter_context(torch.no_grad())
        
        metrics = {
            'loss': 0.,
            'accuracy': 0.,
            'precision': 0.,
            'recall': 0.,
            'f1': 0.
        }
        
        n = len(loader)
        for sample in loader:
            # make predictions
            x = sample['input']
            y = sample['output'].squeeze()
            ttf = sample['ttf'].squeeze()
            pred = MODEL(x)
            # compute losses
            l_bce = loss(pred[:,0],y)
            l_weibull = w_loss(pred[:,1:],ttf)
            l = l_bce + l_weibull/5.
            # apply backprop
            if iteration_type == 'train':
                OPTIMIZER.zero_grad()
                l.backward()
                OPTIMIZER.step()
                
            acc, prec, rec, f1 = compute_metrics(torch.sigmoid(pred[:,0])>0.5,y)
            metrics['loss'] += l.item()/n
            metrics['accuracy'] += acc/n
            metrics['precision'] += prec/n
            metrics['recall'] += rec/n
            metrics['f1'] += f1/n
    return metrics

def epoch(loader,optimizer,model,loss,iteration_type = 'train'):
    if iteration_type == 'train':
        MODEL.train()
    if iteration_type == 'test':
        MODEL.eval()

    with ExitStack() as stack:
        if iteration_type == 'test':
            gs = stack.enter_context(torch.no_grad())
        
        metrics = {
            'loss': 0.,
            'accuracy': 0.,
            'precision': 0.,
            'recall': 0.,
            'f1': 0.
        }
        
        n = len(loader)
        for sample in loader:
            # make predictions
            x = sample['input']
            y = sample['output'].squeeze()
            pred = MODEL(x).squeeze()
            # compute losses
            l = loss(pred,y)
            # apply backprop
            if iteration_type == 'train':
                OPTIMIZER.zero_grad()
                l.backward()
                OPTIMIZER.step()
                
            acc, prec, rec, f1 = compute_metrics(torch.sigmoid(pred)>0.5,y)
            metrics['loss'] += l.item()/n
            metrics['accuracy'] += acc/n
            metrics['precision'] += prec/n
            metrics['recall'] += rec/n
            metrics['f1'] += f1/n
    return metrics

In [9]:
config = {
    "EPOCHS" : 1000,
    "BATCH_SIZE" : 1,
    "LEARNING_RATE" : 3e-4,
    "NUM_WORKERS" : 2,
    "PIN_MEMORY" : True,
    "MODEL_HIDDEN_SIZE_IN" : 128,
    "MODEL_HIDDEN_SIZE_OUT" : 128,
    "MODEL_NUM_LAYERS" : 2,
    "WEIGHT_DECAY" : 0.,
    "SCHEDULER_GAMMA" : 1.,
    "SEED" : 23421467
}
log = True
if log:
    run = wandb.init(project="r-drawing",config=config)

wandb: Currently logged in as: lmagne (use `wandb login --relogin` to force relogin)


In [10]:
# %%wandb

torch.manual_seed(config["SEED"])
np.random.seed(config["SEED"])

dataset = DrawingsDS()
train_set, test_set = torch.utils.data.random_split(dataset,(35,5))

MODEL = LSTM(
    hidden_size_in = config["MODEL_HIDDEN_SIZE_IN"],
    hidden_size_out = config["MODEL_HIDDEN_SIZE_OUT"],
    num_layers = config["MODEL_NUM_LAYERS"]
).double()

LOSS = torch.nn.BCEWithLogitsLoss(pos_weight = torch.tensor([1./dataset.y_mean]))

W_LOSS = weibull_loss

OPTIMIZER = torch.optim.Adam(
    MODEL.parameters(),
    lr = config["LEARNING_RATE"],
    weight_decay = config["WEIGHT_DECAY"]
)

'''
OPTIMIZER = torch.optim.SGD(
    MODEL.parameters(),
    lr = config["LEARNING_RATE"],
    momentum = 0.9
)
'''

'''
SCHEDULER = torch.optim.lr_scheduler.StepLR(
    OPTIMIZER,
    step_size = 100,
    gamma = config["SCHEDULER_GAMMA"]
)
'''
SCHEDULER = torch.optim.lr_scheduler.MultiStepLR(
    OPTIMIZER,
    milestones=[250,500,750,1000],
    gamma=0.1
)

train_loader = torch.utils.data.DataLoader(
    train_set,
    batch_size = config["BATCH_SIZE"],
    num_workers = config["NUM_WORKERS"],
    pin_memory = config["PIN_MEMORY"],
    shuffle = True
)

test_loader = torch.utils.data.DataLoader(
    test_set,
    batch_size = config["BATCH_SIZE"],
    num_workers = config["NUM_WORKERS"],
    pin_memory = config["PIN_MEMORY"],
    shuffle = True
)

train_loss = []
test_loss = []
train_acc = []
test_acc = []

if log:
    wandb.watch(MODEL)

for k in tqdm(range(config["EPOCHS"])):
    train_metrics = epoch(train_loader,OPTIMIZER,MODEL,LOSS,'train')
    test_metrics = epoch(test_loader,OPTIMIZER,MODEL,LOSS,'test')
    if log:
        wandb.log({
            "loss_train" : train_metrics["loss"],
            "loss_test" : test_metrics["loss"],
            "accuracy_train" : train_metrics["accuracy"],
            "accuracy_test" : test_metrics["accuracy"],
            "precision_train" : train_metrics["precision"],
            "precision_test" : test_metrics["precision"],
            "recall_train" : train_metrics["recall"],
            "recall_test" : test_metrics["recall"],
            "f1_train" : train_metrics["f1"],
            "f1_test" : test_metrics["f1"]
        })
    SCHEDULER.step()

if log:
    run.finish()

  return torch._C._cuda_getDeviceCount() > 0
  4%|▍         | 39/1000 [06:50<2:48:31, 10.52s/it]


KeyboardInterrupt: 

In [None]:
sample = test_set[1]
print(sample['name'])
x = torch.tensor(sample['input']).unsqueeze(0)
pred = (torch.sigmoid(MODEL(x)) > 0.5).detach().numpy()
visualize(sample,pred)
visualize(sample,sample['output'])