# PYNQ UDP Server Demo

本Notebook演示如何在PYNQ平台下，控制雷达、滑台、UART等硬件，并实现UDP数据采集流程。

In [None]:
# 导入Python库
import pynq
from pynq import Overlay, MMIO
import socket
import time
import numpy as np
from pynq.lib import AxiGPIO
# 假设用户自定义驱动如下
# from your_ip import RadarController, MotorController, UartController, DMAController

## 1. 加载比特流和初始化硬件

In [None]:
# 加载比特流
overlay = Overlay('/home/xilinx/overlays/your_bitstream.bit')
# 初始化控制器（假设用户已封装好Python类）
# radar = RadarController(overlay)
# motor = MotorController(overlay)
# uart = UartController(overlay)
# dma = DMAController(overlay)

# 示例：GPIO控制
# gpio = AxiGPIO(overlay.ip_dict['axi_gpio_0']['phys_addr'])
# leds = gpio.channel1
# leds.write(0x01)

## 2. 设置UDP服务器参数（IP, 端口等）

In [None]:
UDP_IP = "192.168.2.111"  # PYNQ板卡IP
UDP_PORT = 8888            # 监听端口
BUFFER_SIZE = 4096         # 缓冲区大小
udp_server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_server.bind((UDP_IP, UDP_PORT))
print(f"Listening on {UDP_IP}:{UDP_PORT}")

## 3. UART控制雷达（发送启动/停止命令）

In [None]:
# 假设uart.write/send已实现
def radar_uart_start(uart):
    uart.write(b'sensorStart 0\r\n')
    time.sleep(0.1)

def radar_uart_stop(uart):
    uart.write(b'sensorStop\r\n')
    time.sleep(0.1)

# 示例
# radar_uart_start(uart)
# radar_uart_stop(uart)

## 4. 控制滑台运动

In [None]:
# 假设motor.move(dir, steps)已实现
def slip2d_H_move(motor, direction, steps):
    motor.move('H', direction, steps)

def slip2d_V_move(motor, direction, steps):
    motor.move('V', direction, steps)

# 示例
# slip2d_H_move(motor, dir=1, steps=100)
# slip2d_V_move(motor, dir=0, steps=100)

## 5. 数据采集与DMA传输

In [None]:
# 假设dma.read(buf, size)已实现
def collect_radar_data(dma, length=2048):
    buf = np.zeros(length, dtype=np.uint8)
    dma.read(buf, length)
    return buf
# 示例
# buf = collect_radar_data(dma)

## 6. UDP循环服务与数据上传

In [None]:
try:
    while True:
        data, addr = udp_server.recvfrom(BUFFER_SIZE)
        print(f"Received command: {data} from {addr}")
        if data == b'start_collect':
            # radar_uart_start(uart)  # 启动雷达
            # slip2d_H_move(motor, 1, 24) # 控制滑台
            # buf = collect_radar_data(dma)
            # udp_server.sendto(buf.tobytes(), addr)  # 发送数据
            print("[模拟] 启动雷达并发送采集数据")
            udp_server.sendto(b'data: ...', addr)
        elif data == b'stop_collect':
            # radar_uart_stop(uart)
            print("[模拟] 停止采集")
            udp_server.sendto(b'stopped', addr)
        else:
            udp_server.sendto(b'Unknown command', addr)
except KeyboardInterrupt:
    print("Server stopped.")
finally:
    udp_server.close()