In [20]:
import itertools
import copy
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch.utils.data import Dataset, DataLoader, random_split
import torch.nn as nn
import pickle
import random
from sklearn.model_selection import train_test_split

import sys
import os
current = os.path.dirname(os.path.realpath('plotting.py'))
parent = os.path.dirname(current)
sys.path.append(parent)
import functions.plotting as NNplt
from functions.rnn_cryptic import generate_sequences, convert_seq2inputs, pad_seqs

In [7]:
import torch
import math
# this ensures that the current MacOS version is at least 12.3+
print(torch.backends.mps.is_available())
# this ensures that the current current PyTorch installation was built with MPS activated.
print(torch.backends.mps.is_built())


True
True


# RNN

In [22]:
class OneStepRNN(nn.Module):

    def __init__(self, input_size, output_size, hidden_size, num_layers):
        super(OneStepRNN, self).__init__()
        # Define parameters
        self.rnn = torch.nn.RNN(input_size=input_size,
                        hidden_size=hidden_size,
                        num_layers= num_layers,
                        batch_first=True)
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        # Define the layers
        self.input2hidden = nn.Linear(input_size + self.hidden_size, self.hidden_size)
        self.fc1tooutput = nn.Linear(self.hidden_size, output_size)

    def forward(self, x, hidden):
        combined = torch.cat((x, hidden), dim=0) ## dim = 1??
        self.hidden = nn.functional.relu(self.input2hidden(combined))
        self.output = self.fc1tooutput(self.hidden)
        #return self.output.view(-1,output_size), self.hidden
        return self.output, self.hidden

    def get_activations(self, x, hidden):
        self.forward(x, hidden)  # update the activations with the particular input
        return self.hidden, self.output #, self.fc1_activations

    def get_noise(self):
        return self.hidden_noise

    def initHidden(self):
        return torch.zeros(1, self.hidden_size)[0]


def train(sequence,label,model,optimizer,criterion):

    optimizer.zero_grad()
    #Read each cue in and keep hidden state for next cue
    hidden = model.initHidden()
    for i in range(len(sequence[0])):
        output, hidden = model.forward(sequence[0][i], hidden)
    #Compare final output to target
    loss = criterion(output,label)#.long())
    #Back-propagate
    loss.backward()
    optimizer.step()

    return output, loss.item()

def run(model, train_data, epochs):

    model.train()
    loss_history = []
    for epoch in range(epochs):
        lossTotal = 0
        for x,y in train_data:
            output, loss = train(x,y,model,optimizer,criterion)
            lossTotal += loss # add MSE -> sum of square errors 
        loss_history.append(lossTotal)

    print(f'loss: {round(lossTotal,1)} ')
    return loss_history

def run_acc(model, train_data, test_data, epochs, verbose = False):

    model.train()
    loss_history = []
    acc_history = []
    for epoch in range(epochs):
        lossTotal = 0
        for i, (x,y) in enumerate(train_data):
            output, loss = train(x,y,model,optimizer,criterion)
            lossTotal += loss # add MSE -> sum of square errors 
            if (epoch%100 == 2):
                if i%50 == 0:
                    if verbose:
                        print('\n################\nepoch: ',epoch, '\n################\n')
                        print('output =. ' , output.detach().numpy())
                        print('label =. ' , y.detach().numpy())
                        test_acc(model, test_data, hidden_size, verbose = True)
        loss_history.append(lossTotal)
        acc = test_acc(model, test_data, hidden_size)
        acc_history.append(acc)

    print(f'loss: {round(lossTotal,1)} ')
    print(f'accuracy: {round(acc,2)} ')
    return loss_history, acc_history

def test_acc(model, testdata, hidden_size, verbose = False):
    model.eval()
    correct = 0
    for x,y in testdata:
        hidden = torch.zeros(1, hidden_size)[0]
        for step in x[0]:
            hidden, y_hat = model.get_activations(step,hidden)
        correct += sum(torch.round(y) == torch.round(y_hat)).item()/len(y)
    acc = correct/len(testdata)
    
    if verbose:
        print('test accuracy: %f ' % (acc))

    return acc

