# Import block

In [None]:
import numpy as np
import sklearn.model_selection as sk
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow.keras.backend as kb
import matplotlib as mpl
import matplotlib.pyplot as plt
import scipy.io as sio
%matplotlib widget
# Check if GPU is available
print(tf.config.list_physical_devices('GPU'))
print(tf.test.gpu_device_name())

# Generate input and output signals
Generates the dataset used for training the network
The last bit will split it in training and test blocks

In [None]:
# Generate very simple dataset
curIt = 21;
# Trial temporal profile
frameRate = 10
prestimulusTime = 1
stimulusDuration = 1.5

Ntrials = 64*500 # Per difficulty

# Psychometric parameters
lapseRate = 0.1
alpha = (3/90, 10/90) # For the (low, high) attention states

# Difficulties
angleList = np.linspace(-90, 90, 13)
angleValue = np.linspace(-1, 1, len(angleList)) # 1-1 mapping with angleList
angleDifficulty = np.unique(abs(angleList)) # 0 to 90

totalFrames = int((prestimulusTime+stimulusDuration)*frameRate)
timeVec = np.arange(totalFrames)/frameRate
stimulusFrame = np.argmin(abs(timeVec-prestimulusTime))

# Generate the psychometric curves (prob choosing left) angle variable is |thetaR|-|thetaL|
ScurveLeftLow = 1/(1+np.exp(-alpha[0]*angleList))*(1-lapseRate)+lapseRate/2
ScurveLeftHigh = 1/(1+np.exp(-alpha[1]*angleList))*(1-lapseRate)+lapseRate/2
def Scurve(angleDiff, alpha, lapse): 
    p = 1/(1+np.exp(-alpha*angleDiff))*(1-lapse)+lapse/2
    return p

# Balanced dataset for each difficulty and attention level

# Let's generate the angle list for the dataset
NtrialsFull = Ntrials*len(angleDifficulty) # Ntrials per difficulty

angleData = np.zeros((NtrialsFull, 2)) # Stores the 2 stimuli angles of each trial
angleDataIdx = np.zeros((NtrialsFull, 2)) # The idx in the angle vector of the 2 angles
angleDataDifficulty = np.zeros(NtrialsFull) # The trial difficulty (angle difference) ||theta_R|-|theta_L||
idealChoice = np.zeros(NtrialsFull) # The correct choice for that trial |theta_R|>|theta_L| ?
trainedChoice = np.zeros(NtrialsFull) # The choice used to train (from the psych curve)
attentionLevel = np.zeros(NtrialsFull, dtype=int) # Attention level of that trial

