In [6]:
!pip install ezdxf
!pip install tinygrad
!pip install matplotlib
!pip install scikit-learn
!pip install loguru
!pip install colorama
!pip install tqdm
!pip install plotly
!pip install ipywidgets
!pip install scipy


DEPRECATION: Loading egg at /usr/local/python/3.12.1/lib/python3.12/site-packages/my_project-0.1.0-py3.12.egg is deprecated. pip 24.3 will enforce this behaviour change. A possible replacement is to use pip for package installation. Discussion can be found at https://github.com/pypa/pip/issues/12330
DEPRECATION: Loading egg at /usr/local/python/3.12.1/lib/python3.12/site-packages/tinygrad-0.9.2-py3.12.egg is deprecated. pip 24.3 will enforce this behaviour change. A possible replacement is to use pip for package installation. Discussion can be found at https://github.com/pypa/pip/issues/12330
DEPRECATION: Loading egg at /usr/local/python/3.12.1/lib/python3.12/site-packages/my_project-0.1.0-py3.12.egg is deprecated. pip 24.3 will enforce this behaviour change. A possible replacement is to use pip for package installation. Discussion can be found at https://github.com/pypa/pip/issues/12330
DEPRECATION: Loading egg at /usr/local/python/3.12.1/lib/python3.12/site-packages/tinygrad-0.9.2-py

## Util

In [7]:
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw

import ezdxf


# DXF export function
def export_loss_to_dxf(loss_values, filename="loss_curve.dxf"):
    doc = ezdxf.new(dxfversion='R2010')
    msp = doc.modelspace()
    scale = 10
    for i, loss in enumerate(loss_values[:-1]):
        msp.add_line((i * scale, loss_values[i] * scale), ((i + 1) * scale, loss_values[i + 1] * scale))
    doc.saveas(filename)
    print(f"Loss curve exported as {filename}")


## Attention Head

In [8]:
from tinygrad import Tensor
import numpy as np
import tinygrad as tngrd

class AttentionHead:
    def __init__(self, embed_dim, head_dim, requires_grad=True, dtype=tngrd.dtypes.float16):
        # Set dtype
        self.dtype = dtype

        # Initialize weights as Tensors with the specified dtype
        self.W_q = Tensor.randn(embed_dim, head_dim, dtype=self.dtype, requires_grad=requires_grad)
        self.W_k = Tensor.randn(embed_dim, head_dim, dtype=self.dtype, requires_grad=requires_grad)
        self.W_v = Tensor.randn(embed_dim, head_dim, dtype=self.dtype, requires_grad=requires_grad)
        self.W_o = Tensor.randn(head_dim, embed_dim, dtype=self.dtype, requires_grad=requires_grad)

        # Initialize biases
        self.b_q = Tensor.zeros(head_dim, dtype=self.dtype, requires_grad=requires_grad)
        self.b_k = Tensor.zeros(head_dim, dtype=self.dtype, requires_grad=requires_grad)
        self.b_v = Tensor.zeros(head_dim, dtype=self.dtype, requires_grad=requires_grad)
        self.b_o = Tensor.zeros(embed_dim, dtype=self.dtype, requires_grad=requires_grad)

        # Precompute the scale factor as a constant tensor
        self.scale_factor = Tensor((head_dim ** 0.5), dtype=self.dtype, requires_grad=False)

    def __call__(self, x):
        # Linear transformation for Q, K, V
        Q = x @ self.W_q + self.b_q
        K = x @ self.W_k + self.b_k
        V = x @ self.W_v + self.b_v

        # Dot product attention (QK^T)
        scores = Q @ K.transpose(2, 1)
        scaled_scores = scores / self.scale_factor
        attention_weights = scaled_scores.softmax(-1)

        # Attention output (Weighted sum of V)
        attention_output = attention_weights @ V

        # Final linear transformation
        output = attention_output @ self.W_o + self.b_o
        return output


### Tests

In [9]:
import unittest
from tinygrad.nn.optim import SGD
import tinygrad as tngrd

