
## Detailed Explanation of the VAIC_23_24 Program

This article provides a detailed explanation of the establishment, inference, object recognition data processing of the YOLOv3 visual neural network by the VAIC_23_24 program. It also delves into the working mechanism of the Intel D435 camera, communication between Jetson Nano and Vex Brian, and how neural network recognition of object positions and distances is used to control robot motion via Brian. Additionally, it covers mutual communication between dual machines to achieve coordinated movement, as well as remote real-time display technology.


The AI functionality of the VAIC_23_24 software is implemented by the Python programs in the JetsonExample directory.<br> The components of the Jetson Python programs have been briefly introduced earlier. Please refer to the "Introduction to VAIC Software".<br> 
As an AI project, we will first conduct a technical analysis of this part to help readers understand and grasp the application and implementation of AI functionality in the program as soon as possible.<br>

###   1.The establishment of VAIC visual neural network




The AI technology introduced by Vex AI uses a visual depth neural network to recognize objects and measure distances in real-time images captured by the camera on the field. The shape, color, and positional data of the objects are then sent to Vex Brian. The Brian robot uses this information to select targets, plan paths, and control movement and actions based on the competition strategy.

This section provides an in-depth discussion on the integration of the visual depth neural network YOLOv3, the reception of images from the Intel D435 camera, the use of TensorRT for high-speed inference to identify object positions and distances, and the process of transmitting the identified object data to Brian.




Open "overunder.py," and you can see a series of instances created for the program's execution within the__def init(self)__ function of the "MainApp" class:

Our discussion starts from the "Processing" class.

The AI process begins with the creation of an instance of "Processing." According to Python's execution mechanism, after allocating memory space for the instance, the__def init(self)__ function of "Processing" is executed.

In [None]:
self.processing = Processing(self.camera.depth_scale)

The__init__(self) code segment is as follows:

The parameter depth_scale is a scaling factor used in depth calculations by the camera. The initialization of the camera is configured to obtain settings from the camera, which will be explained in the Camera section.

In [None]:
    # Class to handle camera data processing, preparing for inference, and running inference
    # on camera image.
    def __init__(self, depth_scale):
        self.depth_scale = depth_scale
        self.align_to = rs.stream.color
        self.align = rs.align(self.align_to)  # Align depth frames to color stream
        self.model = Model()  # Initialize the object detection model

.self.depth_scale = depth_scale<br>
This line stores the depth calculation scaling factor from the external parameter into the internal parameter for use in calculating the distance to recognized objects.

.self.align_to = rs.stream.color<br>
.self.align = rs.align(self.align_to)<br>
Image alignment: In RealSense applications, depth and object color images are collected separately by infrared and visible light cameras. The depth image needs to be aligned with the RGB image so that both types of image data can be processed in the same coordinate system.

Next, an instance of the Model class is created:

.self.model = Model()<br>
The Model class is the core of VAIC, representing the neural network model for depth measurement and object recognition, and is a crucial part of VAIC's application of AI technology. <br>
It is important to note that the definition of class Model is in the model.py file. The overunder.py program imports this definition at the beginning with the statement:<br>
from model import Model<br>
Now, let's take a look at the__init__(self) function of the Model instance.<br>

In [None]:
def __init__(self):
        # Initialize the TensorRT engine and execution context.

        # Define file paths for ONNX and engine files
        current_folder_path = os.path.dirname(os.path.abspath(__file__))
        onnx_file_path = os.path.join(current_folder_path, "VEXOverUnder.onnx")
        # If you change the onnx file to your own model, adjust the file name here
        engine_file_path = os.path.join(current_folder_path, "VEXOverUnder.trt")
        # This should match the .onnx file name

        # Get the TensorRT engine
        self.engine = Model.get_engine(onnx_file_path, engine_file_path)

        # Create an execution context
        self.context = self.engine.create_execution_context()

        # Allocate buffers for input and output
        self.inputs, self.outputs, self.bindings, self.stream =
                                                common.allocate_buffers(self.engine)


Based on the previous introduction, to use NVIDIA TensorRT for high-speed inference, the neural network needs to be optimized and converted to a different format, changing the .onnx file format to .trt format.<br> 
The first three lines of the code check whether the .onnx and .trt files exist.<br>
 The .onnx file here is the YOLOv3 neural network fine-tuned using PyTorch by VEX AI, specifically for the recognition of tri-colored balls and their distances for the VAIC_23_24 season.<br>

The line self.engine = Model.get_engine(onnx_file_path, engine_file_path) is the format conversion function. The function is as follows:<br>

In [None]:
       def build_engine():
            print("Building engine file from onnx, this could take a while")
            # Builds and returns a TensorRT engine from an ONNX file.
            with trt.Builder(TRT_LOGGER) as builder, \
                    builder.create_network(common.EXPLICIT_BATCH) as network, \
                    builder.create_builder_config() as config, \
                    trt.OnnxParser(network, TRT_LOGGER) as parser, \
                    trt.Runtime(TRT_LOGGER) as runtime:

                config.max_workspace_size = 1 << 28  # Set maximum workspace size to 256MiB
                builder.max_batch_size = 1

                # Check if ONNX file exists
                if not os.path.exists(onnx_file_path):
                    print("ONNX file {} not found.".format(onnx_file_path))
                    exit(0)

                # Load and parse the ONNX file
                with open(onnx_file_path, "rb") as model:
                    if not parser.parse(model.read()):
                        print("ERROR: Failed to parse the ONNX file.")
                        for error in range(parser.num_errors):
                            print(parser.get_error(error))
                        return None

                # Set input shape for the network
                network.get_input(0).shape = [1, 3, 320, 320]

                # Build and serialize the network, then create and return the engine
                plan = builder.build_serialized_network(network, config)
                engine = runtime.deserialize_cuda_engine(plan)
                with open(engine_file_path, "wb") as f:
                    f.write(plan)
                return engine

        # Check if a serialized engine file exists and load it if so, otherwise build a new
        # one
        if os.path.exists(engine_file_path):
            with open(engine_file_path, "rb") as f, trt.Runtime(TRT_LOGGER) as runtime:
                return runtime.deserialize_cuda_engine(f.read())
        else:
            return build_engine()

Following Python's alignment rules, the program executes from the bottom up. If the .trt file exists, it opens the .trt file, imports TensorRT's runtime functionality, and uses deserialize_cuda_engine to load the CUDA engine from the serialized file.

In the get_engine() function, a custom build_engine() function is defined. As shown in the code segment, if the .trt file is not found in the directory, this function converts the .onnx file in the directory to the .trt format. Below is a brief explanation of the transformation function:

The with statement references several TensorRT functions, builder, network, config, parser, and runtime, in preparation for the format conversion.<br><br>
.config.max_workspace_size = 1 << 28 defines the network memory workspace size.<br><br>
.builder.max_batch_size = 1 sets the batch size for processing to 1.<br> During training, for efficiency, max_batch_size might be 64 or larger, but for inference, it processes only one image frame at a time.
Next, it checks if the .onnx file exists, reads the file, and verifies its correctness.<br><br>
.network.get_input(0).shape = [1, 3, 320, 320] specifies the network input format as [batch size=1, color=3 (RGB), image width=320, image height=320].<br><br>
.plan = builder.build_serialized_network(network, config) is the format conversion function, which uses TensorRT's API to define and configure the network model. It applies a series of optimization strategies, such as layer fusion and memory management optimization, to enhance the network's execution speed and reduce latency, and saves the optimized network structure and related information into serialized (usually binary) memory.<br><br>
.engine = runtime.deserialize_cuda_engine(plan) function restores (or deserializes) the CUDA engine from the serialized memory. The CUDA engine is the neural network used in TensorRT for executing deep learning inference, and it can be directly used for subsequent inference operations.<br><br>
Finally, the transformed network data is saved as a .trt file for future use, allowing it to be directly read in later.



In [None]:
self.context = self.engine.create_execution_context()

To create a new execution context for the inference engine, configure the input and output bindings for the neural network.

In [None]:
self.inputs, self.outputs, self.bindings, self.stream = common.allocate_buffers(self.engine)


Allocate memory for the neural network inputs and outputs.<br>
 The structure and function of each memory allocation will be introduced in the inference section.<br>
  Here, the memory allocation function common.allocate_buffers() is defined in common.py and imported at the beginning of the model.py file with the statement:<br>
import common<br>
With this, the import of the neural network, which is the core content of VAIC's AI, is now complete.


###   2. Configuration and Connection of the Depth Camera

Returning to the overunder.py file, within the__init__(self) function of the class MainApp, to emphasize the core role of the neural network, the establishment of the YOLOv3 network has already been explained. <br>
Now, let's introduce the configuration and connection of the external device, the Camera.

In [None]:
self.camera = Camera()
self.camera.start()

Execute the initialization function of the Class Camera().

Note that in the following code segment, rs indicates the use of the pyrealsense2 library, which is imported at the beginning of overunder.py with the statement:

import pyrealsense2 as rs

In [None]:
    def __init__(self):
        self.pipeline = rs.pipeline()  # Initialize RealSense pipeline
        self.config = rs.config()
        # Enable depth stream at 640x480 in z16 encoding at 30fps
        self.config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
        # Enable color stream at 640x480 in bgr8 encoding at 30fps
        self.config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)

As indicated in the comments, create instances of rs.pipeline() and rs.config().

Configure the depth data stream with width=640 pixels, height=480 pixels, data format=z16, and frame rate=30 frames per second.

Configure the image data stream with width=640 pixels, height=480 pixels, data format=bgr8, and frame rate=30 frames per second.

In [None]:
self.camera.start()

In [None]:
    def start(self):
       self.profile = self.pipeline.start(self.config)   # Start the pipeline
        # Obtain depth sensor and calculate depth scale
        depth_sensor = self.profile.get_device().first_depth_sensor()
        self.depth_scale = depth_sensor.get_depth_scale()


.self.profile = self.pipeline.start(self.config)
According to the previously defined depth and image configuration config, set the camera hardware parameters and open the video stream according to the configured parameters.


.depth_sensor = self.profile.get_device().first_depth_sensor()
Get a depth sensor object from the currently configured RealSense device.


.self.depth_scale = depth_sensor.get_depth_scale()
Obtain the depth scale of the depth sensor. This scale is a floating-point number representing the conversion from pixel values in the depth image to actual physical distances.

