# max_len per stroke
training set (0+1) | validation set (2) | test set (3)
--|--|--
289|233|218

trim data at 288 because 289 is only divisible by 17 which is a primary number so it's not convenient in regards to pooling

In [None]:
#smart manuscript
from smartmanuscript.corpus_iam import _import_set

#visualization
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
sns.set_style("darkgrid", {'axes.grid' : False})#for prettier plots
#math tools
import numpy as np
#preprocessing
from sklearn.preprocessing import scale
#machine learning
import torch
from sklearn.model_selection import StratifiedKFold
USE_CUDA = torch.cuda.is_available()
device = torch.device("cuda" if USE_CUDA else "cpu")
print("using",device,"device")
#io
from os.path import join
from os.path import exists
from os import makedirs
from os import listdir
import pickle
import sys
sys.path.append("..")
#utils
from time import time
import warnings
#custom
from parkinson_detection.modules.utils import *

# Data loading

In [None]:
set_numbers=[0,1,2,3,4]
set_number=set_numbers[2]
assert set_number in set_numbers
iamondo_path=join("data","IAMonDo-db-1.0")
max_len=288
measure2index={"x-coordinate":0,"y-coordinate":1}
index2measure=list(measure2index.keys())

In [None]:
if False:    
    if False:
        words, lines = _import_set(iamondo_path, "{}.set".format(set_number), max_files=None)

        data=np.asarray([stroke for word in words for stroke in  word[1]])
    else:
        data=np.load(join("data","3.set.npy"))
        print(data.shape)
    tmp=[]
    for stroke in data:
        stroke=scale(stroke,axis=0)
        if len(stroke) > max_len:
            stroke=stroke[:max_len]
        else:
            stroke=np.concatenate((stroke,np.zeros(shape=(max_len-len(stroke),stroke.shape[1]))))#zero-padding
        tmp.append(stroke)
    data=np.asarray(tmp)
    print(data.shape)
    np.save("data/padded_3.set.npy",data)
else:
    train_set=np.load(join("data","padded_0_1.set.npy"))
    test_set=np.load(join("data","padded_3.set.npy"))
    train_set=train_set[:,:288]
    test_set=test_set[:,:288]
    print(train_set.shape)
    print(test_set.shape)

#  Model

https://pytorch.org/docs/stable/nn.html#convtranspose1d

In [None]:
import torch

class CNNAutoencoder(torch.nn.Module):
    def __init__(self,input_size,hidden_size,conv_kernel,pool_kernel ,padding,
                 stride=1,dilation=1, dropout=0.0):
        super(CNNAutoencoder, self).__init__()

        self.num_layers=len(hidden_size) 
        
        #encoder
        self.conv1=torch.nn.utils.weight_norm(
            torch.nn.Conv1d(input_size,hidden_size[0],conv_kernel[0],stride=1,padding=padding[0],dilation=dilation[0]))
        self.relu1=torch.nn.ReLU()
        self.pool1=torch.nn.MaxPool1d(pool_kernel[0],pool_kernel[0],padding=0,dilation=1)
        if self.num_layers > 1:
            self.drop1=torch.nn.Dropout(dropout)
            self.conv2=torch.nn.utils.weight_norm(
                torch.nn.Conv1d(hidden_size[0],hidden_size[1],conv_kernel[1],stride=1,padding=padding[1],dilation=dilation[1]))
            self.relu2=torch.nn.ReLU()
            self.pool2=torch.nn.MaxPool1d(pool_kernel[1],pool_kernel[1],padding=0,dilation=1)
        self.drop2=torch.nn.Dropout(dropout)
        
        #decoder        
        if self.num_layers > 1:
            self.d_drop2=torch.nn.Dropout(dropout)
            self.d_conv2=torch.nn.utils.weight_norm(
                torch.nn.ConvTranspose1d(hidden_size[1],hidden_size[0],conv_kernel[1],stride=pool_kernel[1],
                                         padding=padding[1],dilation=dilation[1],output_padding=pool_kernel[1]-1))
            self.d_relu2=torch.nn.ReLU()
        self.d_drop1=torch.nn.Dropout(dropout)
        self.d_conv1=torch.nn.utils.weight_norm(
            torch.nn.ConvTranspose1d(hidden_size[0],input_size,conv_kernel[0],stride=pool_kernel[0],
                                     padding=padding[0],dilation=dilation[0],output_padding=pool_kernel[0]-1))
        #self.sigmoid=torch.nn.Sigmoid()
        
    def forward(self,subject):
        c1=self.conv1(subject)
        r1=self.relu1(c1)
        p1=self.pool1(r1)

        if self.num_layers > 1:
            drop1=self.drop1(p1)
            c2=self.conv2(drop1)
            r2=self.relu2(c2)
            p2=self.pool2(r2)
            drop2=self.drop2(p2)
            d_c2=self.d_conv2(drop2)
            d_r2=self.d_relu2(d_c2)
            d_drop2=self.d_drop2(d_r2)
        else:
            d_drop2=self.drop2(p1)
            
        d_c1=self.d_conv1(d_drop2)
        
        return d_c1#self.sigmoid(d_c1)


