In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Magic(nn.Module):
    def __init__(self, d_model, n_heads, d_ff):
        super(Magic, self).__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads
        self.d_ff = d_ff

        # Multi-Head Attention components
        self.Q_linear = nn.Linear(d_model, d_model)
        self.K_linear = nn.Linear(d_model, d_model)
        self.V_linear = nn.Linear(d_model, d_model)

        # Output projection
        self.out_linear = nn.Linear(d_model, d_model)

        # Feedforward Network (FFN)
        self.ffn1 = nn.Linear(d_model, d_ff)
        self.ffn2 = nn.Linear(d_ff, d_model)

        # Layer Normalization
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x):
        batch_size, seq_len, _ = x.size()

        # Compute Q, K, V
        Q = self.Q_linear(x).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        K = self.K_linear(x).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        V = self.V_linear(x).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)

        # Compute attention scores: Q @ K.T
        attn = torch.matmul(Q, K.transpose(-2, -1)) / self.d_k ** 0.5  # Scale attention scores
        
        mask = torch.triu(torch.ones_like(attn), diagonal=1).bool()
        attn = attn.masked_fill(mask, float('-inf'))
        attn = F.softmax(attn, dim=-1)

        # Attention output
        output = torch.matmul(attn, V).transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        output = self.out_linear(output)

        # Add & Normalize
        x = self.norm1(x + output)

        # Feed-Forward Network (FFN)
        ffn_out = F.relu(self.ffn1(x))
        ffn_out = self.ffn2(ffn_out)

        # Add & Normalize
        x = self.norm2(x + ffn_out)

        return x


class multiBERT(nn.Module):
    def __init__(self, vocab_size, d_model=80, n_heads=2, d_ff=2048, max_length=512):
        super(multiBERT, self).__init__()
        self.emb = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = self.create_positional_encoding(max_length, d_model)
        self.magics = nn.ModuleList([Magic(d_model, n_heads, d_ff) for _ in range(3)])
        self.linear = nn.Linear(d_model, vocab_size)

    def create_positional_encoding(self, max_length, embed_dim):
        position = torch.arange(max_length, dtype=torch.float).unsqueeze(1)
        dim = torch.arange(embed_dim, dtype=torch.float).unsqueeze(0)
        angles = position / (10000 ** (dim / embed_dim))
        pos_encoding = torch.zeros(max_length, embed_dim)
        pos_encoding[:, 0::2] = torch.sin(angles[:, 0::2])
        pos_encoding[:, 1::2] = torch.cos(angles[:, 1::2])
        return pos_encoding

    def forward(self, inputs):
        embs = self.emb(inputs)

        # Add positional encoding (broadcast to match batch size)
        embs += self.positional_encoding[:embs.size(1), :].to(embs.device)

        # Pass through Magic layers
        for magic in self.magics:
            embs = magic(embs)

        return embs


In [3]:
data = ['A', 'A', 'B', 'B', 'C', 'C', 'A', 'A', 'B', 'B', 'C', 'C', 'A']

In [15]:

vocab = {char: idx for idx, char in enumerate(set(data))}  # Create a vocab dictionary
encoded_data = [vocab[char] for char in data]  # Convert data to indices
vocab_size = len(vocab)  # Total number of unique tokens

print(vocab)

{'C': 0, 'A': 1, 'B': 2}


In [14]:
sequence_length = 3  # Length of each input sequence
inputs = []
outputs = []
for i in range(len(encoded_data) - sequence_length):
    inputs.append(encoded_data[i:i+sequence_length])
    outputs.append(encoded_data[i+sequence_length])


print(inputs)
print(outputs)

[[1, 1, 2], [1, 2, 2], [2, 2, 0], [2, 0, 0], [0, 0, 1], [0, 1, 1], [1, 1, 2], [1, 2, 2], [2, 2, 0], [2, 0, 0]]
[2, 0, 0, 1, 1, 2, 2, 0, 0, 1]


In [8]:
import torch

#convert to tensors
inputs = torch.tensor(inputs, dtype=torch.long)  # Input sequences
outputs = torch.tensor(outputs, dtype=torch.long)  # Target next characters

In [9]:
d_model = 80  # Dimension of embeddings, can be adjusted
n_heads = 2  # Number of attention heads
d_ff = 160  # Dimension of feed-forward network

model = multiBERT(vocab_size=vocab_size, d_model=d_model, n_heads=n_heads, d_ff=d_ff)

In [10]:
#train
import torch.optim as optim
import torch.nn.functional as F
import wandb

optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# Initialize W&B
wandb.init(project="transformers", name='transforming bert')
epochs = 100  # Number of training epochs

