RNNs are used over point sequence to predict whether the user wanted to have the pencil up or pencil down

In [1]:
import torch
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import wandb
import cv2
from os import listdir
from contextlib import ExitStack
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

In [2]:
print(torch.__version__)

1.9.0+cu102


# Dataset

In [3]:
class DrawingsDS(torch.utils.data.Dataset):
    def __init__(self, folder="../../data/processed_labeled/"):
        self.folder = folder
        self.ds = []
        self.n = 6
        for f in listdir(self.folder):
            self.ds.append(self.load_file(f))
        
        self.compute_mean()
        self.compute_std()
        
        for f in self.ds:
            f['input'] = (f['input'] - self.mean)/self.std
        
    def compute_mean(self):
        self.mean = np.zeros(self.n)
        tot = 0
        self.y_mean = 0
        for f in self.ds:
            x = f['input']
            self.mean += np.sum(x,axis=0)
            self.y_mean += np.sum(f['output'])
            tot += x.shape[0]
        self.mean /= tot
        self.y_mean /= tot
        
        
    def compute_std(self):
        variance = np.zeros(self.n)
        tot = 0
        for f in self.ds:
            x = f['input'] - self.mean
            x = np.square(x)
            variance += np.sum(x,axis=0)
            tot += x.shape[0]
        variance /= tot
        self.std = np.sqrt(variance)
        
    def load_file(self,f):
        df = pd.read_csv(self.folder+f,index_col=0)
        raw_pos = df[['x','y']].to_numpy().astype(np.int)
        raw_inputs = df[['vx','vy','v','ax','ay','a']].to_numpy().astype(np.double).copy()
        inputs = df[['vx','vy','v','ax','ay','a']].to_numpy().astype(np.double).copy()
        output = df[['label']].to_numpy().astype(np.double)
        distance = df[['dist']].to_numpy().astype(np.double)
        return {
            'raw_pos' : raw_pos,
            'raw_input' : raw_inputs,
            'input' : inputs,
            'output' : output,
            'name': f,
            'dist': distance
        }
    
    def __len__(self):
        return len(self.ds)
    
    def __getitem__(self, idx):
        return self.ds[idx]

In [4]:
dataset = DrawingsDS()
print(dataset.mean)
print(dataset.y_mean)
print(dataset.std)
print(dataset[0]['input'].shape)
print(dataset[0]['output'].shape)
n = len(dataset)
n_test = n//10
n_train = n - n_test
print(f'n_train : {n_train}, n_test : {n_test}')
train_set, test_set = torch.utils.data.random_split(dataset,(n_train,n_test))

[-0.24121178  0.24171754  9.58346554  0.01237983  0.09380086  6.99433956]
0.56051923751608
[12.08068013 10.86206077 13.12253769 11.87022658  9.88225856 13.77132816]
(262, 6)
(262, 1)
n_train : 216, n_test : 24


In [5]:
def visualize(sample,pred):
    sample_output = np.squeeze(pred)
    pts = sample['raw_pos'][sample_output == True]

    img = np.zeros((720,1280), dtype=np.uint8)
    img[pts.T[1],pts.T[0]]=255
    img = cv2.flip(img, 1)

    cv2.imshow('frame', img)
    key = cv2.waitKey(0)
    cv2.destroyAllWindows()

# Model

In [6]:
class AttnBlock(torch.nn.Module):
    def __init__(self,in_features,num_heads,out_features): 
        super(AttnBlock, self).__init__()
        self.attn = torch.nn.MultiheadAttention(
            embed_dim = in_features,
            num_heads = num_heads,
            batch_first = True
        )
        self.fwd = torch.nn.Sequential(
            torch.nn.Linear(in_features=in_features,out_features=out_features),
            torch.nn.ReLU()
        )
        self.relu = torch.nn.ReLU()
    def forward(self,x):
        return self.relu(self.fwd(self.attn(x,x,x)[0]))
    
