## Training MNIST Neural Network Model

In [5]:
!!pip install tensorflow-model-optimization

['Collecting tensorflow-model-optimization',
 '  Downloading tensorflow_model_optimization-0.7.5-py2.py3-none-any.whl (241 kB)',
 '\x1b[?25l     \x1b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[32m0.0/241.2 kB\x1b[0m \x1b[31m?\x1b[0m eta \x1b[36m-:--:--\x1b[0m',
 '\x1b[2K     \x1b[91m━━━━━━\x1b[0m\x1b[91m╸\x1b[0m\x1b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[32m41.0/241.2 kB\x1b[0m \x1b[31m1.2 MB/s\x1b[0m eta \x1b[36m0:00:01\x1b[0m',
 '\x1b[2K     \x1b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[32m241.2/241.2 kB\x1b[0m \x1b[31m3.7 MB/s\x1b[0m eta \x1b[36m0:00:00\x1b[0m',
 'Installing collected packages: tensorflow-model-optimization',
 'Successfully installed tensorflow-model-optimization-0.7.5']

In [1]:
from tensorflow import keras
from keras.datasets import mnist
from scipy.ndimage import zoom
import numpy as np

(x_train, y_train), (x_test, y_test) = mnist.load_data()

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [2]:
# Resizing function
def resize_images(images):
    return np.array([zoom(image, 0.5) for image in images])

# Resize
x_train = resize_images(x_train)
x_test = resize_images(x_test)

# Then reshape
x_train = x_train.reshape(60000, 14*14)
x_test = x_test.reshape(10000, 14*14)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')

# normalize to range [0, 1]
x_train /= 255
x_test /= 255

In [3]:
from tensorflow.keras import layers

num_classes = 10

model = keras.Sequential([
    keras.layers.InputLayer(input_shape=(14*14,)),
    keras.layers.Dense(10, activation='relu'),
    keras.layers.Dense(10, activation='softmax')
])

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])


In [4]:
batch_size = 256
epochs = 10
history = model.fit(x_train, y_train,
                    epochs=epochs,
                    validation_split=0.2)


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


### Quantize Model Paramaters

In [6]:
import tensorflow_model_optimization as tfmot

# Apply quantization to the layers
quantize_model = tfmot.quantization.keras.quantize_model

q_aware_model = quantize_model(model)

# 'quantize_model' requires a recompile
q_aware_model.compile(optimizer='adam',
                      loss='sparse_categorical_crossentropy',
                      metrics=['accuracy'])

q_aware_model.summary()


Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 quantize_layer (QuantizeLa  (None, 196)               3         
 yer)                                                            
                                                                 
 quant_dense (QuantizeWrapp  (None, 10)                1975      
 erV2)                                                           
                                                                 
 quant_dense_1 (QuantizeWra  (None, 10)                115       
 pperV2)                                                         
                                                                 
Total params: 2093 (8.18 KB)
Trainable params: 2080 (8.12 KB)
Non-trainable params: 13 (52.00 Byte)
_________________________________________________________________


In [7]:
batch_size = 256
epochs = 10
history = q_aware_model.fit(x_train, y_train,
                            epochs=epochs,
                            validation_split=0.2)

scores, acc = q_aware_model.evaluate(x_test, y_test, verbose=0)
print('Test loss:', scores)
print('Test accuracy:', acc)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Test loss: 0.25907444953918457
Test accuracy: 0.929099977016449


In [8]:
import tensorflow as tf

# Create a converter
converter = tf.lite.TFLiteConverter.from_keras_model(q_aware_model)

converter_model = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter_model.convert()
open("model.tflite", "wb").write(tflite_model)

# Indicate that you want to perform default optimizations,
# which include quantization
converter.optimizations = [tf.lite.Optimize.DEFAULT]

# Define a generator function that provides your test data's numpy arrays
def representative_data_gen():
  for i in range(500):
    yield [x_test[i:i+1]]

# Use the generator function to guide the quantization process
converter.representative_dataset = representative_data_gen

# Ensure that if any ops can't be quantized, the converter throws an error
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]

# Set the input and output tensors to int8
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

# Convert the model
tflite_model = converter.convert()

# Save the model to disk
open("q_aware_model.tflite", "wb").write(tflite_model)



4304

In [9]:
# Load the TFLite model and allocate tensors.
interpreter = tf.lite.Interpreter(model_path="q_aware_model.tflite")
interpreter.allocate_tensors()

In [10]:
# Get input and output tensors.
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# Normalize the input value to int8
input_shape = input_details[0]['shape']
input_data = np.array(x_test[0:1], dtype=np.int8)
interpreter.set_tensor(input_details[0]['index'], input_data)

# Perform the inference
interpreter.invoke()

# Get the result
output_data = interpreter.get_tensor(output_details[0]['index'])
print(output_data)

[[-128 -128 -104  104 -128 -128 -128 -128 -128 -128]]


In [11]:
(_, _), (x_test_image, y_test_label) = mnist.load_data()

