<a href="https://colab.research.google.com/github/PiyushTewari2/squeezenet-pytorch/blob/main/training_v3_oct.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.nn.init as init
import torchvision
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
import zipfile
import time
import os
import copy

use_gpu = torch.cuda.is_available()

# Unzipping the dataset
zip_file = '/content/drive/MyDrive/squeeze-net/Data2.zip'
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
    zip_ref.extractall('/content/data')

# Data paths
train_dir = '/content/data/Data/Train_Data'
test_dir = '/content/data/Data/Test_Data'

# Data transformations
data_transforms = {
    'Train_Data': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor()
        #transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'Test_Data': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor()
        #transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

# Datasets and Dataloaders
image_datasets = {
    'Train_Data': datasets.ImageFolder(root=train_dir, transform=data_transforms['Train_Data']),
    'Test_Data': datasets.ImageFolder(root=test_dir, transform=data_transforms['Test_Data'])
}

dataloaders = {
    'Train_Data': DataLoader(image_datasets['Train_Data'], batch_size=4, shuffle=True),
    'Test_Data': DataLoader(image_datasets['Test_Data'], batch_size=4, shuffle=False)
}

dataset_sizes = {x: len(image_datasets[x]) for x in ['Train_Data', 'Test_Data']}
class_names = image_datasets['Train_Data'].classes

# Define the SqueezeNet v1.1 model explicitly
class Fire(nn.Module):
    def __init__(self, inplanes, squeeze_planes, expand1x1_planes, expand3x3_planes):
        super(Fire, self).__init__()
        self.inplanes = inplanes
        self.squeeze = nn.Conv2d(inplanes, squeeze_planes, kernel_size=1)
        self.squeeze_activation = nn.ReLU(inplace=True)
        self.expand1x1 = nn.Conv2d(squeeze_planes, expand1x1_planes, kernel_size=1)
        self.expand1x1_activation = nn.ReLU(inplace=True)
        self.expand3x3 = nn.Conv2d(squeeze_planes, expand3x3_planes, kernel_size=3, padding=1)
        self.expand3x3_activation = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.squeeze_activation(self.squeeze(x))
        return torch.cat([
            self.expand1x1_activation(self.expand1x1(x)),
            self.expand3x3_activation(self.expand3x3(x))
        ], 1)

class SqueezeNet(nn.Module):
    def __init__(self, version='1_1', num_classes=1000):
        super(SqueezeNet, self).__init__()
        self.num_classes = num_classes
        if version == '1_1':
            self.features = nn.Sequential(
                nn.Conv2d(3, 64, kernel_size=3, stride=2),
                nn.ReLU(inplace=True),
                nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True),
                Fire(64, 16, 64, 64),
                #Fire(128, 16, 64, 64),
                nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True),
                Fire(128, 32, 128, 128),
                #Fire(256, 32, 128, 128),
                nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True),
                Fire(256, 48, 192, 192),
                #Fire(384, 48, 192, 192),
                #Fire(384, 64, 256, 256),
                #Fire(512, 64, 256, 256),
            )
        else:
            raise ValueError("Unsupported SqueezeNet version {version}: 1_0 or 1_1 expected".format(version=version))

        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d(output_size=(1, 1)),
            nn.Flatten(),
            nn.Linear(384, num_classes),
            nn.Softmax(dim=1)
        )

        # Initialize weights
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
              init.kaiming_uniform_(m.weight)
              if m.bias is not None:
                  init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

# Create the SqueezeNet model for binary classification
squeezenet1_1 = SqueezeNet(version='1_1', num_classes=2)
print(squeezenet1_1)