self.depth_scale is the parameter used to create self.processing = Processing(self.camera.depth_scale).

With this, the setup and startup of the Camera are complete. Through the pipeline, data is transmitted to the Jetson Nano with the depth data stream configured to width=640 pixels, height=480 pixels, data format=z16, and frame rate=30 frames per second, and the image data stream configured to width=640 pixels, height=480 pixels, data format=bgr8, and frame rate=30 frames per second.


###   3. &nbsp; &nbsp;Object Recognition Process of the VAIC Neural Network
To introduce the object recognition inference process of the YOLOv3 neural network, we need to return to the main program, which is the entry point for the entire AI control program.

In [None]:
if __name__ == "__main__":
    app = MainApp()  # Create the main application
    app.run()  # Run the application


In the initialization of MainApp(), we temporarily ignore:

.self.v5 = V5SerialComms()<br>
.self.v5Map = MapPosition()<br>
.self.v5Pos = V5GPS()<br>
.self.v5Web = V5WebData(self.v5Map, self.v5Pos)<br>
.self.stats = Statistics(0, 0, 0, 640, 480, 0, False)<br>
.self.rendering = Rendering(self.v5Web)<br>
app.run() is a loop function. In app.run(), we temporarily ignore:<br>
.self.v5.start()<br>
.self.v5Pos.start()<br>
.self.v5Web.start()<br>
This does not mean that these functions are not important.<br>
 If any of these functions do not execute properly, the system will not work. However, these functions are conventional and auxiliary, and they are not directly related to AI object recognition technology. They will all be explained in detail later.<br>

Enter the loop of app.run().

In [None]:
try:
    while True:
        start_time = time.time()  # start time of the loop
        frames = self.camera.get_frames()
        depth_image, color_image, depth_map = self.processing.process_frames(frames)
        invoke_time = time.time()
        output, detections = self.processing.detect_objects(color_image)
        invoke_time = time.time() - invoke_time
        aiRecord = self.processing.compute_detections(self, detections, depth_image)
        self.set_v5(aiRecord)
        self.rendering.set_images(output, depth_map)
        self.rendering.set_detection_data(aiRecord)
        self.rendering.set_stats(self.stats, self.v5Pos, start_time, invoke_time, run_time)
        # self.rendering.display_output(output)
finally:
    self.camera.stop()


.start_time = time.time()
Saves the start time of each loop.

.frames = self.camera.get_frames()
Extracts frame images from the pipeline:


def get_frames(self):
    return self.pipeline.wait_for_frames()
wait_for_frames() means waiting for the image frames to refresh and extracting image and distance data between the intervals of two frames.


.depth_image, color_image, depth_map = self.processing.process_frames(frames)
The function is as follows:

Frame alignment involves aligning the depth matrix from infrared ranging with the positions of the transformed color camera image, giving each point in the scene image a depth value. The scene image is in color_image, and the depth is saved in depth_image. Using depth_image, normalization is performed, and a color map (COLORMAP_JET) is applied to generate a colored depth map (depth_map).

The function's purpose is to extract the depth image and color image from the aligned frames, then normalize and apply a color map to the depth image, and return these three images.

In [None]:
    def process_frames(self, frames):
        # Align frames and extract color and depth images
        # Apply a color map to the depth image
        self.align_frames(frames)
        depth_image = np.asanyarray(self.depth_frame_aligned.get_data())
        color_image = np.asanyarray(self.color_frame_aligned.get_data())
        depthImage = cv2.normalize(depth_image, None, alpha=0.01, beta=255, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8U)
        depth_map = cv2.applyColorMap(depthImage, cv2.COLORMAP_JET)

        return depth_image, color_image, depth_map

In [None]:
        invoke_time = time.time()
        output, detections = self.processing.detect_objects(color_image)
        invoke_time = time.time() - invoke_time


The difference between two invoke_time instances is the time taken for one object detection.
.output, detections = self.processing.detect_objects(color_image)<br>
This is the detection function where the input is the color image, and the output is the detection results.

In [None]:
    def detect_objects(self, color_image):
        # Perform object detection and return results using the Model class in model.py
        output, detections = self.model.inference(color_image)
        return output, detections


The function output, detections = self.model.inference(color_image) calls the following function.<br> Inference is the process of forward propagation in a trained neural network, which refers to the neural network's working state. It needs a relatively detailed explanation. <br>Here are all the statements of the function, analyzed one by one:

In [None]:
def inference(self, inputImage):
    # Perform inference on the given image and return the bounding boxes, scores, and
    # classes of detected objects.

    # Process the input image
    input_image = Model.image_processing(inputImage)

    # Define input resolution and create preprocessor
    input_resolution_yolov3_HW = (320, 320)
    preprocessor = PreprocessYOLO(input_resolution_yolov3_HW)

    # Process the image and get original shape
    image_raw, image = preprocessor.process(input_image)
    shape_orig_WH = image_raw.size

    # Define output shapes for post-processing
    output_shapes = [(1, 24, 10, 10), (1, 24, 20, 20)]

    # Set the input and perform inference
    self.inputs[0].host = image
    trt_outputs = common.do_inference_v2(self.context, bindings=self.bindings,
                inputs=self.inputs,outputs=self.outputs,stream=self.stream)



        # Reshape the outputs for post-processing
        trt_outputs = [output.reshape(shape) for output, shape in zip(trt_outputs,
                                                                      output_shapes)]

        #Define arguments for post-processing
        postprocessor_args = {
            "yolo_masks": [(3, 4, 5), (0, 1, 2)],
            "yolo_anchors": [(10, 14), (23, 27), (37, 58), (81, 82), (135, 169), (344, 319)],
            "obj_threshold": [0.4, 0.90, 0.90],  # Different thresholds for each class
            #label (Green, Red, Blue)
            "nms_threshold": 0.5,
            "yolo_input_resolution": input_resolution_yolov3_HW,
        }#

        # Perform post-processing
        postprocessor = PostprocessYOLO(**postprocessor_args)
        boxes, classes, scores = postprocessor.process(trt_outputs, (shape_orig_WH))

        Detections = []

        # Handle case with no detections
        if boxes is None or classes is None or scores is None:
            print("No objects were detected.")
            return input_image, Detections

        # Draw bounding boxes and return detected objects
        obj_detected_img = Model.draw_bboxes(image_raw, boxes, scores, classes,
                                             ALL_CATEGORIES, Detections)
        obj_detected_img = obj_detected_img.convert("RGB")
        return np.array(obj_detected_img), Detections


input_image = Model.image_processing(inputImage)

Its purpose is to enhance the image by adjusting its hue, saturation, and brightness.

.hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)<br>
Converts the image to the HSV color space. This uses OpenCV's cvtColor function to convert the input BGR image (the default format for images read by OpenCV) to the HSV color space. The HSV color space consists of three components: Hue, Saturation, and Value, which are more suitable for processing color information.

.hsv[..., 0] = hsv[..., 0] + 12<br>
Modifies the hue channel of the HSV image by increasing each pixel value by 12. This typically shifts the overall color of the image towards a particular hue.

.hsv[:, :, 1] = np.clip(hsv[:, :, 1] * 1.2, 0, 255)
First multiplies the saturation channel values by 1.2 to increase saturation, then uses np.clip to ensure these values remain within the range of 0 to 255. Increasing saturation usually makes the colors in the image more vivid.

hsv[:, :, 2] = np.clip(hsv[:, :, 2] * 1.1, 0, 255)
Similarly adjusts the brightness channel by multiplying by 1.1. This increases brightness, but again uses np.clip to ensure values stay within the valid range.

.return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)<br>
Converts the image back to the BGR color space. After the modifications, the hsv variable now contains the modified HSV image. To display it on standard monitors or applications, it needs to be converted back to BGR format. The cvtColor function is used to convert the HSV image back to BGR format, and the processed image is returned.

.input_resolution_yolov3_HW = (320, 320)<br>
.preprocessor = PreprocessYOLO(input_resolution_yolov3_HW)<br>

Creates an instance of `Class PreprocessYOLO` with a resolution of 320x320, according to the standard input image resolution for YOLOv3. For details, refer to `data_processing.py`.

.image_raw, image = preprocessor.process(input_image)<br>
.shape_orig_WH = image_raw.size<br>
The preprocessor.process() calls _load_and_resize(self, input_image) and _shuffle_and_normalize(self, image) functions, transforming the input image to 320x320 PIL format for inference. image_raw is converted to PIL format and retains its original resolution for dynamic display. For details, refer to data_processing.py.
.output_shapes = [(1, 24, 10, 10), (1, 24, 20, 20)]<br>
Defines the output format of YOLOv3. This includes the shapes of the YOLOv3 model's outputs. Typically, the YOLOv3 model has multiple output layers, each corresponding to different scales of feature maps.

Here, there are two output layers with shapes (1, 24, 10, 10) and (1, 24, 20, 20). These shapes usually mean:

The first dimension (1) is the batch size, indicating how many images are processed at once (in this case, 1 image).<br>
The second dimension (24) is the number of anchors per layer multiplied by the number of predicted object parameters (object center coordinates x, y, predicted bounding box width w, height h, confidence, and class probabilities for tri-colored balls, 3x(5+3)=24).<br> Anchors will be defined and explained later.
The remaining two dimensions (10x10 and 20x20) are the sizes of the feature maps, indicating the number of grids at different scales.<br>

In [None]:
input_resolution_yolov3_HW = (320, 320)
preprocessor = PreprocessYOLO(input_resolution_yolov3_HW)

In [None]:
 # Set the input and perform inference
        self.inputs[0].host = image
        trt_outputs = common.do_inference_v2(self.context, bindings=self.bindings,
                     inputs=self.inputs,outputs=self.outputs, stream=self.stream)

        # Reshape the outputs for post-processing
        trt_outputs = [output.reshape(shape) for output, shape in zip(trt_outputs,
                                                        output_shapes)]


Using the YOLOv3 visual neural network to perform object recognition on real-time acquired scene images:


.self.inputs[0].host = image
The image is delivered to the CPU memory space allocated for inference. In CUDA programming, host refers to the CPU, and driver refers to the GPU.