for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()

    # Forward pass
    predictions = model(inputs)  # Shape: (batch_size, seq_len, vocab_size)
    predictions = predictions[:, -1, :]  # Only take the last output for each input sequence

    # Compute the loss
    loss = criterion(predictions, outputs)
    loss.backward()
    optimizer.step()

    # Log the loss value
    wandb.log({'average_loss': loss.item()})

    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item():.4f}")

wandb.finish()

[34m[1mwandb[0m: Using wandb-core as the SDK backend. Please refer to https://wandb.me/wandb-core for more information.
[34m[1mwandb[0m: Currently logged in as: [33mliquid-candidate[0m ([33mliquid-candidate-personal[0m). Use [1m`wandb login --relogin`[0m to force relogin


Epoch 0, Loss: 5.1093
Epoch 10, Loss: 0.3003
Epoch 20, Loss: 0.0607
Epoch 30, Loss: 0.0261
Epoch 40, Loss: 0.0163
Epoch 50, Loss: 0.0126
Epoch 60, Loss: 0.0108
Epoch 70, Loss: 0.0098
Epoch 80, Loss: 0.0091
Epoch 90, Loss: 0.0086


0,1
average_loss,█▆▂▂▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁

0,1
average_loss,0.00823


In [None]:
def predict_next(model, input_seq, vocab):
    model.eval()
    input_tensor = torch.tensor(input_seq, dtype=torch.long).unsqueeze(0)  # Add batch dimension
    with torch.no_grad():
        output = model(input_tensor)
        last_output = output[:, -1, :]  # Get the last time step output
        predicted_idx = torch.argmax(last_output, dim=-1).item()
        for char, idx in vocab.items():
            if idx == predicted_idx:
                return char

# # Example usage
# input_seq = [vocab['C'], vocab['C'], vocab['B']]  # Sequence to predict next character
# predicted_char = predict_next(model, input_seq, vocab)
# print(f"Predicted next character: {predicted_char}")

Predicted next character: A


In [22]:
# Predict the next 20 characters
predicted = []
start_seq = ['A', 'A', 'B']

while len(predicted) < 20:
    input_seq_tokens = [vocab.get(token) for token in start_seq]
    predicted_char = predict_next(model, input_seq_tokens, vocab)
    predicted.append(predicted_char)
    start_seq.append(predicted_char)  # Append character itself, not its vocab ID
    start_seq = start_seq[1:]  # Keep the length of start_seq fixed

print("Predicted sequence:", predicted)


Predicted sequence: ['B', 'C', 'C', 'A', 'A', 'B', 'B', 'C', 'C', 'A', 'A', 'B', 'B', 'C', 'C', 'A', 'A', 'B', 'B', 'C']


In [None]:
import torch.optim as optim
import torch.nn.functional as F

optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

epochs = 100  # Number of training epochs

for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()

    # Forward pass
    predictions = model(inputs)  # Shape: (batch_size, seq_len, vocab_size)
    predictions = predictions[:, -1, :]  # Only take the last output for each input sequence

    # Compute the loss
    loss = criterion(predictions, outputs)
    loss.backward()
    optimizer.step()

    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item():.4f}")

In [None]:
import torch
import collections

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Read the file
with open('sentence_test_data.txt', "r") as f:
    sentences = f.read().splitlines()  # Each line is a separate sentence

def create_lookup_tables(sentences: list[str]) -> tuple[dict[str, int], dict[int, str]]:
    # Split sentences into individual words
    words = []
    for sentence in sentences:
        words.extend(sentence.split())  # Split each sentence and add words to the list

    # Count unique words and sort by frequency
    word_counts = collections.Counter(words)
    vocab = sorted(word_counts, key=lambda k: word_counts[k], reverse=True)
    
    # Create int-to-word and word-to-int mappings with special tokens
    int_to_vocab = {ii + 1: word for ii, word in enumerate(vocab)}
    int_to_vocab[0] = '<PAD>'
    int_to_vocab[len(int_to_vocab)] = '<UNK>'  # Add '<UNK>' token at the end
    vocab_to_int = {word: ii for ii, word in int_to_vocab.items()}
    
    return vocab_to_int, int_to_vocab

# Creating dictionary
words_to_ids, ids_to_words = create_lookup_tables(sentences)

# Tokenize sentences
tokens = []
for sentence in sentences:
    # Split the sentence into words and convert to IDs
    sentence_tokens = [words_to_ids.get(word, words_to_ids['<UNK>']) for word in sentence.split()]
    tokens.append(sentence_tokens)
