<center> 
    <h1>Mini-project1</h1>
    <h2>Predict the laterality of upcoming finger movements</h2>
</center>

#### Imports

In [None]:
import math
import numpy as np
import matplotlib.pylab as plt
import dlc_bci as bci
from types import SimpleNamespace 

from sklearn.model_selection import cross_validate
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

# baselines
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

import torch 
from torch.nn import Conv1d, Conv2d, Linear, Module, CrossEntropyLoss
from torch import Tensor
from torch.autograd import Variable
from torch.optim import Adam
import torch.nn.functional as F

from helpers import *
from modelWrapper import *

%matplotlib inline
%load_ext autoreload
%autoreload 2

## Download/load the dataset

In [None]:
one_khz=False

train = SimpleNamespace()
train.X, train.y = bci.load(root='./data_bci', one_khz=one_khz)
print(str(type(train.X)), train.X.size())
print(str(type(train.y)), train.y.size())

test = SimpleNamespace()
test.X, test.y = bci.load(root='./data_bci', train=False, one_khz=one_khz)
print(str(type(test.X)), test.X.size())
print(str(type(test.y)), test.y.size())

In [None]:
i = 1
plt.imshow(X_tr[i])
plt.show()

## Baselines:

### Random Forest

- Preprocess data

In [None]:
# flatten the X (no standardization for random forest) 
X_tr, y_tr = train.X.view(train.X.shape[0], -1).clone().numpy(), train.y.numpy() 
X_te, y_te = test.X.view(test.X.shape[0], -1).clone().numpy(), test.y.numpy() 
X_tr.shape, y_tr.shape, X_te.shape, y_te.shape

- Tune and compute test accuracy.

In [None]:
depths = np.arange(5, 100, 10) # grid search on a parameter of the model

# here we store all the scores obtained with the different depths
randForest = {
    "tr_scores": [],
    "va_scores": []
}

for depth in depths:
    result = cross_validate(
        RandomForestClassifier(n_estimators=100, max_depth=depth, n_jobs=-1, random_state=1), 
        X_tr, y_tr, cv=5, return_train_score=True)
    
    randForest["tr_scores"].append(np.mean(result["train_score"]))
    randForest["va_scores"].append(np.mean(result["test_score"]))
    
plot_scores(depths, "depth", randForest["tr_scores"], randForest["va_scores"], ylog_scale=False)

best_depth = depths[np.argmax(randForest["va_scores"])]
print('Best depth:', best_depth)
print('Test score:',
      RandomForestClassifier(n_estimators=100, max_depth=depth, n_jobs=-1, random_state=1)
      .fit(X_tr, y_tr)
      .score(X_te, y_te))

### Logistic regression

- Preprocess data

In [None]:
# flatten and normalize the X
X_tr, y_tr = train.X.view(train.X.shape[0], -1).clone().numpy(), train.y.numpy() 
X_te, y_te = test.X.view(test.X.shape[0], -1).clone().numpy(), test.y.numpy() 

scaler = StandardScaler()
scaler.fit(X_tr)
X_tr = scaler.transform(X_tr)
X_te = scaler.transform(X_te)
X_tr.shape, y_tr.shape, X_te.shape, y_te.shape

- Tune and compute test accuracy.

In [None]:
lambdas = np.logspace(-6, 6, 10) # grid search on a parameter of the model

# here we store all the scores obtained with the different lambdas
logreg = {
    "tr_scores": [],
    "va_scores": []
}

# tune the model on the train set 
for lambda_ in lambdas:
    result = cross_validate(LogisticRegression(C=lambda_), X_tr, y_tr, cv=5, return_train_score=True)
    
    logreg["tr_scores"].append(np.mean(result["train_score"]))
    logreg["va_scores"].append(np.mean(result["test_score"]))
    
plot_scores(lambdas, "1/lambda", logreg["tr_scores"], logreg["va_scores"], ylog_scale=True)

# select the best lambdas and estimate the accuracy on the test set
best_lambda = lambdas[np.argmax(logreg["va_scores"])]
print('Best lambda:', best_lambda)
print('Test score:', 
      LogisticRegression(C=best_lambda)
      .fit(X_tr, y_tr)
      .score(X_te, y_te))

