In [1]:
from pynq import Overlay
from pynq import allocate
import time
import matplotlib.pyplot as plt
import numpy as np
import shutil
import os
import socket
import threading

# -----------------------------------------------------------------------------------------------------------------------------#
# Overlay and Parameter Setup
# -----------------------------------------------------------------------------------------------------------------------------#

ol = Overlay("/home/xilinx/dma_tests/working.bit")

In [3]:
#------------------------------------------------------------------------------------------------------------------------------#
# DMA Setup
#------------------------------------------------------------------------------------------------------------------------------#

dma_o = ol.axi_dma
dma = dma_o.recvchannel

print(f"DMA data: running: {dma.running}, idle: {dma.idle}, error: {dma.error}")

# -----------------------------------------------------------------------------------------------------------------------------#
# TCP Setup
# -----------------------------------------------------------------------------------------------------------------------------#

HOST = '192.168.2.99'
UNITY_PORT = 12345

unity_welcome_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
unity_welcome_socket.bind((HOST, UNITY_PORT))
unity_welcome_socket.listen()

DRAWING_PORT = 9005
EXPECTED_BYTES = 313
cells = 2500

drawing_welcome_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
drawing_welcome_socket.bind((HOST, DRAWING_PORT))
drawing_welcome_socket.listen()

DMA data: running: True, idle: False, error: False


In [None]:
# -----------------------------------------------------------------------------------------------------------------------------#
# Overlay and Parameter Setup
# -----------------------------------------------------------------------------------------------------------------------------#
size=50
cellCount = size * size

ostep_countn = ol.step_countn.channel1
ireset_lbm_step = ol.rst_step.channel1
iparam_set = ol.params.channel1
iimg = ol.img_thing.channel1

# -----------------------------------------------------------------------------------------------------------------------------#
# TCP Functions
# -----------------------------------------------------------------------------------------------------------------------------#

unity_connection_socket, unity_address = unity_welcome_socket.accept()
unity_connection_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)


drawing_connection_socket, unity_address = drawing_welcome_socket.accept()
drawing_connection_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)


def unity_tcp_handling():
    frame_buffer = allocate(shape=(cellCount + 80,), dtype=np.int64)
    toggle = False
    
    for step in range(1, int(2e30), 20):
        
        write_step(step)
        while ostep_countn.read() != step:
            pass
#         time.sleep(0.1)
        print(ostep_countn.read())
        dma.transfer(frame_buffer)
        dma.wait()

#         data = frame_buffer[0:2500]
        data = np.concatenate((frame_buffer[1:2500], frame_buffer[0:1]))
#         print(data)
#         print("")

        unity_connection_socket.sendall(data.tobytes())
        frame_buffer.invalidate()
        toggle = not toggle
        
    unity_connection_socket.close()
    print("done")
    return
    

def drawing_tcp_handling():
    while True:
        time.sleep(1)
        is_packet = True
        data_buff = bytearray()
        while len(data_buff) < EXPECTED_BYTES:
            packet = drawing_connection_socket.recv(EXPECTED_BYTES - len(data_buff))
            if not packet:
                is_packet = False
                break
            data_buff.extend(packet)

            print(f"Received {len(data_buff)} bytes")
        if is_packet:
            # Unpack bits (only first 2500 bits matter)
            bit_array = np.unpackbits(np.frombuffer(data_buff, dtype=np.uint8))[:cells]

#             reversed_array = np.flip(bit_array)
            rows, cols = 50, 50
            img = bit_array.reshape((rows, cols))
            flipped = np.flipud(img)
            reversed_array = flipped.flatten()

            # Send to AXI in 16-bit chunks
            for i in range(0, cells, 16):
                word_bits = np.flip(reversed_array[i:i+16])
                if len(word_bits) < 16:
                    word_bits = np.pad(word_bits, (0, 16 - len(word_bits)))  # pad last word
                word = int("".join(map(str, word_bits)), 2)

                address = i  // 16 # word address
                full_word = (1 << 31) | (address << 16) | word
                iimg.write(full_word, 0xFFFFFFFF)
    return

