In [1]:
import sys
import copy
import random
import os
from typing import Tuple
from collections import OrderedDict

import numpy as np
import PIL 
import matplotlib.pyplot as plt


import torch
from torch import nn, optim
from torch.utils import data
from torchvision import datasets, transforms

In [2]:
# sys.path.append("/home/matthias/Documents/EmbeddedAI/deep-microcompression/")
sys.path.append("../../")

from development import (
    Sequential,
    AvgPool2d,
    BatchNorm2d,
    Conv2d,
    Flatten,
    Linear,
    ReLU,
    ReLU6,
    MaxPool2d,
)

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
mobilenetv1_file = f"mobilenetv1_state_dict_from_tf.pth"

LUCKY_NUMBER = 25
torch.manual_seed(LUCKY_NUMBER)
torch.random.manual_seed(LUCKY_NUMBER)
torch.cuda.manual_seed(LUCKY_NUMBER)

DEVICE

'cpu'

In [4]:
# class ImageNet_Validation_DataSet(data.Dataset):

#     def __init__(self, image_dir, combined_label_file, transformer=None):
        
#         if not os.path.exists(image_dir):
#             raise RuntimeError(f"image_dir {image_dir} doesn't exist!")
        
#         if not os.path.exists(combined_label_file):
#             raise RuntimeError(f"combined_label_file {combined_label_file} doesn't exist!")

#         self.image_dir = image_dir
#         self.images =  os.listdir(image_dir)
#         self.images.sort()

#         with open(combined_label_file, "r") as file:
#             self.labels, self.class_names = list(), list()

#             for line in file.readlines()[1:]:
#                 _, _, tf_label, class_name = line.strip().split(", ")
#                 self.labels.append(tf_label)
#                 self.class_names.append(class_name)

#         assert len(self.images) == len(self.labels), "Images not of the same length as targets"

#         self.transformer = transformer

        
#     def __len__(self):
#         return len(self.images)

#     def __getitem__(self, idx):
#         image = PIL.Image.open(os.path.join(self.image_dir, self.images[idx])).convert("RGB")
#         if self.transformer:
#             image = self.transformer(image)
#         return  image, int(self.labels[idx])
        
        


# imagenet_transformer = transforms.Compose([
#     transforms.Resize((224, 224)),
#     transforms.ToTensor(),
#     transforms.Normalize(mean=[0.5, 0.5, 0.50], std=[0.5, 0.5, 0.5])
# ])

# imagenet_val_dir = "../../../Datasets/ImageNet_2012/validation/ILSVRC2012_img_val/"
# imagenet_val_combined_label_file = "../../../Datasets/ImageNet_2012/validation/ILSVRC2012_validation_combined_ground_truth.txt"
# imagenet_val_dataset = ImageNet_Validation_DataSet(image_dir=imagenet_val_dir, combined_label_file=imagenet_val_combined_label_file, transformer=imagenet_transformer)
# imagenet_val_dataloader = data.DataLoader(imagenet_val_dataset, shuffle=False, batch_size=32, num_workers=os.cpu_count())

In [28]:
data_transform = transforms.Compose([
    # transforms.RandomCrop((24, 24)),
    transforms.Resize((32, 32)),
    transforms.ToTensor(),
])




cifar10_train_dataset = datasets.CIFAR10("../../../Datassets/", train=True, download=True, transform=data_transform)
cifar10_test_dataset = datasets.CIFAR10(".../../../Datassets/", train=False, download=True, transform=data_transform)

cifar10_train_loader = data.DataLoader(cifar10_train_dataset, batch_size=32, shuffle=True)
cifar10_test_loader = data.DataLoader(cifar10_test_dataset, batch_size=32)

# cifar100_train_dataset = datasets.CIFAR100("./datasets", train=True, download=True, transform=data_transform)
# cifar100_test_dataset = datasets.CIFAR100("./datasets", train=False, download=True, transform=data_transform)

# cifar100_train_loader = data.DataLoader(cifar100_train_dataset, batch_size=32, shuffle=True, num_workers=os.cpu_count())
# cifar100_test_loader = data.DataLoader(cifar100_test_dataset, batch_size=32, num_workers=os.cpu_count())