## Support vector machine

- Preprocess data

In [None]:
# flatten and normalize the X
X_tr, y_tr = train.X.view(train.X.shape[0], -1).clone().numpy(), train.y.numpy() 
X_te, y_te = test.X.view(test.X.shape[0], -1).clone().numpy(), test.y.numpy() 

scaler = StandardScaler()
scaler.fit(X_tr)
X_tr = scaler.transform(X_tr)
X_te = scaler.transform(X_te)
X_tr.shape, y_tr.shape, X_te.shape, y_te.shape

- Tune and compute test accuracy.

In [None]:
lambdas = np.logspace(-6, 6, 10) # grid search on a parameter of the model

# here we store all the scores obtained with the different lambdas
logreg = {
    "tr_scores": [],
    "va_scores": []
}

# tune the model on the train set 
for lambda_ in lambdas:
    result = cross_validate(SVC(C=lambda_), X_tr, y_tr, cv=5, return_train_score=True)
    
    logreg["tr_scores"].append(np.mean(result["train_score"]))
    logreg["va_scores"].append(np.mean(result["test_score"]))
    
plot_scores(lambdas, "1/lambda", logreg["tr_scores"], logreg["va_scores"], ylog_scale=True)

# select the best lambdas and estimate the accuracy on the test set
best_lambda = lambdas[np.argmax(logreg["va_scores"])]
print('Best lambda:', best_lambda)
print('Test score:', 
      SVC(C=best_lambda)
      .fit(X_tr, y_tr)
      .score(X_te, y_te))

### LinearDiscriminantAnalysis

- Preprocess data

In [None]:
# flatten and normalize the X
X_tr, y_tr = train.X.view(train.X.shape[0], -1).clone().numpy(), train.y.numpy() 
X_te, y_te = test.X.view(test.X.shape[0], -1).clone().numpy(), test.y.numpy() 

scaler = StandardScaler()
scaler.fit(X_tr)
X_tr = scaler.transform(X_tr)
X_te = scaler.transform(X_te)
X_tr.shape, y_tr.shape, X_te.shape, y_te.shape

- Tune and compute test accuracy.

In [None]:
tols = np.logspace(-6, 1, 10) # grid search on a parameter of the model

# here we store all the scores obtained with the different lambdas
logreg = {
    "tr_scores": [],
    "va_scores": []
}

# tune the model on the train set 
for tol in tols:
    result = cross_validate(LinearDiscriminantAnalysis(tol=tol), X_tr, y_tr, cv=5, return_train_score=True)
    
    logreg["tr_scores"].append(np.mean(result["train_score"]))
    logreg["va_scores"].append(np.mean(result["test_score"]))
    
plot_scores(tols, "tol", logreg["tr_scores"], logreg["va_scores"], ylog_scale=True)

# select the best lambdas and estimate the accuracy on the test set
best_tol = tols[np.argmax(logreg["va_scores"])]
print('Best tol:', best_tol)
print('Test score:', 
      LinearDiscriminantAnalysis(tol=best_tol)
      .fit(X_tr, y_tr)
      .score(X_te, y_te))

### K-Nearest Neighbors

- Preprocess data

In [None]:
# flatten, normalize the X and apply PCA to reduce dimensions (reduce curse of dimensionality effect)
X_tr, y_tr = train.X.view(train.X.shape[0], -1).clone().numpy(), train.y.numpy() 
X_te, y_te = test.X.view(test.X.shape[0], -1).clone().numpy(), test.y.numpy() 

scaler = StandardScaler()
scaler.fit(X_tr)
X_tr_scaled = scaler.transform(X_tr)
X_te_scaled = scaler.transform(X_te)

pca = PCA(n_components=0.95)
pca.fit(X_tr_scaled)
X_tr = pca.transform(X_tr)
X_te = pca.transform(X_te)

X_tr.shape, y_tr.shape, X_te.shape, y_te.shape

- Tune and compute test accuracy.

In [None]:
Ks = np.arange(1, 10, 1) # grid search on a parameter of the model

# here we store all the scores obtained with the different number of neighbors
nearestNeig = {
    "tr_scores": [],
    "va_scores": []
}