.trt_outputs = common.do_inference_v2(self.context, bindings=self.bindings, inputs=self.inputs, outputs=self.outputs, stream=self.stream)<br>
This calls the common.do_inference_v2() function provided by NVIDIA. trt_outputs are the raw outputs obtained from the YOLOv3 model, typically containing multiple lists or tuples with the model's detection results for the input image. <br>
self.context is the YOLOv3 network context introduced earlier. bindings are arrays that bind the data between the host memory (CPU) and the device memory (GPU), ensuring that during inference, input data is correctly transferred to the device and output data is correctly returned to the host.<br>

Refer to the common.py file for the do_inference_v2() function, where you'll see a series of computational operations involving the CUDA platform to implement TensorRT. Here is a brief explanation of the calculations:


.[cuda.memcpy_htod_async(inp.device, inp.host, stream) for inp in inputs]<br>
Copies the detection image from the CPU to the GPU for executing YOLOv3 network TensorRT forward propagation in CUDA platform programming.

.context.execute_async(batch_size=batch_size, bindings=bindings, stream_handle=stream.handle)<br>
Executes the inference computation.<br>

.[cuda.memcpy_dtoh_async(out.host, out.device, stream) for out in outputs]<br>
Copies the recognition results from the GPU's out.device to the CPU's out.host.<br>

.stream.synchronize()<br>
Synchronizes data, waiting and checking for all operations on the GPU to complete.

It's important to note that the functions executed previously all have _async, meaning they are asynchronous functions. Asynchronous programming is a technique that allows a program to continue performing other tasks while waiting for an operation (such as an I/O operation) to complete, rather than blocking and waiting for the operation to finish. This can significantly improve the responsiveness and throughput of a program, especially when handling operations that involve waiting, such as CPU and GPU coordination, network requests, and file I/O.<br>

Calling stream.synchronize() waits for all queued operations on the GPU to complete. This is necessary because although the previous memory copies and inference operations are asynchronous, it is important to ensure they have been properly completed before finishing the inference process.<br>

.trt_outputs = [output.reshape(shape) for output, shape in zip(trt_outputs, output_shapes)]<br>
Reshapes each element in the trt_outputs list (assuming they are NumPy arrays or similar iterable objects) according to the corresponding shape in the output_shapes list. The reshaped data is collected into a new list and assigned to trt_outputs. Note that this effectively modifies the trt_outputs list in place according to the shapes in output_shapes.<br>


First, we need to explain the post-processing parameters postprocessor_args, as shown below:

.yolo_masks: YOLOv3 uses feature maps of different scales to detect objects of different sizes. Each feature map is responsible for detecting objects of a specific size. yolo_masks defines the indices of the anchors for each feature map. In this example, there are two feature maps.<br>
 The first feature map uses anchor indices (3, 4, 5), and the second feature map uses anchor indices (0, 1, 2).<br>

.yolo_anchors: Anchors are a predefined set of width-height ratios used as the initial shapes for predicting object bounding boxes. YOLOv3 uses these anchors to predict the offsets of the actual bounding boxes relative to these initial shapes. Here, six anchors are defined, each being a tuple of width and height.<br>

In simple terms: Anchors are rectangles that approximate the shape of the object to be recognized, distinguishing it from other objects with different width-height ratios. For the tri-colored balls, which have a width-height ratio of 1:1, the camera image is 640x480. Since YOLOv3's input image is 320x320, the width is compressed by half, and the height is compressed to 320/480, making the bounding box of the recognized object rectangular.<br>

.obj_threshold: [0.4, 0.90, 0.90] - Threshold for object color judgment; green=0.4, red=0.9, blue=0.9.<br>

.nms_threshold: Non-Maximum Suppression (NMS) threshold; NMS is a technique to reduce the number of overlapping bounding boxes. When the overlapping area (Intersection Over Union, IOU) of two bounding boxes is greater than the NMS threshold, the bounding box with the higher confidence is retained, and the one with the lower confidence is suppressed (i.e., deleted). This ensures that a single object is not detected multiple times.<br>

.yolo_input_resolution: input_resolution_yolov3_HW, 320x320, the input image size for YOLOv3.

.postprocessor = PostprocessYOLO(**postprocessor_args)
Creates an instance of Class PostprocessYOLO with the above arrays as parameters.

.boxes, classes, scores = postprocessor.process(trt_outputs, (shape_orig_WH))<br>
Processes the YOLOv3 outputs generated from TensorRT forward propagation. This post-processes these outputs and returns lists of bounding boxes for the detected objects along with their respective classes and confidence scores, placing them in different lists. The function's boxes, classes, and scores are the three variables unpacked from the return values of the postprocessor.process method.<br>

That is, the bounding boxes, classes, and confidence scores extracted from the output of the object detection model. shape_orig_WH contains the original input image's width and height (in pixels). This information is crucial when converting the bounding box coordinates in the model's output from normalized coordinates or relative coordinates back to the original image's coordinates.<br>

In [None]:
postprocessor_args = {
    "yolo_masks": [(3, 4, 5), (0, 1, 2)],
    "yolo_anchors": [(10, 14), (23, 27), (37, 58), (81, 82), (135, 169), (344, 319)],
    "obj_threshold": [0.4, 0.90, 0.90],  # Different thresholds for each class label
    #(BlueGreen, Red, )
    "nms_threshold": 0.5,
    "yolo_input_resolution": input_resolution_yolov3_HW,
}
postprocessor = PostprocessYOLO(**postprocessor_args)
boxes, classes, scores = postprocessor.process(trt_outputs, (shape_orig_WH))

Detections = []

# Handle case with no detections
if boxes is None or classes is None or scores is None:
    print("No objects were detected.")
    return input_image, Detections

# Draw bounding boxes and return detected objects
obj_detected_img = Model.draw_bboxes(image_raw, boxes, scores, classes, ALL_CATEGORIES,
                                     Detections)
obj_detected_img = obj_detected_img.convert("RGB")
return np.array(obj_detected_img), Detections

<img src="./image/sigmoid-1.png" width="400" height="300" >


    图-1 sigmoid函数


Refer to the data_processing.py file for the PostprocessYOLO class and its def process(self, outputs, resolution_raw) function. It sequentially calls the _reshape_output, _process_feats, and _process_feats functions, which transform the recognition tensors, including transposition and reshaping. These functions arrange the recognized objects' information according to the bounding boxes, classes, and confidence scores.<br>

A special mention is needed for the calculation of the sigmoid activation function. The sigmoid function is a commonly used activation function in machine learning and deep learning.<br>

$$\\sigmoid(x) = 1/{(1 + e^{-x})}$$ 

Especially in logistic regression and neural networks, it can map any real number to a range between 0 and 1. Therefore, it is often used as the activation function of the output layer to interpret the output as a probability. As mentioned earlier, each tri-colored ball has three classifications, and after processing by the sigmoid function, it becomes the probability of each classification. The function graph is shown in Figure-1.<br>

Finally, the Model's own draw_bboxes function is called to perform the final processing of the recognized objects.<br>
.obj_detected_img = Model.draw_bboxes(image_raw, boxes, scores, classes, ALL_CATEGORIES, Detections)<br>
The function is defined as follows. It uses Python's PIL (Python Imaging Library) image processing function library, which is imported at the beginning of the model.py file with the statement: <br>
from PIL import ImageDraw<br>

It extracts the center and diagonal coordinates of the bounding box for each recognized object and organizes the recognition data of each object into the Detections array. The max and min functions control the bounding box to stay within the image boundary when only part of the object is on the image.

After completing draw_bboxes, it calls obj_detected_img = obj_detected_img.convert("RGB") to convert the detected image to RGB format for use in subsequent programs. This completes the recognition process.

In [None]:
def draw_bboxes(image_raw, bboxes, confidences, categories, all_categories, Detections,
                bbox_color="white"):
        # Draw bounding boxes on the original image and return it.

        # Create drawing context
        draw = ImageDraw.Draw(image_raw)

        # Draw each bounding box
        for box, score, category in zip(bboxes, confidences, categories):
            x_coord, y_coord, width, height = box
            left = max(0, np.floor(x_coord + 0.5).astype(int))
            top = max(0, np.floor(y_coord + 0.5).astype(int))
            right = min(image_raw.width, np.floor(x_coord + width + 0.5).astype(int))
            bottom = min(image_raw.height, np.floor(y_coord + height + 0.5).astype(int))

            # Draw the rectangle and text
            # draw.rectangle(((left, top), (right, bottom)), outline=bbox_color)
            # draw.text((left, top - 12), "{0} {1:.2f}".format(all_categories[category], score),
            # fill=bbox_color)

            # Create and store the raw detection object
            raw_detection = rawDetection(int(left), int(top), [x_coord, y_coord], int(width),
                                         int(height), score,category)

            Detections.append(raw_detection)

        return image_raw


This completes the entire execution process of output, detections = self.processing.detect_objects(color_image), which is the main content of the AI technology introduced in VAIC_23_24.

It should be noted that each recognized object is organized into the Detections array in the following data format: <br>
left position of the bounding box, top position of the bounding box, [center x coordinate, center y coordinate], bounding box width, bounding box height, confidence score, Recognition class.<br>

The next section will introduce how to use the AI-recognized object data combined with the real-time field positioning provided by VEX GPS from the Vex Brain, convert it to world coordinate system coordinates, and feedback to the Brain to control the Vex 5 robot.

###   4.&nbsp; &nbsp;Transmission of Object Recognition Data<br>

Returning to the main program in overrunder.py, we first provide additional explanations for the other functions that were previously set aside in the initialization of class MainApp:

.self.v5 = V5SerialComms()<br>
Establishes the USB channel for transmitting recognized object data from Jetson to Brain and for receiving VexGPS field positioning data from Brain to Jetson.<br>
.self.v5Map = MapPosition()<br>
Provides the Tait-Bryan transformation matrix for converting recognition data from camera coordinates to physical field coordinates, as well as the calculation parameters and methods for the camera installation position.<br>
.self.v5Pos = V5GPS()<br>
Creates a V5GPS instance on the Jetson side and creates a thread for receiving GPS data sent from Brain, which is used to run GPS data reading and processing, including establishing connections, data reading, coordinate conversion, and status processing. It also sets local variables for GPS offsets.<br>
.self.v5Web = V5WebData(self.v5Map, self.v5Pos)<br>
Establishes a web server to transmit updated data in real-time for remote terminals to display the coordinates and categories of recognized objects on the field and the actual game scene in real-time.<br>
.self.stats = Statistics(0, 0, 0, 640, 480, 0, False)<br>
Initializes statistical attributes such as frame rate (FPS), CPU temperature, video dimensions, runtime, and GPS connection status.<br>
self.rendering = Rendering(self.v5Web)<br>
Through a web link, sends real-time images of the field and object recognition data from the Jetson side to be synchronously displayed on a remote terminal. As shown in Figure 2.<br>

