In [2]:
import numpy as np

In [4]:
np.linalg.norm(np.ones((1,1)))

1.0

In [None]:
def generate_seq(batch_size, vec_len, R, T):
    batch_input = []
    batch_target = []
    
    for batch in range (batch_size):
        input = []
        target = []
        for i in range(R):
            input.append(torch.bernoulli(torch.empty(vec_len).uniform_(0, 1)))
            target.append(torch.zeros(1))
        
        for i in range(R, 2*R):
            if np.random.rand() > 0.5:
                input.append(input[i-R])
                target.append(torch.ones(1))
            else:
                input.append(torch.bernoulli(torch.empty(vec_len).uniform_(0, 1)))
                target.append(torch.zeros(1))

        for i in range(2*R,T):
            if np.random.rand() > 0.5 and (not torch.equal(input[i-R],input[i-2*R])):
                input.append(input[i-R])
                target.append(torch.ones(1))
            else:
                input.append(torch.bernoulli(torch.empty(vec_len).uniform_(0, 1)))
                target.append(torch.zeros(1))
        
                
        batch_input.append(torch.stack(input, dim=0))
        batch_target.append(torch.stack(target, dim=0))
    
    input_seq = torch.stack(batch_input, dim=0)
    target_seq = torch.stack(batch_target, dim=0)
    return(input_seq,target_seq)

In [None]:
def generate_mix_seq(batch_size, vec_len, R, T):
    mix_input_seq = []
    mix_target_seq = []
    for r in R:
        input_seq,target_seq = generate_seq(int(batch_size/len(R)), vec_len, r, T) 
        mix_input_seq.append(input_seq)
        mix_target_seq.append(target_seq)
    
    mix_input_seq = torch.cat(mix_input_seq, dim=0)
    mix_target_seq = torch.cat(mix_target_seq, dim=0)
    shuf = np.random.permutation(len(mix_input_seq))

    return mix_input_seq[shuf],mix_target_seq[shuf]

In [None]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_dim, output_size):
        super(RNN, self).__init__()

        self.hidden_dim = hidden_dim
        self.rnn = nn.RNN(input_size, hidden_dim, batch_first=True)   
        self.fc = nn.Linear(hidden_dim, output_size)
        self.activation = torch.nn.Sigmoid()

    
    def forward(self, x):
        
        batch_size = x.size(0)
        hidden = self.init_hidden(batch_size)
        out, hidden = self.rnn(x, hidden)
        out = out.contiguous().view(batch_size, -1, self.hidden_dim)
        out = self.fc(out)
        out = self.activation(out)
        
        return out, hidden
    
    def init_hidden(self, batch_size):
        hidden = torch.zeros(1, batch_size, self.hidden_dim)
        return hidden

## Training over the same input

In [None]:
training_Loss = []
valid_Loss = []
training_accuracy = []
valid_accuracy = []


vec_len = 100
R = 3
T = 500
cfd = RNN(input_size=vec_len, output_size=1, hidden_dim=100)
lr=0.0001
criterion = nn.BCELoss()
#criterion = nn.MSELoss()
optimizer = torch.optim.Adam(cfd.parameters(), lr=lr)
batch_size = 1


n_iterations = 1000
input_seq,target_seq = generate_seq(batch_size = batch_size, vec_len = vec_len, R = R, T = T)
for iterations in range(1, n_iterations + 1):
    # training
    optimizer.zero_grad() 

    output, hidden = cfd(input_seq)

    loss = criterion(output, target_seq.view(batch_size,-1,1)) 
    training_Loss.append(loss.detach().numpy())
    predicted = (output>0.5).float()
    accuracy = (predicted == target_seq).sum() / (batch_size * T)
    training_accuracy.append(accuracy.detach().numpy())

    
    loss.backward() 
    optimizer.step() 
    
    
    
    # validation
    validation_batch_size = 10
    validation_seq,validation_target_seq = generate_seq(batch_size = validation_batch_size, vec_len = vec_len, R = R, T=T)
    validation_out, hidden = cfd(validation_seq)
    validation_loss = criterion(validation_out, validation_target_seq.view(validation_batch_size,-1,1)) 
    validation_predicted = (validation_out>0.5).float()
    validation_accuracy = (validation_predicted == validation_target_seq).sum() / (validation_batch_size * T)
    valid_Loss.append(validation_loss.detach().numpy())
    valid_accuracy.append(validation_accuracy.detach().numpy())

    # print loss over epochs
    if iterations%100 == 0:
        print('Iterations: {}/{}.............'.format(iterations, n_iterations), end=' ')
        print("Loss: {:.4f}".format(validation_loss.item()), end=' | ')
        print("Accuracy: {:.4f}".format(validation_accuracy.item()))