# Training def

In [None]:
def step(input, target, model, optimizer, loss_fn, batch_size,validation = False, device="cuda"):
    if not validation:
        # Zero gradients
        optimizer.zero_grad()

    # Set device options
    target=target.to(device)
    #forward pass
    output=model(input)

    # Compute loss
    loss = loss_fn(output, target)
    if not validation:
        # Perform backpropagation
        loss.backward()
        # Adjust model weights
        optimizer.step()
    return loss.item(), output.squeeze().cpu().detach().numpy()

#  Hyperparameters

In [None]:
dropout=0.0
hidden_size=[16,32]  
conv_kernel= [4,4]
pool_kernel=[2,2]
dilation= [2,2]
stride='redef as kernel_size'
output_size="foo"

padding=[]
for d,k in list(zip(dilation,conv_kernel)):
    padding.append(d*(k-1)//2)

input_size=2
batch_size=128
loss_fn=torch.nn.MSELoss()

learning_rate=1e-2

In [None]:
model=CNNAutoencoder(input_size,hidden_size,conv_kernel,pool_kernel ,padding,stride,dilation, dropout)

model=model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
count_params(model)

In [None]:
subject=torch.Tensor(train_set[0].copy()).unsqueeze(0).transpose(1,2).to(device)

In [None]:
subject.shape

In [None]:
padding

In [None]:
model

In [None]:
loss,output=step(subject, subject, model, optimizer, loss_fn, 1)
print(loss)

In [None]:
def plot_measures(task,style="-",index2measure=index2measure):
    plt.figure(figsize=(16,12))
    for i,measure in enumerate(index2measure):
        plt.subplot(3,3,i+1)
        plt.plot(task[:,i],style)
        plt.xlabel("timesteps")
        plt.ylabel(measure)

In [None]:
plot_measures(train_set[0][:50],".",index2measure)

In [None]:
plot_measures(output.T[:50]
              ,".",index2measure)

In [None]:
plot_task(train_set[0],".",measure2index)

In [None]:
plot_task(output.T,".",measure2index)

In [None]:
model.conv1

In [None]:
c1=model.conv1(subject)
r1=model.relu1(c1)
p1=model.pool1(r1)

if model.num_layers > 1:
    drop1=model.drop1(p1)
    c2=model.conv2(drop1)
    r2=model.relu2(c2)
    p2=model.pool2(r2)
    drop2=model.drop2(p2)
    d_c2=model.d_conv2(drop2)
    d_r2=model.d_relu2(d_c2)
    d_drop2=model.d_drop2(d_r2)
else:
    d_drop2=model.drop2(p1)

d_c1=model.d_conv1(d_drop2)

In [None]:
class autoencoder(nn.Module):
    def __init__(self):
        super(autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, 3, stride=3, padding=1),  # b, 16, 10, 10
            nn.ReLU(True),
            nn.MaxPool2d(2, stride=2),  # b, 16, 5, 5
            nn.Conv2d(16, 8, 3, stride=2, padding=1),  # b, 8, 3, 3
            nn.ReLU(True),
            nn.MaxPool2d(2, stride=1)  # b, 8, 2, 2
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(8, 16, 3, stride=2),  # b, 16, 5, 5
            nn.ReLU(True),
            nn.ConvTranspose2d(16, 8, 5, stride=3, padding=1),  # b, 8, 15, 15
            nn.ReLU(True),
            nn.ConvTranspose2d(8, 1, 2, stride=2, padding=1),  # b, 1, 28, 28
            nn.Tanh()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
return x

In [None]:
c1.shape

In [None]:
p1.shape

In [None]:
subject.shape

In [None]:
pool_kernel

In [None]:
p2.shape

In [None]:
d_c2.shape

In [None]:
d_c1.shape

In [None]:
p1.shape

In [None]:
289/17

In [None]:
model

In [None]:
plot_task(train_set[0],measure2index)

In [None]:
289/72