**experimental implementation of neural compressed search inspired learning algorithm (Schmidhuber)**

In [1]:
import torch

In [259]:
def transpose2(x):
   return torch.transpose(x, 0, 1)


# "Hopfield"
class Hopfield(torch.nn.Module):
    def __init__(self, n, width, beta=0.1):
        super(Hopfield, self).__init__()
        self.beta = beta

        self.n = n # number of items
        self.width = width # width of a memory item


        self.Wk = torch.nn.Parameter(torch.rand(self.width, self.n)*(1.0 / (self.width*self.n))*1.0) # key
        self.Wv = torch.nn.Parameter(torch.rand(self.width, self.n)*(1.0 / (self.width*self.n))*0.1) # value
        self.Wq = torch.nn.Parameter(torch.rand(self.n, self.n)*(1.0 / (self.n*self.n))*0.1) # query (is always a square matrix)

    # /param R retrieval
    def forward(self, R, Y):
        z0 = self.beta * R
        z0 = z0 @ self.Wq
        z0 = z0 @ transpose2(self.Wk)
        z0 = z0 @ transpose2(Y)
        z0 = torch.nn.functional.softmax(z0, dim=-1)  # Apply softmax

        z0 = z0 @ Y
        z = z0 @ self.Wv
        return z




class LeakyReLUMlp(torch.nn.Module):
    def __init__(self, input_size, hidden_size, output_size, negative_slope=0.05):
        super(LeakyReLUMlp, self).__init__()
        self.fc1 = torch.nn.Linear(input_size, hidden_size)
        self.leaky_relu = torch.nn.LeakyReLU(negative_slope=negative_slope)
        self.fc2 = torch.nn.Linear(hidden_size, output_size)

        torch.nn.init.xavier_uniform_(self.fc1.weight)
        torch.nn.init.xavier_uniform_(self.fc2.weight)

    def forward(self, x):
        x = self.fc1(x)
        x = self.leaky_relu(x)
        x = self.fc2(x)
        return x

In [224]:
hopfield = Hopfield(3, 9)

R = transpose2( torch.tensor([[0.99, 0.01, 0.998,  0.11, 0.01, 0.00282,  0.00827, 0.0082629, 0.000986], [0.00882882, 0.992, 0.998,  0.11, 0.01, 0.00282,  0.00827, 0.0082629, 0.000986], [0.99, 0.01, 0.998,  0.999, 0.999, 0.00282,  0.00827, 0.0082629, 0.000986]]) )
Y = torch.rand(hopfield.n, hopfield.width)
hopfieldOutA = hopfield.forward(R, Y)


torch.set_printoptions(precision=10)
print(hopfieldOutA)
torch.set_printoptions(profile='default')

m0 = torch.rand(hopfield.n*hopfield.width, 4)

m1 = torch.flatten(hopfieldOutA)

print(m1.size())
print(m0.size())

z0 = m1 @ m0

modelOut = z0

print(z0)



tensor([[0.0081950873, 0.0066402457, 0.0053359563],
        [0.0081950650, 0.0066402261, 0.0053359480],
        [0.0081951059, 0.0066402676, 0.0053359703],
        [0.0081950705, 0.0066402294, 0.0053359489],
        [0.0081950678, 0.0066402252, 0.0053359456],
        [0.0081950482, 0.0066402042, 0.0053359340],
        [0.0081950482, 0.0066402047, 0.0053359340],
        [0.0081950482, 0.0066402047, 0.0053359340],
        [0.0081950482, 0.0066402042, 0.0053359340]], grad_fn=<MmBackward0>)
torch.Size([27])
torch.Size([27, 4])
tensor([0.1008, 0.0937, 0.0817, 0.0995], grad_fn=<SqueezeBackward4>)


In [225]:




print('\n'*3)

for iParamName, iParam in hopfield.named_parameters():
    paramTensor = iParam.data # access actual tensor of parameter
    print(paramTensor.size()) # access actual tensor
    
    print(iParamName + '   ' + str(iParam))

    
    

    if iParamName == 'Wq':
        # we treat Wq in a special way
        
        pass
    else:
        pass







torch.Size([9, 3])
Wk   Parameter containing:
tensor([[0.0204, 0.0349, 0.0202],
        [0.0302, 0.0190, 0.0328],
        [0.0261, 0.0050, 0.0228],
        [0.0041, 0.0306, 0.0221],
        [0.0282, 0.0172, 0.0218],
        [0.0296, 0.0333, 0.0114],
        [0.0259, 0.0132, 0.0360],
        [0.0300, 0.0281, 0.0205],
        [0.0247, 0.0164, 0.0328]], requires_grad=True)
