## DNN model for MNIST dataset

In [14]:
import numpy as np
import torch
import tensorflow as tf

# Load TensorFlow MNIST data
mnist = tf.keras.datasets.mnist
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

# Normalize and flatten the images
train_images_tf = train_images.reshape((-1, 28*28)) / 255.0
test_images_tf = test_images.reshape((-1, 28*28)) / 255.0

# Convert to PyTorch format [batch_size, total pixels]
# Since images are already normalized and flattened for TensorFlow, we can use the same arrays
train_images_pt = torch.tensor(train_images_tf).float()
test_images_pt = torch.tensor(test_images_tf).float()
train_labels_pt = torch.tensor(train_labels)
test_labels_pt = torch.tensor(test_labels)

In [26]:
from tensorflow import keras
num_classes = 10

model_tf = keras.Sequential([
    keras.layers.InputLayer(input_shape=(28*28,)),  # Adjusted for 28x28 images
#    keras.layers.Dense(128, activation='relu'),     # Increased number of neurons
    keras.layers.Dense(56, activation='relu'),      # Additional hidden layer
    keras.layers.Dense(num_classes, activation='softmax')  # Output layer for 10 classes
])

model_tf.compile(optimizer='adam', 
              loss='sparse_categorical_crossentropy', 
              metrics=['accuracy'])

In [27]:
# Train the model
history = model_tf.fit(train_images_tf, train_labels, epochs=3, batch_size=32, validation_split=0.1)

# Evaluate the model
test_loss, test_acc = model_tf.evaluate(test_images_tf, test_labels, verbose=2)
print('\nTest accuracy:', test_acc)

Epoch 1/3
Epoch 2/3
Epoch 3/3
313/313 - 0s - loss: 0.1174 - accuracy: 0.9647 - 208ms/epoch - 665us/step

Test accuracy: 0.9646999835968018


### Convert to Pytorch DNN model

In [31]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
    def __init__(self, num_classes=10):
        super(Net, self).__init__()
        # Fully connected layers / Dense block
        # First dense layer
        self.fc1 = nn.Linear(28*28, 56)  # Flatten 28*28 and feed into 56 neurons
        
        # Second dense layer (output layer)
        self.fc2 = nn.Linear(56, num_classes)  # 56 inputs, 10 outputs (number of classes)

    def forward(self, x):
        # Flatten the tensor
        #x = x.view(-1, 28*28)
        
        # Fully connected layers with ReLU activation for the first layer
        x = F.relu(self.fc1(x))

        # Output layer with no activation
        # Softmax will be applied externally during training and evaluation
        x = self.fc2(x)
        return F.log_softmax(x, dim = 1)
    
model_pt = Net()

In [32]:
# Transfer weights for the first dense layer (fc1) from model_tf to model_pt
weights, biases = model_tf.layers[0].get_weights()
model_pt.fc1.weight = nn.Parameter(torch.from_numpy(np.transpose(weights, (1, 0))))
model_pt.fc1.bias = nn.Parameter(torch.from_numpy(biases))

# Transfer weights for the second dense layer (fc2) from model_tf to model_pt
weights, biases = model_tf.layers[1].get_weights()
model_pt.fc2.weight = nn.Parameter(torch.from_numpy(np.transpose(weights, (1, 0))))
model_pt.fc2.bias = nn.Parameter(torch.from_numpy(biases))

In [34]:
# Select the image for TensorFlow and PyTorch
controlled_input_tf = test_images[36].reshape(1, 28*28)  # Reshape to (1, 784) for DNN
controlled_input_pt = torch.from_numpy(controlled_input_tf).float()

# Test TensorFlow Model
output_tf = model_tf.predict(controlled_input_tf) 
print("TensorFlow Basic Model Output:", output_tf)

# Test PyTorch Model
model_pt.eval()  # Set PyTorch model to evaluation mode
with torch.no_grad():
    output_pt = model_pt(controlled_input_pt)
print("PyTorch Basic Model Output:", torch.exp(output_pt).numpy())

TensorFlow Basic Model Output: [[0. 0. 0. 0. 0. 0. 0. 1. 0. 0.]]
PyTorch Basic Model Output: [[0. 0. 0. 0. 0. 0. 0. 1. 0. 0.]]


In [36]:
from torch.utils.data import DataLoader, TensorDataset

# Create TensorDataset for test data
test_dataset = TensorDataset(test_images_pt, test_labels_pt)

# Create a DataLoader for the test dataset
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

def evaluate_pytorch_model(model, test_loader):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in test_loader:
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy

# Evaluate the PyTorch model
accuracy = evaluate_pytorch_model(model_pt, test_loader)
print(f'Accuracy of the PyTorch model on the test images: {accuracy:.8f}%')