for k in Ks:
    result = cross_validate(
        KNeighborsClassifier(n_neighbors=k), 
        X_tr, y_tr, cv=5, return_train_score=True)
    
    nearestNeig["tr_scores"].append(np.mean(result["train_score"]))
    nearestNeig["va_scores"].append(np.mean(result["test_score"]))
    
plot_scores(Ks, "# Neighbors", nearestNeig["tr_scores"], nearestNeig["va_scores"], ylog_scale=False)

best_k = Ks[np.argmax(nearestNeig["va_scores"])]
print('Best k:', best_k)
print('Test score:', 
      KNeighborsClassifier(n_neighbors=k)
      .fit(X_tr, y_tr)
      .score(X_te, y_te))

## Deep networks

### CNN: 2D convolutional layers

In [None]:
# add a channel to store the pixels image (so to apply the 2D convolutional layer)
X_tr, y_tr = Variable(train.X.clone().unsqueeze(1)), Variable(train.y)
X_te, y_te = Variable(test.X.clone().unsqueeze(1)), Variable(test.y)

X_tr.shape, y_tr.shape, X_te.shape, y_te.shape

In [None]:
# define the network with two convolutional layers
class CNN1(Module, modelWrapper):
    def __init__(self, nb_hidden):
        self.nb_hidden = nb_hidden
        super(CNN1, self).__init__()
        
        self.conv1 = Conv2d(1, 32, kernel_size=(3, 7), padding=(1, 3)) 
        self.conv2 = Conv2d(32, 64, kernel_size=5, padding=2)
        self.conv3 = Conv2d(64, 32, kernel_size=5, padding=2)
#         self.conv4 = Conv2d(32, 16, kernel_size=5, padding=2)
        
        self.fc1 = Linear(384, nb_hidden)
        self.fc2 = Linear(nb_hidden, 2)
        
        self.criterion = CrossEntropyLoss()
        self.optimizer = Adam(self.parameters(), lr=0.001)

    def forward(self, x):
        # (1, 28, 50)
        x = F.leaky_relu(F.max_pool2d(self.conv1(x), 2))
        # (32, 14, 25)
        x = F.leaky_relu(F.max_pool2d(self.conv2(x), (2, 5)))
        # (64, 7, 5)
        x = F.leaky_relu(F.max_pool2d(self.conv3(x), 2, padding=(1, 1)))
        # (32, 4, 3)
        
        x = F.leaky_relu(self.fc1(x.view(-1, 384)))  
        # (1, nb_hidden)
        x = self.fc2(x)
        return x
    
    def reset(self):
        self.__init__(self.nb_hidden)

In [None]:
model = CNN1(100)

In [None]:
model.fit(X_tr, y_tr, X_test=X_te, y_test=y_te, epochs=10)

In [None]:
# model.predict(train.X.variable).shape, train.X.variable.shape
# model.score(train.X.variable, train.y.variable)

In [None]:
class CNN2(Module, modelWrapper):
    def __init__(self):
        super(CNN2, self).__init__()
        
        self.conv1_1 = Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv1_2 = Conv2d(32, 32, kernel_size=3, padding=1) 
        self.conv1_3 = Conv2d(32, 32, kernel_size=3, padding=1) 
        
        self.conv2_1 = Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv2_2 = Conv2d(64, 64, kernel_size=3, padding=1)
        self.conv2_3 = Conv2d(64, 64, kernel_size=3, padding=1)
        
        self.conv3_1 = Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv3_2 = Conv2d(128, 128, kernel_size=3, padding=1)
        self.conv3_3 = Conv2d(128, 128, kernel_size=3, padding=1)
        
        self.fc1 = Linear(1536, 200)
        self.fc2 = Linear(200, 2)
        
        self.criterion = CrossEntropyLoss()
        self.optimizer = Adam(self.parameters(), lr=0.001)

    def forward(self, x):
        # (1, 28, 50)
        x = F.relu(self.conv1_1(x))
        x = F.relu(self.conv1_2(x))
        x = F.relu(F.max_pool2d(self.conv1_3(x), 2))
        # (32, 14, 25)
        x = F.relu(self.conv2_1(x))
        x = F.relu(self.conv2_2(x))
        x = F.relu(F.max_pool2d(self.conv2_3(x), (2, 5)))
        # (64, 7, 5)
        x = F.relu(self.conv3_1(x))
        x = F.relu(self.conv3_2(x))
        x = F.relu(F.max_pool2d(self.conv3_3(x), 2, padding=(1, 1)))
        # (128, 4, 3)
        
        x = F.relu(self.fc1(x.view(-1, 1536)))  
        # (1, nb_hidden)
        x = self.fc2(x)
        return x
    
    def reset(self):
        self.__init__(self.nb_hidden)

