In [1]:
import math
import inspect
from dataclasses import dataclass

import torch
import torch.nn as nn
from torch.nn import functional as F

class CognitiveClarifier(nn.Module):
    """ LayerNorm but with an optional bias. PyTorch doesn't support simply bias=False """
    def __init__(self, ndim, bias):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(ndim))
        self.bias = nn.Parameter(torch.zeros(ndim)) if bias else None

    def forward(self, input):
        return F.layer_norm(input, self.weight.shape, self.weight, self.bias, 1e-5)

In [2]:
class TemporalAttention(nn.Module):

    def __init__(self, config):
        super().__init__()
        assert config.n_embd % config.n_head == 0
        # key, query, value projections for all heads, but in a batch
        self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd, bias=config.bias)
        # output projection
        self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)
        # regularization
        self.attn_dropout = nn.Dropout(config.dropout)
        self.resid_dropout = nn.Dropout(config.dropout)
        self.n_head = config.n_head
        self.n_embd = config.n_embd
        self.dropout = config.dropout
        
        # flash attention make GPU go brrrrr but support is only in PyTorch >= 2.0
        self.flash = hasattr(torch.nn.functional, 'scaled_dot_product_attention')
        if not self.flash:
            print("WARNING: using slow attention. Flash Attention requires PyTorch >= 2.0")
            # causal mask to ensure that attention is only applied to the left in the input sequence
            self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size))
                                        .view(1, 1, config.block_size, config.block_size))

    def forward(self, x):
        B, T, C = x.size() # batch size, sequence length, embedding dimensionality (n_embd)

        # calculate query, key, values for all heads in batch and move head forward to be the batch dim
        q, k, v  = self.c_attn(x).split(self.n_embd, dim=2)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)

        # causal self-attention; Self-attend: (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)
        if self.flash:
            # efficient attention using Flash Attention CUDA kernels
            y = torch.nn.functional.scaled_dot_product_attention(q, k, v, attn_mask=None, dropout_p=self.dropout if self.training else 0, is_causal=True)
        else:
            # manual implementation of attention
            att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
            att = att.masked_fill(self.bias[:,:,:T,:T] == 0, float('-inf'))
            att = F.softmax(att, dim=-1)
            att = self.attn_dropout(att)
            y = att @ v # (B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)
        y = y.transpose(1, 2).contiguous().view(B, T, C) # re-assemble all head outputs side by side

        # output projection
        y = self.resid_dropout(self.c_proj(y))
        return y


In [3]:
class Neuron(nn.Module):

    def __init__(self, in_features, out_features, bias=True, dropout=0.0):
        super().__init__()
        self.linear = nn.Linear(in_features, out_features, bias=bias) #c_fc or c_proj in the original MLP
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.linear(x)
        x = self.gelu(x)
        x = self.dropout(x)
        return x

In [4]:
class NeuralNetwork(nn.Module):

    def __init__(self, numberOfNeurons, bias=True, dropout=0.0):
        super().__init__()
        self.neurons = nn.ModuleList()
        for i in range(len(numberOfNeurons) - 1):
            self.neurons.append(Neuron(numberOfNeurons[i], numberOfNeurons[i + 1], bias=bias, dropout=dropout))

    def forward(self, x):
        for aNeuron in self.neurons:
            x = aNeuron(x)
        return x

In [5]:
## Tests:

# Define a neural network with 3 layers: input layer with 5 neurons, hidden layer with 10 neurons, output layer with 2 neurons
layers = [5, 10, 2]
neural_network = NeuralNetwork(layers, bias=True, dropout=0.1)

# Example tensor with shape (batch_size, input_features)
x = torch.randn(8, 5)
output = neural_network(x)
print(output)

tensor([[ 0.3800,  0.2443],
        [-0.0059, -0.0246],
        [ 0.0790, -0.1704],
        [ 0.1027,  0.0710],
        [ 0.0275, -0.0944],
        [ 0.1436,  0.1779],
        [ 0.2156, -0.1365],
        [ 0.2178, -0.0000]], grad_fn=<MulBackward0>)


In [6]:
class ThoughtProcessor(nn.Module):
    """
    A block consisting of multi-head self-attention, feed-forward network, layer normalization, and residual connections.
    
    Arguments:
        config (object): Configuration object with attributes n_embd, n_head, dropout, block_size, bias.
    """
    def __init__(self, config):
        super().__init__()
        self.ln1 = CognitiveClarifier(config.n_embd, bias=config.bias)
        self.ln2 = CognitiveClarifier(config.n_embd, bias=config.bias)
        self.attn = TemporalAttention(config)
        self.mlp = NeuralNetwork([config.n_embd, 4 * config.n_embd, config.n_embd], bias=config.bias, dropout=config.dropout)

    def forward(self, x):
        # Multi-head self-attention
        x = x + self.attn(self.ln1(x))
        # Feed-forward network
        x = x + self.mlp(self.ln2(x))
        return x

