In [8]:
import tinygrad.nn
import numpy as np
from tinygrad import nn , Tensor



def layer_init(layer: nn.Linear, std=np.sqrt(2), bias_const=0.0):
    """CleanRL's default layer initialization"""
    layer.weight = tiny_orthogonal_(layer.weight, std)
    layer.bias = tiny_constant_(layer.bias, bias_const)
    return layer

from tinygrad import nn 
def tiny_orthogonal_(tensor: Tensor, gain=1, generator=None):
    """
    NOTE: Since initialization occurs only once, we are being lazy and using numpy linear algebra to perform certain operations.
    """
    if tensor.ndim < 2:
        raise ValueError("Only tensors with 2 or more dimensions are supported")

    if tensor.numel() == 0:
        return tensor # no-op for empty tensors

    rows, cols = tensor.shape[0], tensor.numel() // tensor.shape[0]
    flattened = Tensor.randn(rows, cols) # figure out if it has the same device configs as the input tensor

    if rows < cols:
        flattened = flattened.transpose()

    # for now, we use numpy to compute the qr factorization
    q, r = np.linalg.qr(flattened.numpy())

    d = np.diag(r, 0)
    ph = np.sign(d)
    q *= ph

    if rows < cols:
        q.transpose()

    return Tensor(q).mul(gain)

def tiny_constant_(tensor: Tensor, val: float):
    """
    """
    return Tensor.ones(tensor.shape) * val
    

from tinygrad import nn 


class TinyPolicy:
    def __init__(self, policy):
        self.policy = policy
        self.is_continuous = hasattr(policy, 'is_continuous') and policy.is_continuous
    
    def __call__(self, x, action=None):
        return self.get_action_and_value(x, action)

    def get_value(self, x, state=None):
        _, value = self.policy(x)
        return value

    def get_action_and_value(self, x, action=None):
        logits, value = self.policy(x)
        action, logprob, entropy = sample_logits(logits, action, self.is_continuous)
        return action, logprob, entropy, value
    
class Critic:
    def __init__(self, obs_size, hidden_size):
        self.l1 = layer_init(tinygrad.nn.Linear(obs_size, hidden_size))
        self.l2 = layer_init(tinygrad.nn.Linear(hidden_size, hidden_size))
        self.l3 = layer_init(tinygrad.nn.Linear(hidden_size, 1))

    def __call__(self, x: Tensor):
        x = self.l1(x).tanh()
        x = self.l2(x).tanh()
        return self.l3(x)

class ActorEncoder:
    def __init__(self, obs_size, hidden_size):
        self.l1 = layer_init(tinygrad.nn.Linear(obs_size, hidden_size))
        self.l2 = layer_init(tinygrad.nn.Linear(hidden_size, hidden_size))

    def __call__(self, x: Tensor):
        x = self.l1(x).tanh()
        return self.l2(x).tanh()

class TinyCleanRLPolicy(TinyPolicy):
    def __init__(self, envs, hidden_size=64):
        super().__init__(policy=None)  # Just to get the right init
        self.is_continuous = True

        # self.obs_size = np.array(envs.single_observation_space.shape).prod()
        # action_size = np.prod(envs.single_action_space.shape)

        ## figuring out how to normalize observations will be an important step, but leaving it out for now
        action_size = 1
        self.obs_size = 1
        self.critic = Critic(self.obs_size, hidden_size)
        self.actor_encoder = ActorEncoder(self.obs_size, hidden_size)
        self.actor_decoder_mean = layer_init(tinygrad.nn.Linear(hidden_size, action_size), std=0.01)
        self.actor_decoder_logstd = Tensor.zeros(1, action_size)



policy = TinyCleanRLPolicy("hypothetical env", hidden_size=64)



In [20]:
from torch.distributions import Normal
import torch
a = Normal(torch.tensor([1.0, 2.0]), torch.tensor([1.0, 1.0]))



In [23]:
a.log_prob(torch.tensor([1.0, 2.0])).sum(1)

IndexError: Dimension out of range (expected to be in range of [-1, 0], but got 1)