In the V5Comm.py file, the related data structures and methods for the above functionalities are defined for reference when implementing these functions. When explaining the following functions, further details will be provided as necessary.<br>

It is necessary to briefly explain the mechanism for receiving GPS data sent from Brain to Jetson and transmitting the aiRecord position data from Jetson to Brain. Both processes use threads.<br>
A thread is a real-time running background program that, once started, runs automatically.<br>

Apart from sharing the power supply and being mounted on the same robot cart, the transmission and reception of GPS data, and the transmission and reception of AI-recognized object position data, are the only links between the Brain and Jetson systems. <br>
The sender of GPS data is Brain, and the receiver is Jetson, while the sender of aiRecord recognized object position data is Jetson, and the receiver is Brain. <br>
In the V5Comm.py and V5Position.py files, <br>
.self.__thread = threading.Thread(target=self.__run, args=()) <br>
is called to create their respective threads that call the thread target function __run(), each with different functionalities.

Ignoring the explanations of other auxiliary functions within the threads, the __run() function of V5GPS contains data = self.__ser.read_until(b'\xCC\x33'), which indicates receiving data until the termination symbol b'\xCC\x33' appears.<br>

To briefly explain the data transmission process in the __run() function of V5SerialComms, the program segment for data transmission is as follows:

First, the thread reads whether there is a data transmission request from the Brain side.<br>

.data = self.__ser.readline().decode("utf-8").rstrip()<br>
Reads the command sent from the Brain side from the port. When the command "AA55CC3301" appears, it indicates that the Brain side requests data transmission.
Ignoring the requests for thread time slices, unlocking, and other routine operations,
.self.__ser.write(data)<br>
Indicates sending all the data to Brain.
The technical details of when and how the Brain side sends the data request command will be explained during the discussion of the Brain-side C++ program.<br>

In [None]:
    while self.__started:  # Continue reading while thread is started
        # Read data from the serial port
        data = self.__ser.readline().decode("utf-8").rstrip()
        if(data == "AA55CC3301"):
            self.__detectionLock.acquire()
            myPacket = V5SerialPacket(self.__MAP_PACKET_TYPE, self.__detections)
            self.__detectionLock.release()
            data = myPacket.to_Serial()
            self.__ser.write(data)  # Write serialized data to the serial port


The sending of GPS data is completed by the C++ program running on the Brain, while the thread on the Jetson side receives it. Similarly, ignoring the explanations of other auxiliary functions within the thread, the receiving program segment is as follows:<br>
.data = self.__ser.read_until(b'\xCC\x33')<br>
The thread program reads the port until the start symbol '\xCC\x33' appears.<br>
.x, y, z, az, el, rot = struct.unpack(b'hhhhhh', data[2:14])<br>
Unpacks the position and angle data from the received data packet. <br>  

In [None]:
    data = self.__ser.read_until(b'\xCC\x33')
    if(len(data) == 16):
        self.__frameCount = self.__frameCount + 1
        status = data[1]
        x, y, z, az, el, rot = struct.unpack('<hhhhhh', data[2:14])


Now, discussing the subsequent processing of the recognized object data, the main loop program app.run() executes the following after completing object recognition with processing.detect_objects(color_image).

In [None]:
aiRecord = self.processing.compute_detections(self, detections, depth_image)


As noted in the comments of the def compute_detections(self, v5, detections, depth_image) function in overunder.py, an AIRecord is created, and the detection results are computed using depth and image data.

The aiRecord is a data structure that contains control information sent from Jetson to Brain. aiRecord.detections includes the ClassID (category ID), Probability, and depth information for each detected target within the camera's field of view. Additionally, it contains the camera image detection and the target's location information on the field. The program list is as follows:

In [None]:
def compute_detections(self, v5, detections, depth_image):
        # Create AIRecord and compute detections with depth and image data.
        # Each AIRecord contains the ClassID, Probablity, and depth information for each detection
        # In addition to the detection's camera image and map position information.
        aiRecord = V5Comm.AIRecord(v5.get_v5Pos(), [])
        for detection in detections:
            depth = self.get_depth(detection, depth_image)
            imageDet = V5Comm.ImageDetection(
                int(detection.x),
                int(detection.y),
                int(detection.Width),
                int(detection.Height),
            )
            mapPos = v5.v5Map.computeMapLocation(detection, depth, aiRecord.position)
            mapDet = V5Comm.MapDetection(mapPos[0], mapPos[1], mapPos[2])
            detect = V5Comm.Detection(
                int(detection.ClassID),
                float(detection.Prob),
                float(depth),
                imageDet,
                mapDet,
            )
            aiRecord.detections.append(detect)
        return aiRecord


The aiRecord is created with the following line in overunder.py:<br>
aiRecord = V5Comm.AIRecord(v5.get_v5Pos(), [])<br>
Refer to the definition and initialization of AIRecord in V5Comm.py.

    

In [None]:
def __init__(self, position: Position, detections: "list[Detection]")


The first parameter is the Position obtained by v5.get_v5Pos(), and the second parameter is a list[Detection]. When creating the aiRecord instance, two variables are introduced: the position data from VexGPS and the object recognition data from YOLOv3.

As explained in the previous section, the detections array contains multiple objects (tri-colored balls) recognized within the camera's field of view. The following loop calculates the distance for each recognized object.

The data obtained from VEXGPS includes:

1.frameCount: Data frame count.<br>
2.status: Communication status; a normal status indicates reliable data.<br>
3.x: X-coordinate of the robot's rotation center on the field.<br>
4.y: Y-coordinate of the robot's rotation center on the field.<br>
5.z: Z-coordinate of the robot's rotation center on the field.<br>
6.azimuth: Azimuth angle, which is the angle measured clockwise along the horizontal plane from a reference direction (usually north or south) to a point. In navigation, the azimuth is used to determine the direction of a point relative to the observer.<br>
7.elevation: Elevation angle, which is the angle from the horizon up to the target point. Combined with the azimuth, it can precisely locate celestial bodies or other objects. In aviation and satellite communication, the elevation angle is used to describe the inclination of an aircraft or satellite relative to the horizon.<br>
8.rotation: Rotation angle, which is the rotation about the observation axis, indicating the rotation of the axis pointing from the ground plane to the zenith around itself. In photogrammetry, machine vision, and robotics, it is used to correct and adjust the orientation of images and sensors.<br>
The use of these data will be detailed during the coordinate transformation calculations.

        

In [None]:
self.frameCount = frameCount
self.status = status
self.x = x
self.y = y
self.z = z
self.azimuth = azimuth
self.elevation = elevation
self.rotation = rotation