In [7]:
@dataclass
class NeuralCircuitSettings:
    n_embd = 64
    n_head = 8
    dropout = 0.1
    block_size = 128
    bias = True
    vocabSize = 50257 #vocabSize
    n_layer = 12 #n_layer
    bias = True

config = NeuralCircuitSettings()
block = ThoughtProcessor(config)

# Example tensor with shape (batch_size, sequence_length, embedding_dim)
x = torch.randn(32, 128, 64)
output = block(x)
print(output.size())  # Expected output size: (32, 128, 64)

torch.Size([32, 128, 64])


In [8]:
class Cortex(nn.Module):
    """
    Cortex (GPT) is like the whole brain, composed of multiple ThoughtProcessors,
    which together process input data sequentially to generate output, mimicking the flow of thought.
    """
    def __init__(self, neuroConfig: NeuralCircuitSettings): 
        super().__init__()

        if neuroConfig is None:
            print("No configuration provided, using default settings.")

        assert neuroConfig.vocabSize is not None
        assert neuroConfig.block_size is not None            

        self.neuroConfig = neuroConfig

        self.tokenEmbedding = nn.Embedding(neuroConfig.vocabSize, neuroConfig.n_embd)
        self.positionalEmbedding = nn.Embedding(neuroConfig.block_size, neuroConfig.n_embd)
        self.dropout = nn.Dropout(neuroConfig.dropout)
        self.thoughtProcessors = nn.ModuleList([ThoughtProcessor(neuroConfig) for _ in range(neuroConfig.n_layer)])
        self.outputNormalizer = CognitiveClarifier(neuroConfig.n_embd, bias=neuroConfig.bias)
        self.outputLayer = nn.Linear(neuroConfig.n_embd, neuroConfig.vocabSize, bias=False)
        self.tokenEmbedding.weight = self.outputLayer.weight  # Weight tying

        self.apply(self._init_lessonScalingFactors)
        
        for pn, p in self.named_parameters():  # Apply special scaled init to the residual projections, per GPT-2 paper
            if pn.endswith('outputProjection.weight') or pn.endswith('outputLayer.weight'):
                torch.nn.init.normal_(p, mean=0.0, std=0.02 / math.sqrt(2 * neuroConfig.n_layer))

    def getNumberOfSynapses(self, non_embedding=True):
        """
        Return the number of parameters in the model.
        For non-embedding count (default), the position embeddings get subtracted.
        The token embeddings would too, except due to the parameter sharing these
        params are actually used as weights in the final layer, so we include them.
        """
        synapsesCount_n_params = sum(p.numel() for p in self.parameters())
        if non_embedding:
            synapsesCount_n_params -= self.positionalEmbedding.weight.numel()
        return synapsesCount_n_params

    def _init_lessonScalingFactors(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, dendrites_input, targets=None, external_context=None):
        device = dendrites_input.device
        cognitiveBatchSize, cognitiveSequenceLength = dendrites_input.size()
        assert cognitiveSequenceLength <= self.neuroConfig.block_size, f"Cannot forward sequence of length {cognitiveSequenceLength}, block size is only {self.neuroConfig.block_size}"
        temporalPositions = torch.arange(0, cognitiveSequenceLength, dtype=torch.long, device=device)

        tokenEmbeddings = self.tokenEmbedding(dendrites_input)
        positionalEmbeddings = self.positionalEmbedding(temporalPositions)
        sensoryInput = self.dropout(tokenEmbeddings + positionalEmbeddings)
        
        for aThoughtProcessor in self.thoughtProcessors:
            sensoryInput = aThoughtProcessor(sensoryInput)

        behaviouralResponse = self.outputNormalizer(sensoryInput)
        
        if targets is not None:
            logits = self.outputLayer(behaviouralResponse)
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)
        else:
            logits = self.outputLayer(behaviouralResponse[:, [-1], :])
            loss = None

        return logits, loss

    def reduceThoughtProcessorSize_crop_block_size(self, block_size):
        assert block_size <= self.neuroConfig.block_size
        self.neuroConfig.block_size = block_size
        self.positionalEmbedding.weight = nn.Parameter(self.positionalEmbedding.weight[:block_size])
        for aThoughtProcessor in self.thoughtProcessors:
            if hasattr(aThoughtProcessor.attn, 'bias'):
                aThoughtProcessor.attn.bias = aThoughtProcessor.attn.bias[:,:,:block_size,:block_size]
    
    @classmethod
    def loadMemories(cls, modelType, overrideArgs=None):
        assert modelType in {'gpt2', 'gpt2-medium', 'gpt2-large', 'gpt2-xl'}
        overrideArgs = overrideArgs or {}
        
        assert all(k == 'dropoutRate' for k in overrideArgs)
        from transformers import GPT2LMHeadModel
        print(f"Loading weights from pretrained GPT: {modelType}")
        
        configArgs = {
            'gpt2':         dict(n_layer=12, n_head=12, n_embd=768),
            'gpt2-medium':  dict(n_layer=24, n_head=16, n_embd=1024),
            'gpt2-large':   dict(n_layer=36, n_head=20, n_embd=1280),
            'gpt2-xl':      dict(n_layer=48, n_head=25, n_embd=1600),
        }[modelType]
        configArgs.update({'vocabSize': 50257, 'block_size': 1024, 'bias': True})
    
        if 'dropoutRate' in overrideArgs:
            configArgs['dropout'] = overrideArgs['dropoutRate']
    
        neuroConfig = NeuralCircuitSettings(**configArgs)
        model = cls(neuroConfig)
        memories = model.state_dict()
        memoryKeys_sd_keys = [key for key in memories.keys() if not key.endswith('.attn.bias')]
    
        model_hf = GPT2LMHeadModel.from_pretrained(modelType)
        memories_hf = model_hf.state_dict()
        memories_keys_hf = [key for key in memories_hf.keys() if not key.endswith('attn.masked_bias') and not key.endswith('attn.bias')]
    
        transposed = ['attn.c_attn.weight', 'attn.c_proj.weight', 'mlp.c_fc.weight', 'mlp.c_proj.weight']
        
        assert len(memories_keys_hf) == len(memoryKeys_sd_keys), f"Mismatched keys: {len(memories_keys_hf)} != {len(memoryKeys_sd_keys)}"
        for key in memories_keys_hf:
            if any(key.endswith(w) for w in transposed):
                assert memories_hf[key].shape[::-1] == memories[key].shape
                with torch.no_grad():
                    memories[key].copy_(memories_hf[key].t())
            else:
                assert memories_hf[key].shape == memories[key].shape
                with torch.no_grad():
                    memories[key].copy_(memories_hf[key])
    
        return model
    
    @torch.no_grad()
    def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None, gradualOutput=False, external_context=None):
        encode, decode = getCognitiveInterpreters()
    
        for _ in range(max_new_tokens):
            idx_cond = idx if idx.size(1) <= self.neuroConfig.block_size else idx[:, -self.neuroConfig.block_size:]
            logits, _ = self(idx_cond, external_context=external_context)
            logits = logits[:, -1, :] / temperature
            if top_k is not None:
                v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
                logits[logits < v[:, [-1]]] = -float('Inf')
            probabilities = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probabilities, num_samples=1)
    
            if idx_next.item() == 50256 or idx_next.item() == encode(""):
                print("\nNatural Stop\n")
                break
    
            if idx_next < 0:
                raise ValueError(f"Negative token index encountered: {idx_next}")
            
            idx = torch.cat((idx, idx_next), dim=1)
    
            if gradualOutput:
                next_token_text = decode([idx_next])
                print(next_token_text, end='', flush=True)
    
        return decode(idx[0].tolist())
    