In [None]:
plt.figure(figsize = (16, 4))
plt.subplot(1,2,1)
plt.plot(training_Loss, label = 'training loss', markersize = 3)
plt.plot(valid_Loss, label = 'validation loss', markersize = 3)
plt.title("loss")

plt.subplot(1,2,2)
plt.plot(training_accuracy, label = 'training accuracy', markersize = 3)
plt.plot(valid_accuracy, label = 'validation accuracy', markersize = 3)
plt.title("accuracy")
plt.legend()
plt.show()

## Training over infinite input R3

In [None]:
training_Loss = []
valid_Loss = []
training_accuracy = []
valid_accuracy = []


vec_len = 100
R = 3
T = 500
cfd_R3 = RNN(input_size=vec_len, output_size=1, hidden_dim=100)
lr=0.001
criterion = nn.BCELoss()
#criterion = nn.MSELoss()
optimizer = torch.optim.Adam(cfd_R3.parameters(), lr=lr)


n_iterations = 1000
for iterations in range(1, n_iterations + 1):
    # training
    batch_size = 100
    input_seq,target_seq = generate_seq(batch_size = batch_size, vec_len = vec_len, R = R, T = T)
    optimizer.zero_grad() 

    output, hidden = cfd_R3(input_seq)

    loss = criterion(output, target_seq.view(batch_size,-1,1)) 
    training_Loss.append(loss.detach().numpy())
    predicted = (output>0.5).float()
    accuracy = (predicted == target_seq).sum() / (batch_size * T)
    training_accuracy.append(accuracy.detach().numpy())

    loss.backward() 
    optimizer.step() 
    
    # validation
    validation_batch_size = 10
    validation_seq,validation_target_seq = generate_seq(batch_size = validation_batch_size, vec_len = vec_len, R = R, T=T)
    validation_out, hidden = cfd_R3(validation_seq)
    validation_loss = criterion(validation_out, validation_target_seq.view(validation_batch_size,-1,1)) 
    validation_predicted = (validation_out>0.5).float()
    validation_accuracy = (validation_predicted == validation_target_seq).sum() / (validation_batch_size * T)
    valid_Loss.append(validation_loss.detach().numpy())
    valid_accuracy.append(validation_accuracy.detach().numpy())

    # print loss over epochs
    if iterations%100 == 0:
        print('Iterations: {}/{}.............'.format(iterations, n_iterations), end=' ')
        print("Loss: {:.4f}".format(validation_loss.item()), end=' | ')
        print("Accuracy: {:.4f}".format(validation_accuracy.item()))

In [None]:
plt.figure(figsize = (16, 4))
plt.subplot(1,2,1)
plt.plot(training_Loss, label = 'training loss', markersize = 3)
plt.plot(valid_Loss, label = 'validation loss', markersize = 3)
plt.title("loss")

plt.subplot(1,2,2)
plt.plot(training_accuracy, label = 'training accuracy', markersize = 3)
plt.plot(valid_accuracy, label = 'validation accuracy', markersize = 3)
plt.title("accuracy")
plt.legend()
plt.show()

### Testing 

In [None]:
test_Loss = []
test_accuracy = []
for test_R in range (1,10):
    test_batch_size = 1
    test_seq,test_target_seq = generate_seq(batch_size = test_batch_size, vec_len = vec_len, R = test_R, T=T)
    test_out, hidden = cfd_R3(test_seq)
    test_loss = criterion(test_out, test_target_seq.view(test_batch_size,-1,1)) 
    test_predicted = (test_out>0.5).float()
    accuracy = (test_predicted == test_target_seq).sum() / (test_batch_size * T)
    test_Loss.append(test_loss.detach().numpy())
    test_accuracy.append(accuracy.detach().numpy())

    
    
plt.figure(figsize = (16, 4))
plt.subplot(1,2,1)
r = np.arange(1,10)
plt.plot(r,test_Loss, label = 'test loss', markersize = 3)
plt.title("loss")

plt.subplot(1,2,2)
r = np.arange(1,10)
plt.plot(r,test_accuracy, label = 'test accuracy', markersize = 3)
plt.title("accuracy")
plt.legend()
plt.show()