In [26]:
from tinygrad import TinyJit, Tensor
weight = Tensor.randn(10, 3)
@TinyJit
def forward(x: Tensor, indices: list[int]):
  c = (x[indices] * weight).contiguous()
  print(f"shape of c {c.shape}")
  c.sum(0).realize()

x = Tensor.randn(10)
forward(x, [0, 1, 2])

shape of c (10, 3)


In [None]:
from tinygrad impor

In [17]:
import torch
import torch.nn as nn
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, TensorDataset

print("Using device:", "cuda" if torch.cuda.is_available() else "cpu")

# Get MNIST data 
transform = transforms.ToTensor()
mnist_train = datasets.MNIST('./', train=True, download=True, transform=transform)
mnist_test = datasets.MNIST('./', train=False, transform=transform)

X_train, Y_train = mnist_train.data.float()/255, mnist_train.targets
X_test, Y_test = mnist_test.data.float()/255, mnist_test.targets
X_train = X_train.unsqueeze(1) # Add channel dimension
X_test = X_test.unsqueeze(1)

class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.l1 = nn.Conv2d(1, 32, kernel_size=3)
        self.l2 = nn.Conv2d(32, 64, kernel_size=3)
        self.l3 = nn.Linear(1600, 10)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = torch.max_pool2d(torch.relu(self.l1(x)), 2)
        x = torch.max_pool2d(torch.relu(self.l2(x)), 2)
        x = self.dropout(x.flatten(1))
        return self.l3(x)

model = Model()
optim = torch.optim.Adam(model.parameters())

batch_size = 128
def step():
    model.train()
    indices = torch.randint(len(X_train), (batch_size,))
    X, Y = X_train[indices], Y_train[indices]
    
    optim.zero_grad()
    loss = nn.functional.cross_entropy(model(X), Y)
    loss.backward()
    optim.step()
    return loss

# Train model
for step_num in range(7000):
    loss = step()
    if step_num%100 == 0:
        model.eval()
        with torch.no_grad():
            acc = (model(X_test).argmax(dim=1) == Y_test).float().mean().item()
        print(f"step {step_num:4d}, loss {loss.item():.2f}, acc {acc*100.:.2f}%")


Using device: cpu
step    0, loss 2.31, acc 21.34%
step  100, loss 0.26, acc 93.31%


KeyboardInterrupt: 

In [13]:
# Tinygrad MNIST implementation
from tinygrad import Tensor, nn, TinyJit, Device
from tinygrad.nn.datasets import mnist

print("Using device:", Device.DEFAULT)

# Get MNIST data
X_train, Y_train, X_test, Y_test = mnist()

class Model:
    def __init__(self):
        self.l1 = nn.Conv2d(1, 32, kernel_size=(3,3))
        self.l2 = nn.Conv2d(32, 64, kernel_size=(3,3))
        self.l3 = nn.Linear(1600, 10)

    def __call__(self, x:Tensor) -> Tensor:
        x = self.l1(x).relu().max_pool2d((2,2))
        x = self.l2(x).relu().max_pool2d((2,2))
        return self.l3(x.flatten(1).dropout(0.5))

model = Model()
optim = nn.optim.Adam(nn.state.get_parameters(model))

batch_size = 128
def step():
    Tensor.training = True  # makes dropout work
    samples = Tensor.randint(batch_size, high=X_train.shape[0])
    X, Y = X_train[samples], Y_train[samples]
    optim.zero_grad()
    loss = model(X).sparse_categorical_crossentropy(Y).backward()
    optim.step()
    return loss

# JIT compile the training step
jit_step = TinyJit(step)

# Train model
for step_num in range(7000):
    loss = jit_step()
    if step_num%100 == 0:
        Tensor.training = False
        acc = (model(X_test).argmax(axis=1) == Y_test).mean().item()
        print(f"step {step_num:4d}, loss {loss.item():.2f}, acc {acc*100.:.2f}%")

import timeit 

timeit.timeit(step, repeat=2)


Using device: METAL
step    0, loss 43.87, acc 15.73%


KeyboardInterrupt: 