In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import numpy as np

# ============================================================
# PART 1: PHYSICS ENGINE (Shared Atom)
# ============================================================
class Nucleus(nn.Module):
    """
    [V9.6 Physics] The Grayscale Shape Detector.
    Input: 576 Coefficients (24x24x1).
    Output: 128 Features.
    """
    def __init__(self, num_coefficients, num_features, num_spline_bins=16, init_mode='random'):
        super().__init__()
        self.in_dim = num_coefficients
        self.out_dim = num_features
        self.num_bins = num_spline_bins

        # Spectral Permutation (Frequency Analysis)
        self.perm_freqs = nn.Parameter(torch.randn(num_features, num_coefficients))
        self.perm_phase_shifts = nn.Parameter(torch.rand(num_features, 1))
        self.omega_scale = 1.0

        # Monotonic Spline (Shape)
        self.spline_heights = nn.Parameter(torch.rand(num_features, num_spline_bins))
        self.spline_bias = nn.Parameter(torch.zeros(num_features))

        # Noise
        self.log_sigma = nn.Parameter(torch.ones(num_features) * -5.0)

        # Init
        with torch.no_grad():
            if init_mode == 'identity':
                self.perm_freqs.data.uniform_(-0.01, 0.01)
                self.spline_heights.data.fill_(1.0 / num_spline_bins)
            elif init_mode == 'random':
                self.perm_freqs.data.uniform_(-0.5, 0.5)
                self.spline_heights.data.uniform_(0.0, 0.1)

    def forward(self, x):
        # x: (B, S, 576)
        basis_raw = F.linear(x, self.perm_freqs) + self.perm_phase_shifts.T
        basis = torch.sin(self.omega_scale * basis_raw)
        u = torch.sigmoid(basis)

        w = F.softplus(self.spline_heights)
        u_expanded = u.unsqueeze(-1)
        bin_grid = torch.linspace(0, 1, self.num_bins, device=x.device).view(1, 1, 1, -1)
        relu_basis = F.relu(u_expanded - bin_grid)
        spline_val = torch.sum(relu_basis * w.view(1, 1, self.out_dim, self.num_bins), dim=-1)
        val = spline_val + self.spline_bias.view(1, 1, self.out_dim)

        if self.training:
            val = val + (torch.randn_like(val) * torch.exp(self.log_sigma).view(1, 1, self.out_dim))
        return val

class OrbitalShells(nn.Module):
    def __init__(self, dim, day_length=64):
        super().__init__()
        self.dim = dim
        self.day_length = day_length
        # Precompute frequencies for dim/2 pairs
        self.register_buffer('inv_freq_h', 1.0 / (10000 ** (torch.arange(0, dim, 2).float() / dim)))
        self.register_buffer('inv_freq_d', 1.0 / (100000 ** (torch.arange(0, dim, 2).float() / dim)))

    def forward(self, x, start_index=0):
        # x: (B, Seq, Dim)
        indices = torch.arange(start_index, start_index + x.shape[1], device=x.device).float()
        days = indices // self.day_length
        hours = indices % self.day_length

        # Angles: (Seq, Dim/2)
        angles = torch.outer(hours, self.inv_freq_h) + torch.outer(days, self.inv_freq_d)

        # Repeat to match full Dim: (Seq, Dim) -> (1, Seq, Dim)
        # Pairs: [theta_0, theta_0, theta_1, theta_1...]
        theta = torch.repeat_interleave(angles, 2, dim=1).unsqueeze(0)

        cos_t = torch.cos(theta)
        sin_t = torch.sin(theta)

        x_rot = torch.zeros_like(x)

        # [FIX] Slice cos_t and sin_t to match x slices (taking even/odd indices)
        # Even indices (real part)
        x_rot[..., 0::2] = x[..., 0::2] * cos_t[..., 0::2] - x[..., 1::2] * sin_t[..., 0::2]

        # Odd indices (imaginary part)
        x_rot[..., 1::2] = x[..., 0::2] * sin_t[..., 1::2] + x[..., 1::2] * cos_t[..., 1::2]

        return x_rot

class Atom(nn.Module):
    def __init__(self, dim, patch_dim=None, init_mode='identity'):
        super().__init__()
        in_dim = patch_dim if patch_dim is not None else dim
        self.nucleus = Nucleus(num_coefficients=in_dim, num_features=dim, init_mode=init_mode)
        self.shells = OrbitalShells(dim)

    def forward(self, x, offset=0):
        # x is a stacked sequence of [R, G, B]
        # Nucleus processes them all identically (Shared Physics)
        identity = self.nucleus(x)
        # Shells assign sequential positions 0..47
        # R=0..15, G=16..31, B=32..47
        situated = self.shells(identity, start_index=offset)
        return situated