gradient clipping
reduce lr



## R = 7

In [None]:
training_Loss = []
valid_Loss = []
training_accuracy = []
valid_accuracy = []


vec_len = 100
R = 7
T = 500
cfd_R7 = RNN(input_size=vec_len, output_size=1, hidden_dim=100)
lr=0.0001
criterion = nn.BCELoss()
#criterion = nn.MSELoss()
optimizer = torch.optim.Adam(cfd_R7.parameters(), lr=lr)


n_iterations = 10000
for iterations in range(1, n_iterations + 1):
    # training
    batch_size = 100
    input_seq,target_seq = generate_seq(batch_size = batch_size, vec_len = vec_len, R = R, T = T)
    optimizer.zero_grad() 

    output, hidden = cfd_R7(input_seq)

    loss = criterion(output, target_seq.view(batch_size,-1,1)) 
    training_Loss.append(loss.detach().numpy())
    predicted = (output>0.5).float()
    accuracy = (predicted == target_seq).sum() / (batch_size * T)
    training_accuracy.append(accuracy.detach().numpy())

    loss.backward() 
    optimizer.step() 
    
    # validation
    validation_batch_size = 10
    validation_seq,validation_target_seq = generate_seq(batch_size = validation_batch_size, vec_len = vec_len, R = R, T=T)
    validation_out, hidden = cfd_R7(validation_seq)
    validation_loss = criterion(validation_out, validation_target_seq.view(validation_batch_size,-1,1)) 
    validation_predicted = (validation_out>0.5).float()
    validation_accuracy = (validation_predicted == validation_target_seq).sum() / (validation_batch_size * T)
    valid_Loss.append(validation_loss.detach().numpy())
    valid_accuracy.append(validation_accuracy.detach().numpy())

    # print loss over epochs
    if iterations%100 == 0:
        print('Iterations: {}/{}....'.format(iterations, n_iterations), end=' ')
        print("Valid Loss: {:.4f}".format(validation_loss.item()), end=' | ')
        print("Accuracy: {:.4f}".format(validation_accuracy.item()))

In [None]:
n_iterations = 15000
for iterations in range(10001, n_iterations + 1):
    # training
    batch_size = 100
    input_seq,target_seq = generate_seq(batch_size = batch_size, vec_len = vec_len, R = R, T = T)
    optimizer.zero_grad() 

    output, hidden = cfd_R7(input_seq)

    loss = criterion(output, target_seq.view(batch_size,-1,1)) 
    training_Loss.append(loss.detach().numpy())
    predicted = (output>0.5).float()
    accuracy = (predicted == target_seq).sum() / (batch_size * T)
    training_accuracy.append(accuracy.detach().numpy())

    loss.backward() 
    optimizer.step() 
    
    # validation
    validation_batch_size = 10
    validation_seq,validation_target_seq = generate_seq(batch_size = validation_batch_size, vec_len = vec_len, R = R, T=T)
    validation_out, hidden = cfd_R7(validation_seq)
    validation_loss = criterion(validation_out, validation_target_seq.view(validation_batch_size,-1,1)) 
    validation_predicted = (validation_out>0.5).float()
    validation_accuracy = (validation_predicted == validation_target_seq).sum() / (validation_batch_size * T)
    valid_Loss.append(validation_loss.detach().numpy())
    valid_accuracy.append(validation_accuracy.detach().numpy())

    # print loss over epochs
    if iterations%100 == 0:
        print('Iterations: {}/{}....'.format(iterations, n_iterations), end=' ')
        print("Valid Loss: {:.4f}".format(validation_loss.item()), end=' | ')
        print("Accuracy: {:.4f}".format(validation_accuracy.item()))