torch.Size([9, 3])
Wv   Parameter containing:
tensor([[0.0020, 0.0001, 0.0005],
        [0.0027, 0.0011, 0.0003],
        [0.0036, 0.0021, 0.0027],
        [0.0033, 0.0019, 0.0028],
        [0.0026, 0.0007, 0.0008],
        [0.0015, 0.0022, 0.0036],
        [0.0028, 0.0008, 0.0003],
        [0.0008, 0.0037, 0.0015],
        [0.0008, 0.0037, 0.0011]], requires_grad=True)
torch.Size([3, 3])
Wq   Parameter containing:
tensor([[0.0087, 0.0036, 0.0054],
        [0.0011, 0.0110, 0.0031],
        [0.0055, 0.0048, 0.0064]], requires_grad=True)


In [299]:
# context to keep parameters etc of "neural compressed search" learning algorithm from Schmidhuber
class NeuralCompressedSearchCtx(object):
    def __init__(self):

        #self.n = 10 # size of parameter vector to generate

        #self.hypersIdx = 0 # current index into hyperparameters for read out to actual parameters

        self.nWaves = 2

        pass

    # compute the superposition of weights for the parameterization of the NN by a given hyperparameter vector
    def calcSuperpositionByHyperparameters(self, n, hyperParameters):
        if len(hyperParameters) % 3 != 0:
            raise Exception('invalid length of hyperparameters!')
        
        #print(hyperParameters)
        #print(n)
        
        
        params = torch.zeros(n)
        
        curIdx=0
        while curIdx < len(hyperParameters):
            freq = hyperParameters[curIdx+0]        
            phase = hyperParameters[curIdx+1]
            amplitude = hyperParameters[curIdx+2]
            curIdx+=3
            
            v0 = torch.arange(0,n, dtype=torch.float32) # generate increasing tensor
            
            v1 = v0*freq + torch.ones(n)*phase
            
            # now we compute the parameters
            params0 = torch.cos(v1)*amplitude

            params += params0 # add wave to get a superposition of the waves

        return params





# context for readout of parameters from hyperparameters
class ReadoutCtx(object):
    def __init__(self):
        self.hypersIdx = 0 # current index into hyperparameters for read out to actual parameters


        # defines if a output is returned (this is useful to compute number of hyperparameters based on the actual indices)
        self.noOutMode = False
    
    # read-out
    def readoutMatrixFromCompressed(self, hypers, nUnits, matrixWidth, ncsCtx):
        #print(f'DBG hypersIdx={self.hypersIdx}')
        #print(f'hypers={hypers}')
        
        paramStack = []

        
        
        # translate hyper-parameters to actual parameters as a matrix        
        for z in range(nUnits):
            if not self.noOutMode:
                hypersSlice0 = hypers[self.hypersIdx:self.hypersIdx +nUnits*ncsCtx.nWaves*3]
                #print('hyperSlice0:') # DBG
                #print(hypersSlice0) # DBG
                #print(len(hypersSlice0)) # DBG
            self.hypersIdx += (nUnits*ncsCtx.nWaves*3)

            if not self.noOutMode:
                params0 = ncsCtx.calcSuperpositionByHyperparameters(matrixWidth, hypersSlice0)
                paramStack.append(params0)

        if self.noOutMode:
            return None
        
        params1 = torch.stack(paramStack)
        return params1

    def readoutVectorFromHyperparameters(self, hypers, size, ncsCtx):
        #print(f'DBG hypersIdx={self.hypersIdx}')
        #print(f'hypers={hypers}')
        
        nParamsToExtract = size
        if self.noOutMode:
            params2 = None
        else:
            params2 = torch.tensor(hypers[self.hypersIdx:self.hypersIdx+nParamsToExtract])
        self.hypersIdx += nParamsToExtract
        return params2

    # return hyperparameters for misc
    def retNumberOfHyperparamsMisc(self):
        return self.hypersIdx





ncsCtx = NeuralCompressedSearchCtx()





nWaves = 2
#nUnits = 3 # number of neurons

#matrixWidth = 10


# how many attempts per training step?
nAttemptsPerTrainingStep = 2