SqueezeNet(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(2, 2))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=True)
    (3): Fire(
      (squeeze): Conv2d(64, 16, kernel_size=(1, 1), stride=(1, 1))
      (squeeze_activation): ReLU(inplace=True)
      (expand1x1): Conv2d(16, 64, kernel_size=(1, 1), stride=(1, 1))
      (expand1x1_activation): ReLU(inplace=True)
      (expand3x3): Conv2d(16, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (expand3x3_activation): ReLU(inplace=True)
    )
    (4): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=True)
    (5): Fire(
      (squeeze): Conv2d(128, 32, kernel_size=(1, 1), stride=(1, 1))
      (squeeze_activation): ReLU(inplace=True)
      (expand1x1): Conv2d(32, 128, kernel_size=(1, 1), stride=(1, 1))
      (expand1x1_activation): ReLU(inplace=True)
      (expand3x3): Conv2d(32, 128, kernel_size=(3, 3), stride=(1, 1), padd

In [12]:


# Move the model to GPU if available
if use_gpu:
    squeezenet1_1 = squeezenet1_1.cuda()

# Define loss criterion, optimizer, and learning rate scheduler
criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.SGD(squeezenet1_1.parameters(), lr=0.0005)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

# Training loop
def train_model(model, criterion, optimizer, scheduler, num_epochs=10):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['Train_Data', 'Test_Data']:
            if phase == 'Train_Data':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data
            for inputs, labels in dataloaders[phase]:
                if use_gpu:
                    inputs = inputs.cuda()
                    labels = labels.cuda()

                # Zero the parameter gradients
                optimizer.zero_grad()

                # Forward
                with torch.set_grad_enabled(phase == 'Train_Data'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # Backward + optimize only if in training phase
                    if phase == 'Train_Data':
                        loss.backward()
                        optimizer.step()

                # Statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            if phase == 'Train_Data':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # Deep copy the model
            if phase == 'Test_Data' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')

    # Load best model weights
    model.load_state_dict(best_model_wts)
    return model

# Train the model
squeezenet1_1 = train_model(squeezenet1_1, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=20)
torch.save(squeezenet1_1.state_dict(), '/content/drive/MyDrive/squeezenet_v4_nov.pt')

Epoch 1/20
----------
Train_Data Loss: 0.4647 Acc: 0.8594
Test_Data Loss: 0.4541 Acc: 0.8629

Epoch 2/20
----------
Train_Data Loss: 0.4418 Acc: 0.8751
Test_Data Loss: 0.4372 Acc: 0.8705

Epoch 3/20
----------
Train_Data Loss: 0.4265 Acc: 0.8853
Test_Data Loss: 0.4241 Acc: 0.8803

Epoch 4/20
----------
Train_Data Loss: 0.4143 Acc: 0.8981
Test_Data Loss: 0.4088 Acc: 0.9021

Epoch 5/20
----------
Train_Data Loss: 0.4023 Acc: 0.9128
Test_Data Loss: 0.4052 Acc: 0.9032

Epoch 6/20
----------
Train_Data Loss: 0.3961 Acc: 0.9185
Test_Data Loss: 0.3952 Acc: 0.9140

Epoch 7/20
----------
Train_Data Loss: 0.3921 Acc: 0.9232
Test_Data Loss: 0.3892 Acc: 0.9391

Epoch 8/20
----------
Train_Data Loss: 0.3866 Acc: 0.9289
Test_Data Loss: 0.3844 Acc: 0.9260

Epoch 9/20
----------
Train_Data Loss: 0.3858 Acc: 0.9294
Test_Data Loss: 0.3827 Acc: 0.9304

Epoch 10/20
----------
Train_Data Loss: 0.3856 Acc: 0.9327
Test_Data Loss: 0.3824 Acc: 0.9282

Epoch 11/20
----------
Train_Data Loss: 0.3853 Acc: 0.9307


In [None]:
import torch
import torch.nn as nn
import numpy as np
import os
import zipfile

def quantize_to_9bit_signed(weight):
    # Quantize to 9-bit signed integers
    scale = 2 ** 8
    quantized_weight = np.round(weight * scale).astype(np.int16)
    quantized_weight = np.clip(quantized_weight, -256, 255)  # Clip to fit in 9 bits
    return quantized_weight

def format_weights_for_verilog(weights, num_channels, num_filters):
    formatted_weights = []
    for c in range(num_channels - 1, -1, -1):  # Channels in reverse order
        channel_weights = []
        for f in range(num_filters - 1, -1, -1):  # Filters in reverse order
            filter_weights = weights[f, c].flatten().tolist()
            filter_weights_formatted = ','.join([f"{'-' if w < 0 else ''}9'd{abs(w)}" for w in filter_weights])
            channel_weights.append(f"{{{filter_weights_formatted}}}")
        formatted_weights.append(f"{{{','.join(channel_weights)}}}")

    return f"input_weight = {{{','.join(formatted_weights)}}};"

def format_biases_for_verilog(bias):
    bias_formatted = ','.join([f"{'-' if b < 0 else ''}9'd{abs(b)}" for b in bias])
    return f"input_bias = {{{bias_formatted}}};"

def save_quantized_weights_and_biases(model, output_dir, zip_filename):
    os.makedirs(output_dir, exist_ok=True)

    # Process each layer's weights and biases
    for name, param in model.named_parameters():
        param_np = param.detach().cpu().numpy()
        quantized_param = quantize_to_9bit_signed(param_np)

        if 'weight' in name:
            if len(quantized_param.shape) == 4:
                # For 4D convolutional weights
                num_filters, num_channels, _, _ = quantized_param.shape
                formatted_weights = format_weights_for_verilog(quantized_param, num_channels, num_filters)
            elif len(quantized_param.shape) == 2:
                # For 2D linear weights
                num_filters, num_channels = quantized_param.shape
                formatted_weights = format_weights_for_verilog(quantized_param.reshape(num_filters, num_channels, 1, 1), num_channels, num_filters)

            output_file = os.path.join(output_dir, f"{name.replace('.', '_')}.txt")
            with open(output_file, 'w') as f:
                f.write(formatted_weights)

        elif 'bias' in name:
            formatted_bias = format_biases_for_verilog(quantized_param)
            output_file = os.path.join(output_dir, f"{name.replace('.', '_')}.txt")
            with open(output_file, 'w') as f:
                f.write(formatted_bias)

    # Zip the output directory
    with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, _, files in os.walk(output_dir):
            for file in files:
                zipf.write(os.path.join(root, file), arcname=os.path.relpath(os.path.join(root, file), output_dir))

# Load the model and the weights
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SqueezeNet(version='1_1', num_classes=2).to(device)
model.load_state_dict(torch.load('/content/drive/MyDrive/squeezenet_v3_oct.pt', map_location=device))

# Save quantized weights and biases in Verilog format
save_quantized_weights_and_biases(model, '/content/drive/MyDrive/quantized_weights_v3', '/content/drive/MyDrive/quantized_weights_v3.zip')


  model.load_state_dict(torch.load('/content/drive/MyDrive/squeezenet_v3_oct.pt', map_location=device))
