In [None]:
from pynq import Overlay
from pynq.lib.video import *
# from pynq import Clocks
# Clocks.fclk3_mhz
# Clocks.fclk3_mhz = 30.0

# 1. Load the bitstream
overlay = Overlay("/home/xilinx/5eng_2000.bit")

# Access your pixel_generator instance (assuming it's named 'pixel_generator' in your block design)
# If your IP is named something like 'my_pixel_gen_0', you'd use that.
# Check your block design's address map in Vivado to confirm the instance name.
# For demonstration, let's assume it's overlay.pixel_generator_0
# You might need to adjust this based on your Vivado block design.
# A common pattern is overlay.<instance_name>
# Given your comment pixgen.register_map.gp0 = 0x1234, it seems you already have a pixgen object.
# Let's assume it's like this:
pixgen = overlay.pixel_generator_0 # Adjust this line to match your actual IP instance name


In [None]:
# 2. Define your parameters (example values - adjust as needed)
# Ensure these match the Q-format / data types expected by your HDL
# For example, ZOOM might be an integer, but REAL_CENTER/IMAG_CENTER are fixed-point.
# You mentioned FRAC=28, so these are Q28 numbers.
# Example values for Mandelbrot:
def float_to_q4_28(fval):
    qval = int(round(fval * (1 << 28)))
    if qval < 0:
        qval = (1 << 32) + qval
    return qval

max_iter_log = 8
zoom = 0
re_c_q = float_to_q4_28(-0.75)
im_c_q = float_to_q4_28(0.1)

print(re_c_q)
print(im_c_q)
print(max_iter_log)


4093640704
26843546
8


In [None]:
# 3. Write parameters to the registers
# Assuming regfile[0] is MAX_ITER, regfile[2] is ZOOM, regfile[3] is REAL_CENTER, regfile[4] is IMAG_CENTER
max_iter = 2 ** max_iter_log
pixgen.register_map.gp0 = max_iter
pixgen.register_map.gp1 = max_iter_log
pixgen.register_map.gp2 = zoom
pixgen.register_map.gp3 = re_c_q   
pixgen.register_map.gp4 = im_c_q
              

print(f"Registers written: MAX_ITER={pixgen.register_map.gp0}, ZOOM={pixgen.register_map.gp2}, REAL_CENTER={pixgen.register_map.gp3}, IMAG_CENTER={pixgen.register_map.gp4}")

Registers written: MAX_ITER=0x100, ZOOM=0x0, REAL_CENTER=0xf4000000, IMAG_CENTER=0x199999a


In [None]:
imgen_vdma = overlay.video.axi_vdma_0.readchannel

In [None]:
# 4. Initialize and start VDMA read channel
videoMode = common.VideoMode(2000, 2000, 24) # Assuming 24-bit output (RGB888)
imgen_vdma.mode = videoMode
imgen_vdma.start()

In [None]:
frame = imgen_vdma.readframe()



KeyboardInterrupt: 

In [None]:
import time
num_iterations = 100  # Number of frames to capture
iteration_times = []  # To store the time for each iteration
start_time = time.perf_counter()
print(f"Starting benchmark for {num_iterations} frames...")

# for i in range(num_iterations):
#     print(f"\n--- Iteration {i+1}/{num_iterations} ---")

    # --- Start of benchmarking for this iteration ---
#     

frame = imgen_vdma.readframe() # Your actual function call here

    
    # --- End of benchmarking for this iteration ---

    
#     iteration_times.append(elapsed_time)

#     print(f"Frame {i+1} read")
    # You can optionally print information about the frame if needed
    # print(f"Frame data (first 20 chars): {str(frame)[:20]}...")
#     time.sleep(0.8)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
print("\n--- Benchmark Complete ---")

# Calculate the average time
if num_iterations: # Ensure there were iterations to avoid division by zero
    average_time = elapsed_time / num_iterations
    print(f"\nAverage time per frame ({num_iterations} iterations): {average_time:.6f} seconds")
else:
    print("No iterations were performed.")

Starting benchmark for 100 frames...

--- Benchmark Complete ---

Average time per frame (100 iterations): 0.000408 seconds


In [None]:
if num_iterations: # Ensure there were iterations to avoid division by zero
    average_time = elapsed_time / num_iterations
    print(f"\nAverage time per frame ({num_iterations} iterations): {average_time:.6f} seconds")
else:
    print("No iterations were performed.")


Average time per frame (100 iterations): 1.006347 seconds


In [None]:
# Flush frames
import time
for i in range(3):
     imgen_vdma.readframe()
     time.sleep(1)
runs = 10
run_time = 0
 # Run benchmark
for i in range(runs):
    t_start = time.perf_counter_ns()
    frame = imgen_vdma.readframe()
    t_end = time.perf_counter_ns()
    run_time += t_end - t_start
avg_time = run_time / runs
avg_time # In nanoseconds

KeyboardInterrupt: 

In [None]:
# 7. Process and display the image
import PIL.Image

image = PIL.Image.fromarray(frame)
image # Display the image in Jupyter

In [None]:
imgen_vdma.stop()

In [None]:
from pynq import PL
PL.reset()

In [None]:
# FPGA side - corrected code
import PIL.Image
import numpy as np
import asyncio
import websockets 

frame = imgen_vdma.readframe()

async def send_frame(frame):
    try:
        async with websockets.connect("ws://192.168.137.1:8002") as websocket:
            frame_bytes = frame.tobytes()
            await websocket.send(frame_bytes)
            print("sent frame?")
    except Exception as e:
        print(e)

await send_frame(frame) 

In [None]:
# Cell for receiving UI parameters
import asyncio
import websockets
import json
import numpy as np
import PIL
import time

pixgen = overlay.pixel_generator_0

async def handle_ui_parameters(websocket, path):
    try:
        async for message in websocket:
            try:
                params = json.loads(message)
                print(params)
                
                # Update FPGA registers with new parameters
                def float_to_q4_28(fval):
                    qval = int(round(fval * (1 << 28)))
                    if qval < 0:
                        qval = (1 << 32) + qval
                    return qval

                # max_iter_log = 8
                # zoom = 0
                re_c_q = float_to_q4_28(params.get('re_c', -0.5))
                im_c_q = float_to_q4_28(params.get('ie_c', -0.0))

                max_iter_log = int(params.get('max_iter', 1))  # Need power of 2 for this
                max_iter = 2 ** max_iter_log
                pixgen.register_map.gp0 = max_iter
                pixgen.register_map.gp1 = max_iter_log
                pixgen.register_map.gp2 = int(params.get('zoom', 1))            
                pixgen.register_map.gp3 = re_c_q   
                pixgen.register_map.gp4 = im_c_q
              
                print(pixgen.register_map)
                
                time.sleep(0.8)
                frame = imgen_vdma.readframe()
                
                send_frame(frame)
                
            except Exception as e:
                print(f"error: {e}")
                
    except websockets.exceptions.ConnectionClosed:
        print("disconnected ui")

# Start parameter server
async def start_param_server():
    async with websockets.serve(handle_ui_parameters, "0.0.0.0", 8080):
        await asyncio.Future()

# Run this once to start parameter server
task = asyncio.create_task(start_param_server())
print("✅ Server task created and running in background")
print(f"Task: {task}")