In the frame alignment function, the depth (distance from the camera's imaging screen) for each point in the color scene image is already stored in depth_image.

The get_depth(detection, depth_image) function extracts the depth data from the depth_image for the center 10% area of the detection's bounding box and calculates the arithmetic mean to use as the distance data for that object (refer to the def get_depth(self, detection, depth_img) function in overunder.py).

imageDet represents the image area of the detected object, with the center coordinates (x, y) and the bounding box (Width, Height), which is part of the data transmitted to Brain.

In [None]:
mapPos = v5.v5Map.computeMapLocation(detection, depth, aiRecord.position)


Calculate the data of each detected object's world coordinates on the field, which will be transmitted to Brain. Please refer to the def computeMapLocation(self, detection, depth, position) function in V5MapPosition.py.

By multiplying the relative position of the detected object on the screen with the transformation matrix of the Brian GPS field position, the result of the matrix multiplication gives the physical position of the detected object in the 3D space of the field relative to the robot.

Here is a brief explanation of this function:

The computeMapLocation() function starts with setting some constants. The length dimensions and position data for the camera and GPS on the Vex robot are read from camera_offsets.json and gps_offsets.json. Please refer to the__init__(self, v5Map, v5Pos, port=None) function definition in class V5WebData in V5Web.py and the main program self.v5Web = V5WebData(self.v5Map, self.v5Pos). <br>
These data are determined by the installation positions of the camera and GPS devices on the robot cart and are pre-measured by the designer and recorded in the above files.<br>

Next, a rotation transformation matrix is created. The rotation calculation method composed of the azimuth (Azimuth), elevation (Elevation), and twist (Twist) angles is called the Tait-Bryan rotation composite matrix.<br>


The Tait-Bryan rotation composite matrix (also known as the Euler rotation matrix) has applications in multiple fields, especially when describing the rotation of objects or coordinate systems in three-dimensional space. Here are some specific application scenarios:

Aerospace and Aeronautical Engineering: In aerospace and aeronautical fields, the attitude of an aircraft is typically described using Tait-Bryan angles (yaw, pitch, and roll). These three angles correspond to the rotation of the aircraft around its vertical, lateral, and longitudinal axes, respectively. Using the Tait-Bryan rotation composite matrix, one can conveniently calculate the direction of the aircraft in different attitudes and perform attitude control and navigation.

Robotics: In robotics, the end-effector (such as an arm or gripper) needs to be precisely positioned and rotated in three-dimensional space. The Tait-Bryan rotation composite matrix can calculate the attitude of the end-effector relative to the robot base, enabling precise pose control.

Computer Graphics: In computer graphics, the rotation and transformation of objects in a three-dimensional scene is a critical topic. The Tait-Bryan rotation composite matrix can be used to calculate the rotation matrix of objects in three-dimensional space, thereby achieving object rotation and transformation. This has widespread applications in 3D games, virtual reality, and animation.

Physical Simulation: In physical simulations, such as molecular dynamics simulations and rigid body dynamics simulations, it is necessary to simulate the movement and rotation of objects in three-dimensional space. The Tait-Bryan rotation composite matrix can calculate the attitude of objects at different times, thereby simulating the movement and rotation process of objects.

In summary, the Tait-Bryan rotation composite matrix has extensive applications in describing the rotation of objects or coordinate systems in three-dimensional space. It provides a convenient and efficient way to calculate rotation matrices, enabling precise positioning and rotation control of objects in three-dimensional space.



$
R_{az,el,tw}= <br><br><br>
$
<br><br>
$
\begin{bmatrix}
\cos(az)\cos(tw) + \sin(az)\sin(el)\sin(tw) & \sin(az)\cos(el) & \cos(az)\sin(tw) - \sin(az)\sin(el)\cos(tw) \\
-\sin(az)\cos(tw) + \cos(az)\sin(el)\sin(tw) &
\cos(az)\cos(el) &
 -\sin(az)\sin(tw) - \cos(az)\sin(el)\cos(tw) \\
-\cos(el)\sin(tw) &
\sin(el) &
\cos(el)\cos(tw)
\end{bmatrix} 
$

Calculating the position vector of the target object in the camera coordinate system.

In [None]:
vector = np.zeros(shape=(3, 1))
vector[0] = depth * (detection.Center[0] - MAXSCREENX) / REALDIST
vector[1] = depth
vector[2] = depth * (MAXSCREENY - detection.Center[1]) / REALDIST

The product of the distance vector and the transformation matrix transforms the vector into world space. By multiplying the relative position of the object on the screen with the robot's perspective information, the result of the matrix multiplication gives the physical position of the recognized object in the 3D space relative to the robot.<br>

In [None]:
mapLocation = np.matmul(rot, vector)

Overlay the current robot position coordinates on the field and convert them to the world coordinate system. The position coordinates are relative to the camera's center point.<br>

In [None]:
mapLocation[0] += position.x
mapLocation[1] += position.y
mapLocation[2] += position.z

Transform the position coordinates to the robot's pivot center using the camera installation position's x, y, z distances to the robot's pivot center ([CAMERAOFFSETX], [CAMERAOFFSETY], [CAMERAOFFSETZ]).

In [None]:
 cameraOffset = np.array([[CAMERAOFFSETX], [CAMERAOFFSETY], [CAMERAOFFSETZ]])
rotatedCameraOffset = np.matmul(rot, cameraOffset)

        # Add the adjusted camera offset
mapLocation[0] += rotatedCameraOffset[0]
mapLocation[1] += rotatedCameraOffset[1]
mapLocation[2] -= rotatedCameraOffset[2]  # Subtract Z offse

After the calculation, return the position of the robot car's pivot center in the field coordinate system. Finally, return mapPos=(mapLocation[0], mapLocation[1], mapLocation[2]).<br>

In [None]:
 return mapLocation

Return to the main program's compute_detections loop:<br>
Combine the classification, confidence, distance depth, recognized object's image center coordinates, and bounding box width and height (camera field coordinates), and the recognized object's world (competition field) coordinates into a single detect.<br>

In [None]:
mapDet = V5Comm.MapDetection(mapPos[0], mapPos[1], mapPos[2])
detect = V5Comm.Detection(
    int(detection.ClassID),
    float(detection.Prob),
    float(depth),
    imageDet,
    mapDet,
                           )
    aiRecord.detections.append(detect)
    return aiRecord



Back to the main loop of the compute_detections function:
.mapDet = V5Comm.MapDetection(mapPos[0], mapPos[1], mapPos[2])  # Array mapDet contains the object's location coordinates (X, Y, Z) on the field.

.detect = V5Comm.Detection(<br>
&nbsp; &nbsp;int(detection.ClassID),<br>
&nbsp; &nbsp;float(detection.Prob),<br>
&nbsp; &nbsp;float(depth),<br>
&nbsp; &nbsp;imageDet,<br>
&nbsp; &nbsp;mapDet,<br>
           )<br>
Combine the classification, confidence, distance depth, recognized object's image center coordinates and bounding box width and height (camera field coordinates), and the recognized object's world (competition field) coordinates into a single detect.

.aiRecord.detections.append(detect)<br>
Append the detect to aiRecord.detections.<br>
After the loop completes, the classification recognition and location coordinates of the objects recognized in the current camera image field are stored in aiRecord.detections, ready to be sent to Brian via USB connection.


In the main program's app.run(), the method self.set_v5(aiRecord) is responsible for sending the data. Since this involves the reception program on Brian's end written in C++, some preliminary explanation is required.

In [None]:
   def set_v5(self, aiRecord):
        # Set detection data to the Brain if it is connected but does not set any data if None
        if self.v5 is not None:
            self.v5.setDetectionData(aiRecord)


The set_v5(self, aiRecord) method calls the setDetectionData function in V5Comm.py. As previously mentioned, a thread for sending data to Brian is established through an instance of V5SerialComms.

The method self.v5.setDetectionData(self, data: AIRecord) is a function in V5SerialComms that can directly operate on its own thread, with the parameter data being AIRecord.

The thread calls the acquire method to attempt to obtain a lock. If the lock is not currently held by another thread (i.e., the lock is free), the thread acquires the lock and safely sends the data.

If the lock is already held by another thread, the acquire method will keep the calling thread in a loop until the lock is released, and then it sends the data. Once the data transmission is complete, the lock is immediately released.

In [None]:
    def setDetectionData(self, data: AIRecord):
        # Aquire lock and set detection data
        self.__detectionLock.acquire()
        self.__detections = data
        self.__detectionLock.release()


###    5.&nbsp; &nbsp;Jetson Remote Display and V5Web.py Program
To allow participants to dynamically monitor the competition field during the match, VAIC_23_24 has set up a real-time remote display module. The overrunder.py class Rendering continuously provides real-time data for remote display in the main program competition loop, with network transmission functionality defined in the V5Web.py file. VAIC_23_24 uses the WebSocket method for remote communication (refer to the import statement from websocket_server import WebsocketServer at the beginning of the file).

WebSocket communication is a standard protocol for computers and the internet, and the program is designed according to the WebSocket standard.<br>
 Here is a brief introduction to the basic methods for implementing network transmission:

.self.__server.run_forever(True) starts the WebSocket server's event loop, allowing it to begin listening for connections and processing incoming messages.<br>
In the WebSocket operating mechanism, a common pattern is for the server to set up callback functions to handle messages received from clients. These callback functions process the logic based on the commands or messages sent by the client and may send the requested data back to the client as a response.<br>
__new_client: Called when a new client connects.<br>
__client_left: Called when a client disconnects.<br>
__message_received: Called when the server receives a message from a client.<br>
During the initialization of the class MainApp in the main program, an instance of V5WebData is created with the parameters v5Map and v5Pos to set the CameraOffset for v5Map and the GPSOffset for v5Pos.<br>

In [None]:
self.v5Web = V5WebData(self.v5Map, self.v5Pos)


Then, create a WebSocketServer server and define three callback functions:

In [None]:
        self.__server = WebsocketServer(host = '0.0.0.0', port = self.__serverPort, loglevel = logging.INFO)
        self.__server.set_fn_new_client(self.__new_client)
        self.__server.set_fn_client_left(self.__client_left)
        self.__server.set_fn_message_received(self.__message_received)


self.__server.set_fn_new_client(self.__new_client): Creates a remote display client, sends data requests to the server, receives data from the server, and displays real-time dynamic images of the competition field.<br>
self.__server.set_fn_client_left(self.__client_left): Monitors the status of the transmission channel and issues an alert when disconnected.<br>
__message_received(self.__message_received): Upon receiving commands from the client, it calls the corresponding functions to send the display content to the remote server according to the command.<br>
Below is a snippet of the message_received part of the program for a simple explanation:<br>
Commands such as g_pos, g_detect, g_stats, g_depth, g_color, get_camera_offset, get_gps_offset, etc., are data request commands sent by the client to the server (VAIC_23_24 program does not provide any explanation for this). As shown below, the corresponding functions are called to return the respective types of data.<br>

In [None]:

def __message_received(self, client, server, message):
    .......
    cmds = message.split(",")
    for cmd in cmds:
        if(cmd == "g_pos"):
            outData['Position'] = self.__getPositionElement()
        elif(cmd == "g_detect"):
            outData['Detections'] = self.__getDetectionElement()
        elif(cmd == "g_stats"):
            outData['Stats'] = self.__getStatsElement()
        elif(cmd == "g_depth"):
            outData['Depth'] = self.__getDepthElement()
        elif(cmd == "g_color"):
            outData['Color'] = self.__getColorElement()
        elif(cmd == "get_camera_offset"):
            outData['CameraOffset'] = self.__getCameraOffset().__dict__
        elif(cmd == "get_gps_offset"):
            outData['GpsOffset'] = self.__getGpsOffset().__dict__
    outData = self.convert_numpy_to_list(outData)
    server.send_message(client, json.dumps(outData))



From the program's operation process, it can be seen that the Jetson end recognizes the coordinate positions and distances of objects on the field through a neural network. The data transmission thread sends this data to Brian in real time. Real-time data needs to be placed in a shared space. To prevent conflicts between simultaneous writing and reading of data, the main program starts by creating interlocked variables with self.v5Web.start():

self.__detections: Position and distance data of the recognized objects.<br>
self.__colorImage: Color image.<br>
self.__depthImage: Depth image.<br>
self.__stats: Statistical data.<br>
self.__dataLock: Interlock variable.<br>

In [None]:
    self.__detections = AIRecord(Position(0, 0, 0, 0, 0, 0, 0, 0) [])
    self.__colorImage = None
    self.__depthImage = None
    self.__stats = Statistics(0, 0, 0, 0, 0, 0, False)
    self.__dataLock = Lock()
    self.__server.run_forever(True)

In the main program's run method, these variables are refreshed in each loop iteration.

In [None]:
    self.set_v5(aiRecord)
    self.rendering.set_images(output, depth_map)
    self.rendering.set_detection_data(aiRecord)
    self.rendering.set_stats(self.stats, self.v5Pos, start_time, invoke_time, run_time)

For example, in the run() method, the current color and depth images of the field of view are updated:

In [None]:
self.rendering.set_images(output, depth_map)


Using the methods from the Rendering class:

In [None]:
def set_images(self, output, depth_image):
    # Update web data with color and depth images
    self.web_data.setColorImage(output)
    self.web_data.setDepthImage(depth_image)

Calling the functions from V5Web, note that the defined interlock __dataLock is used:

In [None]:
def setColorImage(self, image):
    # Updates the color image data
    self.__dataLock.acquire()
    self.__colorImage = image
    self.__dataLock.release()

def setDepthImage(self, image):
    # Updates the depth image data
    self.__dataLock.acquire()
    self.__depthImage = image
    self.__dataLock.release()

This is because __message_received() also retrieves data from these spaces when sending image data.

In [None]:
    elif(cmd == "g_depth"):
        outData['Depth'] = self.__getDepthElement()
    elif(cmd == "g_color"):
        outData['Color'] = self.__getColorElement()

To avoid conflicts between simultaneous data refresh and retrieval, both data storage and retrieval use an interlock mechanism.

In [None]:
def __getColorElement(self):
        # Returns the color image data encoded in base64
        outData = {}
        imageData = {}

        self.__dataLock.acquire()
        pixelData = self.__colorImage
        self.__dataLock.release()

        if(len(pixelData) > 0):
            imageData['Valid'] = True
            imageData['Width'] = pixelData.shape[1]
            imageData['Height'] = pixelData.shape[0]
            imageData['Data'] = base64.b64encode(np.array(cv2.imencode(".jpeg", pixelData)[1]).tobytes()).decode('utf-8')
        else:
            imageData['Valid'] = False
            imageData['Error'] = "Image Unavailable"

        outData['Image'] = imageData

        return outData

The V5Web.py file also defines the statistical data that needs to be monitored on-site, such as CPU temperature, video resolution, working time, GPS connection status, etc. This is shown in the class Statistics:

In [None]:
def __init__(self, fps: float, invokeTime: float, cpuTemp: float, videoWidth: int,
                 videoHeight: int, runTime: int, gpsConnected: bool):
    # Initialize statistics attributes such as FPS, CPU temperature, video dimensions, runtime, and GPS connection status
    self.fps = fps
    self.invokeTime = invokeTime
    self.cpuTemp = cpuTemp
    self.videoWidth = videoWidth
    self.videoHeight = videoHeight
    self.runTime = runTime
    self.gpsConnected = gpsConnected

When the client makes a data request, the server sends this statistical data to the client.<br>

In [None]:
elif(cmd == "g_stats"):
    outData['Stats'] = self.__getStatsElement()

When preparing the data transmission packet, the interlock mechanism is also used:<br>

In [None]:
    def __getStatsElement(self):
        # Returns the current statistics elements
        outData = {}

        self.__dataLock.acquire()
        nowStats = self.__stats
        self.__dataLock.release()

        outData['FPS'] = nowStats.fps
        outData['InferTime'] = nowStats.invokeTime
        outData['VideoWidth'] = nowStats.videoWidth
        outData['VideoHeight'] = nowStats.videoHeight
        outData['RunTime'] = nowStats.runTime
        outData['GPSConnected'] = nowStats.gpsConnected
        outData['CPUTempurature'] = nowStats.cpuTemp

        return outData

The VAIC_23_24 software project defines various commands for client data requests but does not provide any technical documentation on the remote client operating mechanism. Participants need to program the client-side application themselves.

The remote display screen of the client is shown in Figure-2.

<img src="./image/实况.png" >


Figure-2: Real-time field view and object recognition position and distance data displayed on the remote terminal.

###    6.&nbsp; &nbsp;The Jetson receives GPS data from the Brian side with the following program:

The class V5GPS in V5Position.py is the functional module for Jetson to receive Vex GPS data sent from Brian.

After creating an instance of V5GPS, execute the start(self) function to start the receiving thread:

In [None]:
    def start(self):
        # Starts the GPS thread
        self.__started = True
        self.__thread = threading.Thread(target=self.__run, args=())
        self.__thread.daemon = True
        self.__thread.start()


The target function def __run(self) of the thread is used for reading and processing GPS data, including establishing a connection, reading data, coordinate transformation, and status processing. It also sets local variables for GPS offsets. Here, we will only explain the data reception part. The GPS data format transmitted from Brian is as follows:<br>

frameCount = frameCount<br>
status = status<br>
x = x<br>
y = y<br>
z = z<br>
azimuth = azimuth<br>
elevation = elevation<br>
rotation = rotation<br>
From the following fragment of the thread's target program, it can be seen that the data length is 16 bytes, and the data header is "xCC, x33". In the C++ program on Brian for sending Vex GPS data, which we will introduce below, we will find the same data header and the corresponding data structure mentioned above.

After reading the header, the thread increments the received frame count by one, then reads two bytes for the status and the corresponding three two-byte position coordinates and three two-byte angular values.<br>

In [None]:
    while self.__started:
        # Read data from the serial port
        data = self.__ser.read_until(b'\xCC\x33')
        if(len(data) == 16):
            self.__frameCount = self.__frameCount + 1
            status = data[1]
            x, y, z, az, el, rot = struct.unpack('<hhhhhh', data[2:14])


After dimensional transformation of the coordinates and conversion of the angles to radians, the azimuth angle is used to calculate the new offset. Then, the received data is stored in the shared space self.__position.

In [None]:
    if(status == 20):
        x, y = self.__filter.update(x, y)
        self.__positionLock.acquire()
        self.__position.x = x
        self.__position.y = y
        self.__position.z = z
        self.__position.azimuth = az
        self.__position.elevation = el
        self.__position.rotation = rot
        self.__position.status = localStatus
        self.__position.frameCount = self.__frameCount
        self.__positionLock.release()


What needs to be mentioned here is that the program performs smoothing on the input x and y coordinates:<br>
x, y = self.__filter.update(x, y)<br>
V5Position.py imports the definition of class LiveFilter from filter.py at the beginning of the file:<br>
from filter import LiveFilter<br>
During the initialization of the class MainApp in the main program, an instance self.v5Pos = V5GPS() is created.

In the__init__(self, port=None) method of V5GPS, the filter is created with a window size of 10:<br>

self.__filter = LiveFilter(10)<br>
from collections import deque<br>
self.x_buffer = deque(maxlen=window_size)<br>
self.y_buffer = deque(maxlen=window_size)<br>
In Python, deque is a double-ended queue class from the collections module.<br> 
It supports fast additions and deletions from both ends. When the number of elements in the deque reaches the window_size, adding a new element will automatically remove the oldest element in the queue.<br>

This way, the x_buffer and y_buffer hold the most recent window_size coordinate values. The update(self, x, y) method calculates their average and then sends it to the shared space for the x and y coordinates.<br>

In other words, the so-called current plane position coordinates are the average of the last ten GPS measurement coordinates. This smoothing of data effectively avoids GPS measurement errors and eliminates drastic data changes, allowing the robot controlled by this data to move smoothly and fluidly.<br>

In [None]:
from collections import deque
import numpy as np

class LiveFilter:
    def __init__(self, window_size=5, output_file='filtered_data_simple.txt'):
        self.window_size = window_size
        self.x_buffer = deque(maxlen=window_size)
        self.y_buffer = deque(maxlen=window_size)
        self.output_file = output_file

    def update(self, x, y):
        # Update x and y buffers
        self.x_buffer.append(x)
        self.y_buffer.append(y)

        # Compute the average of the current buffer
        filtered_x = np.mean(self.x_buffer)
        filtered_y = np.mean(self.y_buffer)

        # Write the filtered data to an output file
        with open(self.output_file, 'a') as f:
            f.write(f"{filtered_x}, {filtered_y}\n")

        return filtered_x, filtered_y

self.__position is the shared space where the receiving thread writes data and the main program reads it using get_v5Pos(). To avoid data read/write conflicts caused by simultaneous writing and reading, an interlock mechanism is used when the thread writes data.

In [None]:
    self.__positionLock.acquire()
    self.__position.x = x
    self.__position.y = y
    self.__position.z = z
    self.__position.azimuth = az
    self.__position.elevation = el
    self.__position.rotation = rot
    self.__position.status = localStatus
    self.__position.frameCount = self.__frameCount
    self.__positionLock.release()

In [None]:
    def getPosition(self):
        # Retrieves the current position
        self.__positionLock.acquire()
        nowPosition = self.__position
        self.__positionLock.release()

###    7.&nbsp; &nbsp;Detailed Explanation of the Brian-side C++ Control Program

The Brian-side C++ control program for VAIC_23_24 consists of several key components. This explanation builds upon the earlier brief introduction and refers to the "Introduction to VAIC Software".<br>

#####   7-1.  Comparison of Jetson and Brian-side Communication Data Formats



To facilitate communication with the Jetson, the Brian-side has added several data structures in ai_jetson.h and a namespace called ai. This namespace defines all the properties and methods needed for data reception, including constants, variables, and functional functions. These will be explained as needed in the following discussion.

Before explaining the Brian-side C++ program, let's first look at the data structure used for communication between both sides. Clearly, to ensure smooth communication, the communication data structures on both sides should be as similar as possible.

On the Jetson side, the GPS data received from Brian is represented as Position. See V5Comm.py's class Position, which has the following attribute format:

In [None]:
        self.frameCount = frameCount
        self.status = status
        self.x = x
        self.y = y
        self.z = z
        self.azimuth = azimuth
        self.elevation = elevation
        self.rotation = rotation

The data format for sending GPS data from Brian is defined in the file ai_jetson.h in the V5Example directory as follows:

In [None]:
typedef struct {
	int32_t		framecnt;
	int32_t		status;
	float	    x, y, z;
	float	    az;
    float       el;
    float       rot;
} POS_RECORD;


Since the Python language does not require explicit data type specifications for variables, the essence of the data structure is essentially the same.


Similarly, the recognition data sent from Jetson to Brian is defined on the Python side as follows:

In [None]:
detect = V5Comm.Detection(
            int(detection.ClassID),
            float(detection.Prob),
            float(depth),
            imageDet,
            mapDet,
  )

The definition of imageDet is:

In [None]:
            imageDet = V5Comm.ImageDetection(
                int(detection.x),
                int(detection.y),
                int(detection.Width),
                int(detection.Height),
            )


Additionally,

In [None]:
mapDet = V5Comm.MapDetection(mapPos[0], mapPos[1], mapPos[2])

On the Brian side, the structures for receiving data are defined as follows:

In [None]:
typedef struct {
  int32_t		      classID;	        // The class ID of the object.
  float           probability;      // The probability that this object is in this class (1.0 == 100%)
  float           depth;            // The depth of this object in meters from the camera
  IMAGE_DETECTION   screenLocation;   // The screen coordinates of this object
  MAP_DETECTION     mapLocation;      // The field coordinates of this object
} DETECTION_OBJECT;

Image Detection and Map Detection Data Structures:

In [None]:
typedef struct {
	int32_t		x, y;
	int32_t		width, height;
} IMAGE_DETECTION;
typedef struct {
  float     x;
  float     y;
  float     z;
} MAP_DETECTION;

Similarly, the data format is identical.


The final format for data exchange between both sides is AI_RECORD.

On the Jetson side, when creating an AIRecord instance, it is defined as a combination of class Position and class Detection. list[Detection] indicates that it is a list of Detection objects.

In Python, lists can be expanded at any time, so there is no need to limit the dimensions of list[Detection].

In [None]:
    def __init__(self, position: Position, detections: "list[Detection]"):
        self.position = position
        self.detections = detections

On the Brian side, AI_RECORD is defined as:

In [None]:
typedef struct {<br>
    int32_t		    detectionCount;
    POS_RECORD	        pos;
    DETECTION_OBJECT	detections[MAX_DETECTIONS];

} AI_RECORD;


Here, DETECTION_OBJECT detections[MAX_DETECTIONS] is defined as an array. Since the size of an array cannot be expanded dynamically in C++, it is limited by MAX_DETECTIONS.

Comparing the two, the Brian-side structure has an additional detectionCount to count the recognized objects. This is because, in Python, you can directly use len(detections) to get the number of recognized objects, so there is no need to set this count. However, in C++, you need to know the number of recognized objects beforehand to establish the extraction loop.

To accommodate this feature of C++, the Jetson side adds this count when packaging the data before sending it.

See the V5Comm.py class AIRecord method:

In [None]:
    def to_Serial(self):
        # Convert AIRecord properties to serialized binary format
        data = struct.pack('<i', len(self.detections))
        data += self.position.to_Serial()
        for det in self.detections:
            data += det.to_Serial()
        return data


.data = struct.pack('<i', len(self.detections)), packs the number of detections in detections into data as an int.

.data += self.position.to_Serial(), packs the position data into data.

for det in self.detections:<br>
&nbsp; &nbsp;data += det.to_Serial()<br>
Iterates through detections and packs all detect objects into data.

#####   7-2.  Brian-side Data Reception Process from Jetson sent<br>


The main program executes ai::jetson, jetson_comms; to create an instance of Jetson, and the instance initializes and starts a communication thread called Brian for receiving the target position information from Jetson. <br>
The thread function is receive_task and has a high priority.<br>

In [None]:
jetson::jetson() {
    state = jetson_state::kStateSyncWait1;
    thread t1 = thread( receive_task, static_cast<void *>(this) );
    t1.setPriority(thread::threadPriorityHigh);
}
jetson::~jetson() {
}


When Brian needs new position data, it calls the jetson_comms.request_map() function. Brian sends a request to Jetson for new position data with the request code "AA55CC3301". <br>
As mentioned earlier, when Jetson receives the request, it can then send the latest position data. The Brian-side thread receive_task remains in a running state, waiting to receive the transmitted data packets.

In [None]:
int
jetson::receive_task( void *arg ) {
    if( arg == NULL)
      return(0);
    jetson *instance = static_cast<jetson *>(arg);
    while(1) {
      int rxchar = getchar();
      if( rxchar >= 0 ) {
        instance->total_data_received++;
        while( instance->parse( (uint8_t)rxchar ) )
          this_thread::yield();
      }
    }
}

In the loop while( instance->parse( (uint8_t)rxchar ) ), the program reads the incoming information transmitted by Jetson.


The parse() function reads the position data sent from Jetson. Referring to jetson::parse(uint8_t data) in ai_jetson.cpp, the function initially keeps reading and checking for the sequence "0xAA, 0x55, 0xCC, 0x33", which serves as a header or start of packet identifier.

It also performs a CRC32 (Cyclic Redundancy Check 32) algorithm to check the integrity of the data. Once the CRC check is successful, it confirms that the transmitted data is correct and proceeds to the kStateGoodPacket state.

Here, we have extracted only the segment of the parse() function within the receive_task thread that receives AI_RECORD data. Apparently:

.memcpy(&newMap, &payload.bytes[0], MAP_POS_SIZE); extracts the Position data from the start of the received data packet.<br>
.memcpy(&newMap.detections, &payload.bytes[MAP_POS_SIZE], sizeof(DETECTION_OBJECT) * newMap.detectionCount) extracts the individual detection data from the position after the Position data in the received packet.<br>
.if(newMap.detectionCount > MAX_DETECTIONS) newMap.detectionCount = MAX_DETECTIONS; If the amount of incoming data exceeds the reserved space for receiving data on the Cpp side, the subsequent data will be discarded.<br>
.memcpy(&last_map, &newMap, sizeof(AI_RECORD)); copies the contents of the newMap variable within the thread to the global variable last_map in the main program.<br>

In [None]:
     kStateGoodPacketcase jetson_state:::
        if( payload_type == MAP_PACKET_TYPE ) {
          AI_RECORD newMap;
          // Parse the payload packet into a AI_RECORD
          memset(&newMap, 0, sizeof(newMap));
          memcpy(&newMap, &payload.bytes[0], MAP_POS_SIZE);
          if(newMap.detectionCount > MAX_DETECTIONS)
            newMap.detectionCount = MAX_DETECTIONS;
          memcpy(&newMap.detections, &payload.bytes[MAP_POS_SIZE],
                       sizeof(DETECTION_OBJECT) * newMap.detectionCount);
          // lock access to last_map and copy data
          maplock.lock();
          memcpy( &last_map, &newMap, sizeof(AI_RECORD));
          maplock.unlock();
        }
        // timestamp this packet
        last_packet_time = timer.system();
        packets++;
        state = jetson_state::kStateSyncWait1;
        break;


The Brian side of the VAIC_23_24 project provides a programming framework for the VEX AI competition, where the specific competition strategy and action planning are determined by the participants through their own programming.

Looking at main.cpp, the autonomous program is divided into two parts: the isolated auto_Isolation() and the interactive auto_Interaction().

.Competition.autonomous(autonomousMain); initiates the autonomous program. Since firstAutoFlag is set to true, the program will first execute auto_Isolation.

In [None]:
bool firstAutoFlag = true;
void autonomousMain(void) {
  if(firstAutoFlag)
    auto_Isolation();
  else
    auto_Interaction();
  firstAutoFlag = false;
}


auto_Isolation(void) demonstrates a simple competition action sequence:

.GPS.calibrate(); initiates the calibration of the GPS sensor.<br>
.getObject(), which performs a series of function calls getObject()->findTarget()->get_data(), retrieves the position data received by the receive_task thread and copied into last_map.<br>
The get_data(AI_RECORD *map) function is as follows:
(Note: The actual implementation of get_data(AI_RECORD *map) is not provided in the snippet, but based on the context, it likely extracts data from the AI_RECORD structure pointed to by map.)

In [None]:
int32_t
jetson::get_data( AI_RECORD *map ) {
    int32_t length = 0;
    if( map != NULL ) {
        maplock.lock();
        memcpy( map, &last_map, sizeof(AI_RECORD));
        length = last_payload_length;
        maplock.unlock();
    }


After that, there are simple actions such as opening the arm, picking up the ball, and pushing it into the goal, as indicated by the comments. <br>
Once auto_Isolation(void) completes, it proceeds to auto_Interaction(). auto_Interaction() is an empty function intended to be designed and programmed by the competitors.<br>

In [None]:
void auto_Interaction(void) {
  // Functions needed: evaluate which ball detected is target, go to target (x,y), intake ball, dump ball,
}


In the main() function of the Brian side, the loop represents the actual competition process, where competitors design their competition strategies and implement them through programming. This simple demonstration provides three essential functions required during the competition:

1.Obtain the target's position data from the Jetson side, and based on the competition strategy, select a target, move towards it, and complete the competition actions for the current phase. jetson_comms.get_data( &local_map );<br>
2.If necessary, send essential information to other friendly robots. In this case, the chosen information to send is the local robot's position coordinates and heading angle. However, the format can be adjusted to send other information that facilitates implementing the competition strategy. link.set_remote_location( local_map.pos.x, local_map.pos.y, local_map.pos.az, local_map.pos.status );<br>
3.Send a request to the Jetson side for the location coordinates of a new target, to continue with the next phase of the competition. jetson_comms.request_map();<br>

In [None]:
  while(1) {

      jetson_comms.get_data( &local_map );
      link.set_remote_location( local_map.pos.x, local_map.pos.y, local_map.pos.az, local_map.pos.status );
      jetson_comms.request_map();
      this_thread::sleep_for(loop_time);
  }


Here is a brief comparison and explanation of how the request_map() function interacts with the Jetson's running thread in terms of data requirements. The C++ source code for request_map() is as follows:

Brian communicates through writing to a file, while the Jetson's communication port is "/dev/serial1", and the data request command is "AA55CC3301".

On the Brian side, to send the request command, it uses fwrite(msg, 1, strlen(msg), fp);.

In [None]:
void
jetson::request_map() {
    if( state != jetson_state::kStateSyncWait1 && timer.time() > 250 ) {
      state = jetson_state::kStateSyncWait1;
    }
    if( state == jetson_state::kStateSyncWait1 ) {
      FILE *fp = fopen("/dev/serial1", "w");
      static char msg[] = "AA55CC3301\r\n";
      fwrite( msg, 1, strlen(msg), fp );
      fclose(fp);
    }
}

When the Jetson thread receives a data request, it sends the location data.

In [None]:
while self.__started:  # Continue reading while thread is started
  # Read data from the serial port
  data = self.__ser.readline().decode("utf-8").rstrip()
    if(data == "AA55CC3301"):
      self.__detectionLock.acquire()
      myPacket = V5SerialPacket(self.__MAP_PACKET_TYPE, self.__detections)
      self.__detectionLock.release()
      data = myPacket.to_Serial()
      self.__ser.write(data)  # Write serialized data to the serial port



###   8.&nbsp; &nbsp;Explanation of the ai_robot_link.cpp Program for the Brian Side of the Vex5 Robot<br>


In the C++ program on the Brian side, VAIC_23_24 introduces a real-time information exchange mechanism called ai_link between two competing robots. The communication between the two robots differs from the previously mentioned Jetson-to-Brian communication as the dual-robot system uses a wireless mode, while the Jetson-to-Brian communication utilizes a wired USB interface.

To enable this wireless communication, the robot_link class has been defined in the ai namespace, inheriting from vex::serial_link. The vex::serial_link class encapsulates the wireless communication functionality provided by Vex. More details about this class can be found in the ai_robot_link.h header file.

The VAIC_23_24 competition requires participants to enter the auto_Interaction mode, but does not provide any reference examples for dual-robot interaction, allowing competitors to unleash their creativity in designing the competition setup. However, it does provide sufficient basic functional programming examples for programmers to refer to.

Here, as a prerequisite for introducing the main operational mechanism of ai_robot_link, a brief explanation of some defined variables and function functionalities is provided.

#### 1. Data Structure

The data packet header, as indicated in the comments, contains: synchronization code, packet length, packet type, and cyclic redundancy check (CRC).

In [None]:
typedef struct __attribute__((__packed__)) _packet_header {
    uint8_t    sync[2];    // synchronizing bytes, find the start of a valid packet
    uint8_t    length;     // length of map record payload, does not include header
    uint8_t    type;       // type of packet,
    uint16_t   crc;        // crc32 of payload, this may need to be removed to allow more payload
} packet_header;

Synchronization code: A two-byte constant used as an identifier to recognize the starting position of the transmitted information packet.<br>

In [None]:
 enum class sync_byte {
    kSync1 = 0xA5,
    kSync2 = 0x5A
 };

Payload: The valid data carried in the data packet. In this case, it is simply set to the robot's position (X, Y) and its orientation (angle).

In [None]:
typedef struct __attribute__((__packed__)) _packet_1_payload {
    float      loc_x;
    float      loc_y;
    float      heading;
} packet_1_payload;

The structure of the data packet used during communication transmission is packet_1_t = packet_header + packet_1_payload.

In [None]:
typedef struct __attribute__((__packed__)) _packet_1_t {
    packet_header     header;
    packet_1_payload  payload;
} packet_1_t;

#### 2.Methods


Threading: In ai_robot_link, two communication threads are defined, one for data transmission and the other for data reception.<br>
.rx_task() is the target function of the receiving thread, responsible for receiving the current position information sent by the friendly robot.
.tx_task() is the target function of the sending thread, responsible for sending the current position information of this robot to the friendly robot.

Detailed functionality will be explained in further analysis during program runtime.

In [None]:

static int    rx_task( void *arg );
static int    tx_task( void *arg );


Transmission Functions: Several demonstration functions are provided in ai_robot_link, which simply transmit or receive undefined (x, y, heading) values to show the method of data transmission in a dual-robot interaction state. These functions are intended to be modified based on the actual interaction data types during competition planning and design. A common feature of these three functions is that they access and modify the data structures used by the threads:
packet_1_t packet_tx_1;  
packet_1_t packet_rx_1;
To ensure thread safety and avoid data races or other concurrency issues, the use of mutexes (short for "mutual exclusion") is implemented:

vex::mutex rxlock;  
vex::mutex txlock;
A mutex is a commonly used synchronization method in programming to protect access to shared resources in a multithreaded or multiprocess environment. When a thread or process acquires a mutex, other threads or processes attempting to acquire the same mutex will be blocked until the mutex is released.

In [None]:
void set_remote_location( float x, float y, float heading, int32_t status );
void get_local_location( float &x, float &y, float &heading, int32_t &status );
void get_remote_location( float &x, float &y, float &heading );


The information parsing function: Parses the transmitted position data from the received data packet.

The .robot_link::process(uint8_t data) function, within the rx_task thread, sequentially reads the header, type, and checksum; performs a checksum calculation, and if it is correct, proceeds to read the transmitted data.

Similar to the parse() function in the thread that reads Jetson information, the program first acquires an exclusive mutex lock, sequentially reads the synchronization code and checksum, and upon successful checksum verification, recognizes the transmitted data as correct, copies the transmitted data, and releases the mutex lock.<br>
The following code snippet demonstrates the process of copying the valid data from the wireless port to the program variable packet_rx_1.payload.

In [None]:
case comms_state::kStateGoodPacket:
    if( payload_type == RL_LOCATION_PACKET ) {
    // lock access and copy data
        rxlock.lock();
        memcpy( &packet_rx_1.payload, &payload.pak_1, sizeof(packet_1_payload));
        rxlock.unlock();
    }

###   9.&nbsp; &nbsp;Vex5 Robot Brian Side ai_functions.cpp Program Description<br>



The ai_functions are the control programs for the robot's movement and gripper actions. Instead of using gamepad controls or preset movement parameters, the implementation of movement relies on real-time calculations based on the target position coordinates transmitted by the Jetson. This is the AI-driven servo-following control module.<br>

Within the ai_functions section, the core function is DETECTION_OBJECT findTarget(int type), where the parameter type represents the classification of the target. The JetsonExample/labels.txt file defines the classification of three-colored balls as follows:<br>

type(GreenTriball) = 0<br>
type(RedTriball) = 1<br>
type(BlueTriball) = 2<br>
The main program, main(), first calls the getObject() function. The getObject() function initially directly uses findTarget(int type). If no target is found, it drives the robot to rotate and change the field of view of the robot's vision camera to re-extract the target data. Once a target is detected, it locates the gripper and drives the robot to the target's grasping position.

In [None]:
void getObject(){
    DETECTION_OBJECT target = findTarget(0);
    // If no target found, turn and try to find again
    if (target.mapLocation.x == 0 && target.mapLocation.y == 0){
        Drivetrain.turnFor(45, rotationUnits::deg, 50, velocityUnits::pct);
        target = findTarget(0);
    }
    double rot = Arm.position(rotationUnits::deg);
    intake(rot-1200, 1205);
    // Move to the detected target's position
    moveToPosition(target.mapLocation.x*100, target.mapLocation.y*100);
}


The findTarget(int type) function block is outlined as follows. It creates a space to store the data received from the Jetson and the location of the target object. It utilizes jetson_comms.get_data(&local_map) to copy the data received by the thread receive_task, which has been explained in the introduction to threading. Then, based on the type of the target object (the color of the three-colored ball), it iterates through all recognized objects to find the closest target that meets the given criteria.

In [None]:
DETECTION_OBJECT findTarget(int type){
    DETECTION_OBJECT target;
    static AI_RECORD local_map;
    jetson_comms.get_data(&local_map);
    double lowestDist = 1000000;
    // Iterate through detected objects to find the closest target of the specified type
    for(int i = 0; i < local_map.detectionCount; i++) {
        double distance = distanceTo(local_map.detections[i].mapLocation.x, local_map.detections[i].mapLocation.y);
        if (distance < lowestDist && local_map.detections[i].classID == type) {
            target = local_map.detections[i];
            lowestDist = distance;
        }
    }
    return target;
}

The other contents within ai_functions.cpp, including the motion control functions that utilize the Vex GPS sensor, follow the VEX 5 environment and do not require detailed introduction here.

###   10.&nbsp; &nbsp;Vex5 robot Brian end doshboard.cpp program description<br>
The "doshboard.cpp" program on the Brian end of the Vex5 robot displays the execution status of Jetson and root_link on Brian's screen. This program is designed as a single-threaded function called at the beginning of the main program.


In [None]:
  // start the status update display
  thread t1(dashboardTask);

The program starts two threads which divide Brian's screen into two sections, each displaying the execution status of Jetson and root_link respectively.

In [None]:
int
dashboardTask() {
  while(true) {
    // status
    dashboardJetson(    0, 0, 280, 240 );
    dashboardVexlink( 279, 0, 201, 240 );
    // draw, at 30Hz
    Brain.Screen.render();
    this_thread::sleep_for(16);
  }
  return 0;
}


Clearly, "dashboardJetson(0, 0, 280, 240)" displays the communication status with Jetson, the number of received data packets, as well as the positions, classifications, and confidence levels of recognized objects (up to a maximum of 8).

In [None]:
Brain.Screen.printAt( ox + 10, oy += 15, "Packets   %d", jetson_comms.get_packets() );
Brain.Screen.printAt( ox + 10, oy += 15, "Errors    %d", jetson_comms.get_errors() );
Brain.Screen.printAt( ox + 10, oy += 15, "Timeouts  %d", jetson_comms.get_timeouts() );
Brain.Screen.printAt( ox + 10, oy += 15, "data/sec  %d             ", total_data );
Brain.Screen.printAt( ox + 10, oy += 15, "pkts/sec  %d             ", total_packets );
Brain.Screen.printAt( ox + 10, oy += 15, "count     %d", local_map.detectionCount );
oy += 15; // Skip a line

  // once per second, update data rate stats
if( Brain.Timer.system() > update_time ) {
    update_time = Brain.Timer.system() + 1000;
    total_data = jetson_comms.get_total() - last_data;
    total_packets = jetson_comms.get_packets() - last_packets;
    last_data = jetson_comms.get_total();
    last_packets = jetson_comms.get_packets();
  }

  Brain.Screen.setFont( mono12 );
  for(int i=0;i<8;i++ ) {
    if( i < local_map.detectionCount ) {
      Brain.Screen.printAt( ox + 10, oy += 12, "map %d: p:%.2f c:%4d X:%.2f Y:%.2f Z:%.1f",i,
                           local_map.detections[i].probability,
                           local_map.detections[i].classID,
                           (local_map.detections[i].mapLocation.x /*/ -25.4*/),  // mm -> inches
                           (local_map.detections[i].mapLocation.y /*/ -25.4*/),  // mm -> inches
                           (local_map.detections[i].mapLocation.z /*/ 25.4*/)); // mm -> inches
    }

Similarly, "dashboardVexlink(279, 0, 201, 240)" displays the communication status of the link and the coordinates, positions, and orientations (yaw angles) of the two robots on their respective fields. It follows a similar structure as the previous program described, so I won't elaborate further here.。