In [None]:
# cpu_fpga_pipeline.py
import numpy as np
from pynq import Overlay, allocate
from time import perf_counter, sleep
from PIL import Image
import tensorflow as tf

# Dimensions
IMG_H, IMG_W, IMG_C = 28, 28, 1
OUT_H, OUT_W, OUT_C = 13, 13, 32
FEAT_SIZE = OUT_H * OUT_W * OUT_C

# 1) Load overlay and IP
ol = Overlay('asl_system.bit', download=True)
asl_conv1 = ol.asl_conv1_0

print("Loaded overlay with IPs:", ol.ip_dict.keys())

# 2) Load TFLite tail model (Conv2 + FC layers)
interpreter = tf.lite.Interpreter(model_path='asl_tail_conv2_fc.tflite')
interpreter.allocate_tensors()
in_details  = interpreter.get_input_details()
out_details = interpreter.get_output_details()

print("TFLite tail input shape:", in_details[0]['shape'])
print("TFLite tail output shape:", out_details[0]['shape'])

# 3) Load test set images and labels (28x28 grayscale, normalized)
x_test = np.load('asl_test_images.npy').astype(np.float32) / 255.0
y_test = np.load('asl_test_labels.npy').astype(np.int32)
num_test = x_test.shape[0]

# 4) Helper: run Conv1 on FPGA for a single image (28x28x1)
def conv1_fpga(img_28x28):
    """img_28x28: numpy array, shape (28,28,1), float32 in [0,1]."""
    inp  = allocate(shape=(IMG_H, IMG_W, IMG_C), dtype=np.float32)
    outp = allocate(shape=(OUT_H, OUT_W, OUT_C), dtype=np.float32)

    inp[:] = img_28x28
    inp.flush()

    reg = asl_conv1.register_map
    reg.in_r_1  = inp.device_address
    reg.out_r_1 = outp.device_address
    reg.CTRL.AP_START = 1

    # Simple poll loop (no timeout here; assumed working)
    while reg.CTRL.AP_DONE == 0:
        pass

    outp.invalidate()
    feat_hw = np.array(outp)          # shape (13,13,32)
    return feat_hw

# 5) Loop over test set: accuracy and latency
correct = 0
latencies = []

for i in range(num_test):
    img = x_test[i]                         # (28,28,1)
    label = int(y_test[i])

    t0 = perf_counter()
    # Conv1 on FPGA
    feat_hw = conv1_fpga(img)
    # Flatten feature map for tail model
    feat_vec = feat_hw.reshape(1, FEAT_SIZE).astype(np.float32)

    # Tail inference on CPU using TFLite
    interpreter.set_tensor(in_details[0]['index'], feat_vec)
    interpreter.invoke()
    logits = interpreter.get_tensor(out_details[0]['index'])[0]  # (25,)
    pred = int(np.argmax(logits))
    t1 = perf_counter()

    latencies.append((t1 - t0) * 1000.0)  # ms
    if pred == label:
        correct += 1

    if (i+1) % 100 == 0:
        print(f"Processed {i+1}/{num_test} samples")

latencies = np.array(latencies, dtype=np.float32)
acc = (correct / num_test) * 100.0
mean_ms = float(latencies.mean())
std_ms  = float(latencies.std(ddof=1))
fps     = 1000.0 / mean_ms

print(f"CPU+FPGA accuracy: {acc:.2f}%")
print(f"CPU+FPGA latency: mean={mean_ms:.3f} ms, std={std_ms:.3f} ms, FPS={fps:.2f}")

np.save('fpga_latency_ms.npy', latencies)