# ============================================================
# PART 2: GRAMMAR
# ============================================================
class MixingNode(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.temperature = nn.Parameter(torch.tensor(1.0))
        self.bias = nn.Parameter(torch.zeros(1))
        self.layer_norm = nn.LayerNorm(dim)

    def forward(self, q, k, v):
        affinity = torch.matmul(q, k.transpose(-2, -1))
        weights = F.softmax(affinity * self.temperature + self.bias, dim=-1)
        out = torch.matmul(weights, v)
        return self.layer_norm(out + q)

# ============================================================
# PART 3: VISION MODULE (Channel Stacked)
# ============================================================
class DeepCore(nn.Module):
    def __init__(self, dim, depth=2):
        super().__init__()
        self.layers = nn.ModuleList([])
        for _ in range(depth):
            self.layers.append(nn.ModuleDict({
                'q': Atom(dim, patch_dim=dim, init_mode='identity'),
                'k': Atom(dim, patch_dim=dim, init_mode='identity'),
                'v': Atom(dim, patch_dim=dim, init_mode='random'),
                'mixer': MixingNode(dim)
            }))
    def forward(self, x):
        curr = x
        for layer in self.layers:
            curr = layer['mixer'](
                layer['q'](curr, offset=0),
                layer['k'](curr, offset=1),
                layer['v'](curr, offset=1)
            )
        return curr

class VisionModule(nn.Module):
    def __init__(self, dim=128, patch_size=24, num_classes=10, depth=2):
        super().__init__()
        self.dim = dim
        self.patch_size = patch_size

        # Interface
        self.lens_embed = nn.Linear(4, dim)
        self.context_core = nn.Sequential(nn.Linear(dim, dim), nn.Tanh(), nn.Linear(dim, dim))

        # RETINA: Input is 576 coeffs (1 channel)
        patch_dim_1ch = patch_size * patch_size * 1

        # Shared Atoms for R, G, B
        self.retina_k = Atom(dim, patch_dim=patch_dim_1ch, init_mode='identity')
        self.retina_v = Atom(dim, patch_dim=patch_dim_1ch, init_mode='random')
        self.lens_q = Atom(dim, patch_dim=dim, init_mode='identity')

        self.input_mixer = MixingNode(dim)
        self.deep_core = DeepCore(dim, depth=depth)
        self.head = nn.Linear(dim, num_classes)

        # Z-Order for 4x4 Grid
        self.register_buffer('z_indices', self._precompute_z_order(96, 96, patch_size))

    def _precompute_z_order(self, H, W, P):
        n_h, n_w = H // P, W // P
        coords = sorted([(x, y) for y in range(n_h) for x in range(n_w)],
                        key=lambda p: self._morton_code(p[0], p[1]))
        return torch.tensor([y * n_w + x for (x, y) in coords], dtype=torch.long)

    def _morton_code(self, x, y):
        code = 0
        for i in range(16): code |= ((x & (1 << i)) << i) | ((y & (1 << i)) << (i + 1))
        return code

    def forward(self, img):
        B, C, H, W = img.shape

        # 1. Interface
        scale = (H * W) / (96 * 96)
        meta = torch.cat([torch.tensor(scale).expand(B, 1).to(img.device),
                          img.std(dim=(1,2,3), keepdim=True).squeeze().unsqueeze(1),
                          img.mean(dim=(1,2,3), keepdim=True).squeeze().unsqueeze(1),
                          torch.ones(B, 1).to(img.device)], dim=1)

        # 2. Patchify & Stack
        # Unfold gives (B, C*P*P, N_patches)
        # We need to process channels independently.
        # Reshape to (B, C, P*P, N) -> Transpose to (B, C, N, P*P)
        patches_raw = F.unfold(img, kernel_size=self.patch_size, stride=self.patch_size)
        patches_reshaped = patches_raw.view(B, C, self.patch_size*self.patch_size, -1)
        patches_transposed = patches_reshaped.permute(0, 1, 3, 2) # (B, 3, 16, 576)

        # Apply Z-Order per channel
        # z_indices handles the N dimension (16)
        patches_z = patches_transposed[:, :, self.z_indices, :]

        # Stack Channels: (B, 3, 16, 576) -> (B, 48, 576)
        # RRR...GGG...BBB...
        stream_stacked = patches_z.reshape(B, -1, self.patch_size*self.patch_size)

        # 3. Layer 0 (Retina)
        lens = self.context_core(self.lens_embed(meta).unsqueeze(1))

        # Process the Stacked Stream
        # The Atom sees 48 "Monochrome Texture Patches"
        # RoPE assigns indices 0..47, effectively linearizing C into T
        q0 = self.retina_k(stream_stacked, offset=1) + lens
        k0 = self.retina_k(stream_stacked, offset=1)
        v0 = self.retina_v(stream_stacked, offset=1)

        # 4. Deep Core
        state = self.input_mixer(q0, k0, v0)
        final = self.deep_core(state)

        return self.head(final.mean(dim=1))

# ============================================================
# PART 4: TRAINING (STL-10)
# ============================================================
def train():
    print("--- V9.6 Vision System (STL-10 | Channel Stacking | Late Fusion) ---")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Device: {device}")

    BATCH_SIZE = 32
    EPOCHS = 5
    LR = 0.001

    transform = transforms.Compose([
        transforms.Resize((96, 96)),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])

    print("Downloading STL-10...")
    train_set = datasets.STL10(root='./data', split='train', download=True, transform=transform)
    train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True)

    model = VisionModule(dim=128, patch_size=24, num_classes=10, depth=2).to(device)
    optimizer = optim.Adam(model.parameters(), lr=LR)
    criterion = nn.CrossEntropyLoss()

    model.train()
    for epoch in range(EPOCHS):
        total_loss = 0
        correct = 0
        total = 0
        for i, (imgs, labels) in enumerate(train_loader):
            imgs, labels = imgs.to(device), labels.to(device)
            optimizer.zero_grad()

            outputs = model(imgs)
            loss = criterion(outputs, labels)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()

            total_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

            if i % 50 == 0:
                print(f"Epoch [{epoch+1}], Step [{i}], Loss: {loss.item():.4f}, Acc: {100.*correct/total:.2f}%")

        print(f"=== Epoch {epoch+1} Avg Loss: {total_loss/len(train_loader):.4f} ===")