In [None]:
model = CNN2()
model.fit(X_tr, y_tr, X_test=X_te, y_test=y_te, epochs=5)

### Residual network with 2D convolutional layers + batch normalization 

In [None]:
# add a channel to store the pixels image (so to apply the 2D convolutional layer)
X_tr, y_tr = Variable(train.X.clone().unsqueeze(1)), Variable(train.y)
X_te, y_te = Variable(test.X.clone().unsqueeze(1)), Variable(test.y)

X_tr.shape, y_tr.shape, X_te.shape, y_te.shape

In [None]:
# resudial network
class BasicBlock(nn.Module):
    def __init__(self, in_planes, out_planes, stride, dropRate=0.0):
        super(BasicBlock, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.relu1 = nn.ReLU(inplace=True)
        self.conv1 = nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                               padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_planes)
        self.relu2 = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_planes, out_planes, kernel_size=3, stride=1,
                               padding=1, bias=False)
        self.droprate = dropRate
        self.equalInOut = (in_planes == out_planes)
        self.convShortcut = (not self.equalInOut) and nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride,
                               padding=0, bias=False) or None
    def forward(self, x):
        if not self.equalInOut:
            x = self.relu1(self.bn1(x))
        else:
            out = self.relu1(self.bn1(x))
        out = self.relu2(self.bn2(self.conv1(out if self.equalInOut else x)))
        if self.droprate > 0:
            out = F.dropout(out, p=self.droprate, training=self.training)
        out = self.conv2(out)
        return torch.add(x if self.equalInOut else self.convShortcut(x), out)
    
class NetworkBlock(nn.Module):
    def __init__(self, nb_layers, in_planes, out_planes, block, stride, dropRate=0.0):
        super(NetworkBlock, self).__init__()
        self.layer = self._make_layer(block, in_planes, out_planes, nb_layers, stride, dropRate)
    def _make_layer(self, block, in_planes, out_planes, nb_layers, stride, dropRate):
        layers = []
        for i in range(nb_layers):
            layers.append(block(i == 0 and in_planes or out_planes, out_planes, i == 0 and stride or 1, dropRate))
        return nn.Sequential(*layers)
    def forward(self, x):
        return self.layer(x)

class WideResNet(nn.Module, modelWrapper):
    def __init__(self, depth, num_classes, widen_factor=1, dropRate=0.0):
        super(WideResNet, self).__init__()
        nChannels = [16, 16*widen_factor, 32*widen_factor, 64*widen_factor]
        assert((depth - 4) % 6 == 0)
        n = int((depth - 4) / 6)
        block = BasicBlock
        # 1st conv before any network block
        self.conv1 = nn.Conv2d(1, nChannels[0], kernel_size=3, stride=1,
                               padding=1, bias=False)
        # 1st block
        self.block1 = NetworkBlock(n, nChannels[0], nChannels[1], block, 1, dropRate)
        # 2nd block
        self.block2 = NetworkBlock(n, nChannels[1], nChannels[2], block, 2, dropRate)
        # 3rd block
        self.block3 = NetworkBlock(n, nChannels[2], nChannels[3], block, 2, dropRate)
        # global average pooling and classifier
        self.bn1 = nn.BatchNorm2d(nChannels[3])
        self.relu = nn.ReLU(inplace=True)
        #self.fc = nn.Linear(nChannels[3], num_classes)
        #self.nChannels = nChannels[3]
        self.fc = nn.Linear(1152, num_classes)
        self.nChannels = 1152
        
        self.criterion = CrossEntropyLoss()
        self.optimizer = Adam(self.parameters(), lr=0.01)

