In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import os
import torch.optim.lr_scheduler as lr_scheduler


# Define AlexNet Model
class AlexNet(nn.Module):
    def __init__(self, q=False):
        super(AlexNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 96, kernel_size=11, stride=4, padding=2),  # Conv1
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(96, 256, kernel_size=5, stride=1, padding=2),  # Conv2
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(256, 384, kernel_size=3, stride=1, padding=1),  # Conv3
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 384, kernel_size=3, stride=1, padding=1),  # Conv4
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, stride=1, padding=1),  # Conv5
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(256 * 6 * 6, 4096),  # FC6
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),  # FC7
            nn.ReLU(inplace=True),
            nn.Linear(4096, 10),  # FC8 (output for CIFAR-10)
        )
        self.q = q
        if q:
            self.quant = torch.quantization.QuantStub()
            self.dequant = torch.quantization.DeQuantStub()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        if self.q:
            x = self.quant(x)
        x = self.features(x)
        x = x.view(x.size(0), -1)  # Flatten
        x = self.classifier(x)
        if self.q:
            x = self.dequant(x)
        return x

    import torch.optim.lr_scheduler as lr_scheduler

# Enhanced data augmentation
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, padding=4),
    transforms.Resize((224, 224)),  # Resize for AlexNet input
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

transform_test = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Updated dataset
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
trainloader = DataLoader(trainset, batch_size=512, shuffle=True, num_workers=4, pin_memory=True)

testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)
testloader = DataLoader(testset, batch_size=512, shuffle=False, num_workers=4, pin_memory=True)

# Fine-tuned training function
def train(model, dataloader, epochs=50, cuda=False):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)  # Using Adam with weight decay
    scheduler = lr_scheduler.StepLR(optimizer, step_size=15, gamma=0.5)  # Reduce LR every 15 epochs

    model.train()
    for epoch in range(epochs):
        running_loss = 0.0
        correct = 0
        total = 0
        for i, data in enumerate(dataloader):
            inputs, labels = data
            if cuda:
                inputs, labels = inputs.cuda(), labels.cuda()
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        scheduler.step()  # Adjust learning rate
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {running_loss / len(dataloader):.4f}, Accuracy: {100 * correct / total:.2f}%')

# Testing function
def test(model, dataloader, cuda=False):
    correct = 0
    total = 0
    model.eval()
    with torch.no_grad():
        for data in dataloader:
            inputs, labels = data
            if cuda:
                inputs, labels = inputs.cuda(), labels.cuda()
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    print(f'Accuracy: {accuracy}%')
    return accuracy

# Train FP32 model
alexnet_fp32 = AlexNet(q=False).cuda()
train(alexnet_fp32, trainloader, epochs=50, cuda=True)
torch.save(alexnet_fp32.state_dict(), "alexnet_fp32_CIFAR.pth")
print("FP32 model saved as alexnet_fp3_CIFAR.pth")

# Test FP32 model
fp32_accuracy = test(alexnet_fp32, testloader, cuda=True)
print(f"FP32 Model Accuracy: {fp32_accuracy}%")




Files already downloaded and verified
Files already downloaded and verified
Epoch [1/50], Loss: 2.0544, Accuracy: 22.85%
Epoch [2/50], Loss: 1.6821, Accuracy: 37.21%
Epoch [3/50], Loss: 1.5008, Accuracy: 44.91%
Epoch [4/50], Loss: 1.3697, Accuracy: 50.18%
Epoch [5/50], Loss: 1.2640, Accuracy: 54.28%
Epoch [6/50], Loss: 1.1840, Accuracy: 57.44%
Epoch [7/50], Loss: 1.1102, Accuracy: 60.22%
Epoch [8/50], Loss: 1.0598, Accuracy: 62.25%
Epoch [9/50], Loss: 0.9923, Accuracy: 64.80%
Epoch [10/50], Loss: 0.9548, Accuracy: 66.30%
Epoch [11/50], Loss: 0.9277, Accuracy: 67.48%
Epoch [12/50], Loss: 0.9016, Accuracy: 68.24%
Epoch [13/50], Loss: 0.8681, Accuracy: 69.70%
Epoch [14/50], Loss: 0.8521, Accuracy: 70.26%
Epoch [15/50], Loss: 0.8356, Accuracy: 70.47%
Epoch [16/50], Loss: 0.7432, Accuracy: 73.89%
Epoch [17/50], Loss: 0.7149, Accuracy: 74.88%
Epoch [18/50], Loss: 0.7142, Accuracy: 75.09%
Epoch [19/50], Loss: 0.6960, Accuracy: 75.60%
Epoch [20/50], Loss: 0.6817, Accuracy: 76.21%
Epoch [21/50]

