In [1]:
from pynq import Overlay, allocate
import numpy as np
import cv2
import time

In [2]:

# -------------------------------------------------------------------
# Configuration (must match HLS constants)
# -------------------------------------------------------------------
IMG_H = 28
IMG_W = 28
IN_CH = 4           # MUST be multiple of TILE_IN_CH=4 (pad grayscale→4ch)
OUT_CH = 16         # total desired output channels
K = 3

TILE_IN_CH  = 4     # hardcoded in HLS
TILE_OUT_CH = 4     # hardcoded in HLS (MUST MATCH yolo_conv_core.cpp line 34!)

OUT_H = IMG_H - 2   # 26  (valid conv, no padding)
OUT_W = IMG_W - 2   # 26

# -------------------------------------------------------------------
# Load overlay
# -------------------------------------------------------------------
print("Loading overlay...")
t0 = time.time()
ol = Overlay("design_1_wrapper.bit")
ip = ol.yolo_conv_core_0
dma0 = ol.dma_img  # send → fm_in,  recv → fm_out
dma1 = ol.dma_wgt # send → wgt_in (no recv channel)
print(f"Overlay loaded in {time.time()-t0:.3f}s")

# -------------------------------------------------------------------

Loading overlay...


Overlay loaded in 2.730s


In [3]:

# Load weights  [shape: (OUT_CH, IN_CH, K, K)]
# -------------------------------------------------------------------
print("Loading weights...")
t0 = time.time()
W = np.load("weights_int8.npy").astype(np.int8)
B = np.load("bias_int8.npy").astype(np.int32)

# Auto-pad weights if needed
if W.shape[1] < IN_CH:
    pad_ch = IN_CH - W.shape[1]
    W = np.pad(W, ((0, 0), (0, pad_ch), (0, 0), (0, 0)), mode='constant')
    print(f"Auto-padded weights: {W.shape[1]-pad_ch}ch → {W.shape[1]}ch")

print(f"Weight shape: {W.shape}, Bias shape: {B.shape} (loaded in {time.time()-t0:.3f}s)")
assert W.shape[1] == IN_CH, f"Weight IN_CH={W.shape[1]} but expected {IN_CH}"
assert W.shape[0] >= TILE_OUT_CH, f"Need at least {TILE_OUT_CH} output channels"

# -------------------------------------------------------------------
# Load and preprocess image
# -------------------------------------------------------------------
print("Loading image...")
t0 = time.time()
img = cv2.imread("digits_0.png", cv2.IMREAD_GRAYSCALE)
if img is None:
    raise FileNotFoundError("digits_0.png not found in working directory")
img = cv2.resize(img, (IMG_W, IMG_H))
print(f"Image loaded in {time.time()-t0:.3f}s")


Loading weights...
Auto-padded weights: 1ch → 4ch
Weight shape: (16, 4, 3, 3), Bias shape: (16,) (loaded in 0.181s)
Loading image...
Image loaded in 0.482s


In [4]:

# -------------------------------------------------------------------
# OPTIMIZATION 1: Pre-pack ALL weights at once (not per-tile)
# -------------------------------------------------------------------
def pack_all_weights(W_full, B_full):
    """Pack weights for all output channel tiles at once."""
    all_wgt_packed = []
    
    for oc_base in range(0, OUT_CH, TILE_OUT_CH):
        words = []
        
        # Pack weight kernel values
        for ic in range(IN_CH):
            for ky in range(K):
                for kx in range(K):
                    val = np.uint32(0)
                    for oc_off in range(TILE_OUT_CH):
                        oc = oc_base + oc_off
                        w8 = np.uint8(W_full[oc, ic, ky, kx].view(np.uint8))
                        val |= np.uint32(w8) << (8 * oc_off)
                    words.append(val)
        
        # Pack biases
        for oc_off in range(TILE_OUT_CH):
            oc = oc_base + oc_off
            words.append(np.uint32(B_full[oc].view(np.uint32) & 0xFFFFFFFF))
        
        all_wgt_packed.append(np.array(words, dtype=np.uint32))
    
    return all_wgt_packed


# -------------------------------------------------------------------
# OPTIMIZATION 2: Pack feature map once and reuse buffer
# -------------------------------------------------------------------
def pack_feature_map(img_gray, in_ch):
    """Pack feature map efficiently."""
    assert in_ch % TILE_IN_CH == 0
    n_ic_tiles = in_ch // TILE_IN_CH
    
    # Pre-allocate output array for speed
    total_words = IMG_H * IMG_W * n_ic_tiles
    words = np.zeros(total_words, dtype=np.uint32)
    
    idx = 0
    for y in range(IMG_H):
        for x in range(IMG_W):
            for ic_tile in range(n_ic_tiles):
                val = np.uint32(0)
                for ii in range(TILE_IN_CH):
                    ch_idx = ic_tile * TILE_IN_CH + ii
                    if ch_idx == 0:
                        pix = np.uint32(img_gray[y, x])
                    else:
                        pix = np.uint32(0)  # zero-pad channels 1-3
                    val |= pix << (8 * ii)
                words[idx] = val
                idx += 1
    
    return words