def shuffle_weights(model):
    model2 = OneStepRNN(input_size, output_size, hidden_size, num_layers)
    mod_dict = model.state_dict()
    shuffled_dict = {layer: shuffle_tensor(val) for layer, val in mod_dict.items()}
    model2.load_state_dict(shuffled_dict)
    return model2

def shuffle_tensor(t):
    idx = torch.randperm(t.nelement())
    t = t.view(-1)[idx].view(t.size())
    return t
    

In [None]:
# Training data

In [24]:
# MC/DC
ops = ['+', '*', '-']
inputs = ['A', 'B', 'C', 'D']
nsteps = 1

# cue op pairs
inp_op_pairs = []
for op in ops:
    for inp in inputs:
        inp_op_pairs.append((op, inp))

# MC set
inits = inputs*3
random.shuffle(inits)
setMC = []
for i in range(len(inits)):
    trial = [inits[i]]
    trial.append(inp_op_pairs[i])
    setMC.append(trial)

# DC set
inits = inputs*6
random.shuffle(inits)
inp_op_pairs = inp_op_pairs*2
setDC = []
for i in range(len(inits)):
    trial = [inits[i]]
    trial.append(inp_op_pairs[i])
    setDC.append(trial)
    
setM = [['A', ('+', 'B')],\
        ['C', ('*', 'D')],\
        ['D', ('-', 'A')]]


### RNN inputs
num_classes = 13
batchsize = 1
scale = 1
len_seq = 1

padM = pad_seqs(setM)
train_inputs = convert_seq2inputs(padM, num_classes=num_classes, seq_len=8)
trainM = DataLoader(train_inputs, batch_size=batchsize, shuffle=True)
# seqsM = generate_sequences(operators, input_ids, len_seq, init_values = init_values, rand = False)
# seqs = generate_sequences(operators, input_ids, len_seq, init_values = init_values, rand = False)
# padseqs1 = pad_select(seqs, [0])
# train_inputs1 = convert_seq2inputs(padseqs1*scale, num_classes=13, seq_len=8)
# train_data1 = DataLoader(train_inputs1, batch_size=batchsize, shuffle=True)

# seqs = generate_sequences(operators, input_ids, len_seq, init_values = init_values, rand = False)
# padseqs2 = pad_select(seqs, [1])
# train_inputs2 = convert_seq2inputs(padseqs2*scale, num_classes=13, seq_len=8)
# train_data2 = DataLoader(train_inputs2, batch_size=batchsize, shuffle=True)

# seqs = generate_sequences(operators, input_ids, len_seq, init_values = init_values, rand = False)
# padseqs3 = pad_select(seqs, [2])
# train_inputs3 = convert_seq2inputs(padseqs3*scale, num_classes=13, seq_len=8)
# train_data3 = DataLoader(train_inputs3, batch_size=batchsize, shuffle=True)

# ## train on two positions
# train_inputs123 = convert_seq2inputs(padseqs1*4 + padseqs2*4 + padseqs3*4, num_classes=13, seq_len=8)
# train_data123 = DataLoader(train_inputs123, batch_size=batchsize, shuffle=True)

# ## train on two positions
# len_seq=2
# seqs = generate_sequences(operators, input_ids, len_seq, init_values = init_values, rand = False)
# padseqs_2step = pad_seqs(seqs)
# train_inputs_2step = convert_seq2inputs(padseqs_2step, num_classes=13, seq_len=8)
# train_2step = DataLoader(train_inputs_2step, batch_size=batchsize, shuffle=True)



# print('train 123: ', len(train_data123))
# print('train 2 step: ', len(train_2step))


RuntimeError: stack expects a non-empty TensorList

In [26]:
setM

[['A', ('+', 'B')], ['C', ('*', 'D')], ['D', ('-', 'A')]]

In [None]:
# train on 1 position
num_classes = 14
input_size = num_classes
output_size = batchsize
hidden_size = 20
num_layers = 1
learningRate = 0.0005
epochs = 400

num_sims = 10

losses=[]
accs = []

for j in range(num_sims):
    print('### rep', j, ' ###')
    model = OneStepRNN(input_size, output_size, hidden_size, num_layers)
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learningRate)
    loss1, acc1 = run_acc(model, train_data123, train_2step, epochs)
    loss2, acc2 = run_acc(model, train_2step, train_2step, epochs)
    losses.append(loss1+loss2)
    accs.append(acc1+acc2)