class ModelA(torch.nn.Module):
    def __init__(self):
        super(ModelA, self).__init__()

        xWidth = 4

        
        self.nUnitsHopfield = 3
        self.widthHopfield = 9
        self.hopfieldA = Hopfield(self.nUnitsHopfield, self.widthHopfield)


        self.encoderForHopfieldAForR = LeakyReLUMlp(xWidth, 30, self.nUnitsHopfield*self.widthHopfield)
        self.encoderForHopfieldAForY = LeakyReLUMlp(xWidth, 30, self.nUnitsHopfield*self.widthHopfield)


        matrixWidth5 = xWidth + hopfield.n*hopfield.width
        self.outLinearTransformWeights = torch.nn.Parameter(torch.rand(matrixWidth5, xWidth)*(1.0 / (matrixWidth5*xWidth))*0.1)

        

        #matrixWidth5 = hopfield.n*hopfield.width
        matrixWidth5 = xWidth
        nUnits5 = 3 # is the size of the output vector
        self.logitHeadWeights = torch.nn.Parameter(torch.rand(matrixWidth5, nUnits5)*(1.0 / (matrixWidth5*nUnits5))*0.1)

    def forward(self, x):#, R, Y):

        #print(x) # DBG

        R = self.encoderForHopfieldAForR.forward(x)
        #print(R)
        R = R.reshape(self.widthHopfield, self.nUnitsHopfield)

        Y = self.encoderForHopfieldAForR.forward(x)
        #print(R)
        Y = Y.reshape(self.hopfieldA.n, self.hopfieldA.width)

        hopfieldResA = self.hopfieldA.forward(R, Y)

        hopfieldResAFlattened = torch.flatten(hopfieldResA)

        # merge crossbar with output from NN of this layer
        z0 = torch.cat([x, hopfieldResAFlattened])

        # transform so that output of this module has dimensions like "x"
        z0 = z0 @ self.outLinearTransformWeights




        

        logithead2Matrix = self.logitHeadWeights # transpose2(self.logitHeadWeights)

        #print(hopfieldResAFlattened.size())
        #print(logithead2Matrix.size())
        
        #z0 = hopfieldResAFlattened @ logithead2Matrix
        z0 = z0 @ logithead2Matrix

        return z0





modelA = ModelA()






readoutCtx = ReadoutCtx()
readoutCtx.noOutMode = True # we don't want to have the actual parameters, we only want to compute the exact number of hyperparameters we need




# readout parameters from hyperparameters for hopfield NN module
# /param isWrite write from parameters to actual model?
def readoutParamsOfModel(hypers, isWrite, model, ncsCtx):
    for iParamName, iParam in model.named_parameters():
        paramTensor = iParam.data # access actual tensor of parameter
        #print(paramTensor.size()) # access actual tensor
        
        #print(iParamName + '   ' + str(iParam))
        
        
        
    
        if iParamName == 'Wq' or iParamName.endswith('.Wq'):
            # we treat Wq in a special way as a uncompressed matrix
    
            # TODO< do treat it in a actually special way! >

            if len(paramTensor.size()) >= 2:
                matrixWidth5 = paramTensor.size()[0]
                nUnits5 = paramTensor.size()[1]
            else:
                matrixWidth5 = paramTensor.size()[0]
                nUnits5 = 1
                
            z0 = readoutCtx.readoutMatrixFromCompressed(hypers, nUnits5, matrixWidth5, ncsCtx)


            if isWrite:
                #print('---')
                #print(iParam.data.size())
                z0 = torch.transpose(z0, 0, 1)
                z0 = z0.reshape(iParam.data.size()) # make sure it has the right shape!
                iParam.data = z0 #torch.transpose(z0, 0, 1)
                #print(iParam.data.size())
                pass
            
            pass
        else:
            # we treat this matrix as a compressed matrix
    
            matrixWidth5 = paramTensor.size()[0]
            nUnits5 = 1
            
            if len(paramTensor.size()) >= 2:
                matrixWidth5 = paramTensor.size()[0]
                nUnits5 = paramTensor.size()[1]
            else:
                matrixWidth5 = paramTensor.size()[0]
                nUnits5 = 1
            
            z0 = readoutCtx.readoutMatrixFromCompressed(hypers, nUnits5, matrixWidth5, ncsCtx)
            

            if isWrite:
                #print('---')
                #print(iParam.data.size())
                z0 = torch.transpose(z0, 0, 1)
                z0 = z0.reshape(iParam.data.size()) # make sure it has the right shape!
                iParam.data = z0#torch.transpose(z0, 0, 1)
                #print(iParam.data.size())
                pass