# -------------------------------------------------------------------
# OPTIMIZATION 3: Allocate DMA buffers ONCE, reuse for all tiles
# -------------------------------------------------------------------
print("Pre-packing data...")
t0 = time.time()

# Pack feature map once
fm_packed = pack_feature_map(img, IN_CH)
print(f"Feature map stream: {fm_packed.shape[0]} words ({fm_packed.nbytes} bytes)")

# Pre-pack all weight tiles
all_wgt_packed = pack_all_weights(W, B)
print(f"Weight tiles packed: {len(all_wgt_packed)} tiles")
print(f"Data packing completed in {time.time()-t0:.3f}s")

# Allocate DMA buffers ONCE (reuse for all tiles)
print("Allocating DMA buffers...")
t0 = time.time()
max_wgt_size = max(w.shape[0] for w in all_wgt_packed)
wgt_buf = allocate(shape=(max_wgt_size,), dtype=np.uint32)
fm_buf  = allocate(shape=fm_packed.shape, dtype=np.uint32)
out_buf = allocate(shape=(OUT_H * OUT_W,), dtype=np.uint32)

# Copy feature map once (it's constant across tiles)
fm_buf[:] = fm_packed
print(f"DMA buffers allocated in {time.time()-t0:.3f}s")


#---------------GPT#
def run_tile_optimized(oc_base, wgt_packed):
    """Run HLS accelerator with proper DMA state management."""
    
    # Copy weights into reusable buffer
    wgt_size = wgt_packed.shape[0]
    wgt_buf[:wgt_size] = wgt_packed
    out_buf[:] = 0
    
    # Configure IP registers
    ip.write(0x10, IMG_W)
    ip.write(0x18, IMG_H)
    ip.write(0x20, IN_CH)
    ip.write(0x28, 0)  # pool disabled
    ip.write(0x30, 1)  # leaky enabled
    
    # --- DIAGNOSTIC BLOCK ---
    print("\n" + "="*50)
    print(f"TILE DEBUG: oc_base={oc_base}")
    print("="*50)
    
    print("1. THE BUCKET (What dma0.recvchannel expects to receive):")
    print(f"   out_buf capacity : {out_buf.size} words ({out_buf.nbytes} bytes)")
    
    print("\n2. THE HOSE (What dma1.sendchannel pushes for weights):")
    print(f"   wgt_buf sending  : {wgt_size} words ({wgt_size * 4} bytes)")
    
    print("\n3. THE HOSE (What dma0.sendchannel pushes for feature maps):")
    print(f"   fm_buf sending   : {fm_buf.size} words ({fm_buf.nbytes} bytes)")
    print("="*50 + "\n")
    
    # --- COMMENTED OUT TO PREVENT HANGING ---
    # dma0.recvchannel.transfer(out_buf)
    # ip.write(0x00, 1)  # AP_START
    # dma1.sendchannel.transfer(wgt_buf[:wgt_size])
    # dma0.sendchannel.transfer(fm_buf)
    
    # dma1.sendchannel.wait()
    # dma0.sendchannel.wait()
    # dma0.recvchannel.wait()
    
    # Return dummy data just so the loop doesn't crash during the dry run
    return np.zeros((TILE_OUT_CH, OUT_H, OUT_W), dtype=np.uint8)

Pre-packing data...
Feature map stream: 784 words (3136 bytes)
Weight tiles packed: 4 tiles
Data packing completed in 0.389s
Allocating DMA buffers...
DMA buffers allocated in 0.008s


In [5]:

# -------------------------------------------------------------------
# OPTIMIZATION 4: Streamlined run_tile function
# -------------------------------------------------------------------
def run_tile_optimized(oc_base, wgt_packed):
    """Run HLS accelerator using pre-allocated buffers."""
    
    # Copy weights into reusable buffer
    wgt_size = wgt_packed.shape[0]
    wgt_buf[:wgt_size] = wgt_packed
    out_buf[:] = 0
    
    # Configure IP registers
    ip.write(0x10, IMG_W)       # img_w
    ip.write(0x18, IMG_H)       # img_h
    ip.write(0x20, IN_CH)       # in_ch
    ip.write(0x28, 0)           # pool (disabled)
    ip.write(0x30, 1)           # leaky (enabled)
    
    # DMA Transfer Sequence
    dma0.recvchannel.transfer(out_buf)
    ip.write(0x00, 1)  # AP_START
    
    # Use slicing to transfer only needed weight data
    dma1.sendchannel.transfer(wgt_buf[:wgt_size])
    dma0.sendchannel.transfer(fm_buf)
    
    # Wait for completion
    dma1.sendchannel.wait_async()
    dma0.sendchannel.wait_async()
    dma0.recvchannel.wait_async()
    
    # Unpack output efficiently: each uint32 → 4 × uint8 channels
    raw = np.array(out_buf, copy=False)  # No copy, just view
    ch0 = (raw & 0xFF).astype(np.uint8)
    ch1 = ((raw >> 8) & 0xFF).astype(np.uint8)
    ch2 = ((raw >> 16) & 0xFF).astype(np.uint8)
    ch3 = ((raw >> 24) & 0xFF).astype(np.uint8)
    
    return np.stack([ch0, ch1, ch2, ch3]).reshape(TILE_OUT_CH, OUT_H, OUT_W)


