In [None]:
# Import necessary libraries
from pynq import Overlay, allocate, MMIO
import time
import os
import numpy as np
import struct
import json
from PIL import Image
import shutil
import requests
from flask import Flask, request, abort

# Load the overlay
overlay = Overlay('dpu.bit')
dma = overlay.axi_dma_0

In [2]:
# dma.register_map
input_size = 784
output_size = 1

# Allocate image and predict buffers
input_buffer = allocate(shape=(input_size,), dtype=np.uint8)
output_buffer = allocate(shape=(output_size,), dtype=np.uint8)

# Initialize latch to track if initialization is complete
init_latch = False

In [3]:
def read_json(model_info_path):
    conv1_list = []
    conv2_list = []
    fc1_list = []
    fc2_list = []
    scales = []
    labels_mapping = []
    
    with open(model_info_path, 'r') as file:
        data = json.load(file)
        
    # CONV1     
    for channel in range(len(data['conv1']['weights'])):
        flatten_list = []
        kernel = data['conv1']['weights'][channel][0]
        bias = split_32bit_to_signed_8bit(data['conv1']['biases'][channel])
        for i in range(5):
            for j in range(5):
                flatten_list.append(kernel[i][j])
        flatten_list.extend(bias)
        conv1_list.extend(flatten_list)
        
    # CONV2
    for channel in range(len(data['conv2']['weights'])):
        flatten_list_2 = []
        for channel_input in range(len(data['conv2']['weights'][0])):
            flatten_list =[]
            kernel = data['conv2']['weights'][channel][channel_input]
            for i in range(5):
                for j in range(5):
                    flatten_list.append(kernel[i][j])
            flatten_list_2.extend(flatten_list)
        bias = list(split_32bit_to_signed_8bit(data['conv2']['biases'][channel]))
        flatten_list_2.extend(bias)
        conv2_list.extend(flatten_list_2)
        
    # FC1    
    for channel in range(len(data['fc1']['weights'])):
        vector = []
        vector = data['fc1']['weights'][channel]
        bias = list(split_32bit_to_signed_8bit(data['fc1']['biases'][channel]))
        vector.extend(bias)
        fc1_list.extend(vector)
        
    # FC2
    for channel in range(len(data['fc2']['weights'])):
        vector = []
        vector = data['fc2']['weights'][channel]
        bias = list(split_32bit_to_signed_8bit(data['fc2']['biases'][channel]))
        vector.extend(bias)
        fc2_list.extend(vector)  
     
    # effective scale / output scale
    for scale in data['Final_Scales']:
        scale_vector = []
        scale_vector = split_32bit_to_signed_8bit(data['Final_Scales'][scale])
        scales.extend(scale_vector)    

    # Input scale and input zero point
    input_scale = data['quant']['scale']
    input_zero_point = data['quant']['zero_point']
    output_zero_point = data['fc2']['layer_zero_point']
    
    scales.append(input_zero_point) 
    scales.append(output_zero_point) 

    List = []
    List.append(np.array(conv1_list,dtype = np.int8))
    List.append(np.array(conv2_list,dtype = np.int8))
    List.append(np.array(fc1_list,dtype = np.int8))
    List.append(np.array(fc2_list,dtype = np.int8))
    List.append(np.array(scales,dtype = np.int8))

    labels_mapping = data['label_mapping']

    return List, input_scale, input_zero_point, labels_mapping

def split_32bit_to_signed_8bit(number):
    # Ensure number is a signed 32-bit integer
    if number & (1 << 31):  # Check if the sign bit is set
        number -= 1 << 32   # Apply two's complement to get the negative value

    # Write it as binary
    binary_representation = format(number & 0xFFFFFFFF, '032b')  # Pad and keep only the least-significant 32 bits

    # Split every 8 bits to 4 8-bit numbers
    byte1 = int(binary_representation[0:8], 2)
    byte2 = int(binary_representation[8:16], 2)
    byte3 = int(binary_representation[16:24], 2)
    byte4 = int(binary_representation[24:32], 2)

    # Convert those numbers as signed 8-bit integers
    byte1 = byte1 - 256 if byte1 > 127 else byte1
    byte2 = byte2 - 256 if byte2 > 127 else byte2
    byte3 = byte3 - 256 if byte3 > 127 else byte3
    byte4 = byte4 - 256 if byte4 > 127 else byte4

    # Return the 4 8-bit signed numbers
    return byte1, byte2, byte3, byte4

def init(lists):
    global init_latch  # Declare the global variable
    
    # Access the custom IP's memory-mapped register
    ps_signal = MMIO(0x41200000, 0x1000)
    # Start hardware initialization
    ps_signal.write(0x00, 0x1)  # Write to control register to start    
    
    send_start_time = time.time()  # Record start time
    for layer in range(len(lists)):
        
        init_buffer = allocate(shape=(len(lists[layer]),), dtype=np.int8)
        init_buffer[:] = lists[layer]

        init_buffer.flush()  # Ensure data is flushed to physical memory

        # Start DMA transfer and time the operation
        dma.sendchannel.transfer(init_buffer)  # Initiate the DMA send