#         for m in self.modules():
#             if isinstance(m, nn.Conv2d):
#                 n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
#                 m.weight.data.normal_(0, math.sqrt(2. / n))
#             elif isinstance(m, nn.BatchNorm2d):
#                 m.weight.data.fill_(1)
#                 m.bias.data.zero_()
#             elif isinstance(m, nn.Linear):
#                 m.bias.data.zero_()

    def forward(self, x):
        #print(x.size())
        out = self.conv1(x)
        #print(out.size())
        out = self.block1(out)
        #print(out.size())
        out = self.block2(out)
        #print(out.size())
        out = self.block3(out)
        #print(out.size())
        out = self.relu(self.bn1(out))
        #print(out.size())
        out = F.avg_pool2d(out, 2)
        #print(out.size())
        out = self.fc(out.view(-1, self.nChannels))
        #print(out.size())
        return out

In [None]:
model = WideResNet(depth=16, num_classes=2)

In [None]:
model.fit(X_tr, y_tr, X_test=X_te, y_test=y_te, epochs=5)

### 1D convolution +  dropout

In [None]:
X_tr, y_tr = Variable(train.X.clone()), Variable(train.y)
X_te, y_te = Variable(test.X.clone()), Variable(test.y)

X_tr.shape, y_tr.shape, X_te.shape, y_te.shape

In [None]:
# define the network with two convolutional layers
class CNN_1D_Dropout(Module, modelWrapper):
    def __init__(self, nb_hidden=50):
        self.nb_hidden = nb_hidden
        super(CNN_1D_Dropout, self).__init__()
        
        self.conv1 = Conv1d(28, 64, kernel_size=5, padding=2) 
        self.conv2 = Conv1d(64, 64, kernel_size=5, padding=2)
        self.conv3 = Conv1d(64, 32, kernel_size=5, padding=2)
        
        self.fc1 = Linear(448, nb_hidden)
        self.fc2 = Linear(nb_hidden, 2)
        
        self.dropout = nn.Dropout(p=0.1)
        
        self.criterion = CrossEntropyLoss()
        self.optimizer = Adam(self.parameters(), lr=0.001)

    def forward(self, x):
        # (28, 50)
        x = F.relu(self.conv1(x))
        x = self.dropout(x)
        # (64, 50)
        x = F.relu(F.max_pool1d(self.conv2(x), 2, padding=1))
        x = self.dropout(x)
        # (64, 26)
        x = F.relu(F.max_pool1d(self.conv3(x), 2, padding=1))
        x = self.dropout(x)
        # (32, 14)
        
        x = F.relu(self.fc1(x.view(-1, 448)))  
        # (1, nb_hidden)
        x = self.fc2(x)
        return x
    
    def reset(self):
        self.__init__(self.nb_hidden)

In [None]:
model = CNN_1D_Dropout()

In [None]:
model.fit(X_tr, y_tr, X_test=X_te, y_test=y_te, epochs=25)

### 1D convolution + Batch norm

In [None]:
X_tr, y_tr = Variable(train.X.clone()), Variable(train.y)
X_te, y_te = Variable(test.X.clone()), Variable(test.y)

X_tr.shape, y_tr.shape, X_te.shape, y_te.shape

In [None]:
class CNN_1D_BatchNorm(Module, modelWrapper):
    def __init__(self, nb_hidden=50, activation=F.relu):
        torch.manual_seed(0) 
        super(CNN_1D_BatchNorm, self).__init__()
        
        self.activation = activation
        self.nb_hidden = nb_hidden
        
        self.bn1 = nn.BatchNorm1d(28)
        self.conv1 = Conv1d(28, 64, kernel_size=5, padding=2) 
        
        self.bn2 = nn.BatchNorm1d(64)
        self.conv2 = Conv1d(64, 64, kernel_size=5, padding=2)
        
        self.bn3 = nn.BatchNorm1d(64)
        self.conv3 = Conv1d(64, 32, kernel_size=5, padding=2)
        
        self.bn4 = nn.BatchNorm1d(32)
        self.fc1 = Linear(448, nb_hidden)
        self.fc2 = Linear(nb_hidden, 2)
        
        self.criterion = CrossEntropyLoss()
        self.optimizer = Adam(self.parameters(), lr=0.001)

    def forward(self, x):
        # (28, 50)
        x = self.bn1(x)
        x = self.activation(self.conv1(x))

        # (64, 50)
        x = self.bn2(x)
        x = self.activation(F.max_pool1d(self.conv2(x), 2, padding=1))

        # (64, 26)
        x = self.bn3(x)
        x = self.activation(F.max_pool1d(self.conv3(x), 2, padding=1))

        # (32, 14)
