Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AP_DDS: added support for UDP transport #23586

Merged
merged 3 commits into from May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Tools/scripts/run_astyle.py
Expand Up @@ -32,6 +32,7 @@ def check(self):
# for path in self.files_to_check:
# self.progress("Checking (%s)" % path)
astyle_command = ["astyle", "--dry-run"]
astyle_command.append("--options=Tools/CodeStyle/astylerc")
astyle_command.extend(self.files_to_check)
ret = subprocess.run(
astyle_command,
Expand Down
174 changes: 67 additions & 107 deletions libraries/AP_DDS/AP_DDS_Client.cpp
Expand Up @@ -5,7 +5,6 @@
#include <AP_GPS/AP_GPS.h>
#include <AP_HAL/AP_HAL.h>
#include <AP_RTC/AP_RTC.h>
#include <AP_SerialManager/AP_SerialManager.h>
#include <AP_Math/AP_Math.h>
#include <GCS_MAVLink/GCS.h>
#include <AP_BattMonitor/AP_BattMonitor.h>
Expand All @@ -21,11 +20,25 @@ static char WGS_84_FRAME_ID[] = "WGS-84";
// https://www.ros.org/reps/rep-0105.html#base-link
static char BASE_LINK_FRAME_ID[] = "base_link";

AP_HAL::UARTDriver *dds_port;


const AP_Param::GroupInfo AP_DDS_Client::var_info[]= {
//! @todo Params go here
const AP_Param::GroupInfo AP_DDS_Client::var_info[] {

// @Param: _ENABLE
// @DisplayName: DDS enable
// @Description: Enable DDS subsystem
// @Values: 0:Disabled,1:Enabled
// @RebootRequired: True
// @User: Advanced
AP_GROUPINFO_FLAGS("_ENABLE", 1, AP_DDS_Client, enabled, 0, AP_PARAM_FLAG_ENABLE),

#if AP_DDS_UDP_ENABLED
// @Param: _UDP_PORT
// @DisplayName: DDS UDP port
// @Description: UDP port number for DDS
// @Range: 1 65535
// @RebootRequired: True
// @User: Standard
AP_GROUPINFO("_PORT", 2, AP_DDS_Client, udp.port, 2019),
#endif

AP_GROUPEND
};
Expand Down Expand Up @@ -183,8 +196,7 @@ void AP_DDS_Client::update_topic(sensor_msgs_msg_BatteryState& msg, const uint8_
update_topic(msg.header.stamp);
auto &battery = AP::battery();

if (!battery.healthy(instance))
{
if (!battery.healthy(instance)) {
msg.power_supply_status = 3; //POWER_SUPPLY_HEALTH_DEAD
msg.present = false;
return;
Expand All @@ -203,45 +215,35 @@ void AP_DDS_Client::update_topic(sensor_msgs_msg_BatteryState& msg, const uint8_
msg.design_capacity = design_capacity;

uint8_t percentage;
if (battery.capacity_remaining_pct(percentage, instance))
{
if (battery.capacity_remaining_pct(percentage, instance)) {
msg.percentage = percentage/100.0;
msg.charge = (percentage * design_capacity)/100.0;
}
else
{
} else {
msg.percentage = NAN;
msg.charge = NAN;
}

msg.capacity = NAN;

if (battery.current_amps(current, instance))
{
if (battery.current_amps(current, instance)) {
if (percentage == 100) {
msg.power_supply_status = 4; //POWER_SUPPLY_STATUS_FULL
}
else if (current < 0.0) {
} else if (current < 0.0) {
msg.power_supply_status = 1; //POWER_SUPPLY_STATUS_CHARGING
}
else if (current > 0.0) {
} else if (current > 0.0) {
msg.power_supply_status = 2; //POWER_SUPPLY_STATUS_DISCHARGING
}
else {
} else {
msg.power_supply_status = 3; //POWER_SUPPLY_STATUS_NOT_CHARGING
}
}
else
{
} else {
msg.power_supply_status = 0; //POWER_SUPPLY_STATUS_UNKNOWN
}

msg.power_supply_health = (battery.overpower_detected(instance)) ? 4 : 1; //POWER_SUPPLY_HEALTH_OVERVOLTAGE or POWER_SUPPLY_HEALTH_GOOD

msg.power_supply_technology = 0; //POWER_SUPPLY_TECHNOLOGY_UNKNOWN

if (battery.has_cell_voltages(instance))
{
if (battery.has_cell_voltages(instance)) {
const uint16_t* cellVoltages = battery.get_cell_voltages(instance).cells;
std::copy(cellVoltages, cellVoltages + AP_BATT_MONITOR_CELLS_MAX, msg.cell_voltage);
}
Expand All @@ -268,8 +270,7 @@ void AP_DDS_Client::update_topic(geometry_msgs_msg_PoseStamped& msg)
// as well as invert Z

Vector3f position;
if (ahrs.get_relative_position_NED_home(position))
{
if (ahrs.get_relative_position_NED_home(position)) {
msg.pose.position.x = position[1];
msg.pose.position.y = position[0];
msg.pose.position.z = -position[2];
Expand All @@ -284,8 +285,7 @@ void AP_DDS_Client::update_topic(geometry_msgs_msg_PoseStamped& msg)
// as well as invert Z (NED to ENU convertion) as well as a 90 degree rotation in the Z axis
// for x to point forward
Quaternion orientation;
if (ahrs.get_quaternion(orientation))
{
if (ahrs.get_quaternion(orientation)) {
Quaternion aux(orientation[0], orientation[2], orientation[1], -orientation[3]); //NED to ENU transformation
Quaternion transformation (sqrt(2)/2,0,0,sqrt(2)/2); // Z axis 90 degree rotation
orientation = aux * transformation;
Expand Down Expand Up @@ -316,8 +316,7 @@ void AP_DDS_Client::update_topic(geometry_msgs_msg_TwistStamped& msg)
// As a consequence, to follow ROS REP 103, it is necessary to switch X and Y,
// as well as invert Z
Vector3f velocity;
if (ahrs.get_velocity_NED(velocity))
{
if (ahrs.get_velocity_NED(velocity)) {
msg.twist.linear.x = velocity[1];
msg.twist.linear.y = velocity[0];
msg.twist.linear.z = -velocity[2];
Expand All @@ -340,15 +339,24 @@ void AP_DDS_Client::update_topic(geometry_msgs_msg_TwistStamped& msg)
}

/*
class constructor
start the DDS thread
*/
AP_DDS_Client::AP_DDS_Client(void)
bool AP_DDS_Client::start(void)
{
AP_Param::setup_object_defaults(this, var_info);
AP_Param::load_object_from_eeprom(this, var_info);

if (enabled == 0) {
return true;
}

if (!hal.scheduler->thread_create(FUNCTOR_BIND_MEMBER(&AP_DDS_Client::main_loop, void),
"DDS",
8192, AP_HAL::Scheduler::PRIORITY_IO, 1)) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"DDS Client: thread create failed");
return false;
}
return true;
}

/*
Expand All @@ -371,35 +379,42 @@ void AP_DDS_Client::main_loop(void)
}
}


bool AP_DDS_Client::init()
{
AP_SerialManager *serial_manager = AP_SerialManager::get_singleton();
dds_port = serial_manager->find_serial(AP_SerialManager::SerialProtocol_DDS_XRCE, 0);
if (dds_port == nullptr) {
return false;
}
// serial init will fail if the SERIALn_PROTOCOL is not setup
bool initTransportStatus = ddsSerialInit();
is_using_serial = initTransportStatus;

// ensure we own the UART
dds_port->begin(0);
#if AP_DDS_UDP_ENABLED
// fallback to UDP if available
if (!initTransportStatus) {
initTransportStatus = ddsUdpInit();
}
#endif

constexpr uint8_t fd = 0;
constexpr uint8_t relativeSerialAgentAddr = 0;
constexpr uint8_t relativeSerialClientAddr = 1;
if (!uxr_init_serial_transport(&serial_transport,fd,relativeSerialAgentAddr,relativeSerialClientAddr)) {
if (!initTransportStatus) {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Transport Initialization failed");
return false;
}

constexpr uint32_t uniqueClientKey = 0xAAAABBBB;
//TODO does this need to be inside the loop to handle reconnect?
uxr_init_session(&session, &serial_transport.comm, uniqueClientKey);
while (!uxr_create_session(&session)) {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Initialization waiting...");
hal.scheduler->delay(1000);
}

reliable_in = uxr_create_input_reliable_stream(&session,input_reliable_stream,BUFFER_SIZE_SERIAL,STREAM_HISTORY);
reliable_out = uxr_create_output_reliable_stream(&session,output_reliable_stream,BUFFER_SIZE_SERIAL,STREAM_HISTORY);
// setup reliable stream buffers
input_reliable_stream = new uint8_t[DDS_STREAM_HISTORY*DDS_BUFFER_SIZE];
if (input_reliable_stream == nullptr) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"DDS Client: allocation failed");
return false;
}
output_reliable_stream = new uint8_t[DDS_STREAM_HISTORY*DDS_BUFFER_SIZE];
if (output_reliable_stream == nullptr) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"DDS Client: allocation failed");
return false;
}

reliable_in = uxr_create_input_reliable_stream(&session, input_reliable_stream, DDS_BUFFER_SIZE, DDS_STREAM_HISTORY);
reliable_out = uxr_create_output_reliable_stream(&session, output_reliable_stream, DDS_BUFFER_SIZE, DDS_STREAM_HISTORY);

GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Init Complete");

Expand Down Expand Up @@ -601,61 +616,6 @@ void AP_DDS_Client::update()
connected = uxr_run_session_time(&session, 1);
}

/*
implement C functions for serial transport
*/
extern "C" {
#include <uxr/client/profile/transport/serial/serial_transport_platform.h>
}

bool uxr_init_serial_platform(void* args, int fd, uint8_t remote_addr, uint8_t local_addr)
{
//! @todo Add error reporting
return true;
}

bool uxr_close_serial_platform(void* args)
{
//! @todo Add error reporting
return true;
}

size_t uxr_write_serial_data_platform(void* args, const uint8_t* buf, size_t len, uint8_t* errcode)
{
if (dds_port == nullptr) {
*errcode = 1;
return 0;
}
ssize_t bytes_written = dds_port->write(buf, len);
if (bytes_written <= 0) {
*errcode = 1;
return 0;
}
//! @todo Add populate the error code correctly
*errcode = 0;
return bytes_written;
}

size_t uxr_read_serial_data_platform(void* args, uint8_t* buf, size_t len, int timeout, uint8_t* errcode)
{
if (dds_port == nullptr) {
*errcode = 1;
return 0;
}
while (timeout > 0 && dds_port->available() < len) {
hal.scheduler->delay(1); // TODO select or poll this is limiting speed (1mS)
timeout--;
}
ssize_t bytes_read = dds_port->read(buf, len);
if (bytes_read <= 0) {
*errcode = 1;
return 0;
}
//! @todo Add error reporting
*errcode = 0;
return bytes_read;
}

#if CONFIG_HAL_BOARD != HAL_BOARD_SITL
extern "C" {
int clock_gettime(clockid_t clockid, struct timespec *ts);
Expand Down
59 changes: 48 additions & 11 deletions libraries/AP_DDS/AP_DDS_Client.h
Expand Up @@ -21,8 +21,16 @@
#include "fcntl.h"

#include <AP_Param/AP_Param.h>
#define STREAM_HISTORY 8
#define BUFFER_SIZE_SERIAL UXR_CONFIG_SERIAL_TRANSPORT_MTU * STREAM_HISTORY

#define DDS_STREAM_HISTORY 8
#define DDS_BUFFER_SIZE 512

// UDP only on SITL for now
#define AP_DDS_UDP_ENABLED (CONFIG_HAL_BOARD == HAL_BOARD_SITL)

#if AP_DDS_UDP_ENABLED
#include <AP_HAL/utility/Socket.h>
#endif

extern const AP_HAL::HAL& hal;

Expand All @@ -31,16 +39,16 @@ class AP_DDS_Client

private:

AP_Int8 enabled;

// Serial Allocation
uxrSerialTransport serial_transport; // client uxr serial transport
uxrSession session; //Session
bool is_using_serial; // true when using serial transport

// Input Stream
uint8_t input_reliable_stream[BUFFER_SIZE_SERIAL];
// input and output stream
uint8_t *input_reliable_stream;
uint8_t *output_reliable_stream;
uxrStreamId reliable_in;

// Output Stream
uint8_t output_reliable_stream[BUFFER_SIZE_SERIAL];
uxrStreamId reliable_out;

// Topic
Expand Down Expand Up @@ -74,11 +82,40 @@ class AP_DDS_Client
// The last ms timestamp AP_DDS wrote a Local Velocity message
uint64_t last_local_velocity_time_ms;

// functions for serial transport
bool ddsSerialInit();
static bool serial_transport_open(uxrCustomTransport* args);
static bool serial_transport_close(uxrCustomTransport* transport);
static size_t serial_transport_write(uxrCustomTransport* transport, const uint8_t* buf, size_t len, uint8_t* error);
static size_t serial_transport_read(uxrCustomTransport* transport, uint8_t* buf, size_t len, int timeout, uint8_t* error);
struct {
AP_HAL::UARTDriver *port;
uxrCustomTransport transport;
} serial;

#if AP_DDS_UDP_ENABLED
// functions for udp transport
bool ddsUdpInit();
static bool udp_transport_open(uxrCustomTransport* args);
static bool udp_transport_close(uxrCustomTransport* transport);
static size_t udp_transport_write(uxrCustomTransport* transport, const uint8_t* buf, size_t len, uint8_t* error);
static size_t udp_transport_read(uxrCustomTransport* transport, uint8_t* buf, size_t len, int timeout, uint8_t* error);

struct {
AP_Int32 port;
// UDP endpoint
const char* ip = "127.0.0.1";
// UDP Allocation
uxrCustomTransport transport;
SocketAPM *socket;
} udp;
#endif

// client key we present
static constexpr uint32_t uniqueClientKey = 0xAAAABBBB;

public:
// Constructor
AP_DDS_Client();

bool start(void);
void main_loop(void);

//! @brief Initialize the client's transport, uxr session, and IO stream(s)
Expand Down