if __name__ == "__main__":
    train()

--- V9.6 Vision System (STL-10 | Channel Stacking | Late Fusion) ---
Device: cpu
Downloading STL-10...
Epoch [1], Step [0], Loss: 2.4152, Acc: 9.38%
Epoch [1], Step [50], Loss: 2.3173, Acc: 9.62%
Epoch [1], Step [100], Loss: 2.3374, Acc: 9.68%
Epoch [1], Step [150], Loss: 2.3705, Acc: 10.35%
=== Epoch 1 Avg Loss: 2.3088 ===
Epoch [2], Step [0], Loss: 2.2731, Acc: 12.50%
Epoch [2], Step [50], Loss: 2.3030, Acc: 14.71%
Epoch [2], Step [100], Loss: 2.2531, Acc: 15.84%
Epoch [2], Step [150], Loss: 2.1316, Acc: 17.14%
=== Epoch 2 Avg Loss: 2.2269 ===
Epoch [3], Step [0], Loss: 2.1895, Acc: 9.38%
Epoch [3], Step [50], Loss: 2.0959, Acc: 19.12%
Epoch [3], Step [100], Loss: 2.1719, Acc: 19.31%
Epoch [3], Step [150], Loss: 2.0306, Acc: 20.94%
=== Epoch 3 Avg Loss: 2.1128 ===
Epoch [4], Step [0], Loss: 2.0259, Acc: 15.62%
Epoch [4], Step [50], Loss: 2.1694, Acc: 25.37%
Epoch [4], Step [100], Loss: 2.1228, Acc: 26.02%
Epoch [4], Step [150], Loss: 1.9882, Acc: 26.18%
=== Epoch 4 Avg Loss: 1.9546 =

In [8]:
print("--- V9.6 Vision System (STL-10 | Channel Stacking | Late Fusion) ---")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

BATCH_SIZE = 32
EPOCHS = 5
LR = 0.001