#         x = self.bn4(x)
        x = x.view(-1, 448)
        x = self.activation(self.fc1(x))  
    
        # (1, nb_hidden)
        x = self.fc2(x)
        return x
    
    def feature_extraction(self):
        # (28, 50)
        x = self.bn1(x)
        x = self.activation(self.conv1(x))

        # (64, 50)
        x = self.bn2(x)
        x = self.activation(F.max_pool1d(self.conv2(x), 2, padding=1))

        # (64, 26)
        x = self.bn3(x)
        x = self.activation(F.max_pool1d(self.conv3(x), 2, padding=1))

        # (32, 14)
#         x = self.bn4(x)
        return x.view(-1, 448)
    
    def reset(self):
        self.__init__(self.nb_hidden, self.activation)

In [None]:
model = CNN_1D_BatchNorm(nb_hidden=50, activation=F.relu)

In [None]:
model.fit(X_tr, y_tr, X_test=X_te, y_test=y_te, epochs=25, batch_size=20)

### 1D convolution + Batch norm (bigger)

In [None]:
X_tr, y_tr = Variable(train.X.clone()), Variable(train.y)
X_te, y_te = Variable(test.X.clone()), Variable(test.y)

X_tr.shape, y_tr.shape, X_te.shape, y_te.shape

In [None]:
class CNN_1D_BatchNorm_Big(Module, modelWrapper):
    def __init__(self, nb_hidden=50):
        self.nb_hidden = nb_hidden
        super(CNN_1D_BatchNorm_Big, self).__init__()
        
        self.bn1 = nn.BatchNorm1d(28)
        self.conv1 = Conv1d(28, 64, kernel_size=5, padding=2) 
        
        self.bn2 = nn.BatchNorm1d(64)
        self.conv2 = Conv1d(64, 64, kernel_size=5, padding=2)
        
        self.bn3 = nn.BatchNorm1d(64)
        self.conv3 = Conv1d(64, 128, kernel_size=5, padding=2)
        
        self.bn4 = nn.BatchNorm1d(128)
        self.conv4 = Conv1d(128, 64, kernel_size=5, padding=2) 
        
        self.bn5 = nn.BatchNorm1d(64)
        self.conv5 = Conv1d(64, 32, kernel_size=5, padding=2)
        
        self.bn6 = nn.BatchNorm1d(32)
        self.conv6 = Conv1d(32, 16, kernel_size=5, padding=2)
        
#         self.bn1 = nn.BatchNorm1d(448)
        self.fc1 = Linear(224, nb_hidden)
        self.fc2 = Linear(nb_hidden, 2)
        
        self.criterion = CrossEntropyLoss()
        self.optimizer = Adam(self.parameters(), lr=0.001)

    def forward(self, x):
        # (28, 50)
        x = self.bn1(x)
        x = F.relu(self.conv1(x))

        # (64, 50)
        x = self.bn2(x)
        x = F.relu(self.conv2(x))
        
        # (64, 50)
        x = self.bn3(x)
        x = F.relu(F.max_pool1d(self.conv3(x), 2, padding=1))

        # (128, 26)
        x = self.bn4(x)
        x = F.relu(F.max_pool1d(self.conv4(x), 2, padding=1))
        
        # (64, 14)
        x = self.bn5(x)
        x = F.relu(self.conv5(x))
        
        # (16, 14)
        x = self.bn6(x)
        x = F.relu(self.conv6(x))
        
        x = F.relu(self.fc1(x.view(-1, 224)))  
        # (1, nb_hidden)
        x = self.fc2(x)
        return x
    
    def reset(self):
        self.__init__(self.nb_hidden)

In [None]:
model = CNN_1D_BatchNorm_Big()

In [None]:
model.fit(X_tr, y_tr, X_test=X_te, y_test=y_te, epochs=25)