for it in range(angleData.shape[0]):
    curDifficulty = angleDifficulty[it // Ntrials]
    # Allowed angles for each difficulty - we start with positive angles for simplicity
    validAngles = angleDifficulty[np.argwhere(angleDifficulty >= curDifficulty)]
    firstAngle = validAngles[np.random.choice(len(validAngles))]
    secondAngle = firstAngle - curDifficulty
    # Now randomize sign (180 deg rotation)
    anglePair = (int(np.random.choice((-1,1))*firstAngle), int(np.random.choice((-1,1))*secondAngle))
    # Now randomize order (left vs right)
    anglePair = np.random.permutation(anglePair)
    angleData[it, :] = anglePair
    angleDataIdx[it, :] = [np.argwhere(anglePair[0] == angleList), np.argwhere(anglePair[1] == angleList)]
    angleDataDifficulty[it] = (np.diff(abs(anglePair)))
    # Choose attention level (at random)
    attentionLevel[it] = np.random.choice((0, 1))
    # Find the correct choice |theta_R|>|theta_L| ? - if equal, choose at random
    if(np.diff(abs(anglePair)) > 0):
        idealChoice[it] = 0
    elif(np.diff(abs(anglePair)) < 0):
        idealChoice[it] = 1
    else:
        idealChoice[it] = np.random.choice((0, 1))
    # Compute the psych probability
    probLeft = Scurve(angleDataDifficulty[it], alpha[attentionLevel[it]], lapseRate)
    
    # Choose the outcome of the trial
    if(np.random.random(1) < probLeft):
        trainedChoice[it] = 0
    else:
        trainedChoice[it] = 1    

# Now that we have defined each trial create the time series
trialsData = np.zeros((NtrialsFull, totalFrames, 3)).astype(np.float32)
responseData  = np.full((NtrialsFull, totalFrames), np.nan)


for it in range(NtrialsFull):
    #if trainedChoice[it] == 0:
    trialsData[it, stimulusFrame:, 0] = angleValue[int(angleDataIdx[it, 0])]
    #else:
    trialsData[it, stimulusFrame:, 1] = angleValue[int(angleDataIdx[it, 1])]
    trialsData[it, 1:, 2] = attentionLevel[it]
    responseData[it, stimulusFrame-1] = 0 # Frame before the stimulus - stay QUIET!
    responseData[it, -1] = trainedChoice[it]+1 # 1/2 left right

trialsData += np.random.randn(*trialsData.shape)/10

xTrain, xTest, yTrain, yTest, angleDataDifficultyTrain , angleDataDifficultyTest, trainedChoiceTrain, trainedChoiceTest, angleDataTrain, angleDataTest, idealChoiceTrain, idealChoiceTest, attentionLevelTrain, attentionLevelTest, = sk.train_test_split(trialsData,
                          responseData,
                          angleDataDifficulty,
                          trainedChoice,
                          angleData,
                          idealChoice,
                          attentionLevel,
                          test_size=0.25,
                          random_state=12)


units = 256 # Number of neurons in the dense layer
output_size = 3 # Number of output states

# Only use 2 frames: 1. Before the stimulus (so both left and right prob are 0). 2. The last frame

def custom_loss(y_actual,y_pred): 
    l1 = kb.sparse_categorical_crossentropy(y_actual[:, stimulusFrame-1], y_pred[:, stimulusFrame-1])
    l2 = kb.sparse_categorical_crossentropy(y_actual[:, -1], y_pred[:, -1])
    custom_loss = l1 + l2
    return custom_loss

# Only use the last frame
def custom_cat(y_actual,y_pred): 
    l2 = tf.keras.metrics.sparse_categorical_accuracy(y_actual[:, -1], y_pred[:, -1])
    custom_cat = l2

    return custom_cat

model = keras.models.Sequential()
model.add(layers.SimpleRNN(units, activation = 'relu', input_shape = (xTrain.shape[1], xTrain.shape[2]),
    return_sequences = True, name = 'rnn'))
model.add(layers.BatchNormalization(name = 'batch_norm'))
model.add(layers.Dense(output_size, activation = 'softmax', name = 'outputprobs'))

model.summary()

batch_size = 640 # Trials per batch
MAX_EPOCHS = 250 # Total epochs

model.compile(
    loss=custom_loss,
    optimizer="adam",
    metrics=custom_cat,
)
history = model.fit(xTrain, yTrain, epochs = MAX_EPOCHS, batch_size = batch_size, verbose = True,
   validation_data = (xTest, yTest))
yTestPredict = model.predict(xTest)

# Save everything to MATLAB
feature_extractor = keras.Model(
    inputs=model.inputs,
    outputs=(model.get_layer(name="rnn").output, model.get_layer(name="outputprobs").output, model.outputs),
)
yTestPredict = model.predict(xTest)
# Call feature extractor on test input.
y = tf.convert_to_tensor(xTest,  dtype='float32')
fet = feature_extractor(y)
tracesN = fet[0].numpy()
tracesP = fet[1].numpy()
sio.savemat('trainedModel' + str(curIt) + '.mat', {'trialsData':xTest, 'responseData': yTest, 
                                 'responseDataPrediction': yTestPredict, 'angleDataDifficulty': angleDataDifficultyTest, 
                                  'trainedChoice': trainedChoiceTest, 'idealChoice': idealChoiceTest, 'attentionLevel': attentionLevelTest, 
                                  'alpha': alpha, 'lapseRate': lapseRate, 'Ntrials': Ntrials, 'frameRate': frameRate, 
                                  'prestimulusTime': prestimulusTime, 'stimulusDuration': stimulusDuration, 'angleList': angleList,
                                  'angleDifficulty': angleDifficulty, 'angleValue': angleValue, 'tracesN': tracesN, 'tracesP': tracesP})
