In [2]:
class UnitMathOps:
    def __init__(self, value, units, utype, T):
        self.T = T
        self.value = 0.00
        self.units = units.replace(utype, '')
        self.type  = utype
        self.convTable = {'μ': 10**-6,
                          'n': 10**-6,
                          'm': 10**-3, 
                          '' : 1,
                          'k': 10** 3,
                          'M': 10** 6 }
        self.setValue(value, units)

    def __add__(self, other):
        self.assertType('add', other.type)
        val = self.value + other.value
        return self.T(self.getValue(val, self.units), self.units)

    def __sub__(self, other):
        self.assertType('sub', other.type)
        val = self.value - other.value
        return self.T(self.getValue(val, self.units), self.units)

    def __mul__(self, other):
        self.assertType('mul', other.type)
        val = self.value * other.value
        return self.T(self.getValue(val, self.units), self.units)

    def __div__(self, other):
        self.assertType('div', other.type)
        val = self.value / other.value
        return self.T(self.getValue(val, self.units), self.units)

    def __floordiv__(self, other):
        self.assertType('floordiv', other.type)
        val = self.value // other.value
        return self.T(self.getValue(val, self.units), self.units)

    def __mod__(self, other):
        self.assertType('mod', other.type)
        val = self.value % other.value
        return self.T(self.getValue(val, self.units), self.units)

    def __pow__(self, other):
        self.assertType('pow', other.type)
        val = self.value ** othereservoir r.value
        return self.T(self.getValue(val, self.units), self.units)

    def __abs__(self):
        val = abs(self.value)
        return self.T(self.getValue(val, self.units), self.units)

    def __lt__(self, other):
        self.assertType('compare', other.type)
        return self.value < other.value

    def __le__(self, other):
        self.assertType('compare', other.type)
        return self.value <= other.value

    def __eq__(self, other):
        self.assertType('compare', other.type)
        return self.value == other.value

    def __ne__(self, other):
        self.assertType('compare', other.type)
        return self.value != other.value

    def __ge__(self, other):
        self.assertType('compare', other.type)
        return self.value >= other.value

    def __gt__(self, other):
        self.assertType('compare', other.type)
        return self.value > other.value

    def __str__(self):
        return '{} {}{}'.format(self.getValue(self.value, self.units), 
                                self.units, self.type)
    
    def assertType(self, func, t):
        assert self.type == t, 'ERROR: Cannot {} type {} and {}' \
                         .format(func, self.type, t)

    def setValue(self, value, units=''):
        self.value = value * self.convTable[units]
        
    def getValue(self, value, units=''):
        return value / self.convTable[units]

In [3]:
import math

class Power(UnitMathOps):
    def __init__(self, watts, units=''):
        P = lambda w, u: Power(w, u)
        UnitMathOps.__init__(self, watts, units , 'W', P) # P

    def volts(self, other):
        assert other.type in ['A', 'Ω'], 'ERROR: Must be type Amps or Ohms'
        if (other.type == 'A'):
            return Voltage(self.value / other.value)
        else:
            return Voltage(math.sqrt(self.value * other.value))
            
    def amps(self, other):
        assert other.type in ['V', 'Ω'], 'ERROR: Must be type Volts or Ohms'
        if (other.type == 'V'):
            return Current(self.value / other.value)
        else:
            return Current(math.sqrt(self.value / other.value))

    def ohms(self, other):
        assert other.type in ['A', 'V'], 'ERROR: Must be type Amps or Volts'
        if (other.type == 'A'):
            return Resistance(self.value / (other.value**2))
        else:
            return Resistance((other.value**2) / self.value)
        
        
class Voltage(UnitMathOps):
    def __init__(self, volts, units=''):
        V = lambda w, u: Voltage(w, u)
        UnitMathOps.__init__(self, volts, units, 'V', V) # V

    def watts(self, other):
        assert other.type in ['A', 'Ω'], 'ERROR: Must be type Amps or Ohms'
        if (other.type == 'A'):
            return Power(self.value * other.value)
        else:
            return Power((self.value**2) / other.value)
            
    def amps(self, other):
        assert other.type in ['W', 'Ω'], 'ERROR: Must be type Watts or Ohms'
        if (other.type == 'W'):
            return Current(other.value / self.value)
        else:
            return Current(math.sqrt(self.value / other.value))

    def ohms(self, other):
        assert other.type in ['A', 'W'], 'ERROR: Must be type Amps, or Watts'
        if (other.type == 'A'):
            return Resistance(self.value / other.value)
        else:
            return Resistance((self.value**2) / other.value)

class Current(UnitMathOps):
    def __init__(self, amps, units=''):
        C = lambda w, u: Current(w, u)
        UnitMathOps.__init__(self, amps, units, 'A', C) # I

    def watts(self, other):
        assert other.type in ['V', 'Ω'], 'ERROR: Must be type Volts or Ohms'
        if (other.type == 'V'):
            return Power(self.value * other.value)
        else:
            return Power((self.value**2) * other.value)
            
    def volts(self, other):
        assert other.type in ['W', 'Ω'], 'ERROR: Must be type Watts or Ohms'
        if (other.type == 'W'):
            return Voltage(other.value / self.value)
        else:
            return Voltage(self.value * other.value)

    def ohms(self, other):
        assert other.type in ['W', 'V'], 'ERROR: Must be type Watts or Volts'
        if (other.type == 'W'):
            return Resistance(ohter.value / (self.value**2))
        else:
            return Resistance(math.sqrt(other.value/self.value))