def getCognitiveInterpreters(metaPath=None):
    if metaPath is None:
        encoder = tiktoken.get_encoding("gpt2")
        encode = lambda s: encoder.encode(s, allowed_special={""})
        decode = lambda l: encoder.decode(l)
    else:
        if os.path.exists(metaPath):
            print(f"Loading meta from {metaPath}...")
            with open(metaPath, 'rb') as file:
                meta = pickle.load(file)
            toStringIndex, indexToString = meta['indexToString'], meta['stringToIndex']
            encode = lambda s: [toStringIndex[c] for c in s]
            decode = lambda l: ''.join([indexToString[I] for I in l])
    return encode, decode

In [1]:
import tiktoken
config = NeuralCircuitSettings()
model = Cortex(config)
# Example tensor with shape (batch_size, sequence_length)
x = torch.randint(0, config.vocabSize, (2, 128))
logits, loss = model(x)
print(f"Logits shape: {logits.shape}")
if loss is not None:
    print(f"Loss: {loss.item()}")

# Test the generate method
model.eval()
generated_sequence = model.generate(x[:, :10], max_new_tokens=50, temperature=1.0, top_k=10, gradualOutput=True)
print(f"Generated sequence: {generated_sequence}")

NameError: name 'NeuralCircuitSettings' is not defined