# -------------------------------------------------------------------
# Main: iterate over all output channel tiles
# -------------------------------------------------------------------
print(f"\nInput: {IMG_H}×{IMG_W}×{IN_CH}  →  Output: {OUT_H}×{OUT_W}×{OUT_CH}")
print("="*60)

# Pre-allocate output array
all_outputs = np.zeros((OUT_CH, OUT_H, OUT_W), dtype=np.uint8)

total_time = 0
for tile_idx, oc_base in enumerate(range(0, OUT_CH, TILE_OUT_CH)):
    print(f"Tile {tile_idx+1}/{OUT_CH//TILE_OUT_CH}: oc=[{oc_base}:{oc_base + TILE_OUT_CH}]...", end=" ")
    
    t0 = time.time()
    wgt_packed = all_wgt_packed[tile_idx]
    result = run_tile_optimized(oc_base, wgt_packed)
    elapsed = time.time() - t0
    total_time += elapsed
    
    # Store result directly in preallocated array
    all_outputs[oc_base:oc_base + TILE_OUT_CH] = result
    
    print(f"✓ {elapsed*1000:.1f}ms (mean={result.mean():.2f})")

print("="*60)
print(f"Total inference time: {total_time*1000:.1f}ms")
print(f"Average per tile: {total_time/len(all_wgt_packed)*1000:.1f}ms")
print(f"\nFinal output shape: {all_outputs.shape}")
print(f"Output range: [{all_outputs.min()}, {all_outputs.max()}]")
print(f"Output mean: {all_outputs.mean():.4f}")

# Cleanup
del wgt_buf, fm_buf, out_buf
print("\nDone!")


Input: 28×28×4  →  Output: 26×26×16
Tile 1/4: oc=[0:4]... ✓ 18.7ms (mean=0.00)
Tile 2/4: oc=[4:8]... 

  dma1.sendchannel.wait_async()
  dma0.sendchannel.wait_async()
  dma0.recvchannel.wait_async()


RuntimeError: DMA channel not idle

In [15]:
# After the deadlock/error, run this:
print("=== DMA0 Register Map ===")
print(dma0.register_map)

print("\n=== DMA1 Register Map ===")
print(dma1.register_map)