### 1D convolution + Batch norm + Residual

In [None]:
X_tr, y_tr = Variable(train.X.clone()), Variable(train.y)
X_te, y_te = Variable(test.X.clone()), Variable(test.y)

X_tr.shape, y_tr.shape, X_te.shape, y_te.shape

In [None]:
class residual_block(Module):
    def __init__(self, activation=F.relu):
        super(residual_block, self).__init__()
        self.activation = activation
        
        self.bn1 = nn.BatchNorm1d(32)
        self.conv1 = Conv1d(32, 64, kernel_size=3, padding=1) 
        
        self.bn2 = nn.BatchNorm1d(64)
        self.conv2 = Conv1d(64, 64, kernel_size=3, padding=1)
        
        self.bn3 = nn.BatchNorm1d(64)
        self.conv3 = Conv1d(64, 32, kernel_size=3, padding=1)
    
    def forward(self, x):
        # (32, 50)
        out = self.bn1(x)
        out = self.activation(self.conv1(out))

        # (64, 50)
        out = self.bn2(out)
        out = self.activation(self.conv2(out))

        # (32, 50)
        out = self.bn3(out)
        out = self.activation(self.conv3(out))
        
        return out+x

class aggregated_residual_blocks(Module):
    def __init__(self, n_residual_blocks=2, activation=F.relu):
        super(aggregated_residual_blocks, self).__init__()
        self.activation = activation
        
        self.residual_blocks = nn.ModuleList()
        for i in range(n_residual_blocks):
            self.residual_blocks.append(residual_block())
    
    def forward(self, x):
        out = []
        
        for block in self.residual_blocks:
            out.append(block(x))
            
        return sum(out)+x
#         return self.residual_blocks[0](x) + self.residual_blocks[1](x) + self.residual_blocks[2](x) + x
    
class CNN_1D_BatchNorm_Residual(Module, modelWrapper):
    def __init__(self, nb_hidden=50, n_aggregated_residual_blocks=2, n_residual_blocks=2, activation=F.relu):
        # n_aggregated_residual_blocks: number of aggregated residual blocks (aggregated_residual_blocks)
        # n_residual_blocks: number of residual blocks per aggregated residual block
        
        super(CNN_1D_BatchNorm_Residual, self).__init__()
        self.nb_hidden = nb_hidden
        self.n_residual_blocks=n_residual_blocks
        self.activation = activation
        
        self.bn1 = nn.BatchNorm1d(28)
        self.conv1 = Conv1d(28, 32, kernel_size=5, padding=2) 
        
        self.agg_residual_blocks = nn.ModuleList()
        for i in range(n_aggregated_residual_blocks):
            self.agg_residual_blocks.append(aggregated_residual_blocks(n_residual_blocks))
        
        self.bn2 = nn.BatchNorm1d(32)
        self.conv2 = Conv1d(32, 32, kernel_size=3, padding=1)
        
        self.bn3 = nn.BatchNorm1d(32)
        self.conv3 = Conv1d(32, 16, kernel_size=3, padding=1)
        
        self.fc1 = Linear(208, nb_hidden)
        self.fc2 = Linear(nb_hidden, 2)
        
        self.criterion = CrossEntropyLoss()
        self.optimizer = Adam(self.parameters(), lr=0.001)

    def forward(self, x):
        # (28, 50) -> (32, 50)
        x = self.bn1(x)
        x = self.activation(self.conv1(x))

        # (32, 50) -> (32, 50)
        for block in self.agg_residual_blocks:
            x = block(x)

        # (32, 50) -> (32, 26)
        x = self.bn2(x)
        x = self.activation(F.max_pool1d(self.conv2(x), 2, padding=1))
        
        # (32, 26) -> (16, 13)
        x = self.bn3(x)
        x = self.activation(F.max_pool1d(self.conv3(x), 2))

        # (16, 13) -> (208)
        x = self.activation(self.fc1(x.view(-1, 208))) 
        
        # (1, nb_hidden)
        x = self.fc2(x)
        return x
    
    def reset(self):
        self.__init__(self.nb_hidden, self.n_residual_blocks, self.activation)