#         print("test 1: Sending started")

        # Wait for DMA transfers to complete
        dma.sendchannel.wait()  # Block until send is complete
#         print("test 2: Sending completed")
        
    send_end_time = time.time()  # Record end time
    send_duration = send_end_time - send_start_time
    print(f"Send duration: {send_duration:.6f} seconds")
    
    ps_signal.write(0x00, 0x0)  # Reset the control register
    # Set initialization latch
    init_latch = True

In [None]:
# Call the initialization function
if init_latch == False:
    model_info_path = "model_info.json"
    list, input_scale, input_zero_point, label_mapping = read_json(model_info_path)
    init(list)

In [5]:
def transform(image, scale=input_scale, zero_point=input_zero_point):
    # Load the image and convert to grayscale
    img = image.convert("L")  # Convert to grayscale
    # Resize the image using bilinear interpolation (matches PyTorch default)
    img = img.resize((28, 28), resample=Image.BILINEAR)
    # Convert image to NumPy array with values in [0, 1] (matches ToTensor)
    img_array = np.array(img, dtype=np.float32) / 255.0
    # Normalize to [-1, 1] (matches Normalize((0.5,), (0.5,)))
    img_normalized = (img_array - 0.5) / 0.5
    # Flatten the normalized array
    img_flattened = img_normalized.flatten()
    # Quantize the flattened array (matches PyTorch quantization logic)
    img_quantized = np.clip(np.round(img_flattened / scale + zero_point), 0, 255).astype(np.uint8)
    
    return img_quantized

def predict_testing(image, label):

    input_buffer[:] = image
    
    input_buffer.flush()
    
    # Start DMA transfer and time the send operation
    dma.sendchannel.transfer(input_buffer)  # Send data
    # Time the receive operation
    dma.recvchannel.transfer(output_buffer)  # Receive data
    
    start_time = time.time()
    # Wait for DMA transfers to complete and record end times
    dma.sendchannel.wait()  # Wait for send to complete
    
    dma.recvchannel.wait()  # Wait for receive to complete
    end_time = time.time()
    
    # Invalidate the output buffer to get the latest data
    output_buffer.invalidate()

    relu_output_fpga = output_buffer[0]
    final_predict = label_mapping[str(relu_output_fpga)]
#     print(f'final predict: {final_predict}')

#     final_predict = np.argmax(output_buffer)
#     final_predict = label_mapping[str(final_predict)]
     
    run_time = end_time - start_time
    
    if final_predict == label:
        return True, run_time
#     print(final_predict, label)
    return False, run_time

def predict(image):

    input_buffer[:] = image
    
    input_buffer.flush()
    
    # Start DMA transfer and time the send operation
    dma.sendchannel.transfer(input_buffer)  # Send data
    # Time the receive operation
    dma.recvchannel.transfer(output_buffer)  # Receive data
    
    start_time = time.time()
    # Wait for DMA transfers to complete and record end times
    dma.sendchannel.wait()  # Wait for send to complete
    
    dma.recvchannel.wait()  # Wait for receive to complete
    end_time = time.time()
    
    # Invalidate the output buffer to get the latest data
    output_buffer.invalidate()

    relu_output_fpga = output_buffer[0]
    final_predict = label_mapping[str(relu_output_fpga)]
#     print(f'final predict: {final_predict}')

#     final_predict = np.argmax(output_buffer)
#     final_predict = label_mapping[str(final_predict)]
     
    run_time = end_time - start_time
    
#     print(final_predict)
#     return run_time
    return final_predict

In [6]:
def load_png_images(folder_path):
    images = []

    # List files only in this folder, sort them, and pick *.png
    for file_name in sorted(os.listdir(folder_path)):
        if file_name.lower().endswith(".png"):
            img_path = os.path.join(folder_path, file_name)
            img = Image.open(img_path)
            images.append(img)
            os.remove(img_path)
            
    return images

In [7]:
# import matplotlib.pyplot as plt

folder_path = "received_images"

# plt.imshow(images[0], cmap='gray')  # Use the 'gray' colormap
# plt.axis('off')
# plt.show()

# img_quant = transform(images[0])
# print(predict(img_quant))


def predict_sequence(folder_path):
    images = load_png_images(folder_path)
    predictions = []

    for img in images:
        img_quant = transform(img)      
        pred      = predict(img_quant)  
        predictions.append(str(pred))

    return " ".join(predictions)

In [None]:
UPLOAD_DIR = "/home/xilinx/jupyter_notebooks/project/FinalProject/received_images"
TARGET_URL = "http://<IP>:5000/receive"
os.makedirs(UPLOAD_DIR, exist_ok=True)

app = Flask(__name__)