transform = transforms.Compose([
    transforms.Resize((96, 96)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

print("Downloading STL-10...")
train_set = datasets.STL10(root='./data', split='train', download=True, transform=transform)
train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True)

model = VisionModule(dim=128, patch_size=24, num_classes=10, depth=2).to(device)
optimizer = optim.Adam(model.parameters(), lr=LR)
criterion = nn.CrossEntropyLoss()


--- V9.6 Vision System (STL-10 | Channel Stacking | Late Fusion) ---
Device: cpu
Downloading STL-10...


In [9]:

model.train()
for epoch in range(EPOCHS):
    total_loss = 0
    correct = 0
    total = 0
    for i, (imgs, labels) in enumerate(train_loader):
        imgs, labels = imgs.to(device), labels.to(device)
        optimizer.zero_grad()

        outputs = model(imgs)
        loss = criterion(outputs, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()

        total_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

        if i % 50 == 0:
            print(f"Epoch [{epoch+1}], Step [{i}], Loss: {loss.item():.4f}, Acc: {100.*correct/total:.2f}%")

    print(f"=== Epoch {epoch+1} Avg Loss: {total_loss/len(train_loader):.4f} ===")


Epoch [1], Step [0], Loss: 2.3484, Acc: 6.25%
Epoch [1], Step [50], Loss: 2.2733, Acc: 10.29%
Epoch [1], Step [100], Loss: 2.2789, Acc: 10.46%
Epoch [1], Step [150], Loss: 2.3342, Acc: 10.99%
=== Epoch 1 Avg Loss: 2.3093 ===
Epoch [2], Step [0], Loss: 2.2941, Acc: 12.50%
Epoch [2], Step [50], Loss: 2.2926, Acc: 11.89%
Epoch [2], Step [100], Loss: 2.2240, Acc: 13.34%
Epoch [2], Step [150], Loss: 2.1288, Acc: 14.90%
=== Epoch 2 Avg Loss: 2.2635 ===
Epoch [3], Step [0], Loss: 2.2254, Acc: 9.38%
Epoch [3], Step [50], Loss: 2.0098, Acc: 19.30%
Epoch [3], Step [100], Loss: 2.1475, Acc: 20.05%
Epoch [3], Step [150], Loss: 2.0832, Acc: 20.80%
=== Epoch 3 Avg Loss: 2.1000 ===
Epoch [4], Step [0], Loss: 1.8989, Acc: 34.38%
Epoch [4], Step [50], Loss: 1.7918, Acc: 27.02%
Epoch [4], Step [100], Loss: 2.1128, Acc: 25.74%
Epoch [4], Step [150], Loss: 1.9111, Acc: 26.18%
=== Epoch 4 Avg Loss: 1.9354 ===
Epoch [5], Step [0], Loss: 1.8339, Acc: 25.00%
Epoch [5], Step [50], Loss: 1.9965, Acc: 26.90%
Epo

In [10]:

model.train()
for epoch in range(EPOCHS):
    total_loss = 0
    correct = 0
    total = 0
    for i, (imgs, labels) in enumerate(train_loader):
        imgs, labels = imgs.to(device), labels.to(device)
        optimizer.zero_grad()

        outputs = model(imgs)
        loss = criterion(outputs, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()

        total_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

        if i % 50 == 0:
            print(f"Epoch [{epoch+1}], Step [{i}], Loss: {loss.item():.4f}, Acc: {100.*correct/total:.2f}%")

    print(f"=== Epoch {epoch+1} Avg Loss: {total_loss/len(train_loader):.4f} ===")


Epoch [1], Step [0], Loss: 1.9506, Acc: 18.75%
Epoch [1], Step [50], Loss: 1.8081, Acc: 30.02%
Epoch [1], Step [100], Loss: 1.7015, Acc: 30.01%
Epoch [1], Step [150], Loss: 1.8772, Acc: 30.03%
=== Epoch 1 Avg Loss: 1.8513 ===
Epoch [2], Step [0], Loss: 1.9908, Acc: 18.75%
Epoch [2], Step [50], Loss: 1.9483, Acc: 28.43%
Epoch [2], Step [100], Loss: 1.9730, Acc: 29.02%
Epoch [2], Step [150], Loss: 1.6854, Acc: 29.12%
=== Epoch 2 Avg Loss: 1.8421 ===
Epoch [3], Step [0], Loss: 1.7571, Acc: 25.00%
Epoch [3], Step [50], Loss: 2.0518, Acc: 31.92%
Epoch [3], Step [100], Loss: 1.8594, Acc: 30.88%
Epoch [3], Step [150], Loss: 1.4683, Acc: 31.02%
=== Epoch 3 Avg Loss: 1.7935 ===
Epoch [4], Step [0], Loss: 2.0556, Acc: 34.38%
Epoch [4], Step [50], Loss: 1.8121, Acc: 32.17%
Epoch [4], Step [100], Loss: 1.6895, Acc: 32.18%
Epoch [4], Step [150], Loss: 1.8267, Acc: 31.79%
=== Epoch 4 Avg Loss: 1.7595 ===
Epoch [5], Step [0], Loss: 2.1563, Acc: 12.50%
Epoch [5], Step [50], Loss: 1.8843, Acc: 35.66%
E

In [11]:

model.train()
for epoch in range(EPOCHS):
    total_loss = 0
    correct = 0
    total = 0
    for i, (imgs, labels) in enumerate(train_loader):
        imgs, labels = imgs.to(device), labels.to(device)
        optimizer.zero_grad()

        outputs = model(imgs)
        loss = criterion(outputs, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()

        total_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

        if i % 50 == 0:
            print(f"Epoch [{epoch+1}], Step [{i}], Loss: {loss.item():.4f}, Acc: {100.*correct/total:.2f}%")

    print(f"=== Epoch {epoch+1} Avg Loss: {total_loss/len(train_loader):.4f} ===")


Epoch [1], Step [0], Loss: 1.6886, Acc: 40.62%
Epoch [1], Step [50], Loss: 1.4757, Acc: 35.91%
Epoch [1], Step [100], Loss: 1.8904, Acc: 33.35%
Epoch [1], Step [150], Loss: 1.6880, Acc: 33.88%
=== Epoch 1 Avg Loss: 1.6947 ===
Epoch [2], Step [0], Loss: 1.5929, Acc: 31.25%
Epoch [2], Step [50], Loss: 1.7762, Acc: 36.52%
Epoch [2], Step [100], Loss: 1.6511, Acc: 35.89%
Epoch [2], Step [150], Loss: 1.5621, Acc: 35.97%
=== Epoch 2 Avg Loss: 1.6627 ===
Epoch [3], Step [0], Loss: 1.7669, Acc: 34.38%
Epoch [3], Step [50], Loss: 1.7638, Acc: 38.79%
Epoch [3], Step [100], Loss: 1.7682, Acc: 38.00%
Epoch [3], Step [150], Loss: 1.7388, Acc: 37.19%
=== Epoch 3 Avg Loss: 1.6404 ===
Epoch [4], Step [0], Loss: 1.3900, Acc: 43.75%
Epoch [4], Step [50], Loss: 1.6163, Acc: 37.93%
Epoch [4], Step [100], Loss: 1.5590, Acc: 37.96%
Epoch [4], Step [150], Loss: 1.2372, Acc: 38.06%
=== Epoch 4 Avg Loss: 1.6132 ===
Epoch [5], Step [0], Loss: 1.4749, Acc: 59.38%
Epoch [5], Step [50], Loss: 1.4335, Acc: 42.71%
E

In [12]:

model.train()
for epoch in range(EPOCHS):
    total_loss = 0
    correct = 0
    total = 0
    for i, (imgs, labels) in enumerate(train_loader):
        imgs, labels = imgs.to(device), labels.to(device)
        optimizer.zero_grad()

        outputs = model(imgs)
        loss = criterion(outputs, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()

        total_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

        if i % 50 == 0:
            print(f"Epoch [{epoch+1}], Step [{i}], Loss: {loss.item():.4f}, Acc: {100.*correct/total:.2f}%")

    print(f"=== Epoch {epoch+1} Avg Loss: {total_loss/len(train_loader):.4f} ===")


Epoch [1], Step [0], Loss: 1.4944, Acc: 43.75%
Epoch [1], Step [50], Loss: 1.3289, Acc: 43.50%
Epoch [1], Step [100], Loss: 1.3987, Acc: 42.17%
Epoch [1], Step [150], Loss: 1.4780, Acc: 41.25%
=== Epoch 1 Avg Loss: 1.5395 ===
Epoch [2], Step [0], Loss: 1.3046, Acc: 43.75%
Epoch [2], Step [50], Loss: 1.3881, Acc: 44.42%
Epoch [2], Step [100], Loss: 1.5920, Acc: 44.86%
Epoch [2], Step [150], Loss: 1.2887, Acc: 44.14%
=== Epoch 2 Avg Loss: 1.4858 ===
Epoch [3], Step [0], Loss: 1.6957, Acc: 40.62%
Epoch [3], Step [50], Loss: 1.1756, Acc: 47.37%
Epoch [3], Step [100], Loss: 1.5331, Acc: 46.44%
Epoch [3], Step [150], Loss: 1.5685, Acc: 45.80%
=== Epoch 3 Avg Loss: 1.4390 ===
Epoch [4], Step [0], Loss: 1.0830, Acc: 71.88%
Epoch [4], Step [50], Loss: 1.3766, Acc: 49.94%
Epoch [4], Step [100], Loss: 1.5160, Acc: 48.24%
Epoch [4], Step [150], Loss: 1.2676, Acc: 47.83%
=== Epoch 4 Avg Loss: 1.3840 ===
Epoch [5], Step [0], Loss: 1.3026, Acc: 50.00%
Epoch [5], Step [50], Loss: 1.1956, Acc: 50.67%
E