In [None]:
# model = CNN_1D_BatchNorm_Residual(n_aggregated_residual_blocks=2, n_residual_blocks=5, activation=F.relu)
model = CNN_1D_BatchNorm_Residual(n_aggregated_residual_blocks=2, n_residual_blocks=2, activation=F.relu)

In [None]:
model.fit(X_tr, y_tr, X_test=X_te, y_test=y_te, epochs=25)

### 1D conv horizontal and 1D conv vertical

In [None]:
X_tr, y_tr = Variable(train.X.clone()), Variable(train.y)
X_te, y_te = Variable(test.X.clone()), Variable(test.y)

X_tr.shape, y_tr.shape, X_te.shape, y_te.shape

In [None]:
class CNN_1D(Module):
    def __init__(self, in_channels, activation=F.relu):
        super(CNN_1D, self).__init__()
        self.activation = activation

        self.bn1 = nn.BatchNorm1d(in_channels)
        self.conv1 = Conv1d(in_channels, 64, kernel_size=7, padding=3) 
        
        self.bn2 = nn.BatchNorm1d(64)
        self.conv2 = Conv1d(64, 64, kernel_size=5, padding=2)
        
        self.bn3 = nn.BatchNorm1d(64)
        self.conv3 = Conv1d(64, in_channels, kernel_size=3, padding=1)
        
    def forward(self, x):
        # (in_channels, 50)
        x = self.bn1(x)
        x = self.activation(self.conv1(x))

        # (64, 50)
        x = self.bn2(x)
        x = self.activation(self.conv2(x))

        # (64, in_channels)
        x = self.bn3(x)
        x = self.activation(self.conv3(x))

        return x
    
class conv2D(Module):
    def __init__(self, activation=F.relu):
        super(conv2D, self).__init__()
        self.activation = activation
                
        self.bn1 = nn.BatchNorm1d(1)
        self.conv1 = Conv2d(1, 32, kernel_size=(3, 7), padding=(1, 3))
        
        self.bn2 = nn.BatchNorm1d(32)
        self.conv2 = Conv2d(32, 32, kernel_size=5, padding=2)
        
        self.bn3 = nn.BatchNorm1d(32)
        self.conv3 = Conv2d(32, 16, kernel_size=5, padding=2)
#         self.conv4 = Conv2d(32, 16, kernel_size=5, padding=2)

    def forward(self, x):
        # (1, 28, 50) -> (32, 14, 26)
        x = self.activation(F.max_pool2d(self.conv1(self.bn1(x)), 2, padding=(0, 1)))

        # (32, 14, 26) -> (32, 8, 14)
        x = self.activation(F.max_pool2d(self.conv2(self.bn2(x)), (2, 2), padding=(1, 1)))

        # (32, 8, 14) -> (16, 4, 7)
        x = self.activation(F.max_pool2d(self.conv3(self.bn3(x)), 2))

        return x
        
class horiz_vert_1D(Module, modelWrapper):
    def __init__(self, activation=F.relu, nb_hidden=50):
        super(horiz_vert_1D, self).__init__()
        self.activation = activation
        
        self.horiz = CNN_1D(28, activation=activation)
        self.vert = CNN_1D(50, activation=activation)
        
        self.conv2D = conv2D()
        self.conv2D_horiz = conv2D()
        self.conv2D_vert = conv2D()
        
        self.fc1 = Linear(448, nb_hidden)
        self.fc2 = Linear(nb_hidden, 2)
        
        self.criterion = CrossEntropyLoss()
        self.optimizer = Adam(self.parameters(), lr=0.001)
        
    def forward(self, x):
        out1 = self.conv2D_horiz(self.horiz(x).unsqueeze(1))
        out2 = self.conv2D_vert(self.vert(x.transpose(1, 2)).transpose(1, 2).unsqueeze(1))
        out3 = self.conv2D(x.unsqueeze(1))
        
        out = out1 + out2 + out3
        
        out = self.activation(self.fc1(out.view(-1, 448))) 
        out = self.fc2(out)
        
        return out
        
    def reset(self):
        self.__init__(self.nb_hidden, self.activation)

In [None]:
model = horiz_vert_1D(nb_hidden=50)

In [None]:
model.fit(X_tr, y_tr, X_test=X_te, y_test=y_te, epochs=25)