In [None]:
n_iterations = 20000
for iterations in range(15001, n_iterations + 1):
    # training
    batch_size = 100
    input_seq,target_seq = generate_seq(batch_size = batch_size, vec_len = vec_len, R = R, T = T)
    optimizer.zero_grad() 

    output, hidden = cfd_R7(input_seq)

    loss = criterion(output, target_seq.view(batch_size,-1,1)) 
    training_Loss.append(loss.detach().numpy())
    predicted = (output>0.5).float()
    accuracy = (predicted == target_seq).sum() / (batch_size * T)
    training_accuracy.append(accuracy.detach().numpy())

    loss.backward() 
    optimizer.step() 
    
    # validation
    validation_batch_size = 10
    validation_seq,validation_target_seq = generate_seq(batch_size = validation_batch_size, vec_len = vec_len, R = R, T=T)
    validation_out, hidden = cfd_R7(validation_seq)
    validation_loss = criterion(validation_out, validation_target_seq.view(validation_batch_size,-1,1)) 
    validation_predicted = (validation_out>0.5).float()
    validation_accuracy = (validation_predicted == validation_target_seq).sum() / (validation_batch_size * T)
    valid_Loss.append(validation_loss.detach().numpy())
    valid_accuracy.append(validation_accuracy.detach().numpy())

    # print loss over epochs
    if iterations%100 == 0:
        print('Iterations: {}/{}....'.format(iterations, n_iterations), end=' ')
        print("Valid Loss: {:.4f}".format(validation_loss.item()), end=' | ')
        print("Accuracy: {:.4f}".format(validation_accuracy.item()))

In [None]:
plt.figure(figsize = (16, 4))
plt.subplot(1,2,1)
plt.plot(training_Loss, label = 'training loss', markersize = 3)
plt.plot(valid_Loss, label = 'validation loss', markersize = 3)
plt.title("loss")

plt.subplot(1,2,2)
plt.plot(training_accuracy, label = 'training accuracy', markersize = 3)
plt.plot(valid_accuracy, label = 'validation accuracy', markersize = 3)
plt.title("accuracy")
plt.legend()
plt.show()

In [None]:
test_Loss = []
test_accuracy = []
for test_R in range (1,10):
    test_batch_size = 1
    test_seq,test_target_seq = generate_seq(batch_size = test_batch_size, vec_len = vec_len, R = test_R, T=T)
    test_out, hidden = cfd_R7(test_seq)
    test_loss = criterion(test_out, test_target_seq.view(test_batch_size,-1,1)) 
    test_predicted = (test_out>0.5).float()
    accuracy = (test_predicted == test_target_seq).sum() / (test_batch_size * T)
    test_Loss.append(test_loss.detach().numpy())
    test_accuracy.append(accuracy.detach().numpy())

    
    
plt.figure(figsize = (16, 4))
plt.subplot(1,2,1)
r = np.arange(1,10)
plt.plot(r,test_Loss, label = 'test loss', markersize = 3)
plt.title("loss")

plt.subplot(1,2,2)
r = np.arange(1,10)
plt.plot(r,test_accuracy, label = 'test accuracy', markersize = 3)
plt.title("accuracy")
plt.legend()
plt.show()

## Mix training with R = [3,7]

In [None]:
training_Loss = []
valid_Loss = []
training_accuracy = []
valid_accuracy = []


vec_len = 100
R = [3,7]
T = 500
cfd_R37 = RNN(input_size=vec_len, output_size=1, hidden_dim=100)
lr=0.0001
criterion = nn.BCELoss()
#criterion = nn.MSELoss()
optimizer = torch.optim.Adam(cfd_R37.parameters(), lr=lr)


n_iterations = 10000
for iterations in range(1, n_iterations + 1):
    # training
    batch_size = 100
    input_seq,target_seq = generate_mix_seq(batch_size = batch_size, vec_len = vec_len, R = R, T = T)
    optimizer.zero_grad() 

    output, hidden = cfd_R37(input_seq)

    loss = criterion(output, target_seq.view(batch_size,-1,1)) 
    training_Loss.append(loss.detach().numpy())
    predicted = (output>0.5).float()
    accuracy = (predicted == target_seq).sum() / (batch_size * T)
    training_accuracy.append(accuracy.detach().numpy())

    loss.backward() 
    optimizer.step() 
      
    # validation
    validation_batch_size = 10
    validation_seq,validation_target_seq = generate_mix_seq(batch_size = validation_batch_size, vec_len = vec_len, R = R, T=T)
    validation_out, hidden = cfd_R37(validation_seq)
    validation_loss = criterion(validation_out, validation_target_seq.view(validation_batch_size,-1,1)) 
    validation_predicted = (validation_out>0.5).float()
    validation_accuracy = (validation_predicted == validation_target_seq).sum() / (validation_batch_size * T)
    valid_Loss.append(validation_loss.detach().numpy())
    valid_accuracy.append(validation_accuracy.detach().numpy())

    # print loss over epochs
    if iterations%100 == 0:
        print('Iterations: {}/{}....'.format(iterations, n_iterations), end=' ')
        print("Valid Loss: {:.4f}".format(validation_loss.item()), end=' | ')
        print("Accuracy: {:.4f}".format(validation_accuracy.item()))