In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Per-Layer Quantization
def per_layer_quantize(tensor):
    max_val = tensor.abs().amax()
    scale = 127 / max_val
    quantized_tensor = (tensor * scale).clamp(-127, 127).round().char()
    return quantized_tensor, scale

def per_layer_dequantize(quantized_tensor, scale):
    return quantized_tensor.float() / scale

# Quantized Forward Pass
def quantized_forward_per_layer(model, x, quantize_fn, dequantize_fn):
    with torch.no_grad():
        weights_q = {}
        scales = {}

        # Quantize weights
        for name, param in model.named_parameters():
            weights_q[name], scales[name] = quantize_fn(param.data)

        # Forward pass through features
        for i, layer in enumerate(model.features):
            if isinstance(layer, nn.Conv2d):
                key = f"features.{i}.weight"
                activation_scale = 127 / (x.abs().amax() + 1e-8)
                x = F.conv2d(
                    x / activation_scale,
                    dequantize_fn(weights_q[key], scales[key]),
                    stride=layer.stride,
                    padding=layer.padding
                )
                x = (x * activation_scale).clamp(-127, 127).round().char()
            elif isinstance(layer, nn.MaxPool2d):
                x = x.float()  # Convert back to Float for pooling
                x = layer(x)
                activation_scale = 127 / (x.abs().amax() + 1e-8)  # Recompute scale
                x = (x * activation_scale).clamp(-127, 127).round().char()  # Requantize
            elif isinstance(layer, nn.ReLU):
                x = x.float()  # Convert back to Float for ReLU
                x = layer(x)
                activation_scale = 127 / (x.abs().amax() + 1e-8)  # Recompute scale
                x = (x * activation_scale).clamp(-127, 127).round().char()  # Requantize

        x = x.view(x.size(0), -1)

        # Forward pass through classifier
        for i, layer in enumerate(model.classifier):
            if isinstance(layer, nn.Linear):
                key = f"classifier.{i}.weight"
                activation_scale = 127 / (x.abs().amax() + 1e-8)
                x = F.linear(
                    x / activation_scale,
                    dequantize_fn(weights_q[key], scales[key])
                )
                x = (x * activation_scale).clamp(-127, 127).round().char()
            elif isinstance(layer, nn.ReLU) or isinstance(layer, nn.Dropout):
                x = x.float()  # Convert back to Float for ReLU or Dropout
                x = layer(x)
                activation_scale = 127 / (x.abs().amax() + 1e-8)  # Recompute scale
                x = (x * activation_scale).clamp(-127, 127).round().char()  # Requantize

        return x


# Quantized Model Testing
def test_quantized_per_layer(model, dataloader, device, quantize_fn, dequantize_fn):
    model.to(device)
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = quantized_forward_per_layer(model, inputs, quantize_fn, dequantize_fn)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    print(f"Quantized Model Accuracy: {accuracy}%")
    return accuracy


# Quantized Testing for CIFAR-10
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
alexnet_fp32.load_state_dict(torch.load("alexnet_fp32_CIFAR.pth"))
print("Testing Per-Layer INT8 Quantized Model...")
int8_accuracy = test_quantized_per_layer(alexnet_fp32, testloader, device, per_layer_quantize, per_layer_dequantize)
print(f"INT8 Quantized Model Accuracy: {int8_accuracy}%")


Testing Per-Layer INT8 Quantized Model...
Quantized Model Accuracy: 49.83%
INT8 Quantized Model Accuracy: 49.83%


In [12]:
# INT16 Per-Layer Quantization
def per_layer_quantize_int16(tensor):
    max_val = tensor.abs().amax()
    scale = 32767 / max_val
    quantized_tensor = (tensor * scale).clamp(-32767, 32767).round().short()
    return quantized_tensor, scale

def per_layer_dequantize_int16(quantized_tensor, scale):
    return quantized_tensor.float() / scale