class Resistance(UnitMathOps):
    def __init__(self, ohms, units=''):
        R = lambda w, u: Resistance(w, u)
        UnitMathOps.__init__(self, ohms, units, 'Ω', R) # R

    def watts(self, other):
        assert other.type in ['A', 'V'], 'ERROR: Must be type Amps or Volts'
        if (other.type == 'A'):
            return Power(self.value * (other.value**2))
        else:
            return Power((other.value**2) / self.value)
            
    def volts(self, other):
        assert other.type in ['A', 'W'], 'ERROR: Must be type Amps or Watts'
        if (other.type == 'A'):
            return Voltage(self.value * other.value)
        else:
            return Voltage(math.sqrt(self.value * other.value))

    def amps(self, other):
        assert other.type in ['W', 'V'], 'ERROR: Must be type Watts or Volts'
        if (other.type == 'W'):
            return Current(math.sqrt(other.value / self.value))
        else:
            return Current(other.value / self.value)



In [16]:
from copy import deepcopy
from collections import deque
from time import time
import random
'''
PoNS Page 33.    ::   Amplitude(mV)  | Duration                | Summation
Receptor potentials   Small (0.1-10) | Brief (5-100ms)         | Graded
Synaptic potentials   Small (0.1-10) | Brief->Long (5ms-20min) | Graded
Action potentials     Large (70-110) | Brief (1-10ms)          | All-or-none

TODO: Add localized gradient descent function to neuron.
'''

class Neuron:
    def __init__(self, indexXYZ, learningRate, ntype):
        self.hashAddress(indexXYZ)
        self.learningRate = learningRate
        self.type = ntype
        self.uRest = Voltage(-65., 'm')        # Constant membrane potential
        self.u = deepcopy(self.uRest)          # Membrane potential
        self.𝜗 = Voltage(20, 'm')               # Axon Hillock threshold
        self.preSynaptic = dict()               # {Address: Weight} Dentrites
        self.postSynaptic = dict()              # {Address: Neuron} Axon 
        self.historyStack = deque()             # [Address, Time]
        self.hpt = 0                            # Hyperpolerization time
        self.t = 0                              # Time step (milliseconds)
        random.seed(time())

    def hashAddress(self, indexXYZ):           # Up to 3 dim addressing for
        self.iaddress = indexXYZ               # more advanced topologies.
        self.address  = hex(indexXYZ[0])[2:]+'.' # Hex address converted
        self.address += hex(indexXYZ[1])[2:]+'.' # to a string to be used
        self.address += hex(indexXYZ[2])[2:]     # as key in dict().

    def dendrite(self, address, t):
        self.t = t                              # Update current time
        self.historyStack(address)              # Record presynaptic potential
        synapticPotial = self.preSynaptic[address] # Get synaptic weight
        self.u += Voltage(synapticPotial, 'm')  # Update membrane potential
        self.PSP = self.u + self.uRest         # Update postsynaptic potential
        self.axonHillock()                      # Move to axonHillock()

    def historyStack(self, address):
        l = len(self.historyStack)              # Record length of stack
        dt = self.t - self.historyStack[0][1]   # Record time since oldest spike
        while(l > 0 and dt > 20):               # For all older than 20ms
            self.historyStack.popleft()         # Erase from historyStack
            dt = self.t - self.historyStack[0][1] # Record next spike dt
            l -= 1                              # Update loop condition
        self.historyStack.append([address, self.t]) # Append to historyStack
    
    def STDP(self, depolarization):
        if (depolarization):                    # If depolarization:
            self.hpt[1] = self.t + 20           # Hyperpolarization next 20ms
            for address, t in self.historyStack:# Depolarize all in historyStack
                # Depolarize synaptic weights by learning rate and distance ...
                dt = self.t - t if (self.t - t > 0) else 1 #...from recent spike
                self.preSynaptic[address] *= Voltage(1+(self.learningRate/dt))
        else:                                   # Else hyperpolarization:
            dhpt = self.hpt - self.t            # Difference in hpt
            if (dhpt > 0):                      # If Hyperpolerization active
                address = self.historyStack[-1][0] # Get most recent address
                # Hyperpolarize synaptic weight by learning rate
                self.preSynaptic[address] *= \
                                        Voltage(1-(self.learningRate*(dt/20)))

    def axonHillock(self):
        if (self.PSP >= self.𝜗):                # If PSP > threshold : FIRE!!!
            self.STDP(True)                     # Activate STDP learning
            self.actionPotential()              # Move to actionPotential()
        else:                                   # Else
            self.STDP(False)                    # Check hyperpolerization

    def actionPotential(self):
        if (self.type == 'output'):
            print('Output Potential')           # TODO: LOG!!*~*~*~*~*~*~*~*~*~*
        for neuron in self.postSynaptic.values: # For each neuron linked to axon
            neuron.dendrite(self.address, self.t+1) # Postsynaptic potential
            print('{} Potential'.format(self.address)) # TODO: LOG!! *~*~*~*~*~*