def _save_one(file_storage, idx):
    filename = file_storage.filename or f"unnamed_{idx}.png"
    save_path = os.path.join(UPLOAD_DIR, filename)
    file_storage.save(save_path)
    print(f"[SERVER] âœ… received '{filename}'")
    return save_path

@app.route("/upload", methods=["POST"])
def upload():
    files = request.files.getlist("file") 
    if not files:
        abort(400, "No file part called 'file'")

    shutil.rmtree(UPLOAD_DIR, ignore_errors=True)
    os.makedirs(UPLOAD_DIR, exist_ok=True)

    for idx, f in enumerate(files, 1):
        _save_one(f, idx)

    expr_str = predict_sequence(UPLOAD_DIR)
    print("[SERVER] predicted:", expr_str)

    try:
        r = requests.post(TARGET_URL, json={"text": expr_str}, timeout=5)
        r.raise_for_status()
        remote_reply = r.json()   
    except Exception as exc:
        remote_reply = {"error": str(exc)}

    return remote_reply, 200


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

In [None]:
# # import matplotlib.pyplot as plt

# def load_images_and_labels(root_directory):
#     """
#     Loads all images from each subfolder inside `root_directory`.
#     Returns a list of tuples: (PIL.Image object, label).
#     """
#     data = []
#     # Loop over every subfolder (each subfolder is a label name)
#     for label_name in os.listdir(root_directory):
#         label_folder_path = os.path.join(root_directory, label_name)

#         # Only proceed if it's actually a directory
#         if os.path.isdir(label_folder_path):
#             # For each file in the label folder
#             for file_name in os.listdir(label_folder_path):
#                 # Check if it's an image by extension
#                 if file_name.lower().endswith(('.jpg', '.jpeg', '.png')):
#                     img_path = os.path.join(label_folder_path, file_name)
                    
#                     # Load the image (using Pillow)
#                     image = Image.open(img_path)
                    
#                     # Store it along with its label
#                     data.append((image, label_name))
                    
#     return data

# root_directory = "dataset/image_testing"
# images_and_labels = []
# if images_and_labels == []:
#     images_and_labels = load_images_and_labels(root_directory)
# # # img, label = images_and_labels[736]

# # # plt.imshow(img, cmap='gray')  # Use the 'gray' colormap
# # # plt.title(f"Label: {label}")
# # # plt.axis('off')
# # # plt.show()

# N = len(images_and_labels)
# correct_count  = 0
# total_time = 0.0

# for idx, (img, label) in enumerate(images_and_labels):
#     img_quant = transform(img)
# #     print(f'label: {label}')
#     last_predict, run_time = predict(img_quant, label)
    
#     if last_predict == True:
#         correct_count  += 1
        
#     total_time += run_time
    
# avg_time_per_inference_s = total_time / N
# print(f"Average time per inference: {avg_time_per_inference_s * 1000:.3f} ms")
# print(f"Accuracy: {correct_count/N * 100:.2f}%")

# # # img_quant = transform(img)
# # # last_predict, _ = predict(img_quant, label)
# # # print(f"Predict: {last_predict}")

In [None]:
# def predict_debugging(image):
    
#     input_buffer[:] = image
    
#     input_buffer.flush()
    
#     # Start DMA transfer and time the send operation
#     dma.sendchannel.transfer(input_buffer)  # Send data
#     # Time the receive operation
#     dma.recvchannel.transfer(output_buffer)  # Receive data
    
#     # Wait for DMA transfers to complete and record end times
#     dma.sendchannel.wait()  # Wait for send to complete
#     dma.recvchannel.wait()  # Wait for receive to complete
    
#     # Invalidate the output buffer to get the latest data
#     output_buffer.invalidate()

#     return output_buffer

# # image_path = "Untitled.png"
# image_path = "test.png"
# img = Image.open(image_path)
# img_quant = transform(img)
# label = "X"

# relu_output_fpga = predict_debugging(img_quant)

# relu_output_pytorch = np.array([[0, 34, 65, 46, 18, 18, 30, 49, 31, 0],
# [0, 24, 44, 44, 38, 29, 25, 22, 17, 0],
# [0, 0, 0, 1, 31, 16, 2, 3, 15, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
# relu_output_pytorch = relu_output_pytorch.flatten()

# # Calculate the difference
# difference = relu_output_pytorch - relu_output_fpga
# # # Check if matrices are equal
# if np.array_equal(relu_output_pytorch, relu_output_fpga):
#     print("The input and output matrices are identical.")
# else:
#     print("The input and output matrices are not identical.")
#     for i in range(len(relu_output_fpga)):  # Iterate over all elements
#         real = relu_output_pytorch[i]
#         fpga = relu_output_fpga[i]
#         diff = abs(difference[i])  # Absolute difference
#         if real != 0:
#             percent_error = (diff / abs(real)) * 100
#         else:
#             percent_error = 0 if diff == 0 else float('inf')  # Define behavior for zero real value
#         print(f'fpga output: {fpga}, real output: {real}, diff: {percent_error:.2f}% : index {i}')