# INT16 Quantized Forward Pass
def quantized_forward_per_layer_int16(model, x, quantize_fn, dequantize_fn):
    with torch.no_grad():
        weights_q = {}
        scales = {}

        # Quantize weights
        for name, param in model.named_parameters():
            weights_q[name], scales[name] = quantize_fn(param.data)

        # Forward pass through features
        for i, layer in enumerate(model.features):
            if isinstance(layer, nn.Conv2d):
                key = f"features.{i}.weight"
                activation_scale = 32767 / (x.abs().amax() + 1e-8)
                x = F.conv2d(
                    x / activation_scale,
                    dequantize_fn(weights_q[key], scales[key]),
                    stride=layer.stride,
                    padding=layer.padding
                )
                x = (x * activation_scale).clamp(-32767, 32767).round().short()
            elif isinstance(layer, nn.MaxPool2d):
                x = x.float()  # Convert back to Float for pooling
                x = layer(x)
                activation_scale = 32767 / (x.abs().amax() + 1e-8)  # Recompute scale
                x = (x * activation_scale).clamp(-32767, 32767).round().short()  # Requantize
            elif isinstance(layer, nn.ReLU):
                x = x.float()  # Convert back to Float for ReLU
                x = layer(x)
                activation_scale = 32767 / (x.abs().amax() + 1e-8)  # Recompute scale
                x = (x * activation_scale).clamp(-32767, 32767).round().short()  # Requantize

        x = x.view(x.size(0), -1)

        # Forward pass through classifier
        for i, layer in enumerate(model.classifier):
            if isinstance(layer, nn.Linear):
                key = f"classifier.{i}.weight"
                activation_scale = 32767 / (x.abs().amax() + 1e-8)
                x = F.linear(
                    x / activation_scale,
                    dequantize_fn(weights_q[key], scales[key])
                )
                x = (x * activation_scale).clamp(-32767, 32767).round().short()
            elif isinstance(layer, nn.ReLU) or isinstance(layer, nn.Dropout):
                x = x.float()  # Convert back to Float for ReLU or Dropout
                x = layer(x)
                activation_scale = 32767 / (x.abs().amax() + 1e-8)  # Recompute scale
                x = (x * activation_scale).clamp(-32767, 32767).round().short()  # Requantize

        return x

# Quantized Model Testing for INT16
def test_quantized_per_layer_int16(model, dataloader, device, quantize_fn, dequantize_fn):
    model.to(device)
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = quantized_forward_per_layer_int16(model, inputs, quantize_fn, dequantize_fn)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    print(f"Quantized Model Accuracy: {accuracy}%")
    return accuracy

# Test INT16 Quantized Model
print("Testing INT16 Quantized Model...")
int16_accuracy = test_quantized_per_layer_int16(
    alexnet_fp32, testloader, device, per_layer_quantize_int16, per_layer_dequantize_int16
)
print(f"INT16 Quantized Model Accuracy: {int16_accuracy}%")


Testing INT16 Quantized Model...
Quantized Model Accuracy: 50.0%
INT16 Quantized Model Accuracy: 50.0%


In [14]:
from torch.quantization import quantize_dynamic
def quantize_and_test(model, test_loader, quant_type):

    # Move the model to CPU for quantization
    model.cpu()

    if quant_type == 'int8':
        quantized_model = quantize_dynamic(model, {nn.Linear}, dtype=torch.qint8)  # INT8 quantization
    elif quant_type == 'int16':
        quantized_model = quantize_dynamic(model, {nn.Linear}, dtype=torch.float16)  # INT16 simulation
    else:
        raise ValueError(f"Unsupported quantization type: {quant_type}")

    # Test the quantized model
    quantized_model.eval()

    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            # Move data to CPU for testing
            images, labels = images.cpu(), labels.cpu()
            outputs = quantized_model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f"{quant_type.upper()} Quantized Model Accuracy: {accuracy:.2f}%")
    return accuracy

# Quantized model evaluations
quant_types = ['int8', 'int16']
quantized_accuracies = {}
for qt in quant_types:
    quantized_accuracies[qt] = quantize_and_test(alexnet_fp32, testloader, qt)

# Display results
print("\nQuantization Results:")
for qt, acc in quantized_accuracies.items():
    print(f"{qt.upper()} Accuracy: {acc:.2f}%")
     

INT8 Quantized Model Accuracy: 83.74%
INT16 Quantized Model Accuracy: 83.73%

Quantization Results:
INT8 Accuracy: 83.74%
INT16 Accuracy: 83.73%


In [18]:
import torch.onnx

# Convert FP32 Model to ONNX
def convert_fp32_to_onnx(model, onnx_filename, input_size=(1, 3, 224, 224)):  # Changed to 3 channels
    model.eval()
    dummy_input = torch.randn(*input_size).to(next(model.parameters()).device)
    torch.onnx.export(
        model,
        dummy_input,
        onnx_filename,
        export_params=True,
        opset_version=11,
        input_names=['input'],
        output_names=['output']
    )
    print(f"FP32 Model exported to {onnx_filename}")

# Example Usage
convert_fp32_to_onnx(alexnet_fp32, "alexnet_fp32_CIFAR.onnx")


FP32 Model exported to alexnet_fp32_CIFAR.onnx