=== DMA0 Register Map ===
RegisterMap {
  MM2S_DMACR = Register(RS=1, Reset=0, Keyhole=0, Cyclic_BD_Enable=0, IOC_IrqEn=0, Dly_IrqEn=0, Err_IrqEn=0, IRQThreshold=1, IRQDelay=0),
  MM2S_DMASR = Register(Halted=0, Idle=0, SGIncld=0, DMAIntErr=0, DMASlvErr=0, DMADecErr=0, SGIntErr=0, SGSlvErr=0, SGDecErr=0, IOC_Irq=0, Dly_Irq=0, Err_Irq=0, IRQThresholdSts=0, IRQDelaySts=0),
  MM2S_CURDESC = Register(Current_Descriptor_Pointer=0),
  MM2S_CURDESC_MSB = Register(Current_Descriptor_Pointer=0),
  MM2S_TAILDESC = Register(Tail_Descriptor_Pointer=0),
  MM2S_TAILDESC_MSB = Register(Tail_Descriptor_Pointer=0),
  MM2S_SA = Register(Source_Address=377790464),
  MM2S_SA_MSB = Register(Source_Address=0),
  MM2S_LENGTH = Register(Length=3136),
  SG_CTL = Register(SG_CACHE=0, SG_USER=0),
  S2MM_DMACR = Register(RS=1, Reset=0, Keyhole=0, Cyclic_BD_Enable=0, IOC_IrqEn=0, Dly_IrqEn=0, Err_IrqEn=0, IRQThreshold=1, IRQDelay=0),
  S2MM_DMASR = Register(Halted=0, Idle=0, SGIncld=0, DMAIntErr=0, DMASlvErr=0, DM

In [2]:
"""
PYNQ-Z2 CNN Accelerator - FINAL WORKING VERSION
Includes proper DMA state management to prevent "channel not idle" errors
"""

from pynq import Overlay, allocate
import numpy as np
import cv2
import time

# Configuration
IMG_H = 28
IMG_W = 28
IN_CH = 4
OUT_CH = 16
K = 3
TILE_IN_CH = 4
TILE_OUT_CH = 4  
OUT_H = IMG_H - 2
OUT_W = IMG_W - 2

print("="*70)
print("PYNQ CNN Accelerator - TILE_OUT_CH=%d Configuration" % TILE_OUT_CH)
print("="*70)

# Load overlay
print("\nLoading overlay...")
t0 = time.time()
ol = Overlay("design_1_wrapper.bit")
ip = ol.yolo_conv_core_0
dma0 = ol.axi_dma_0
dma1 = ol.axi_dma_1
print(f"✓ Overlay loaded in {time.time()-t0:.3f}s")

# Load weights
print("\nLoading weights...")
t0 = time.time()
W = np.load("weights_int8.npy").astype(np.int8)
B = np.load("bias_int8.npy").astype(np.int32)

if W.shape[1] < IN_CH:
    pad_ch = IN_CH - W.shape[1]
    W = np.pad(W, ((0, 0), (0, pad_ch), (0, 0), (0, 0)), mode='constant')
    print(f"  Padded weights: {W.shape[1]-pad_ch}ch → {W.shape[1]}ch")

print(f"✓ W: {W.shape}, B: {B.shape} ({time.time()-t0:.3f}s)")

# Load image
img = cv2.imread("digits_0.png", cv2.IMREAD_GRAYSCALE)
if img is None:
    raise FileNotFoundError("digits_0.png not found")
img = cv2.resize(img, (IMG_W, IMG_H))

# Pack weights for all tiles
def pack_all_weights(W_full, B_full):
    all_wgt_packed = []
    for oc_base in range(0, OUT_CH, TILE_OUT_CH):
        words = []
        for ic in range(IN_CH):
            for ky in range(K):
                for kx in range(K):
                    val = np.uint32(0)
                    for oc_off in range(TILE_OUT_CH):
                        oc = oc_base + oc_off
                        w8 = np.uint8(W_full[oc, ic, ky, kx].view(np.uint8))
                        val |= np.uint32(w8) << (8 * oc_off)
                    words.append(val)
        
        for oc_off in range(TILE_OUT_CH):
            oc = oc_base + oc_off
            words.append(np.uint32(B_full[oc].view(np.uint32) & 0xFFFFFFFF))
        
        all_wgt_packed.append(np.array(words, dtype=np.uint32))
    return all_wgt_packed

# Pack feature map
def pack_feature_map(img_gray, in_ch):
    n_ic_tiles = in_ch // TILE_IN_CH
    total_words = IMG_H * IMG_W * n_ic_tiles
    words = np.zeros(total_words, dtype=np.uint32)
    
    idx = 0
    for y in range(IMG_H):
        for x in range(IMG_W):
            for ic_tile in range(n_ic_tiles):
                val = np.uint32(0)
                for ii in range(TILE_IN_CH):
                    ch_idx = ic_tile * TILE_IN_CH + ii
                    pix = np.uint32(img_gray[y, x]) if ch_idx == 0 else np.uint32(0)
                    val |= pix << (8 * ii)
                words[idx] = val
                idx += 1
    return words

# Pre-pack all data
print("\nPre-packing data...")
t0 = time.time()
fm_packed = pack_feature_map(img, IN_CH)
all_wgt_packed = pack_all_weights(W, B)
print(f"✓ Packing done in {time.time()-t0:.3f}s")

# Allocate DMA buffers once
max_wgt_size = max(w.shape[0] for w in all_wgt_packed)
wgt_buf = allocate(shape=(max_wgt_size,), dtype=np.uint32)
fm_buf = allocate(shape=fm_packed.shape, dtype=np.uint32)
out_buf = allocate(shape=(OUT_H * OUT_W,), dtype=np.uint32)
fm_buf[:] = fm_packed

# Run tile function with PROPER STATE MANAGEMENT
def run_tile_optimized(oc_base, wgt_packed):
    """Run HLS accelerator with proper DMA state management."""
    
    # CRITICAL: Wait for DMAs to be fully idle with timeout
    timeout = 1.0
    start_time = time.time()
    
    while not (dma0.recvchannel.idle and dma0.sendchannel.idle and dma1.sendchannel.idle):
        if time.time() - start_time > timeout:
            raise TimeoutError("DMAs did not return to idle state - power cycle board!")
        time.sleep(0.001)  # Small delay to prevent busy-wait
    
    # Copy weights into reusable buffer
    wgt_size = wgt_packed.shape[0]
    wgt_buf[:wgt_size] = wgt_packed
    out_buf[:] = 0
    
    # Configure IP registers
    ip.write(0x10, IMG_W)
    ip.write(0x18, IMG_H)
    ip.write(0x20, IN_CH)
    ip.write(0x28, 0)  # pool disabled
    ip.write(0x30, 1)  # leaky enabled
    
    # DMA Transfer Sequence
    dma0.recvchannel.transfer(out_buf)
    ip.write(0x00, 1)  # AP_START
    dma1.sendchannel.transfer(wgt_buf[:wgt_size])
    dma0.sendchannel.transfer(fm_buf)
    
    # Wait for ALL transfers to complete
    dma1.sendchannel.wait()
    dma0.sendchannel.wait()
    dma0.recvchannel.wait()
    
    # Small delay to let hardware update idle flags
    time.sleep(0.001)
    
    # Unpack output based on TILE_OUT_CH
    raw = np.array(out_buf, copy=False)
    
    if TILE_OUT_CH == 2:
        ch0 = (raw & 0xFF).astype(np.uint8)
        ch1 = ((raw >> 8) & 0xFF).astype(np.uint8)
        return np.stack([ch0, ch1]).reshape(TILE_OUT_CH, OUT_H, OUT_W)
    elif TILE_OUT_CH == 4:
        ch0 = (raw & 0xFF).astype(np.uint8)
        ch1 = ((raw >> 8) & 0xFF).astype(np.uint8)
        ch2 = ((raw >> 16) & 0xFF).astype(np.uint8)
        ch3 = ((raw >> 24) & 0xFF).astype(np.uint8)
        return np.stack([ch0, ch1, ch2, ch3]).reshape(TILE_OUT_CH, OUT_H, OUT_W)
    else:
        raise ValueError(f"TILE_OUT_CH={TILE_OUT_CH} not supported (must be 2 or 4)")

# Run inference
print(f"\n{'='*70}")
print(f"Running: {IMG_H}×{IMG_W}×{IN_CH} → {OUT_H}×{OUT_W}×{OUT_CH}")
print(f"{'='*70}")

all_outputs = np.zeros((OUT_CH, OUT_H, OUT_W), dtype=np.uint8)
total_time = 0

for tile_idx, oc_base in enumerate(range(0, OUT_CH, TILE_OUT_CH)):
    print(f"Tile {tile_idx+1}/{OUT_CH//TILE_OUT_CH}: ch[{oc_base:2d}:{oc_base+TILE_OUT_CH:2d}]...", 
          end=" ", flush=True)
    
    try:
        t0 = time.time()
        wgt_packed = all_wgt_packed[tile_idx]
        result = run_tile_optimized(oc_base, wgt_packed)
        elapsed = time.time() - t0
        total_time += elapsed
        
        all_outputs[oc_base:oc_base + TILE_OUT_CH] = result
        print(f"✓ {elapsed*1000:5.1f}ms (mean={result.mean():5.2f})")
        
    except Exception as e:
        print(f"\n✗ ERROR on tile {tile_idx+1}: {e}")
        print("   Attempting to continue...")
        # Reload overlay to reset hardware
        ol = Overlay("design_1_wrapper.bit")
        ip = ol.yolo_conv_core_0
        dma0 = ol.axi_dma_0
        dma1 = ol.axi_dma_1
        # Re-copy feature map buffer
        fm_buf = allocate(shape=fm_packed.shape, dtype=np.uint32)
        fm_buf[:] = fm_packed

print(f"{'='*70}")
print(f"Total time:   {total_time*1000:7.1f} ms")
print(f"Per tile avg: {total_time/(OUT_CH//TILE_OUT_CH)*1000:7.1f} ms")
print(f"Output shape: {all_outputs.shape}")
print(f"Output range: [{all_outputs.min()}, {all_outputs.max()}]")
print(f"Output mean:  {all_outputs.mean():.4f}")
print(f"{'='*70}")

# Cleanup
del wgt_buf, fm_buf, out_buf
print("\n✓ Done!")


PYNQ CNN Accelerator - TILE_OUT_CH=4 Configuration

Loading overlay...


AttributeError: Could not find IP or hierarchy axi_dma_0 in overlay

In [3]:
"""
PYNQ-Z2 CNN Accelerator - FINAL WORKING VERSION
Includes proper DMA state management to prevent "channel not idle" errors
"""

from pynq import Overlay, allocate
import numpy as np
import cv2
import time

# Configuration
IMG_H = 28
IMG_W = 28
IN_CH = 4
OUT_CH = 16
K = 3
TILE_IN_CH = 4
TILE_OUT_CH = 4  
OUT_H = IMG_H - 2
OUT_W = IMG_W - 2

print("="*70)
print("PYNQ CNN Accelerator - TILE_OUT_CH=%d Configuration" % TILE_OUT_CH)
print("="*70)

# Load overlay
print("\nLoading overlay...")
t0 = time.time()
ol = Overlay("design_1_wrapper.bit")
ip = ol.yolo_conv_core_0
dma0 = ol.dma_img
dma1 = ol.dma_wgt
print(f"✓ Overlay loaded in {time.time()-t0:.3f}s")

# Load weights
print("\nLoading weights...")
t0 = time.time()
W = np.load("weights_int8.npy").astype(np.int8)
B = np.load("bias_int8.npy").astype(np.int32)

if W.shape[1] < IN_CH:
    pad_ch = IN_CH - W.shape[1]
    W = np.pad(W, ((0, 0), (0, pad_ch), (0, 0), (0, 0)), mode='constant')
    print(f"  Padded weights: {W.shape[1]-pad_ch}ch → {W.shape[1]}ch")

print(f"✓ W: {W.shape}, B: {B.shape} ({time.time()-t0:.3f}s)")

# Load image
img = cv2.imread("digits_0.png", cv2.IMREAD_GRAYSCALE)
if img is None:
    raise FileNotFoundError("digits_0.png not found")
img = cv2.resize(img, (IMG_W, IMG_H))

# Pack weights for all tiles
def pack_all_weights(W_full, B_full):
    all_wgt_packed = []
    for oc_base in range(0, OUT_CH, TILE_OUT_CH):
        words = []
        for ic in range(IN_CH):
            for ky in range(K):
                for kx in range(K):
                    val = np.uint32(0)
                    for oc_off in range(TILE_OUT_CH):
                        oc = oc_base + oc_off
                        w8 = np.uint8(W_full[oc, ic, ky, kx].view(np.uint8))
                        val |= np.uint32(w8) << (8 * oc_off)
                    words.append(val)
        
        for oc_off in range(TILE_OUT_CH):
            oc = oc_base + oc_off
            words.append(np.uint32(B_full[oc].view(np.uint32) & 0xFFFFFFFF))
        
        all_wgt_packed.append(np.array(words, dtype=np.uint32))
    return all_wgt_packed

# Pack feature map
def pack_feature_map(img_gray, in_ch):
    n_ic_tiles = in_ch // TILE_IN_CH
    total_words = IMG_H * IMG_W * n_ic_tiles
    words = np.zeros(total_words, dtype=np.uint32)
    
    idx = 0
    for y in range(IMG_H):
        for x in range(IMG_W):
            for ic_tile in range(n_ic_tiles):
                val = np.uint32(0)
                for ii in range(TILE_IN_CH):
                    ch_idx = ic_tile * TILE_IN_CH + ii
                    pix = np.uint32(img_gray[y, x]) if ch_idx == 0 else np.uint32(0)
                    val |= pix << (8 * ii)
                words[idx] = val
                idx += 1
    return words

# Pre-pack all data
print("\nPre-packing data...")
t0 = time.time()
fm_packed = pack_feature_map(img, IN_CH)
all_wgt_packed = pack_all_weights(W, B)
print(f"✓ Packing done in {time.time()-t0:.3f}s")

# Allocate DMA buffers once
max_wgt_size = max(w.shape[0] for w in all_wgt_packed)
wgt_buf = allocate(shape=(max_wgt_size,), dtype=np.uint32)
fm_buf = allocate(shape=fm_packed.shape, dtype=np.uint32)
out_buf = allocate(shape=(OUT_H * OUT_W,), dtype=np.uint32)
fm_buf[:] = fm_packed

# Run tile function with PROPER STATE MANAGEMENT
def run_tile_optimized(oc_base, wgt_packed):
    """Run HLS accelerator with proper DMA state management."""
    
    # CRITICAL: Wait for DMAs to be fully idle with timeout
    timeout = 1.0
    start_time = time.time()
    
    while not (dma0.recvchannel.idle and dma0.sendchannel.idle and dma1.sendchannel.idle):
        if time.time() - start_time > timeout:
            raise TimeoutError("DMAs did not return to idle state - power cycle board!")
        time.sleep(0.001)  # Small delay to prevent busy-wait
    
    # Copy weights into reusable buffer
    wgt_size = wgt_packed.shape[0]
    wgt_buf[:wgt_size] = wgt_packed
    out_buf[:] = 0
    
    # Configure IP registers
    ip.write(0x10, IMG_W)
    ip.write(0x18, IMG_H)
    ip.write(0x20, IN_CH)
    ip.write(0x28, 0)  # pool disabled
    ip.write(0x30, 1)  # leaky enabled
    
    # DMA Transfer Sequence
    dma0.recvchannel.transfer(out_buf)
    ip.write(0x00, 1)  # AP_START
    dma1.sendchannel.transfer(wgt_buf[:wgt_size])
    dma0.sendchannel.transfer(fm_buf)
    
    # Wait for ALL transfers to complete
    dma1.sendchannel.wait()
    dma0.sendchannel.wait()
    dma0.recvchannel.wait()
    
    # Small delay to let hardware update idle flags
    time.sleep(0.001)
    
    # Unpack output based on TILE_OUT_CH
    raw = np.array(out_buf, copy=False)
    
    if TILE_OUT_CH == 2:
        ch0 = (raw & 0xFF).astype(np.uint8)
        ch1 = ((raw >> 8) & 0xFF).astype(np.uint8)
        return np.stack([ch0, ch1]).reshape(TILE_OUT_CH, OUT_H, OUT_W)
    elif TILE_OUT_CH == 4:
        ch0 = (raw & 0xFF).astype(np.uint8)
        ch1 = ((raw >> 8) & 0xFF).astype(np.uint8)
        ch2 = ((raw >> 16) & 0xFF).astype(np.uint8)
        ch3 = ((raw >> 24) & 0xFF).astype(np.uint8)
        return np.stack([ch0, ch1, ch2, ch3]).reshape(TILE_OUT_CH, OUT_H, OUT_W)
    else:
        raise ValueError(f"TILE_OUT_CH={TILE_OUT_CH} not supported (must be 2 or 4)")

# Run inference
print(f"\n{'='*70}")
print(f"Running: {IMG_H}×{IMG_W}×{IN_CH} → {OUT_H}×{OUT_W}×{OUT_CH}")
print(f"{'='*70}")

all_outputs = np.zeros((OUT_CH, OUT_H, OUT_W), dtype=np.uint8)
total_time = 0

for tile_idx, oc_base in enumerate(range(0, OUT_CH, TILE_OUT_CH)):
    print(f"Tile {tile_idx+1}/{OUT_CH//TILE_OUT_CH}: ch[{oc_base:2d}:{oc_base+TILE_OUT_CH:2d}]...", 
          end=" ", flush=True)
    
    try:
        t0 = time.time()
        wgt_packed = all_wgt_packed[tile_idx]
        result = run_tile_optimized(oc_base, wgt_packed)
        elapsed = time.time() - t0
        total_time += elapsed
        
        all_outputs[oc_base:oc_base + TILE_OUT_CH] = result
        print(f"✓ {elapsed*1000:5.1f}ms (mean={result.mean():5.2f})")
        
    except Exception as e:
        print(f"\n✗ ERROR on tile {tile_idx+1}: {e}")
        print("   Attempting to continue...")
        # Reload overlay to reset hardware
        ol = Overlay("design_1_wrapper.bit")
        ip = ol.yolo_conv_core_0
        dma0 = ol.dma_img
        dma1 = ol.dma_wgt        # Re-copy feature map buffer
        fm_buf = allocate(shape=fm_packed.shape, dtype=np.uint32)
        fm_buf[:] = fm_packed

print(f"{'='*70}")
print(f"Total time:   {total_time*1000:7.1f} ms")
print(f"Per tile avg: {total_time/(OUT_CH//TILE_OUT_CH)*1000:7.1f} ms")
print(f"Output shape: {all_outputs.shape}")
print(f"Output range: [{all_outputs.min()}, {all_outputs.max()}]")
print(f"Output mean:  {all_outputs.mean():.4f}")
print(f"{'='*70}")

# Cleanup
del wgt_buf, fm_buf, out_buf
print("\n✓ Done!")


PYNQ CNN Accelerator - TILE_OUT_CH=4 Configuration

Loading overlay...
✓ Overlay loaded in 1.086s

Loading weights...
  Padded weights: 1ch → 4ch
✓ W: (16, 4, 3, 3), B: (16,) (0.030s)

Pre-packing data...
✓ Packing done in 0.396s

Running: 28×28×4 → 26×26×16
Tile 1/4: ch[ 0: 4]... 
✗ ERROR on tile 1: DMAs did not return to idle state - power cycle board!
   Attempting to continue...
Tile 2/4: ch[ 4: 8]... 
✗ ERROR on tile 2: DMAs did not return to idle state - power cycle board!
   Attempting to continue...
Tile 3/4: ch[ 8:12]... 
✗ ERROR on tile 3: DMAs did not return to idle state - power cycle board!
   Attempting to continue...
Tile 4/4: ch[12:16]... 
✗ ERROR on tile 4: DMAs did not return to idle state - power cycle board!
   Attempting to continue...
Total time:       0.0 ms
Per tile avg:     0.0 ms
Output shape: (16, 26, 26)
Output range: [0, 0]
Output mean:  0.0000

✓ Done!


In [21]:
dma0.register_map.MM2S_DMACR.Reset = 1
dma1.register_map.S2MM_DMACR.Reset = 1
# Add a short delay to ensure reset completes
import time
time.sleep(1)
# Restart the channels
dma0.register_map.MM2S_DMACR.RS = 1
dma1.register_map.S2MM_DMACR.RS = 1


In [6]:

import numpy as np
from pynq import Overlay, allocate
import time

# --- CONFIGURATION MATCHING YOUR HARDWARE ---
TILE_OUT_CH = 4  # MUST match HLS code
TILE_IN_CH  = 4  # MUST match HLS code

# Load Overlay
ol = Overlay("design_1_wrapper.bit") # Ensure this matches your filename
dma_fm = ol.dma_img
dma_wgt = ol.dma_wgt
accel = ol.yolo_conv_core_0

def pack_weights_bias(w_chunk, b_chunk, in_ch):
    """
    Packs 4 filters + 4 biases into a buffer for the hardware.
    w_chunk: (4, in_ch, 3, 3) -> 4 filters for this tile
    b_chunk: (4,)             -> 4 biases
    """
    # Calculate buffer size:
    # Weights: (in_ch * 3 * 3) words. Each word holds 4 weights (8-bit each).
    # Biases:  4 words. Each word holds 1 bias (32-bit).
    num_weight_words = in_ch * 9
    num_bias_words   = 4
    total_words      = num_weight_words + num_bias_words
    
    buf = allocate(shape=(total_words,), dtype=np.int32)
    
    idx = 0
    # 1. Pack Weights
    # We pack 4 output filters (oc) into one 32-bit word: [W3 | W2 | W1 | W0]
    for ic in range(in_ch):
        for ky in range(3):
            for kx in range(3):
                w0 = int(w_chunk[0][ic][ky][kx]) & 0xFF
                w1 = int(w_chunk[1][ic][ky][kx]) & 0xFF
                w2 = int(w_chunk[2][ic][ky][kx]) & 0xFF
                w3 = int(w_chunk[3][ic][ky][kx]) & 0xFF
                
                # Little Endian Packing: W0 is LSB
                packed = (w3 << 24) | (w2 << 16) | (w1 << 8) | w0
                buf[idx] = packed
                idx += 1
                
    # 2. Pack Biases (One per word)
    buf[idx] = int(b_chunk[0]); idx += 1
    buf[idx] = int(b_chunk[1]); idx += 1
    buf[idx] = int(b_chunk[2]); idx += 1
    buf[idx] = int(b_chunk[3]); idx += 1
    
    return buf

def run_layer(input_vol, weights, bias, pool=False):
    h, w, in_ch = input_vol.shape
    out_ch = weights.shape[0]
    
    # Validation
    if in_ch % TILE_IN_CH != 0:
        print(f"Error: Input channels {in_ch} must be multiple of {TILE_IN_CH}")
        return None
        
    out_h = h // 2 if pool else h
    out_w = w // 2 if pool else w
    
    # Result buffer
    output_vol = np.zeros((out_h, out_w, out_ch), dtype=np.uint8)
    
    # Prepare Input Buffer
    # Input is 4 channels. We pack them into one 32-bit integer per pixel.
    flat_input = input_vol.flatten()
    in_buf_size = flat_input.shape[0] // 4
    in_buf = allocate(shape=(in_buf_size,), dtype=np.int32)
    
    # Fast packing using numpy view (Assumes input is already uint8 [H,W,4])
    # This treats 4 consecutive bytes as one 32-bit int
    np.copyto(in_buf, flat_input.view(np.int32))
    
    # Explicit flush to ensure data is in RAM
    in_buf.flush()

    print(f"Running: {w}x{h}x{in_ch} -> {out_w}x{out_h}x{out_ch}")
    start_total = time.time()

    # TILE LOOP (4 Output Channels at a time)
    for oc in range(0, out_ch, TILE_OUT_CH):
        # 1. Slice Weights/Bias for this tile
        w_tile = weights[oc : oc + TILE_OUT_CH]
        b_tile = bias[oc : oc + TILE_OUT_CH]
        
        # 2. Pack and Flush Weights
        wb_buf = pack_weights_bias(w_tile, b_tile, in_ch)
        wb_buf.flush()
        
        # 3. Configure IP Registers
        accel.write(0x10, w)         # img_w
        accel.write(0x18, h)         # img_h
        accel.write(0x20, in_ch)     # in_ch
        accel.write(0x28, int(pool)) # pool enable
        accel.write(0x30, 1)         # leaky enable
        accel.write(0x00, 1)         # AP_START
        
        # 4. DMA Transfer - Weights (Send Only)
        dma_wgt.sendchannel.transfer(wb_buf)
        dma_wgt.sendchannel.wait()
        
        # 5. DMA Transfer - Image
        # Allocate output buffer for ONE tile
        out_tile_size = out_h * out_w
        out_buf = allocate(shape=(out_tile_size,), dtype=np.int32)
        
        dma_fm.sendchannel.transfer(in_buf)
        dma_fm.recvchannel.transfer(out_buf)
        
        dma_fm.sendchannel.wait()
        dma_fm.recvchannel.wait()
        
        # 6. Unpack Results
        # Each 32-bit word contains 4 output pixels (one for each filter in the tile)
        res = np.array(out_buf)
        ch0 = (res & 0xFF).astype(np.uint8).reshape(out_h, out_w)
        ch1 = ((res >> 8) & 0xFF).astype(np.uint8).reshape(out_h, out_w)
        ch2 = ((res >> 16) & 0xFF).astype(np.uint8).reshape(out_h, out_w)
        ch3 = ((res >> 24) & 0xFF).astype(np.uint8).reshape(out_h, out_w)
        
        output_vol[:,:,oc]   = ch0
        output_vol[:,:,oc+1] = ch1
        output_vol[:,:,oc+2] = ch2
        output_vol[:,:,oc+3] = ch3
        
        wb_buf.free()
        out_buf.free()
        
    in_buf.free()
    print(f"Total time: {(time.time() - start_total)*1000:.1f} ms")
    return output_vol