class ConvBlock(torch.nn.Module):
    def __init__(self,in_features,out_features,kernel_size=7):
        super(ConvBlock, self).__init__()
        self.conv = torch.nn.Conv1d(
            in_channels=in_features,
            out_channels=out_features,
            kernel_size=kernel_size,
            padding=kernel_size//2
        )
    def forward(self,x):
        output = torch.transpose(x,1,2)
        output = self.conv(output)
        output = torch.transpose(output,1,2)
        return output

In [7]:
class FullyConvModel(torch.nn.Module):
    def __init__(
        self,
        input_size=6,
        output_size=1,
        conv_seq=[16,32,64,128,64,32,16]
    ):
        super(FullyConvModel, self).__init__()
    
        layers = [ConvBlock(input_size,conv_seq[0])]
        for k in range(len(conv_seq)-1):
            layers.append(ConvBlock(conv_seq[k],conv_seq[k+1]))
        self.layers = torch.nn.Sequential(*layers)
        self.fc1 = torch.nn.Linear(in_features=conv_seq[-1],out_features=conv_seq[-1]//2)
        self.fc2 = torch.nn.Linear(in_features=conv_seq[-1]//2,out_features=conv_seq[-1]//2)
        self.fc3 = torch.nn.Linear(in_features=conv_seq[-1]//2,out_features=output_size)
        self.relu = torch.nn.ReLU()
        
    def forward(self,x):
        output = self.layers(x)
        output = self.fc3(self.relu(self.fc2(self.relu(self.fc1(output)))))
        return output

In [8]:
class ConvRecurrentModel(torch.nn.Module):
    def __init__(
        self,
        input_size=6,
        output_size=1,
        hidden_size_in=64,
        hidden_size_out=32,
        num_layers=2,
        model=torch.nn.GRU,
        dropout=0.2,
        bidirectional=True
    ):
        super(ConvRecurrentModel, self).__init__()
        self.rnn = model(
            input_size = hidden_size_in,
            hidden_size = hidden_size_out,
            num_layers = num_layers,
            dropout = dropout,
            bidirectional = bidirectional
        )
        self.before = torch.nn.Sequential(
            torch.nn.Conv1d(in_channels=input_size,out_channels=hidden_size_in//2,kernel_size=5,padding=2),
            torch.nn.ReLU(),
            torch.nn.Conv1d(in_channels=hidden_size_in//2,out_channels=hidden_size_in//2,kernel_size=5,padding=2),
            torch.nn.ReLU(),
            torch.nn.Conv1d(in_channels=hidden_size_in//2,out_channels=hidden_size_in,kernel_size=5,padding=2)
        )
        self.after = torch.nn.Sequential(
            torch.nn.Conv1d(in_channels=2*hidden_size_out,out_channels=hidden_size_out,kernel_size=5,padding=2),
            torch.nn.ReLU(),
            torch.nn.Conv1d(in_channels=hidden_size_out,out_channels=hidden_size_out,kernel_size=5,padding=2),
            torch.nn.ReLU(),
            torch.nn.Conv1d(in_channels=hidden_size_out,out_channels=hidden_size_out//2,kernel_size=5,padding=2)
        )
        self.fc = torch.nn.Linear(in_features=hidden_size_out//2,out_features=output_size)
        
    def forward(self,x):
        x = torch.transpose(x,1,2)
        output = self.before(x)
        
        output = torch.transpose(output,2,1)
        output = torch.transpose(output,0,1)
        output, _ = self.rnn(output)
        output = torch.transpose(output,0,1)
        
        output = torch.transpose(output,2,1)
        output = self.after(output)
        
        output = torch.transpose(output,2,1)
        output = self.fc(output)
        return output

In [9]:
"""
The full model to be exported into ONNX. 
This model contains input normalization + sigmoid function at the end.
"""
class StandaloneModel(torch.nn.Module):
    def __init__(self, trained_model, mean, std):
        super(StandaloneModel,self).__init__()
        self.model = trained_model
        self.mean = torch.tensor(mean,dtype=torch.float32)
        self.std = torch.tensor(std,dtype=torch.float32)
        self.sigmoid = torch.nn.Sigmoid()
        
    def forward(self,x):
        output = torch.unsqueeze(x,0)
        output = (output-self.mean)/self.std
        output = self.model(output)
        return torch.squeeze(self.sigmoid(output))

In [10]:
model1 = ConvRecurrentModel(dataset.n).double()
sample = torch.tensor(train_set[0]['input']).unsqueeze(0)
print(sample.size())
print(model1(sample).size())

torch.Size([1, 418, 6])
torch.Size([1, 418, 1])


In [11]:
model2 = FullyConvModel(dataset.n).double()
sample = torch.tensor(train_set[0]['input']).unsqueeze(0)
print(sample.size())
print(model2(sample).size())

torch.Size([1, 418, 6])
torch.Size([1, 418, 1])


In [12]:
model3 = StandaloneModel(model1,dataset.mean,dataset.std)
sample = torch.tensor(train_set[0]['raw_input'])
print(sample.size())
print(model3(sample).size())

torch.Size([418, 6])
torch.Size([418])


# Training

In [13]:
def compute_metrics(pred,y):
    pred_np = pred.squeeze().detach().numpy()
    y_np = y.squeeze().detach().numpy()
    accuracy = accuracy_score(y_np,pred_np)
    precision = precision_score(y_np,pred_np,zero_division=0)
    recall = recall_score(y_np,pred_np,zero_division=0)
    f1 = f1_score(y_np,pred_np,zero_division=0)
    return accuracy, precision, recall, f1

def save_model(model,name,acc):
    acc = int(10000*acc)/100
    f_name = f"{int(name['MODEL_HIDDEN_SIZE_IN'])}_{int(name['MODEL_HIDDEN_SIZE_OUT'])}_{int(name['MODEL_NUM_LAYERS'])}_{acc}.pt"
    torch.save(model.state_dict(),"../../models/"+f_name)
    

def epoch(loader,optimizer,model,loss1,loss2,iteration_type='train',gradient_clipping=1.):
    if iteration_type == 'train':
        MODEL.train()
    if iteration_type == 'test':
        MODEL.eval()

    with ExitStack() as stack:
        if iteration_type == 'test':
            gs = stack.enter_context(torch.no_grad())
        
        metrics = {
            'loss': 0.,
            'accuracy': 0.,
            'precision': 0.,
            'recall': 0.,
            'f1': 0.
        }
        
        n = len(loader)
        for sample in loader:
            # make predictions
            x = sample['input']
            y = sample['output'].squeeze()
            dist = sample['dist'].squeeze()
            augment = np.random.uniform(0.5,1.5) if iteration_type=='train' else 1.
            pred = MODEL(augment*x).squeeze()
            # compute losses
            l = loss1(pred,y)
            # apply backprop
            if iteration_type == 'train':
                OPTIMIZER.zero_grad()
                l.backward()
                '''torch.nn.utils.clip_grad_norm_(
                    parameters = model.parameters(),
                    max_norm = gradient_clipping
                )'''
                OPTIMIZER.step()
                
            acc, prec, rec, f1 = compute_metrics(torch.sigmoid(pred)>0.5,y)
            metrics['loss'] += l.item()/n
            metrics['accuracy'] += acc/n
            metrics['precision'] += prec/n
            metrics['recall'] += rec/n
            metrics['f1'] += f1/n
    return metrics

class DiceLoss(torch.nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):
        
        #comment out if your model contains a sigmoid or equivalent activation layer
        inputs = torch.nn.functional.sigmoid(inputs)       
        
        #flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)
        
        intersection = (inputs * targets).sum()                            
        dice = (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)  
        
        return 1 - dice


In [14]:
config = {
    "EPOCHS" : 1000,
    "BATCH_SIZE" : 1,
    "LEARNING_RATE" : 3e-4,
    "NUM_WORKERS" : 2,
    "PIN_MEMORY" : True,
    "MODEL_HIDDEN_SIZE_IN" : 32,
    "MODEL_HIDDEN_SIZE_OUT" : 16,
    "MODEL_NUM_LAYERS" : 1,
    "WEIGHT_DECAY" : 0.,
    "SCHEDULER_GAMMA" : 0.1,
    "SEED" : 179428,
    "DROPOUT" : 0.2,
    "GRADIENT_CLIPPING" : None,
    "BIDIRECTIONAL" : True
}
log = True
if log:
    run = wandb.init(project="r-drawing",config=config)

wandb: Currently logged in as: lmagne (use `wandb login --relogin` to force relogin)


In [15]:
torch.manual_seed(config["SEED"])
np.random.seed(config["SEED"])

dataset = DrawingsDS()
n = len(dataset)
n_test = n//10
n_train = n - n_test
print(f'n_train : {n_train}, n_test : {n_test}')
train_set, test_set = torch.utils.data.random_split(dataset,(n_train,n_test))

MODEL = ConvRecurrentModel(
    input_size = dataset.n,
    hidden_size_in = config["MODEL_HIDDEN_SIZE_IN"],
    hidden_size_out = config["MODEL_HIDDEN_SIZE_OUT"],
    num_layers = config["MODEL_NUM_LAYERS"],
    dropout = config["DROPOUT"],
    model=torch.nn.LSTM
).double()

LOSS1 = torch.nn.BCEWithLogitsLoss()
LOSS2 = DiceLoss()

OPTIMIZER = torch.optim.Adam(
    MODEL.parameters(),
    lr = config["LEARNING_RATE"],
    weight_decay = config["WEIGHT_DECAY"]
)

"""
OPTIMIZER = torch.optim.SGD(
    MODEL.parameters(),
    lr = config["LEARNING_RATE"],
    momentum = 0.9,
    nesterov = True
)
"""

SCHEDULER = torch.optim.lr_scheduler.StepLR(
    OPTIMIZER,
    step_size = 500,
    gamma = config["SCHEDULER_GAMMA"]
)


train_loader = torch.utils.data.DataLoader(
    train_set,
    batch_size = config["BATCH_SIZE"],
    num_workers = config["NUM_WORKERS"],
    pin_memory = config["PIN_MEMORY"],
    shuffle = True
)

test_loader = torch.utils.data.DataLoader(
    test_set,
    batch_size = config["BATCH_SIZE"],
    num_workers = config["NUM_WORKERS"],
    pin_memory = config["PIN_MEMORY"],
    shuffle = True
)

if log:
    wandb.watch(MODEL)
    
best_so_far = 0.
for k in tqdm(range(config["EPOCHS"])):
    train_metrics = epoch(train_loader,OPTIMIZER,MODEL,LOSS1,LOSS2,'train',config["GRADIENT_CLIPPING"])
    test_metrics = epoch(test_loader,OPTIMIZER,MODEL,LOSS1,LOSS2,'test')
    if log:
        wandb.log({
            "loss_train" : train_metrics["loss"],
            "loss_test" : test_metrics["loss"],
            "accuracy_train" : train_metrics["accuracy"],
            "accuracy_test" : test_metrics["accuracy"],
            "precision_train" : train_metrics["precision"],
            "precision_test" : test_metrics["precision"],
            "recall_train" : train_metrics["recall"],
            "recall_test" : test_metrics["recall"],
            "f1_train" : train_metrics["f1"],
            "f1_test" : test_metrics["f1"]
        })
    if test_metrics['accuracy'] > best_so_far:
        save_model(MODEL,config,test_metrics['accuracy'])
        best_so_far = test_metrics['accuracy']
    SCHEDULER.step()

if log:
    run.finish()

  0%|          | 0/1000 [00:00<?, ?it/s]

n_train : 216, n_test : 24


  return torch._C._cuda_getDeviceCount() > 0
 40%|████      | 403/1000 [3:09:23<4:40:34, 28.20s/it]


KeyboardInterrupt: 

In [16]:
for k in range(20):
    sample = test_set[k]
    print(sample['name'])
    x = torch.tensor(sample['input']).unsqueeze(0)
    pred = (torch.sigmoid(MODEL(x)) > 0.5).detach().numpy()
    visualize(sample,pred)
    visualize(sample,sample['output'])

3_sample.csv
1_flower.csv
1_issouent.csv
4_airplane.csv
4_crayon.csv
3_cat.csv
1_lantern.csv
4_leagueoflegends.csv
3_magicien.csv
1_prenom.csv
4_uwuhearth.csv
3_lantern.csv
4_kayak.csv
3_kayak.csv
3_face.csv
1_guitar.csv
1_livre.csv
4_salut.csv
2_uwuhearth.csv
4_flower.csv


In [None]:
def safety_check():
    MODEL = ConvRecurrentModel(
        input_size = dataset.n,
        hidden_size_in = 32,
        hidden_size_out = 16,
        num_layers = 1,
        dropout = 0.,
        model=torch.nn.LSTM
    )
    MODEL.load_state_dict(torch.load(f"../../models/32_16_1_83.85.pt"))
    MODEL.eval()
    exportable_model = StandaloneModel(MODEL,dataset.mean,dataset.std)
    exportable_model.eval()
    sample = test_set[0]
    x1 = torch.tensor(sample['input'],dtype=torch.float32).unsqueeze(0)
    x2 = torch.tensor(sample['raw_input'],dtype=torch.float32)
    y1 = torch.sigmoid(MODEL(x1)).squeeze()
    y2 = exportable_model(x2)
    assert np.all(np.isclose(y1.detach().numpy(),y2.detach().numpy()))
    assert np.all(np.isclose(y2.detach().numpy(),y1.detach().numpy()))
safety_check()

In [17]:
def toONNX():
    MODEL = ConvRecurrentModel(
        input_size = dataset.n,
        hidden_size_in = 32,
        hidden_size_out = 16,
        num_layers = 1,
        dropout = 0.,
        model=torch.nn.LSTM
    )
    MODEL.load_state_dict(torch.load(f"../../models/32_16_1_91.48.pt"))
    MODEL.eval()
    exportable_model = StandaloneModel(MODEL,dataset.mean,dataset.std)
    exportable_model.eval()
    x = torch.randn(500, 6, requires_grad=True,dtype=torch.float)
    y = exportable_model(x)
    torch.onnx.export(exportable_model,               # model being run
                      x,                         # model input (or a tuple for multiple inputs)
                      f"../../models/lstm_X_91_48.onnx",   # where to save the model (can be a file or file-like object)
                      input_names = ['input'],   # the model's input names
                      output_names = ['output'], # the model's output names
                      dynamic_axes={'input' : {0 : 'seq_len'},    # variable length axes
                                    'output' : {0 : 'seq_len'}},
                      verbose=True)

In [18]:
toONNX()

graph(%input : Float(*, 6, strides=[6, 1], requires_grad=1, device=cpu),
      %model.before.0.weight : Float(16, 6, 5, strides=[30, 5, 1], requires_grad=1, device=cpu),
      %model.before.0.bias : Float(16, strides=[1], requires_grad=1, device=cpu),
      %model.before.2.weight : Float(16, 16, 5, strides=[80, 5, 1], requires_grad=1, device=cpu),
      %model.before.2.bias : Float(16, strides=[1], requires_grad=1, device=cpu),
      %model.before.4.weight : Float(32, 16, 5, strides=[80, 5, 1], requires_grad=1, device=cpu),
      %model.before.4.bias : Float(32, strides=[1], requires_grad=1, device=cpu),
      %model.after.0.weight : Float(16, 32, 5, strides=[160, 5, 1], requires_grad=1, device=cpu),
      %model.after.0.bias : Float(16, strides=[1], requires_grad=1, device=cpu),
      %model.after.2.weight : Float(16, 16, 5, strides=[80, 5, 1], requires_grad=1, device=cpu),
      %model.after.2.bias : Float(16, strides=[1], requires_grad=1, device=cpu),
      %model.after.4.weight : F

