In [26]:
from tinygrad.tensor import Tensor
import tinygrad.nn as nn

In [27]:
class BasicBlock():
    outchannel_ratio = 4
    def __init__(self, in_planes, planes, stride=1, downsample=None):
        # https://pytorch.org/docs/stable/generated/torch.nn.BatchNorm2d.html
        # eps=1e-05, momentum=0.1, affine=True, track_running_stats=True
        # https://docs.tinygrad.org/nn/#tinygrad.nn.BatchNorm
        # eps=1e-05, momentum=0.1, affine=True, track_running_stats=True
        self.bn1 = nn.BatchNorm(in_planes)
        # https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html
        # dilation=1, groups=1
        # https://docs.tinygrad.org/nn/#tinygrad.nn.Conv2d
        # dilation=1, groups=1
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn3 = nn.BatchNorm(planes)
        self.relu = Tensor.relu
        self.downsample = downsample
        self.stride = stride

    def __call__(self, x):
        out = self.bn1(x)
        out = self.conv1(out)
        
        out = self.bn2(out)
        out = self.relu(out)
        out = self.conv2(out)
        
        out = self.bn3(out)

        if self.downsample is not None:
            shortcut = self.downsample(x)
            featuremap_size = shortcut.size()[2:4]
        else:
            shortcut = x
            featuremap_size = out.size()[2:4]

        batch_size = out.size()[0]
        residual_channel = out.size()[1]
        shortcut_channel = shortcut.size()[1]

        if residual_channel != shortcut_channel:
            padding = Tensor.zeros(batch_size, residual_channel - shortcut_channel, featuremap_size[0], featuremap_size[1])
            out += Tensor.cat((shortcut, padding), 1)
        else:
            out += shortcut

        return out


