In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
import numpy as np
import matplotlib.pyplot as plt
import struct

# Hardware Constraints per Specification
INPUT_SIZE = 784
HIDDEN_SIZE = 128
OUTPUT_SIZE = 10
NUM_BANKS = 64
MAX_DSP_OUTPUT = 2**47  # 48-bit accumulator
QTZ_INT8_MIN = -128
QTZ_INT8_MAX = 127
QTZ_UINT8_MIN = 0
QTZ_UINT8_MAX = 255

# Set seeds
torch.manual_seed(42)
np.random.seed(42)

print("Libraries loaded and hardware constants defined.")

Libraries loaded and hardware constants defined.


In [2]:
# Prepare data - We keep it simple (0-1 float) for training,
# but will map to 0-255 integers for the FPGA.
transform = transforms.Compose([
    transforms.ToTensor(), # Converts 0-255 image to 0.0-1.0 float
])

train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST('./data', train=False, download=True, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1000, shuffle=False)

print("MNIST Data downloaded and loaded.")

100%|██████████| 9.91M/9.91M [00:01<00:00, 6.09MB/s]
100%|██████████| 28.9k/28.9k [00:00<00:00, 160kB/s]
100%|██████████| 1.65M/1.65M [00:01<00:00, 1.52MB/s]
100%|██████████| 4.54k/4.54k [00:00<00:00, 1.26MB/s]

MNIST Data downloaded and loaded.





In [21]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import OneCycleLR

class FPGA_MLP(nn.Module):
    def __init__(self):
        super(FPGA_MLP, self).__init__()
        # Layer 1: 784 -> 128. Bias=False to match DSP accumulator logic
        self.fc1 = nn.Linear(INPUT_SIZE, HIDDEN_SIZE, bias=False)
        self.relu = nn.ReLU()
        # Layer 2: 128 -> 10. Bias=False
        self.fc2 = nn.Linear(HIDDEN_SIZE, OUTPUT_SIZE, bias=False)

    def forward(self, x):
        x = x.view(-1, 784)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

model = FPGA_MLP()
print(model)

FPGA_MLP(
  (fc1): Linear(in_features=784, out_features=128, bias=False)
  (relu): ReLU()
  (fc2): Linear(in_features=128, out_features=10, bias=False)
)


In [22]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Adam optimizer with weight decay for regularization
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
criterion = nn.CrossEntropyLoss()

# OneCycleLR scheduler: peaks at max_lr then decays
# total_steps = epochs * batches_per_epoch
EPOCHS = 5
steps_per_epoch = len(train_loader)
scheduler = OneCycleLR(
    optimizer,
    max_lr=0.001,  # Peak learning rate
    epochs=EPOCHS,
    steps_per_epoch=steps_per_epoch,
    pct_start=0.3,  # Spend 30% of training in warmup phase
    anneal_strategy='cos'  # Cosine annealing
)

def train(epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        scheduler.step()  # Update LR after each batch

        if batch_idx % 100 == 0:
            print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} '
                  f'({100. * batch_idx / len(train_loader):.0f}%)]\t'
                  f'Loss: {loss.item():.6f}\tLR: {scheduler.get_last_lr()[0]:.6f}')

def test():
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()

    acc = 100. * correct / len(test_loader.dataset)
    print(f'\nTest set: Accuracy: {correct}/{len(test_loader.dataset)} ({acc:.2f}%)\n')
    return acc

# Train for 5 epochs with OneCycleLR
for epoch in range(1, EPOCHS + 1):
    train(epoch)
    test()



Test set: Accuracy: 9214/10000 (92.14%)


Test set: Accuracy: 9511/10000 (95.11%)


Test set: Accuracy: 9647/10000 (96.47%)


Test set: Accuracy: 9688/10000 (96.88%)


Test set: Accuracy: 9703/10000 (97.03%)



In [23]:
# Observers to store min/max values
stats = {
    'input_min': 0, 'input_max': 0,
    'l1_out_min': 0, 'l1_out_max': 0,
    # Note: Layer 2 output is logits, we don't strictly need to quantize
    # the final output for Argmax, but we will for completeness.
}