Accuracy of the PyTorch model on the test images: 96.47000000%


In [43]:
def get_predictions_tf(model, test_images, batch_size=32):
    predictions = []
    for i in range(0, len(test_images), batch_size):
        batch = test_images[i:i+batch_size]
        pred = model.predict(batch)
        predictions.extend(np.argmax(pred, axis=1))
    return predictions

def get_predictions_pt(model, test_images, batch_size=32):
    model.eval()
    predictions = []
    with torch.no_grad():
        for i in range(0, len(test_images), batch_size):
            batch = test_images[i:i+batch_size]
            pred = model(batch)
            predictions.extend(torch.argmax(pred, axis=1).tolist())
    return predictions

In [44]:
# Generate predictions
predictions_tf = get_predictions_tf(model_tf, test_images_tf)
predictions_pt = get_predictions_pt(model_pt, test_images_pt)

# Compare predictions
mismatches = sum(p1 != p2 for p1, p2 in zip(predictions_tf, predictions_pt))
print(f"Number of mismatches: {mismatches} out of {len(test_images_tf)} samples")


Number of mismatches: 0 out of 10000 samples


### Test on Orion

Apply Quantization Aware Training (QAT), which requires retraining.. 

> Concretely QAT is a method where the quantization error is emulated during the training phase itself. In this process, the weights and activations of the model are quantized, and this information is used during both the forward and backward passes of training. This allows the model to learn and adapt to the quantization error. It ensures that once the model is fully quantized post-training, it has already accounted for the effects of quantization, resulting in improved accuracy.

If new tutorial of Post-Training Quantization is released sometime... I will update the benchmark based on that

In [13]:
import tensorflow_model_optimization as tfmot

# Apply quantization to the layers
quantize_model = tfmot.quantization.keras.quantize_model

q_aware_model = quantize_model(model)

# 'quantize_model' requires a recompile
q_aware_model.compile(optimizer='adam',
                      loss='sparse_categorical_crossentropy',
                      metrics=['accuracy'])

q_aware_model.summary()


Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 quantize_layer_2 (Quantize  (None, 784)               3         
 Layer)                                                          
                                                                 
 quant_dense_6 (QuantizeWra  (None, 56)                43965     
 pperV2)                                                         
                                                                 
 quant_dense_7 (QuantizeWra  (None, 10)                575       
 pperV2)                                                         
                                                                 
Total params: 44543 (174.00 KB)
Trainable params: 44530 (173.95 KB)
Non-trainable params: 13 (52.00 Byte)
_________________________________________________________________


In [41]:
batch_size = 256
epochs = 3
history = q_aware_model.fit(train_images, train_labels,
                            epochs=epochs,
                            validation_split=0.2)

Epoch 1/3
Epoch 2/3
Epoch 3/3


In [42]:
scores, acc = q_aware_model.evaluate(test_images, test_labels, verbose=0)
print('Test loss:', scores)
print('Test accuracy:', acc)

Test loss: 0.9716224670410156
Test accuracy: 0.9459999799728394


### Prepare quantized test images

In [36]:
# Resize and Normalize x_test_image from float64 to int8
x_test_image_norm = (test_images / 255.0 * 255 - 128).astype(np.int8)
x_train_image_norm = (train_images / 255.0 * 255 - 128).astype(np.int8)

### Convert to TFLite Format

In [45]:
import tensorflow as tf

# Create a converter
converter = tf.lite.TFLiteConverter.from_keras_model(q_aware_model)

# Indicate that you want to perform default optimizations,
# which include quantization
converter.optimizations = [tf.lite.Optimize.DEFAULT]

# Define a generator function that provides your test data's numpy arrays
def representative_data_gen():
  for i in range(500):
    yield [np.array(train_images[i:i+1], dtype=np.float32)]

# Use the generator function to guide the quantization process
converter.representative_dataset = representative_data_gen

# Ensure that if any ops can't be quantized, the converter throws an error
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]

# Set the input and output tensors to int8
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

# Convert the model
tflite_model = converter.convert()

# Save the model to disk
open("q_aware_model.tflite", "wb").write(tflite_model)

INFO:tensorflow:Assets written to: /tmp/tmpxps2qz41/assets