readoutParamsOfModel(None, False, modelA, ncsCtx)





nHyperParams = readoutCtx.retNumberOfHyperparamsMisc()
print(f'nHyperParams={nHyperParams}')




# HACKY<   Y should be compute by MLP from input "x"   >
Y = torch.rand(hopfield.n, hopfield.width)






# all hyperparameters of the best candidate
#hypersBest = torch.tensor([0.1, 0.5, 0.52,     0.3, 0.05, 0.3,            0.888, 0.006, 0.52,     0.3, 0.05, 0.3,                  0.888, 0.006, 0.52,     0.3, 0.05, 0.3,              0.0001, 0.7, 0.333])

hypersBest = torch.rand(nHyperParams)*0.01
hypersBestLoss = 1.0e20

for itOuter in range(1000):


    hypersWithBestLossThisStep = hypersBest.clone()
    bestLossThisStep = 1.0e20
    
    for itInner in range(nAttemptsPerTrainingStep):
        hypers = hypersBest.clone()
        
        
        #hyperparamVectorSize = (nWaves*3+1) * nUnits # +1  because for the hyperparameter for the bias
        hypers = hypers + torch.normal(0.0, 0.01, size=hypers.size()) # normal distribution
        
        
        #####################
        # readout of parameters for NN from hyperparameters
        
        readoutCtx = ReadoutCtx()

        # write parameters from hyperparameters to actual model
        readoutParamsOfModel(hypers, True, modelA, ncsCtx)


        
        
        ####################
        # INFERENCE

        yTarget = torch.tensor([0.5, 0.9, 0.2])
        
        '''
        # stimulus vector
        x = torch.tensor([0.999, 0.9, -0.5, 0.0, 0.7,    0.5, 0.5, 0.1, 0.1, 0.0])
        
        z0 = x @ params1
        z0 = z0 + params2
        
        #z0 = torch.nn.functional.softmax(z0, dim=0)  # nonlinearity
        z0 = torch.nn.functional.leaky_relu(z0, negative_slope=0.05)  # nonlinearity
        
        #print(z0) # DBG
        
        y = z0
        '''
        











        
        
        R = transpose2( torch.tensor([[0.99, 0.01, 0.998,  0.11, 0.01, 0.00282,  0.00827, 0.0082629, 0.000986], [0.00882882, 0.992, 0.998,  0.11, 0.01, 0.00282,  0.00827, 0.0082629, 0.000986], [0.99, 0.01, 0.998,  0.999, 0.999, 0.00282,  0.00827, 0.0082629, 0.000986]]) )
        #Y = torch.rand(hopfield.n, hopfield.width)
        
        '''
        hopfieldOutA = hopfield.forward(R, Y)
        

        if False: # debug output from hopfield NN?
            torch.set_printoptions(precision=10)
            print(hopfieldOutA)
            torch.set_printoptions(profile='default')
        
        #logithead2Matrix = torch.rand(hopfield.n*hopfield.width, 3) # 3 is the size of the output vector


        
        m1 = torch.flatten(hopfieldOutA)
        
        

        

        logithead2Matrix = transpose2(logitheadMatrix)

        #print(m1.size())
        #print(logithead2Matrix.size())
        #print(logithead2Matrix)
        
        z0 = m1 @ logithead2Matrix

        y = z0
        
        #print(y) # DBG

        '''

        x = torch.tensor([0.9, 0.8, 1.0, 0.5])

        #y = modelA.forward(R, Y)
        y = modelA.forward(x)





        
        loss = torch.nn.functional.mse_loss(y, yTarget)
        
        if False: # print loss?
            print(f'loss={loss}')
    
        if loss < bestLossThisStep:
            bestLossThisStep = loss
            hypersWithBestLossThisStep = hypers


    

    # did we find a better solution?
    if bestLossThisStep < hypersBestLoss:
        hypersBest = hypersWithBestLossThisStep
        hypersBestLoss = bestLossThisStep

    if True and (itOuter % 50) == 0: # print loss?
        print(f'hypersBestLoss={hypersBestLoss}')



# HALFDONE< implement simple learning algorithm by generating a new candidate hyperparameter vector and then evaluate  the parameterized network on a (small) batch >



nHyperParams=11328
hypersBestLoss=0.32218170166015625


KeyboardInterrupt: 