class TestAttentionHead(unittest.TestCase):

    def setUp(self):
        self.embed_dim = 16
        self.head_dim = 8
        self.batch_size = 2
        self.seq_length = 4
        self.attention_head = AttentionHead(self.embed_dim, self.head_dim, requires_grad=True, dtype=tngrd.dtypes.float16)

    def test_linear_projection_shapes(self):
        x = Tensor(np.random.randn(self.batch_size, self.seq_length, self.embed_dim).astype(np.float32))
        output = self.attention_head(x)
        self.assertEqual(output.shape, (self.batch_size, self.seq_length, self.embed_dim))

    def test_forward_pass(self):
        x = Tensor(np.random.randn(self.batch_size, self.seq_length, self.embed_dim).astype(np.float32))
        output = self.attention_head(x)
        self.assertIsNotNone(output)
        self.assertEqual(output.shape, (self.batch_size, self.seq_length, self.embed_dim))

    def test_gradient_check(self):
        x = Tensor(np.random.randn(self.batch_size, self.seq_length, self.embed_dim).astype(np.float32), requires_grad=True)
        output = self.attention_head(x)
        target = Tensor(np.random.randn(self.batch_size, self.seq_length, self.embed_dim).astype(np.float32))
        loss = ((output - target) ** 2).mean()
        loss.backward()

        self.assertIsNotNone(x.grad)
        self.assertEqual(self.attention_head.W_q.grad.shape, self.attention_head.W_q.shape)

    def test_training(self):
        # Enable training mode
        Tensor.training = True  # Make sure Tensor.training is enabled for the optimizer

        # Test training over multiple iterations with gradient descent
        optimizer = SGD([self.attention_head.W_q, self.attention_head.W_k, self.attention_head.W_v, self.attention_head.W_o,
                        self.attention_head.b_q, self.attention_head.b_k, self.attention_head.b_v, self.attention_head.b_o], lr=0.01)

        loss_values = []  # Track loss values for plotting

        for _ in range(200):
            x = Tensor(np.random.randn(self.batch_size, self.seq_length, self.embed_dim).astype(np.float32), requires_grad=True)
            target = Tensor(np.random.randn(self.batch_size, self.seq_length, self.embed_dim).astype(np.float32))
            output = self.attention_head(x)
            loss = ((output - target) ** 2).mean()
            loss_values.append(loss.numpy())  # Store the loss value

            print("Loss:", repr(loss.numpy()))

            # Backward pass to compute gradients
            loss.backward()

            # Perform optimization step
            optimizer.step()

            # Zero the gradients for the next step
            optimizer.zero_grad()

            # Assert that loss is a scalar
            self.assertEqual(loss.shape, ())

        # After training, export the loss values to DXF
        export_loss_to_dxf(loss_values, filename="loss_curve.dxf")

        # Disable training mode after test
        Tensor.training = False  # Reset training mode after test

# Run the tests
unittest.main(argv=[''], verbosity=2, exit=False)

test_forward_pass (__main__.TestAttentionHead.test_forward_pass) ... ok
test_gradient_check (__main__.TestAttentionHead.test_gradient_check) ... ok
test_linear_projection_shapes (__main__.TestAttentionHead.test_linear_projection_shapes) ... ok
test_training (__main__.TestAttentionHead.test_training) ... 