INFO:tensorflow:Assets written to: /tmp/tmpxps2qz41/assets
2024-01-27 23:30:26.946295: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:378] Ignored output_format.
2024-01-27 23:30:26.946316: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:381] Ignored drop_control_dependency.
2024-01-27 23:30:26.946456: I tensorflow/cc/saved_model/reader.cc:83] Reading SavedModel from: /tmp/tmpxps2qz41
2024-01-27 23:30:26.947399: I tensorflow/cc/saved_model/reader.cc:51] Reading meta graph with tags { serve }
2024-01-27 23:30:26.947409: I tensorflow/cc/saved_model/reader.cc:146] Reading SavedModel debug info (if present) from: /tmp/tmpxps2qz41
2024-01-27 23:30:26.950084: I tensorflow/cc/saved_model/loader.cc:233] Restoring SavedModel bundle.
2024-01-27 23:30:26.988799: I tensorflow/cc/saved_model/loader.cc:217] Running initialization op on SavedModel bundle at path: /tmp/tmpxps2qz41
2024-01-27 23:30:26.998885: I tensorflow/cc/saved_model/loader.cc:316] SavedModel

46944

### Load and Test

In [60]:
# Load the TFLite model and allocate tensors.
interpreter = tf.lite.Interpreter(model_path="q_aware_model.tflite")
interpreter.allocate_tensors()

# Get input and output tensors.
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# The input needs to be quantized, so we retrieve the quantization parameters
input_scale, input_zero_point = input_details[0]['quantization']
output_scale, output_zero_point = output_details[0]['quantization']

# Normalize and quantize the test images
test_images_quant = (test_images / input_scale + input_zero_point).astype(np.int8)



In [61]:
# Evaluate the quantized TFLite model
correct_predictions = 0
for i in range(len(test_images)):
    test_image = np.expand_dims(test_images_quant[i], axis=0)
    
    # Set the value for the input tensor
    interpreter.set_tensor(input_details[0]['index'], test_image)
    
    # Run the inference
    interpreter.invoke()

    # Retrieve the output and dequantize
    output = interpreter.get_tensor(output_details[0]['index'])
    output = np.argmax(output, axis=1)
    predicted_class = output[0]
    if predicted_class == test_labels[i]:
        correct_predictions += 1

# Calculate the accuracy
accuracy = correct_predictions / len(test_images) * 100
print(f'Quantized model accuracy: {accuracy:.2f}%')

Quantized model accuracy: 93.89%


In [62]:
scores, acc = q_aware_model.evaluate(test_images, test_labels, verbose=0)
print('Test loss:', scores)
print('Test accuracy:', acc)

Test loss: 0.9716224670410156
Test accuracy: 0.9459999799728394


In [63]:
# Evaluate the model
test_loss, test_acc = model.evaluate(test_images, test_labels, verbose=2)
print('\nTest accuracy:', test_acc)

313/313 - 0s - loss: 0.1168 - accuracy: 0.9650 - 216ms/epoch - 689us/step

Test accuracy: 0.9649999737739563


### Convert to Cairo

In [65]:
# Load the TFLite model and allocate tensors.
interpreter = tf.lite.Interpreter(model_path="./mnist_nn/q_aware_model.tflite")
interpreter.allocate_tensors()

In [67]:
# Create an object with all tensors 
#(an input + all weights and biases)
tensors = {
    "input": test_image[0].flatten(),
    "fc1_weights": interpreter.get_tensor(1), 
    "fc1_bias": interpreter.get_tensor(2), 
    "fc2_weights": interpreter.get_tensor(4), 
    "fc2_bias": interpreter.get_tensor(5)
}

In [68]:
import os
# Create the directory if it doesn't exist
os.makedirs('./mnist_nn/src/generated', exist_ok=True)

for tensor_name, tensor in tensors.items():
    with open(os.path.join('./mnist_nn/src', 'generated', f"{tensor_name}.cairo"), "w") as f:
        f.write(
            "use core::array::ArrayTrait;\n" +
            "use orion::operators::tensor::{TensorTrait, Tensor, I32Tensor};\n" +
            "use orion::numbers::i32;\n\n" +
            "\nfn {0}() -> Tensor<i32> ".format(tensor_name) + "{\n" +
            "    let mut shape = ArrayTrait::<usize>::new();\n"
        )
        for dim in tensor.shape:
            f.write("    shape.append({0});\n".format(dim))
        f.write(
            "    let mut data = ArrayTrait::<i32>::new();\n"
        )
        for val in np.nditer(tensor.flatten()):
            f.write("    data.append(i32 {{ mag: {0}, sign: {1} }});\n".format(abs(int(val)), str(val < 0).lower()))
        f.write(
            "    TensorTrait::new(shape.span(), data.span())\n" +
            "}\n"
        )
      
with open(os.path.join('./mnist_nn/src', 'generated.cairo'), 'w') as f:
    for param_name in tensors.keys():
        f.write(f"mod {param_name};\n")


### Test on OPML