class Bottleneck():
    outchannel_ratio = 4
    def __init__(self, in_planes, planes, stride=1, downsample=None):
        self.bn1 = nn.BatchNorm(in_planes)
        # https://pytorch.org/docs/stable/generated/torch.nn.BatchNorm2d.html
        # stride=1, padding=0
        # https://docs.tinygrad.org/nn/#tinygrad.nn.BatchNorm
        # stride=1, padding=0
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn2 = nn.BatchNorm(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn3 = nn.BatchNorm(planes)
        self.conv3 = nn.Conv2d(planes, planes * Bottleneck.outchannel_ratio, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm(planes * Bottleneck.outchannel_ratio)
        self.relu = Tensor.relu
        self.downsample = downsample
        self.stride = stride

    def __call__(self, x):
        out = self.bn1(x)
        out = self.conv1(out)
        
        out = self.bn2(out)
        out = self.relu(out)
        out = self.conv2(out)

        out = self.bn3(out)
        out = self.relu(out)
        out = self.conv3(out)
        
        out = self.bn4(out)

        if self.downsample is not None:
            shortcut = self.downsample(x)
            featuremap_size = shortcut.size()[2:4]
        else:
            shortcut = x
            featuremap_size = out.size()[2:4]

        batch_size = out.size()[0]
        residual_channel = out.size()[1]
        shortcut_channel = shortcut.size()[1]

        if residual_channel != shortcut_channel:
            padding = Tensor.zeros(batch_size, residual_channel - shortcut_channel, featuremap_size[0], featuremap_size[1])
            out += Tensor.cat((shortcut, padding), 1)
        else:
            out += shortcut

        return out


class PyramidNet:
    def __init__(self, in_planes, num_classes, depth, alpha, bottleneck=False):
        if depth not in [18, 34, 50, 101, 152, 200]:
            if bottleneck:
                block = Bottleneck
                temp_cfg = (depth - 2) // 12
            else:
                block = BasicBlock
                temp_cfg = (depth - 2) // 8
            layers = [temp_cfg, temp_cfg, temp_cfg, temp_cfg]
            print('=> the layer configuration for each stage is set to', layers[depth])
        else:
            block = BasicBlock if depth <= 34 and not bottleneck else Bottleneck
            if depth == 18:
                layers = [2, 2, 2, 2]
            elif depth in [34, 50]:
                layers = [3, 4, 6, 3]
            elif depth == 101:
                layers = [3, 4, 23, 3]
            elif depth == 152:
                layers = [3, 8, 36, 3]
            else:
                layers = [3, 24, 36, 3]

        self.in_planes = in_planes            
        self.addrate = alpha / sum(layers)

        self.input_featuremap_dim = self.in_planes
        self.conv1 = nn.Conv2d(3, self.input_featuremap_dim, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(self.input_featuremap_dim)
        self.relu = Tensor.relu

        self.featuremap_dim = self.input_featuremap_dim 
        self.layer1 = self.pyramidal_make_layer(block, layers[0])
        self.layer2 = self.pyramidal_make_layer(block, layers[1], stride=2)
        self.layer3 = self.pyramidal_make_layer(block, layers[2], stride=2)
        self.layer4 = self.pyramidal_make_layer(block, layers[3], stride=2)

        self.final_featuremap_dim = self.input_featuremap_dim
        self.bn_final = nn.BatchNorm2d(self.final_featuremap_dim)
        self.relu_final = Tensor.relu
        self.avgpool = lambda x: x.avg_pool2d(7)
        self.fc = lambda x: x.linear(self.final_featuremap_dim, num_classes)

    def pyramidal_make_layer(self, block, block_depth, stride=1):
        downsample = None
        if stride != 1:
            downsample = lambda x: x.avg_pool2d((2, 2), stride=(2, 2)) # ceil_mode?

        layers = []
        self.featuremap_dim += self.addrate
        layers.append(block(self.input_featuremap_dim, int(round(self.featuremap_dim)), stride, downsample))
        for i in range(1, block_depth):
            temp_featuremap_dim = self.featuremap_dim + self.addrate
            layers.append(block(int(round(self.featuremap_dim)) * block.outchannel_ratio, int(round(temp_featuremap_dim)), 1))
            self.featuremap_dim  = temp_featuremap_dim
        self.input_featuremap_dim = int(round(self.featuremap_dim)) * block.outchannel_ratio
        return layers

    def __call__(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = x.max_pool2d(kernel_size=3, stride=2, padding=1)

        for layer in self.layer1:
            x = layer(x)
        for layer in self.layer2:
            x = layer(x)
        for layer in self.layer3:
            x = layer(x)
        for layer in self.layer4:
            x = layer(x)

        x = self.bn_final(x)
        x = self.relu_final(x)
        x = self.avgpool(x)
        x = x.flatten()
        x = self.fc(x)
        return x
    
        


In [28]:
import numpy as np
from tinygrad.tensor import Tensor
from tinygrad.nn.optim import SGD
from torchvision import datasets, transforms

# Hyperparameters
num_classes = 10
depth = 18
alpha = 48
batch_size = 64
epochs = 5
learning_rate = 0.001

# Load CIFAR-10 Dataset
def load_cifar10():
    train_dataset = datasets.CIFAR10(root="./data", train=True, download=True, transform=transforms.ToTensor())
    test_dataset = datasets.CIFAR10(root="./data", train=False, download=True, transform=transforms.ToTensor())
    
    # Convert to numpy arrays
    train_data = [(np.array(img).transpose(2, 0, 1), label) for img, label in train_dataset]
    test_data = [(np.array(img).transpose(2, 0, 1), label) for img, label in test_dataset]

    return train_data, test_data

train_data, test_data = load_cifar10()

# Batch Generator
def get_batches(data, batch_size):
    np.random.shuffle(data)
    for i in range(0, len(data), batch_size):
        batch = data[i:i + batch_size]
        x = np.array([item[0] for item in batch], dtype=np.float32) / 255.0  # Normalize
        y = np.array([item[1] for item in batch], dtype=np.int32)
        yield Tensor(x), Tensor(y)

# Model
model = PyramidNet(in_planes=16, num_classes=num_classes, depth=depth, alpha=alpha)

# Optimizer
print(model.__dict__.values())
optimizer = SGD([param for param in model.__dict__.values() if isinstance(param, Tensor)], lr=learning_rate)

# Loss function
def cross_entropy_loss(logits, labels):
    logits = logits - logits.max(axis=1).reshape((-1, 1))  # Stability trick
    log_probs = logits - Tensor.log(Tensor.exp(logits).sum(axis=1)).reshape((-1, 1))
    return -log_probs[np.arange(len(labels.data)), labels.data].mean()

# Training loop
for epoch in range(epochs):
    total_loss = 0.0
    for inputs, labels in get_batches(train_data, batch_size):
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)

        # Loss computation
        loss = cross_entropy_loss(outputs, labels)

        # Backward pass
        loss.backward()
        optimizer.step()

        total_loss += loss.numpy()

    print(f"Epoch {epoch + 1}/{epochs}, Loss: {total_loss:.4f}")

# Testing loop
correct = 0
total = 0
for inputs, labels in get_batches(test_data, batch_size):
    outputs = model(inputs)
    predictions = np.argmax(outputs.numpy(), axis=1)
    correct += (predictions == labels.numpy()).sum()
    total += len(labels)

print(f"Test Accuracy: {100 * correct / total:.2f}%")


Files already downloaded and verified
Files already downloaded and verified
dict_values([16, 6.0, 256, <tinygrad.nn.Conv2d object at 0x000001A7FFC300E0>, <tinygrad.nn.BatchNorm object at 0x000001A7A3279D00>, <function Tensor.relu at 0x000001A7D21AA840>, 64.0, [<__main__.BasicBlock object at 0x000001A7A32783B0>, <__main__.BasicBlock object at 0x000001A7A327BA10>], [<__main__.BasicBlock object at 0x000001A7A3CA2E40>, <__main__.BasicBlock object at 0x000001A7A3BF6270>], [<__main__.BasicBlock object at 0x000001A7A3C516A0>, <__main__.BasicBlock object at 0x000001A7A3C00A10>], [<__main__.BasicBlock object at 0x000001A7A3C03E00>, <__main__.BasicBlock object at 0x000001A7A3C63230>], 256, <tinygrad.nn.BatchNorm object at 0x000001A7A3C6A660>, <function Tensor.relu at 0x000001A7D21AA840>, <function PyramidNet.__init__.<locals>.<lambda> at 0x000001A7A4F8BEC0>, <function PyramidNet.__init__.<locals>.<lambda> at 0x000001A7A325C220>])


AssertionError: optimizer must have at least one param