In [29]:
def ConvBatchReLU(
        in_channels:int,
        out_channels:int,
        kernel_size:int,
        stride:int = 1,
        groups:int = 1,
        pad:Tuple[int, int, int, int] = (0, 0, 0, 0),
        bias=False,
        eps=0.001, 
        momentum=0.01,
):
    return (Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, groups=groups, pad=pad, bias=bias),
            BatchNorm2d(num_features=out_channels, eps=eps, momentum=momentum, affine=True, track_running_stats=True,),
            ReLU6(inplace=True))

In [30]:
def DepthwiseSeperableConv2LUBatchReLU(
        in_channels:int,
        out_channels:int,
        kernel_size:int,
        stride:int,
        pad:Tuple[int, int, int, int] = (0, 0, 0, 0),
        eps=0.001, 
        momentum=0.01
):
    return (
        *ConvBatchReLU(in_channels=in_channels, out_channels=in_channels, kernel_size=kernel_size, stride=stride, pad=pad, groups=in_channels, eps=eps, momentum=momentum),
        *ConvBatchReLU(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=1, pad=(0,0,0,0), groups=1, eps=eps, momentum=momentum)
    )

In [31]:
def MobileNetV1(just_backbone=False, classifer=None, pretrained_backbone_weight_file=None):
    mobilenetv1_backbone = {
        "conv2d_0": [3, 32, 3, 2],

        "depthwiseseparable_0": [32, 64, 3, 1],
        "depthwiseseparable_1": [64, 128, 3, 2],
        "depthwiseseparable_2": [128, 128, 3, 1],
        "depthwiseseparable_3": [128, 256, 3, 2],
        "depthwiseseparable_4": [256, 256, 3, 1],
        "depthwiseseparable_5": [256, 512, 3, 2],

        "depthwiseseparable_6": [512, 512, 3, 1],
        "depthwiseseparable_7": [512, 512, 3, 1],
        "depthwiseseparable_8": [512, 512, 3, 1],
        "depthwiseseparable_9": [512, 512, 3, 1],
        "depthwiseseparable_10": [512, 512, 3, 1],

        "depthwiseseparable_11": [512, 1024, 3, 2],
        "depthwiseseparable_12": [1024, 1024, 3, 1],

    }

    batchnorm_eps = 0.001
    batchnorm_momentum = 1 - 0.99

    layers = []

    for name, parameters in mobilenetv1_backbone.items():
        if "conv2d" in name:
            in_channels, out_channels, kernel_size, stride = parameters
            if stride == 2:
                pad = (0, 1, 0, 1)
            else:
                raise RuntimeError(f"Unexpected type Conv layer")
            layers.extend(ConvBatchReLU(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, pad=pad, eps=batchnorm_eps, momentum=batchnorm_momentum))
        elif "depthwiseseparable" in name:
            in_channels, out_channels, kernel_size, stride = parameters
            if stride == 2:
                pad = (0, 1, 0, 1)
            else:
                pad = tuple([1]*4)
            layers.extend(DepthwiseSeperableConv2LUBatchReLU(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, pad=pad, eps=batchnorm_eps, momentum=batchnorm_momentum))
        else:
            raise ValueError(f"Recieved unexpected layer of {name}")
    
    model_backbone = Sequential(*layers)


    if pretrained_backbone_weight_file is not None:
        state_dict = torch.load(pretrained_backbone_weight_file)
        print(model_backbone.load_state_dict(torch.load(pretrained_backbone_weight_file), strict=False)
)
    if not just_backbone:
        layers = []

        if classifer is None:
            classifer = {
                "avgpool_0": [7],
                "conv2d_1": [1024, 1000, 1, 1],
                "flatten_0": [],
            }

        for name, parameters in classifer.items():
            if "conv2d" in name:
                in_channels, out_channels, kernel_size, stride = parameters
                layers.extend(ConvBatchReLU(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, eps=batchnorm_eps, momentum=batchnorm_momentum))
            elif "avgpool" in name:
                kernel_size = parameters[0]
                stride = kernel_size
                layers.append(AvgPool2d(kernel_size=kernel_size))
            elif "flatten" in name:
                layers.append(Flatten())
            elif "linear" in name:
                in_features, out_features = parameters
                layers.append(Linear(in_features=in_features, out_features=out_features))
            else:
                raise ValueError(f"Recieved unexpected layer of {name}")

    return model_backbone.extend(layers)