Loss: array(207.10237, dtype=float32)
Loss: array(63.100628, dtype=float32)
Loss: array(156.17815, dtype=float32)
Loss: array(115.389, dtype=float32)
Loss: array(90.95379, dtype=float32)
Loss: array(110.74843, dtype=float32)
Loss: array(46.058445, dtype=float32)
Loss: array(59.310402, dtype=float32)
Loss: array(84.15816, dtype=float32)
Loss: array(58.05771, dtype=float32)
Loss: array(80.72236, dtype=float32)
Loss: array(35.15234, dtype=float32)
Loss: array(42.367126, dtype=float32)
Loss: array(44.041824, dtype=float32)
Loss: array(51.60333, dtype=float32)
Loss: array(39.01849, dtype=float32)
Loss: array(28.87851, dtype=float32)
Loss: array(49.33441, dtype=float32)
Loss: array(42.728138, dtype=float32)
Loss: array(38.990295, dtype=float32)
Loss: array(42.628036, dtype=float32)
Loss: array(19.084894, dtype=float32)
Loss: array(37.17585, dtype=float32)
Loss: array(13.000665, dtype=float32)
Loss: array(14.316606, dtype=float32)
Loss: array(15.760514, dtype=float32)
Loss: array(20.19385, dt

ok

----------------------------------------------------------------------
Ran 4 tests in 4.857s

OK


<unittest.main.TestProgram at 0x7230d2eb8fe0>

### Analysis

In [10]:
import os  # For clearing the terminal
import numpy as np
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from tinygrad.nn.optim import SGD
from tqdm import tqdm  # For progress bars
from loguru import logger  # For logging
from colorama import Fore, Style  # For colored terminal output
from IPython.display import clear_output

# Initialize Colorama for colored text
from colorama import init
init(autoreset=True)

# Customize logger with colored output using colorama
logger.add(lambda msg: print(f"{Fore.GREEN}{msg}{Style.RESET_ALL}"))

# Training function for attention head
def train_attention_head(attention_head, num_iterations=200, lr=0.01):
    # Enable training mode
    Tensor.training = True  # Make sure Tensor.training is enabled for the optimizer

    optimizer = SGD([attention_head.W_q, attention_head.W_k, attention_head.W_v, attention_head.W_o,
                     attention_head.b_q, attention_head.b_k, attention_head.b_v, attention_head.b_o], lr=lr)

    loss_values = []
    for _ in tqdm(range(num_iterations), desc="Training Steps", leave=False):
        x = Tensor(np.random.randn(batch_size, seq_length, embed_dim).astype(np.float32), requires_grad=True)
        target = Tensor(np.random.randn(batch_size, seq_length, embed_dim).astype(np.float32))
        output = attention_head(x)
        loss = ((output - target) ** 2).mean()
        loss_values.append(loss.numpy())

        # Backward pass and optimizer step
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

    # Return final loss and concatenation of all weights
    final_loss = loss.numpy()
    weights_vector = np.concatenate([attention_head.W_q.numpy().flatten(),
                                     attention_head.W_k.numpy().flatten(),
                                     attention_head.W_v.numpy().flatten(),
                                     attention_head.W_o.numpy().flatten(),
                                     attention_head.b_q.numpy(),
                                     attention_head.b_k.numpy(),
                                     attention_head.b_v.numpy(),
                                     attention_head.b_o.numpy()])

    # Disable training mode
    Tensor.training = False  # Reset training mode

    return final_loss, weights_vector

# Hyperparameters
num_models = 50
embed_dim = 16
head_dim = 8
batch_size = 2
seq_length = 4

# Train multiple AttentionHead models and collect weight vectors and losses
weight_vectors = []
losses = []

logger.info(f"Starting training of {num_models} models")

# Outer tqdm loop for training multiple models
for i in tqdm(range(num_models), desc="Training Models"):
    clear_output(wait=True)
    # Clear the terminal before logging each model's details
    os.system('cls' if os.name == 'nt' else 'clear')

    logger.info(f"{Fore.CYAN}Training model {i+1}/{num_models}{Style.RESET_ALL}")
    attention_head = AttentionHead(embed_dim, head_dim, requires_grad=True, dtype=tngrd.dtypes.float16)

    final_loss, weights_vector = train_attention_head(attention_head)
    weight_vectors.append(weights_vector)
    losses.append(final_loss)

    logger.info(f"Model {i+1} finished with final loss: {Fore.YELLOW}{final_loss}{Style.RESET_ALL}")

# Convert to numpy arrays for dimensionality reduction
weight_vectors = np.array(weight_vectors)
losses = np.array(losses)

# Standardize the weight vectors
scaler = StandardScaler()
weight_vectors_std = scaler.fit_transform(weight_vectors)

# Apply PCA (or TSNE or UMAP) for dimensionality reduction
pca = PCA(n_components=2)
reduced_weights = pca.fit_transform(weight_vectors_std)

# Create a 2D heatmap (or 3D plot with loss values)
plt.figure(figsize=(8, 6))
plt.scatter(reduced_weights[:, 0], reduced_weights[:, 1], c=losses, cmap='coolwarm')
plt.colorbar(label='Final Loss')
plt.xlabel('PC1')
plt.ylabel('PC2')
plt.title('Solution Space of AttentionHead Models')
plt.show()

logger.info(f"{Fore.GREEN}Training completed and plot generated!{Style.RESET_ALL}")


[32m2024-10-03 18:30:35.530[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m137[0m - [1m[36mTraining model 50/50[0m[0m

Training Steps:   0%|          | 0/200 [00:00<?, ?it/s]
Training Steps:   3%|▎         | 6/200 [00:00<00:03, 57.19it/s]
Training Steps:   6%|▌         | 12/200 [00:00<00:03, 56.49it/s]
Training Steps:   9%|▉         | 18/200 [00:00<00:03, 57.98it/s]
Training Steps:  12%|█▏        | 24/200 [00:00<00:03, 53.84it/s]
Training Steps:  15%|█▌        | 30/200 [00:00<00:03, 54.53it/s]
Training Steps:  18%|█▊        | 36/200 [00:00<00:02, 56.18it/s]
Training Steps:  22%|██▏       | 43/200 [00:00<00:02, 57.60it/s]
Training Steps:  25%|██▌       | 50/200 [00:00<00:02, 58.58it/s]
Training Steps:  28%|██▊       | 57/200 [00:00<00:02, 59.19it/s]
Training Steps:  32%|███▏      | 64/200 [00:01<00:02, 59.64it/s]
Training Steps:  36%|███▌      | 71/200 [00:01<00:02, 60.18it/s]
Training Steps:  39%|███▉      | 78/200 [00:01<00:02, 60.70it/s]
Training Steps:  42%|