# Resize and Normalize x_test_image to int8
x_test_image = resize_images(x_test_image)
x_test_image_norm = (x_test_image / 255.0 * 255 - 128).astype(np.int8)

# Initialize an array to store the predictions
predictions = []

# Iterate over the test data and make predictions
for i in range(len(x_test_image_norm)):
    test_image = np.expand_dims(x_test_image_norm[i].flatten(), axis=0)

    # Set the value for the input tensor
    interpreter.set_tensor(input_details[0]['index'], test_image)

    # Run the inference
    interpreter.invoke()

    output = interpreter.get_tensor(output_details[0]['index'])
    predictions.append(output)

### Generate Leo Neural Network

In [42]:
# Create an object with all tensors
#(an input + all weights and biases)
tensors = {
    "input": x_test_image[8].flatten(),
    "fc1_weights": interpreter.get_tensor(1),
    "fc1_bias": interpreter.get_tensor(2),
    "fc2_weights": interpreter.get_tensor(4),
    "fc2_bias": interpreter.get_tensor(5)
}

In [54]:

import math

def nn2leo_converter(tensors):
    neurons_per_layer = [196, 10, 10] # specifies NN architecture
    scaling_factor = 0 # specifies scaling factor for fixed point numbers
    integer_type = "u32" # specifies used integer type

    inputs = tensors['input']

    w1 = np.ones(tensors['input'].shape,dtype=int).reshape(1,-1)
    b1 = np.zeros(tensors['input'].shape, dtype=int)
    w2 = tensors['fc1_weights']
    b2 = tensors['fc1_bias']
    w3 = tensors['fc2_weights']
    b3 = tensors['fc2_bias']

    if(len(neurons_per_layer) < 2 or min(neurons_per_layer) < 1):
        print("error, invalid input")

    str_list_main = []
    str_list_inputs = []

    str_main="program nn.aleo {\ntransition main("

    str_inputs = ""

    str_list_inputs.append("[main]\n")

    # for i in range(neurons_per_layer[0]):
    #     str_main += "w0" + str(i)+": " + integer_type + ", b0" + str(i) + ": " + integer_type + ", "
    #     str_inputs += "w0" + str(i) + ": " + integer_type + " = 0"+integer_type+";\n"
    #     str_inputs += "b0" + str(i) + ": " + integer_type + " = 0"+integer_type+";\n"

    # str_list_inputs.append(str_inputs)
    # str_inputs = ""

    # for i in range(1, len(neurons_per_layer)): # current layer
    #     for j in range(neurons_per_layer[i-1]): # neuron of previous layer
    #         for k in range(neurons_per_layer[i]): # neuron of current layer
    #             str_main += "w" + str(i) + str(j) + str(k) + ": " + integer_type + ", "
    #             str_inputs += "w" + str(i) + str(j) + str(k) + ": " + integer_type + " = 0"+integer_type+";\n"
    #         str_main += "b" + str(i) + str(j) + ": " + integer_type + ", "
    #         str_inputs += "b" + str(i) + str(j) + ": " + integer_type + " = 0"+integer_type+";\n"

    for i in range(math.ceil(neurons_per_layer[0]/32)):
        if i + 1 != math.ceil(neurons_per_layer[0]/32):
            str_main += "inputs"+str(i+1)+": [" + integer_type + "; 32], "
            str_inputs += "inputs"+str(i+1)+": " + integer_type + " = 0"+integer_type+";\n"
        else:
            str_main += "inputs"+str(i+1)+": [" + integer_type + "; "+str(neurons_per_layer[0]%32)+"], "

    str_main = str_main[:-2]
    str_list_inputs.append(str_inputs)

    str_inputs = "[registers]\n"

    str_main += ") -> [" + integer_type + "; " + str(neurons_per_layer[-1]) + "] {\n"

    str_list_main.append(str_main)

    line = ""

    # for i in range(neurons_per_layer[0]): # input layer
    #     line += "let neuron0"+str(i) + ": " + integer_type + " = w0_" + str(i) + " * input" + str(i) + " / " + str(2**scaling_factor)+ integer_type + " + b0_" + str(i) + ";\n"

    for i in range(neurons_per_layer[0]): # input layer
        line += "let neuron0"+str(i) + ": " + integer_type + " = inputs" + str((i)//32+1) + f"[{i%32}{integer_type}];\n"



    for layer in range(1, len(neurons_per_layer)): # other layers
        for i in range(neurons_per_layer[layer]):
            line_start = "let neuron" + str(layer) + str(i) + ": " + integer_type + " = rectified_linear_activation("
            for j in range(neurons_per_layer[layer-1]):
                line_start += "neuron" + str(layer-1) + str(j) + " * w" + str(layer) +"_" + str(j) +"_" + str(i) + " / " + str(2**scaling_factor)+ integer_type + " + "

            line_start += "b" + str(layer) +"_" + str(i) + ");\n"
            line += line_start

    str_list_main.append(line)

    line = "return ["
    str_inputs += "r0: [" + integer_type + "; " + str(neurons_per_layer[-1]) + "] = ["
    for i in range(neurons_per_layer[-1]):
        line += "neuron" + str(len(neurons_per_layer)-1) + str(i) + ", "
    str_inputs += "0, "
    str_inputs = str_inputs[:-2] + "];\n"

    line = line[:-2]
    line += "];}\n\n"
    str_list_main.append(line)
    str_list_inputs.append(str_inputs)

    str_list_main.append("function rectified_linear_activation(x: "+str(integer_type)+") -> "+str(integer_type)+" {\n")
    str_list_main.append("let result: "+str(integer_type)+" = 0" + integer_type + ";\n")
    str_list_main.append("if x > 0" + integer_type + " {\n")
    str_list_main.append("result = x;\n")
    str_list_main.append("}\n")
    str_list_main.append("return result;\n")
    str_list_main.append("}\n")
    str_list_main.append("}")

    with open("main.leo", "w+") as file:
        file.writelines(str_list_main)

    with open("project.in", "w+") as file:
        file.writelines(str_list_inputs)

    with open("main.leo", "r") as file:
        aa = file.read()

    temp = "\n".join(aa.split("\n")[2:])

    weights = [w1, w2, w3]
    biases = [b1, b2, b3]

    for i in range(len(inputs)):
        temp = temp.replace(f"input{i}", str(inputs[i])+str(integer_type),1)

    for i in range(w1.shape[1]):
        temp = temp.replace(f"w0_{i}", str(w1[0][i])+str(integer_type), 1)


    for i in range(len(weights)):
        for j in range(weights[i].shape[0]):
            for k in range(weights[i].shape[1]):
                temp = temp.replace(f"w{i}_{k}_{j}", str(weights[i][j][k])+str(integer_type), 1)
                temp = temp.replace(f"w{i}_{k}_0", str(weights[i][0][k])+str(integer_type), 1)

    for i in range(len(biases)):
        for j in range(len(biases[i])):
            temp = temp.replace(f"b{i}_{j}", str(biases[i][j])+str(integer_type), 1)

    first_part = aa.split("\n")[:2]
    last_part = temp.split("\n")
    str_main_list = first_part + last_part
    str_main = "\n".join(str_main_list)
    ## Write to file
    with open("main-.leo", "w+") as file:
        file.write(str_main)
    create_input_file(integer_type, list(inputs))
    a = 5

# Define a function to chunk the input array in size of 32
def chunked_inputs(inputs, chunk_size=32):
    return [inputs[i:i + chunk_size] for i in range(0, len(inputs), chunk_size)]
def create_input_file(integer_type, inputs):
    # Define the input array
    # inputs = [1, 2]


    str_input_file = ""
    str_input_file += f"[main]\n"
    for idx, chunk in enumerate(chunked_inputs(inputs), start=1):
        inputs_str = ','.join(str(i) + str(integer_type) for i in map(str, chunk))  # Convert chunk to string
        str_input_file += f'inputs{idx}: [{integer_type}; {len(chunk)}] = [{inputs_str}];\n'


    # Create or open the file to write the inputs
    with open('nn.in', 'w') as file:
        # Write the [main] section header
        file.write(f"[main]\n")
        str_input_file + f"[main]\n"

        for idx, chunk in enumerate(chunked_inputs(inputs), start=1):
            inputs_str = ','.join(str(i) + str(integer_type) for i in map(str, chunk))  # Convert chunk to string
            file.write(f'inputs{idx}: [{integer_type}; {len(chunk)}] = [{inputs_str}];\n')
            str_input_file + f'inputs{idx}: [{integer_type}; {len(chunk)}] = [{inputs_str}];\n'

        # # Write the inputs array in the specified format
        # inputs_formatted = ','.join(str(i) + str(integer_type) for i in inputs)  # Format array as a comma-separated string
        # file.write(f'= [{inputs_formatted}];\n')
    print("The file with inputs has been created successfully.")
    return str_input_file

In [44]:
nn2leo_converter(tensors)

The file with inputs has been created successfully.


### Download MNIST Dataset

In [15]:
from keras.datasets import mnist
import os
from PIL import Image

# Function to save dataset images as jpeg files
def save_mnist_as_jpeg():
    # Load the dataset
    (train_images, train_labels), (test_images, test_labels) = mnist.load_data()

    # Create directories for training and test sets
    os.makedirs('mnist_data/train', exist_ok=True)
    os.makedirs('mnist_data/test', exist_ok=True)

    # Save training images
    for idx, image in enumerate(train_images):
        file_path = os.path.join('mnist_data/train', f'train_image_{idx}_{train_labels[idx]}.jpg')
        img = Image.fromarray(image)
        img.save(file_path, 'JPEG')

    # Save test images
    for idx, image in enumerate(test_images):
        file_path = os.path.join('mnist_data/test', f'test_image_{idx}_{test_labels[idx]}.jpg')
        img = Image.fromarray(image)
        img.save(file_path, 'JPEG')

    print("MNIST dataset has been saved as JPEG files.")

# Call the function
save_mnist_as_jpeg()

MNIST dataset has been saved as JPEG files.