model.eval()
with torch.no_grad():
    # Pass a batch of data to calibrate
    data_iter = iter(train_loader)
    images, _ = next(data_iter)
    images = images.to(device)

    # Input Stats (Should be 0.0 to 1.0)
    flat_img = images.view(-1, 784)
    stats['input_max'] = flat_img.max().item()

    # Layer 1 Output Stats (Before ReLU)
    l1_out = model.fc1(flat_img)
    stats['l1_out_max'] = l1_out.max().item()
    stats['l1_out_min'] = l1_out.min().item()

    print("Calibration Stats:", stats)

Calibration Stats: {'input_min': 0, 'input_max': 1.0, 'l1_out_min': -9.771818161010742, 'l1_out_max': 7.377934455871582}


In [24]:
# --- Calculate Quantization Scales ---

# Input scale: MNIST images are normalized [0, 1], map to [0, 255]
S_input = 1.0 / 255.0

# Layer 1 output scale (after fc1, before ReLU)
# Use symmetric quantization for signed values
S_l1_out = max(abs(stats['l1_out_min']), abs(stats['l1_out_max'])) / 127.0

print(f"S_input: {S_input:.6f}")
print(f"S_l1_out: {S_l1_out:.6f}")

# --- Quantize Weights ---

# Extract weights from trained model
W1_fp32 = model.fc1.weight.data.cpu().numpy()  # Shape: [128, 784]
W2_fp32 = model.fc2.weight.data.cpu().numpy()  # Shape: [10, 128]

# Calculate weight scales (symmetric quantization to int8)
S_w1 = np.max(np.abs(W1_fp32)) / 127.0
S_w2 = np.max(np.abs(W2_fp32)) / 127.0

print(f"S_w1: {S_w1:.6f}")
print(f"S_w2: {S_w2:.6f}")

# Quantize weights to int8
W1_q = np.round(W1_fp32 / S_w1).astype(np.int8)
W2_q = np.round(W2_fp32 / S_w2).astype(np.int8)

print(f"W1_q shape: {W1_q.shape}, dtype: {W1_q.dtype}")
print(f"W2_q shape: {W2_q.shape}, dtype: {W2_q.dtype}")

# Verify quantization error
w1_error = np.mean(np.abs(W1_fp32 - (W1_q * S_w1)))
w2_error = np.mean(np.abs(W2_fp32 - (W2_q * S_w2)))
print(f"W1 quantization error (MAE): {w1_error:.6f}")
print(f"W2 quantization error (MAE): {w2_error:.6f}")


S_input: 0.003922
S_l1_out: 0.076943
S_w1: 0.004329
S_w2: 0.006170
W1_q shape: (128, 784), dtype: int8
W2_q shape: (10, 128), dtype: int8
W1 quantization error (MAE): 0.000919
W2 quantization error (MAE): 0.001512


In [25]:
import math

def get_shift_only_param(real_multiplier):
    """
    Approximates a float multiplier (e.g., 0.0034) using ONLY a bit shift.
    Mathematically: Finds N such that 2^(-N) ~= real_multiplier
    """
    if real_multiplier <= 0:
        return 0 # Should not happen with ReLU

    # We want: real_multiplier ~= 1.0 / (2^shift)
    # So: shift ~= -log2(real_multiplier)
    shift = round(-math.log2(real_multiplier))

    # Clamp shift to reasonable values (e.g., 0 to 31)
    shift = max(0, min(31, int(shift)))

    # Calculate the actual scale we ended up with
    actual_scale = 1.0 / (2**shift)
    error = abs(actual_scale - real_multiplier) / real_multiplier

    print(f"Target: {real_multiplier:.5f} | Shift: {shift} | Actual: {actual_scale:.5f} | Error: {error*100:.1f}%")
    return shift

# --- Recalculate Scales for Shift-Only ---

# 1. Effective Scale for Layer 1
M_effective_l1 = (S_input * S_w1) / S_l1_out
print("Layer 1 Param:")
shift_l1 = get_shift_only_param(M_effective_l1)