# Mobilenetv1 for CIFAR10

In [32]:
cifar10_classifier = {
    "conv2d_1": [1024, 10, 1, 1],
    "flatten_0": [],
}
mobilenetv1_cifar10_model = MobileNetV1(
    just_backbone=False, 
    classifer=cifar10_classifier, 
    pretrained_backbone_weight_file=mobilenetv1_file
).to(DEVICE)


# mobilenetv1_cifar10_model.load_state_dict(torch.load(mobilenetv1_file), strict=False)

_IncompatibleKeys(missing_keys=[], unexpected_keys=['conv2d_27.weight', 'conv2d_27.bias', 'conv2d_0.bias', 'conv2d_1.bias', 'conv2d_2.bias', 'conv2d_3.bias', 'conv2d_4.bias', 'conv2d_5.bias', 'conv2d_6.bias', 'conv2d_7.bias', 'conv2d_8.bias', 'conv2d_9.bias', 'conv2d_10.bias', 'conv2d_11.bias', 'conv2d_12.bias', 'conv2d_13.bias', 'conv2d_14.bias', 'conv2d_15.bias', 'conv2d_16.bias', 'conv2d_17.bias', 'conv2d_18.bias', 'conv2d_19.bias', 'conv2d_20.bias', 'conv2d_21.bias', 'conv2d_22.bias', 'conv2d_23.bias', 'conv2d_24.bias', 'conv2d_25.bias', 'conv2d_26.bias'])


In [33]:
mobilenetv1_cifar10_model

Sequential(
  (conv2d_0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), bias=False)
  (batchnorm2d_0): BatchNorm2d(32, eps=0.001, momentum=0.010000000000000009, affine=True, track_running_stats=True)
  (relu6_0): ReLU6(inplace=True)
  (conv2d_1): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), groups=32, bias=False)
  (batchnorm2d_1): BatchNorm2d(32, eps=0.001, momentum=0.010000000000000009, affine=True, track_running_stats=True)
  (relu6_1): ReLU6(inplace=True)
  (conv2d_2): Conv2d(32, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
  (batchnorm2d_2): BatchNorm2d(64, eps=0.001, momentum=0.010000000000000009, affine=True, track_running_stats=True)
  (relu6_2): ReLU6(inplace=True)
  (conv2d_3): Conv2d(64, 64, kernel_size=(3, 3), stride=(2, 2), groups=64, bias=False)
  (batchnorm2d_3): BatchNorm2d(64, eps=0.001, momentum=0.010000000000000009, affine=True, track_running_stats=True)
  (relu6_3): ReLU6(inplace=True)
  (conv2d_4): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), bi

In [38]:
next(iter(cifar10_test_dataset))[1], next(iter(cifar10_test_dataset))[1]

(3, 3)

In [35]:
top1_acc_fun = lambda y_pred, y_true: (y_pred.argmax(dim=1) == y_true).sum().item()    
mobilenetv1_cifar10_model.evaluate(cifar10_test_dataset, top1_acc_fun, device=DEVICE)

  0%|          | 0/10000 [00:00<?, ?it/s]


AttributeError: 'int' object has no attribute 'to'

In [None]:
mobilenetv1_model.cpu()

mobilenetv1_mcu_model = copy.deepcopy(mobilenetv1_model)

original_acc = mobilenetv1_mcu_model.evaluate(imagenet_val_dataloader, top1_acc_fun)
original_size = mobilenetv1_mcu_model.get_size_in_bits()//8
print(f"The original model accuracy is {original_acc*100:.2f}% with size {original_size} bytes.")
test_output = mobilenetv1_mcu_model.test(); print(test_output)
# mobilenetv1_mcu_model.convert_to_c(var_name="mobilenetv1_mcu_model", src_dir="./Arduino Nano 33 BLE/src/", include_dir="./Arduino Nano 33 BLE/include/")
mobilenetv1_mcu_model.convert_to_c(var_name="mobilenetv1_mcu_model", src_dir="./HP HP Pavilion Laptop 15-cs3xxx/src/", include_dir="./HP HP Pavilion Laptop 15-cs3xxx/include/")


  0%|          | 0/1563 [00:04<?, ?it/s]


