In [1]:
#SCRN: self control reservoir network
#pre-alpha version, may contain bugs and errors
#author:Bocchese Giacomo, giacomobocchese.business@gmail.com
#date of ultimation: 29/01/2023
#read the license before use: private agreement needed

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import math
import random
import time

In [2]:
class SCRN:
    def __init__(self,inp_dim,res_dim,out_dim,control_dim=2):
        self.N_input=inp_dim #input dimension
        self.N_res=res_dim #reservoir dimension
        self.N_output=out_dim #output dimension 
        self.N_control=control_dim #control dimension
        if(self.N_res<self.N_control):
            raise ValueError('reservoir dimension must be larger than control dimension')
        if(self.N_control!=2):
            raise ValueError('control dimension must be 2, if you want to use more than 2 control signals, please modify the code')
        
        #nonlinear activation function
        self.f=lambda x: np.tanh(x)

        #looping iteration limit
        self.iteration_limit=10

        #indexes: modify the code if you want to use more or less than 2 control signals
        self.vanilla_res_idx=np.arange(self.N_res-self.N_control) #vanilla reservoir index
        self.looping_control_idx=self.N_res-2 #looping control index
        self.input_control_idx=self.N_res-1 #input control index
    
        #initialize states
        self.x=np.zeros((self.N_res,1)) #reservoir state
        self.looping_control=self.x[self.looping_control_idx]<0.5 #looping control
        self.input_control=self.x[self.input_control_idx]<0.5 #input control

        #initialize parameters
        self.L=np.random.normal(0,1,(self.N_res,self.N_res)) #reservoir linear weights
        self.b=np.random.normal(0,1,(self.N_res,1)) #reservoir linear bias
        self.bn=0*np.random.normal(0,1,(self.N_res,1)) #reservoir nonlinear bias
        self.w_in=np.random.normal(0,1,(self.N_input,self.N_res)) #input weights
        self.w_out=np.random.normal(0,1,(self.N_res,self.N_output)) #output weights
        self.b_in=0*np.random.normal(0,1,(self.N_res,1)) #input bias
        self.b_out=0*np.random.normal(0,1,(self.N_res,1)) #output bias
        self.W=0*np.random.normal(0,1,(self.N_res,self.N_res)) #reservoir nonlinear weights
        
        #total number of parameters
        self.N_params=self.N_res**2+self.N_res*self.N_input+self.N_output*self.N_res+self.N_res+self.N_res+self.N_res+self.N_output+self.N_res+self.N_output

    def summary(self):
        print('network structure:\n')
        print('input dimension:',self.N_input)
        print('output dimension:',self.N_output)
        print()
        print('reservoir dimension:',self.N_res)
        print('control dimension:',self.N_control)
        print()
        print('total number of parameters:',self.N_params)

    def reset(self):
        #modify the code if you want to use more or less than 2 control signals
        self.x=np.zeros((self.N_res,1)) #reservoir state
        self.looping_control=self.x[self.looping_control_idx]<0.5 #looping control
        self.input_control=self.x[self.input_control_idx]<0.5 #input control
    
    def reset_control(self):
        #modify the code if you want to use more or less than 2 control signals
        self.looping_control=self.x[self.looping_control_idx]<0.5 #looping control
        self.input_control=self.x[self.input_control_idx]<0.5 #input control

    def update_state(self,x,weighted_input,mode):
        #mode: 'inference' or 'training'
        if mode=='inference':
            noise=0
        elif mode=='training':
            noise=1e-4
        r_noise=np.random.normal(0,noise,(self.N_res,1))
        x_new=self.f(self.W.T@x+self.bn)+self.L.T@x+self.b+weighted_input+r_noise
        looping_control=x_new[self.looping_control_idx]<0.5
        input_control=x_new[self.input_control_idx]<0.5
        return x_new,looping_control,input_control

    def update(self, inp, mode='inference'):
        self.reset_control()
        #throw exception if the state contains nan
        if np.isnan(self.x).any():
            raise ValueError('nan state')
        n_iter=0
        while self.looping_control and n_iter<self.iteration_limit:
            weighted_input=self.w_in.T@inp+self.b_in if self.input_control else np.zeros((self.N_res,1))                          
            self.x,self.looping_control,self.input_control=self.update_state(self.x,weighted_input,mode=mode)
            #throw exception if the state contains nan
            if np.isnan(self.x).any():
                raise ValueError('nan state')
            n_iter+=1
        
        #output calculation
        out=self.w_out.T@(self.x+self.b_out)
        if(np.isnan(out).any()):
            raise ValueError('nan output')
        return out



In [12]:
net=SCRN(2,4,2)
net.summary()

network structure:

input dimension: 2
output dimension: 2

reservoir dimension: 4
control dimension: 2

total number of parameters: 52


In [13]:
print('INITIAL STATE:\n')
net.reset()
print(net.x)

print('\n\nINPUT:\n')
inp=np.array([[1],[2]])
print(inp)
print('\n\nUPDATED STATE:\n')
out=net.update(inp)
print(net.x)

print('\n\nOUTPUT:\n')
print(out)

INITIAL STATE:

[[0.]
 [0.]
 [0.]
 [0.]]


INPUT:

[[1]
 [2]]


UPDATED STATE:

[[ 9.57963799]
 [12.27293351]
 [ 6.65988686]
 [20.31557593]]


OUTPUT:

[[27.42160658]
 [-8.35521866]]