#------------------------------------------------------------------------------------------------------------------------------#
# Load Image
#------------------------------------------------------------------------------------------------------------------------------#

param_map = {
    "omega": 0,
    "c0": 1,
    "cn": 2,
    "cne": 3,
    "ce": 4,
    "cse": 5,
    "cs": 6,
    "csw": 7,
    "cw": 8,
    "cnw": 9,
}

def write_param(param, val):
    val_16bit = val & 0xFFFF  
    param_index = param_map[param]

    gpio_value = (param_index << 16) | val_16bit
    iparam_set.write(gpio_value,-1)

def write_reset(i):
    ireset_lbm_step.write(i<<16,0x10000)

def write_step(i):
    ireset_lbm_step.write(i,0xFFFF)
    
write_param("omega", 0x3000)
write_param("c0"  , 0x0E02 )
write_param("cn"  , 0x0381 )
write_param("cs"  , 0x0381 )
write_param("ce"  , 0x04BB )
write_param("cw"  , 0x0298 )
write_param("cne" , 0x012F )
write_param("cse" , 0x012F )
write_param("cnw" , 0x00A6 )
write_param("csw" , 0x00A6 )

#------------------------------------------------------------------------------------------------------------------------------#
# LBM Reset
#------------------------------------------------------------------------------------------------------------------------------#

for i in range(2):
    # reset all
    write_reset(0)
    write_reset(1) #reset
    time.sleep(0.1) # wait

    write_step(0)
    time.sleep(0.1)
    # check step
# print(f"current step (expect 0): {ostep_countn.read()}")

#------------------------------------------------------------------------------------------------------------------------------#
# Run TCP
#------------------------------------------------------------------------------------------------------------------------------#

# unity_tcp_handling()

def main():
    unity_thread = threading.Thread(target=unity_tcp_handling)
    drawing_thread = threading.Thread(target=drawing_tcp_handling)
    
    unity_thread.start()
    drawing_thread.start()
    
main()

1
21
41
61
81
101
121
141
161
181
201
221
241


Exception in thread Thread-6 (drawing_tcp_handling):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1009, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 946, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_1266/416944665.py", line 58, in drawing_tcp_handling
NameError: name 'drawing_connection_socket' is not defined. Did you mean: 'unity_connection_socket'?


261
281
301
321
341
361
381
401
421
441
461
481
501
521
541
561
581
601
621
641
661
681
701
721
741
761
781
801
821
841
861
881
901
921
941
961
981
1001
1021
1041
1061
1081
1101
1121
1141
1161
1181
1201
1221
1241
1261
1281
1301
1321
1341
1361
1381
1401
1421
1441
1461
1481
1501
1521
1541
1561
1581
1601
1621
1641
1661
1681
1701
1721
1741
1761
1781
1801
1821
1841
1861
1881
1901
1921
1941
1961
1981
2001
2021
2041
2061
2081
2101
2121
2141
2161
2181
2201
2221
2241
2261
2281
2301
2321
2341
2361
2381
2401
2421
2441
2461
2481
2501
2521
2541
2561
2581
2601
2621
2641
2661
2681
2701
2721
2741
2761
2781
2801
2821
2841
2861
2881
2901
2921
2941
2961
2981
3001
3021
3041
3061
3081
3101
3121
3141
3161
3181
3201
3221
3241
3261
3281
3301
3321
3341
3361
3381
3401
3421
3441


In [2]:
unity_welcome_socket.close()
unity_connection_socket.close()
drawing_welcome_socket.close()
drawing_connection_socket.close()

NameError: name 'unity_welcome_socket' is not defined

In [None]:
# dma_o.register_map

# manual reset of DMA for debug
# dma.stop()
dma_o.register_map.S2MM_DMACR.Reset = 1
dma.start()