In [None]:
plt.figure(figsize = (16, 4))
plt.subplot(1,2,1)
plt.plot(training_Loss, label = 'training loss', markersize = 3)
plt.plot(valid_Loss, label = 'validation loss', markersize = 3)
plt.title("loss")

plt.subplot(1,2,2)
plt.plot(training_accuracy, label = 'training accuracy', markersize = 3)
plt.plot(valid_accuracy, label = 'validation accuracy', markersize = 3)
plt.title("accuracy")
plt.legend()
plt.show()

In [None]:
test_Loss = []
test_accuracy = []
for test_R in range (1,10):
    test_batch_size = 1
    test_seq,test_target_seq = generate_seq(batch_size = test_batch_size, vec_len = vec_len, R = test_R, T=T)
    test_out, hidden = cfd_R37(test_seq)
    test_loss = criterion(test_out, test_target_seq.view(test_batch_size,-1,1)) 
    test_predicted = (test_out>0.5).float()
    accuracy = (test_predicted == test_target_seq).sum() / (test_batch_size * T)
    test_Loss.append(test_loss.detach().numpy())
    test_accuracy.append(accuracy.detach().numpy())

    
    
plt.figure(figsize = (16, 4))
plt.subplot(1,2,1)
r = np.arange(1,10)
plt.plot(r,test_Loss, label = 'test loss', markersize = 3)
plt.title("loss")

plt.subplot(1,2,2)
r = np.arange(1,10)
plt.plot(r,test_accuracy, label = 'test accuracy', markersize = 3)
plt.title("accuracy")
plt.legend()
plt.show()

In [None]:
n_iterations = 20000
for iterations in range(10001, n_iterations + 1):
    # training
    batch_size = 100
    input_seq,target_seq = generate_mix_seq(batch_size = batch_size, vec_len = vec_len, R = R, T = T)
    optimizer.zero_grad() 

    output, hidden = cfd_R37(input_seq)

    loss = criterion(output, target_seq.view(batch_size,-1,1)) 
    training_Loss.append(loss.detach().numpy())
    predicted = (output>0.5).float()
    accuracy = (predicted == target_seq).sum() / (batch_size * T)
    training_accuracy.append(accuracy.detach().numpy())

    loss.backward() 
    optimizer.step() 
      
    # validation
    '''
    validation_batch_size = 10
    validation_seq,validation_target_seq = generate_mix_seq(batch_size = validation_batch_size, vec_len = vec_len, R = R, T=T)
    validation_out, hidden = cfd_R37(validation_seq)
    validation_loss = criterion(validation_out, validation_target_seq.view(validation_batch_size,-1,1)) 
    validation_predicted = (validation_out>0.5).float()
    validation_accuracy = (validation_predicted == validation_target_seq).sum() / (validation_batch_size * T)
    valid_Loss.append(validation_loss.detach().numpy())
    valid_accuracy.append(validation_accuracy.detach().numpy())
    '''
    
    
    # print loss over epochs
    if iterations%100 == 0:
        print('Iterations: {}/{}....'.format(iterations, n_iterations), end=' ')
        print("Valid Loss: {:.4f}".format(loss.item()), end=' | ')
        print("Accuracy: {:.4f}".format(accuracy.item()))

In [None]:
test_Loss = []
test_accuracy = []
for test_R in range (1,10):
    test_batch_size = 1
    test_seq,test_target_seq = generate_seq(batch_size = test_batch_size, vec_len = vec_len, R = test_R, T=T)
    test_out, hidden = cfd_R37(test_seq)
    test_loss = criterion(test_out, test_target_seq.view(test_batch_size,-1,1)) 
    test_predicted = (test_out>0.5).float()
    accuracy = (test_predicted == test_target_seq).sum() / (test_batch_size * T)
    test_Loss.append(test_loss.detach().numpy())
    test_accuracy.append(accuracy.detach().numpy())

    
    
plt.figure(figsize = (16, 4))
plt.subplot(1,2,1)
r = np.arange(1,10)
plt.plot(r,test_Loss, label = 'test loss', markersize = 3)
plt.title("loss")

plt.subplot(1,2,2)
r = np.arange(1,10)
plt.plot(r,test_accuracy, label = 'test accuracy', markersize = 3)
plt.title("accuracy")
plt.legend()
plt.show()