# 2. Effective Scale for Layer 2
# (Reusing previous S_logits calculation)
S_logits = 10.0 / 127.0
M_effective_l2 = (S_l1_out * S_w2) / S_logits
print("Layer 2 Param:")
shift_l2 = get_shift_only_param(M_effective_l2)

Layer 1 Param:
Target: 0.00022 | Shift: 12 | Actual: 0.00024 | Error: 10.7%
Layer 2 Param:
Target: 0.00603 | Shift: 7 | Actual: 0.00781 | Error: 29.6%


In [26]:
def fpga_layer_sim_shift_only(input_vec, weights, shift, activation='relu'):
    # 1. Matrix Vector Multiply (Accumulation in Int32/48)
    acc = np.matmul(weights.astype(np.int32), input_vec.astype(np.int32))

    # 2. ReLU
    if activation == 'relu':
        acc = np.maximum(acc, 0)

    # 3. PURE SHIFT Requantization
    # We perform a standard arithmetic right shift
    output_val = acc >> shift

    # 4. Saturation / Clipping to 0-255 (UInt8)
    output = np.clip(output_val, 0, 255).astype(np.uint8)

    return output

def run_fpga_inference_shift_only(image_tensor):
    img_uint8 = (image_tensor.view(-1).numpy() * 255).astype(np.uint8)

    # Layer 1
    l1_out = fpga_layer_sim_shift_only(img_uint8, W1_q, shift_l1, activation='relu')

    # Layer 2
    l2_out = fpga_layer_sim_shift_only(l1_out, W2_q, shift_l2, activation='none')

    return np.argmax(l2_out)

# Run Verification
correct = 0
total = 0
limit = 1000

print("Running Shift-Only Simulation...")
for i in range(limit):
    img, target = test_dataset[i]
    pred = run_fpga_inference_shift_only(img)
    if pred == target:
        correct += 1
    total += 1

print(f"FPGA (Shift-Only) Accuracy: {100.0 * correct / total:.2f}%")

Running Shift-Only Simulation...
FPGA (Shift-Only) Accuracy: 97.00%


In [27]:
print("--- MODEL EXPORT SUMMARY ---")

# 1. Quantized Weights
# Ensure they are explicitly int8
W1_final = W1_q.astype(np.int8)
W2_final = W2_q.astype(np.int8)

print(f"Layer 1 Weights: {W1_final.shape} | Min: {W1_final.min()} | Max: {W1_final.max()} | dtype: {W1_final.dtype}")
print(f"Layer 2 Weights: {W2_final.shape}  | Min: {W2_final.min()} | Max: {W2_final.max()} | dtype: {W2_final.dtype}")

# 2. Quantization Parameters (Shift Only)
# Ensure they are standard Python integers
shift_l1_final = int(shift_l1)
shift_l2_final = int(shift_l2)

print(f"\nLayer 1 Shift: {shift_l1_final}")
print(f"Layer 2 Shift: {shift_l2_final}")

# 3. Validation
if W1_final.shape != (128, 784):
    print("WARNING: W1 shape mismatch! Expected (128, 784)")
if W2_final.shape != (10, 128):
    print("WARNING: W2 shape mismatch! Expected (10, 128)")

--- MODEL EXPORT SUMMARY ---
Layer 1 Weights: (128, 784) | Min: -127 | Max: 68 | dtype: int8
Layer 2 Weights: (10, 128)  | Min: -127 | Max: 84 | dtype: int8

Layer 1 Shift: 12
Layer 2 Shift: 7


In [28]:
import numpy as np

# Save to a compressed .npz file
# This file contains everything needed to run the model on the FPGA
outfile = 'mnist_model.npz'

np.savez(
    outfile,
    # Weights (The Matrices)
    w1=W1_final,
    w2=W2_final,

    # Shifts (The Scalars)
    # We wrap them in numpy arrays because save/load works best with arrays
    shift_l1=np.array(shift_l1_final),
    shift_l2=np.array(shift_l2_final)
)

print(f"Successfully saved model to: {outfile}")
print("Keys in file: ['w1', 'w2', 'shift_l1', 'shift_l2']")

Successfully saved model to: mnist_model.npz
Keys in file: ['w1', 'w2', 'shift_l1', 'shift_l2']