The original model accuracy is 68.75% with size 17059232 bytes.
tensor([[ 3.2222e+00,  9.0384e+00,  1.6922e+00,  2.9519e+00,  5.3741e+00,
          3.3085e-01,  4.1090e+00,  2.5040e+00,  3.1401e+00, -8.7384e-02,
         -1.1370e+00, -2.6074e-01, -1.9652e-01, -3.4057e+00, -2.7831e+00,
          1.2588e+00, -2.9276e+00, -3.2650e+00, -9.5138e-01, -2.4112e+00,
         -8.9809e+00,  9.1203e-01, -1.9521e+00, -2.0736e+00, -3.3935e+00,
          3.4715e+00,  6.2112e+00,  7.1531e+00,  1.6990e+00,  1.0570e+01,
         -1.5256e+00, -1.4328e+00,  1.4381e+00,  4.5991e+00,  3.1130e+00,
          2.0042e+00, -1.5593e+00,  1.9062e+00,  2.0727e+00,  5.6052e+00,
          4.3260e+00,  3.1655e-01,  6.4658e+00,  5.5718e+00,  1.6820e+00,
          4.5489e+00,  3.1417e+00,  5.6063e+00,  6.7165e+00,  3.1532e+00,
          2.3107e+00,  2.3647e+00, -6.0248e-01, -5.0344e+00, -2.5723e+00,
         -1.7455e+00,  1.8690e-02, -3.9020e+00, -3.9278e+00, -2.1961e+00,
         -8.1626e-01, -5.3579e-01,  2.3169e-01, 

In [None]:
i = 0

test_output[0, i:i+20]

tensor([ 3.2222,  9.0384,  1.6922,  2.9519,  5.3741,  0.3308,  4.1090,  2.5040,
         3.1401, -0.0874, -1.1370, -0.2607, -0.1965, -3.4057, -2.7831,  1.2588,
        -2.9276, -3.2650, -0.9514, -2.4112])

In [None]:
sparsity = .34

mobilenetv1_model.cpu()
mobilenetv1_mcu_model = mobilenetv1_model.prune_channel(sparsity)

mobilenetv1_mcu_model.to(DEVICE)
print(f"Pruned with {sparsity}, acc = {mobilenetv1_mcu_model.evaluate(cifar10_test_loader, top1_acc_fun, device=DEVICE)}")

mobilenetv1_mcu_model.cpu()
mobilenetv1_mcu_model.convert_to_c(var_name="mobilenetv1_mcu_model", src_dir="./HP HP Pavilion Laptop 15-cs3xxx/src/", include_dir="./HP HP Pavilion Laptop 15-cs3xxx/include/")

mobilenetv1_mcu_model.to(DEVICE)
mobilenetv1_mcu_model.test(DEVICE)

NameError: name 'cifar10_test_loader' is not defined

# MobileNetV1 Tensorflow to Torch Converter 

In [None]:
import torch
from torch import nn

import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
import tensorflow as tf

import numpy as np


@torch.no_grad()
def copy_tensor(tensor_source, tensor_destination):
    tensor_destination.copy_(tensor_source)


@torch.no_grad()
def convert_mobilenetv1_tf_to_torch():
    mobilenetv1_tf_model = tf.keras.applications.mobilenet.MobileNet(weights="imagenet")
    
    torch_layers = []
    for layer in mobilenetv1_tf_model.layers:


        if isinstance(layer, tf.keras.layers.InputLayer):
            pass

        elif isinstance(layer, tf.keras.layers.Conv2D):

            weight = np.transpose(layer.weights[0], (3, 2, 0, 1))
            out_channels, in_channels, kernel_size, _ = weight.shape
            stride =layer.strides[0]

            if kernel_size == 3 and stride == 2:
                pad = (0, 1, 0, 1)
                padding = 0
            elif kernel_size == 1 and stride == 1:
                pad = ()
                padding = 0
            else:
                raise RuntimeError(f"Unexpected type Conv layer")
            
            if len(layer.weights) == 1:
                conv_layer = Conv2d(
                    in_channels=in_channels, 
                    out_channels=out_channels, 
                    kernel_size=kernel_size, 
                    stride=stride, 
                    padding=padding, 
                    pad=pad, 
                    bias=True
                )
            else:
                conv_layer = Conv2d(
                    in_channels=in_channels, 
                    out_channels=out_channels, 
                    kernel_size=kernel_size, 
                    stride=stride, 
                    padding=padding, 
                    pad=pad, 
                    bias=False
                )
                copy_tensor(torch.from_numpy(layer.weights[1].numpy()), conv_layer.bias)

            copy_tensor(torch.from_numpy(weight), conv_layer.weight)
            torch_layers.append(conv_layer)         

        elif isinstance(layer, tf.keras.layers.DepthwiseConv2D):

            # Convert to PyTorch format: [in_channels, 1, H, W]
            weight = np.transpose(layer.weights[0], (2, 3, 0, 1))
            in_channels, depth_multiplier, kernel_size, _ = weight.shape
            stride =layer.strides[0]

            assert depth_multiplier == 1, "Depth multiplier must be 1 for depthwise convolutions"

            if kernel_size == 3 and stride == 2:
                padding = 0
                pad = (0, 1, 0, 1)
            elif kernel_size == 3 and stride == 1:
                padding = 1
                pad = ()
            else:
                raise RuntimeError(f"Unexpected type Conv layer")

            if len(layer.weights) == 1:
                deepseperableconv_layer = Conv2d(in_channels=in_channels, 
                    out_channels=in_channels, 
                    kernel_size=kernel_size, 
                    stride=stride, 
                    padding=padding, pad=pad, 
                    groups=in_channels, 
                    bias=False
                )
            else: 
                deepseperableconv_layer = Conv2d(in_channels=in_channels, 
                    out_channels=in_channels, 
                    kernel_size=kernel_size, 
                    stride=stride, 
                    padding=padding, pad=pad, 
                    groups=in_channels, 
                    bias=True
                )
                copy_tensor(torch.from_numpy(layer.weights[1].numpy()), deepseperableconv_layer.bias)

            copy_tensor(torch.from_numpy(weight), deepseperableconv_layer.weight)

            torch_layers.append(deepseperableconv_layer)

        elif isinstance(layer, tf.keras.layers.ReLU):
            # torch_layers.append(ReLU())
            torch_layers.append(ReLU6())

        elif isinstance(layer, tf.keras.layers.BatchNormalization):
            gamma, beta, mean, var = layer.weights
            out_channels = gamma.shape[0]

            torch_layers.append(BatchNorm2d(out_channels, eps=layer.epsilon, momentum=1 - layer.momentum, affine=layer.center and layer.scale,  track_running_stats=True,))
            print(torch_layers[-1].momentum)  
            print(layer.epsilon, layer.momentum, layer.center, layer.scale)
            copy_tensor(torch.from_numpy(gamma.numpy()), torch_layers[-1].weight)
            copy_tensor(torch.from_numpy(beta.numpy()), torch_layers[-1].bias)
            copy_tensor(torch.from_numpy(mean.numpy()), torch_layers[-1].running_mean)
            copy_tensor(torch.from_numpy(var.numpy()), torch_layers[-1].running_var)

        elif isinstance(layer, tf.keras.layers.MaxPooling2D):
            pass

        elif isinstance(layer, tf.keras.layers.GlobalAveragePooling2D):
            torch_layers.append(AvgPool2d(kernel_size=7))
            pass

        elif isinstance(layer, tf.keras.layers.ZeroPadding2D):
            print("Not Needed")

        elif isinstance(layer, tf.keras.layers.Dropout):
            print("Not Needed")

        elif isinstance(layer, tf.keras.layers.Reshape):
            torch_layers.append(Flatten())

        elif isinstance(layer, tf.keras.layers.Activation):
            print(layer.activation.__name__)
            torch_layers.append(nn.Softmax(dim=1))

        else: 
            raise RuntimeError(f"Unknown layer type: {type(layer)}")
        
    return Sequential(*torch_layers)
    
mobilenetv1_torch_model = convert_mobilenetv1_tf_to_torch()
type(mobilenetv1_torch_model)