In [2]:
from datetime import date
import numpy as np
import os.path
import sqlite3

class Brain:
    def __init__(self, dbFilename=None, learningRate=0.001, topology='ff'
                 inputNeurons=1, hiddenNeurons=1, outputNeurons=1):
        self.memory = Memory()
        if (dbFilename):
            self.memory.load(dbFilename)
            self.filename = dbFilename
            self.brainID = self.memory.brainScheme[0]
            self.learningRate = self.memory.brainScheme[2]
            self.topology = self.memory.brainScheme[3]
            self.network  = {'input':  self.memory.brainScheme[4],
                             'hidden': self.memory.brainScheme[5],
                             'output': self.memory.brainScheme[6]}
        else:
            self.memory.init()
            self.filename = self.memory.getAFilename()
            self.brainID = self.memory.brainScheme[0]
            self.learningRate = learningRate
            self.topology = topology            # {feedforward, resevoir}
            self.network  = {'input':  inputNeurons,
                             'hidden': hiddenNeurons,
                             'output': outputNeurons}
            self.ineurons = list()                  # Input neurons count
            self.hneurons = list()                  # Hidden neuron count
            self.oneurons = list()                  # Output neuron count
            if (topology in ['ff', 'feedforward']):
                self.buildFeedforward()
            elif (topology in ['rs', 'resevoir']):
                self.buildReservoir()

    def buildFeedForward(self):
        odict = dict()
        for o in range(self.network['output']):
            self.oneurons.append(Neuron([o, 2, 0]), 
                                 self.learningRate, 'output')
            odict[self.oneurons[-1].address] = self.oneurons[-1]
        hdict = dict()
        for h in range(self.network['hidden']):
            self.hneurons.append(Neuron([h, 1, 0]), 
                                 self.learningRate, 'hidden')
            hdict[self.hneurons[-1].address] = self.hneurons[-1]
            for o in range(self.oneurons):      # oneuron preSynaptic
                o.preSynaptic[self.oneurons[-1].address] = self.bttrRndDist()
            self.hneurons[-1].postSynaptic = odict # hneuron postSynaptic
        ilist = list()
        for i in range(self.network['input']):
            self.ineurons.append(Neuron([i, 0, 0]), 
                                 self.learningRate)
            ilist.append(self.ineurons[-1])
            for h in range(self.hneurons):      # hneuron preSynaptic
                h.preSynaptic[self.hneurons[-1].address] = self.bttrRndDist()
            self.ineurons[-1].postSynaptic = hdict # ineuron postSynaptic

    def buildReservoir(self):
        pass

    def bttrRndDist(self):
        pv = np.random.normal(0.5, 0.1)
        nv = np.random.normal(-0.5, 0.1)
        return np.random.choice([nv, pv])

    def simulate(self, spikeTrainMatrix):
        # Run the simulation, given input trains, and log neural activities
        pass

    def show(self):
        # Graph and show network and logged activities in animation. Plotly?
        pass

class Memory:
    def __init__(self)
    
    def load(self, dbFilename):
        self.memoryDB = sqlite3.connect(dbFilename)
        self.memory = self.memoryDB.cursor()
        self.brainInitVars = self.memory.execute('''
            select *
            from brainInitVars
            order by brainID desc
            limit 1;
        ''')[0]

    def init(self)
        self.memoryDB = sqlite3.connect(':memory:')
        self.memory = self.memoryDB.cursor()
        self.initSchemes()

    def getAFilename(self):
       filename = 'brain_'+str(date.today())
        count = 1
        while (not os.path.isfile(filename+'.db')):
           filename =filename[:16]+'('+str(count)+')'
            count += 1
        return filename

    def initSchemes(self):
        self.memory.execute('''
            create table brainInitVars (
                brainID integer primary key autoincrement,
                tdate text,
                learningRate real,
                topology text,
                inputNeurons integer,
                hiddenNeurons integer,
                outputNeurons integer,
                inputNeuronFilename text,
                hiddenNeuronFilename text,
                outputNeuronFilename text
            )
        ''')

    def saveMemory(self, close=False):
        self.memory.execute('''
            insert into brainInit
            values (date('now'), {}, {}, {}, {}, {})
        '''.format(self.learningRate, self.topology, self.network[0],
                   self.network[1], self.network[2]))
        with open(self.dbFilename) as f:
            for line in self.brainDB.iterdump():
                f.write('{}/n'.format(line))
        if (close):
            self.brainDB.close()

In [37]:
n1 = Neuron([55,3,17], 0.001)
n2 = deepcopy(n1)
n1.u += n2.u
print(n1.u)
print(n1.address)        

"Ó|XG2'¾\x0b¢\x9e±\x8cp|\x9cè"