Skip to content

Commit

Permalink
AP_DDS: support UDP transport
Browse files Browse the repository at this point in the history
and switch serial transport to use custom transport
  • Loading branch information
arshPratap authored and tridge committed May 3, 2023
1 parent aa25461 commit fbc7a6d
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 126 deletions.
174 changes: 67 additions & 107 deletions libraries/AP_DDS/AP_DDS_Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <AP_GPS/AP_GPS.h>
#include <AP_HAL/AP_HAL.h>
#include <AP_RTC/AP_RTC.h>
#include <AP_SerialManager/AP_SerialManager.h>
#include <AP_Math/AP_Math.h>
#include <GCS_MAVLink/GCS.h>
#include <AP_BattMonitor/AP_BattMonitor.h>
Expand All @@ -21,11 +20,25 @@ static char WGS_84_FRAME_ID[] = "WGS-84";
// https://www.ros.org/reps/rep-0105.html#base-link
static char BASE_LINK_FRAME_ID[] = "base_link";

AP_HAL::UARTDriver *dds_port;


const AP_Param::GroupInfo AP_DDS_Client::var_info[]= {
//! @todo Params go here
const AP_Param::GroupInfo AP_DDS_Client::var_info[] {

// @Param: _ENABLE
// @DisplayName: DDS enable
// @Description: Enable DDS subsystem
// @Values: 0:Disabled,1:Enabled
// @RebootRequired: True
// @User: Advanced
AP_GROUPINFO_FLAGS("_ENABLE", 1, AP_DDS_Client, enabled, 0, AP_PARAM_FLAG_ENABLE),

#if AP_DDS_UDP_ENABLED
// @Param: _UDP_PORT
// @DisplayName: DDS UDP port
// @Description: UDP port number for DDS
// @Range: 1 65535
// @RebootRequired: True
// @User: Standard
AP_GROUPINFO("_PORT", 2, AP_DDS_Client, udp.port, 2019),
#endif

AP_GROUPEND
};
Expand Down Expand Up @@ -183,8 +196,7 @@ void AP_DDS_Client::update_topic(sensor_msgs_msg_BatteryState& msg, const uint8_
update_topic(msg.header.stamp);
auto &battery = AP::battery();

if (!battery.healthy(instance))
{
if (!battery.healthy(instance)) {
msg.power_supply_status = 3; //POWER_SUPPLY_HEALTH_DEAD
msg.present = false;
return;
Expand All @@ -203,45 +215,35 @@ void AP_DDS_Client::update_topic(sensor_msgs_msg_BatteryState& msg, const uint8_
msg.design_capacity = design_capacity;

uint8_t percentage;
if (battery.capacity_remaining_pct(percentage, instance))
{
if (battery.capacity_remaining_pct(percentage, instance)) {
msg.percentage = percentage/100.0;
msg.charge = (percentage * design_capacity)/100.0;
}
else
{
} else {
msg.percentage = NAN;
msg.charge = NAN;
}

msg.capacity = NAN;

if (battery.current_amps(current, instance))
{
if (battery.current_amps(current, instance)) {
if (percentage == 100) {
msg.power_supply_status = 4; //POWER_SUPPLY_STATUS_FULL
}
else if (current < 0.0) {
} else if (current < 0.0) {
msg.power_supply_status = 1; //POWER_SUPPLY_STATUS_CHARGING
}
else if (current > 0.0) {
} else if (current > 0.0) {
msg.power_supply_status = 2; //POWER_SUPPLY_STATUS_DISCHARGING
}
else {
} else {
msg.power_supply_status = 3; //POWER_SUPPLY_STATUS_NOT_CHARGING
}
}
else
{
} else {
msg.power_supply_status = 0; //POWER_SUPPLY_STATUS_UNKNOWN
}

msg.power_supply_health = (battery.overpower_detected(instance)) ? 4 : 1; //POWER_SUPPLY_HEALTH_OVERVOLTAGE or POWER_SUPPLY_HEALTH_GOOD

msg.power_supply_technology = 0; //POWER_SUPPLY_TECHNOLOGY_UNKNOWN

if (battery.has_cell_voltages(instance))
{
if (battery.has_cell_voltages(instance)) {
const uint16_t* cellVoltages = battery.get_cell_voltages(instance).cells;
std::copy(cellVoltages, cellVoltages + AP_BATT_MONITOR_CELLS_MAX, msg.cell_voltage);
}
Expand All @@ -268,8 +270,7 @@ void AP_DDS_Client::update_topic(geometry_msgs_msg_PoseStamped& msg)
// as well as invert Z

Vector3f position;
if (ahrs.get_relative_position_NED_home(position))
{
if (ahrs.get_relative_position_NED_home(position)) {
msg.pose.position.x = position[1];
msg.pose.position.y = position[0];
msg.pose.position.z = -position[2];
Expand All @@ -284,8 +285,7 @@ void AP_DDS_Client::update_topic(geometry_msgs_msg_PoseStamped& msg)
// as well as invert Z (NED to ENU convertion) as well as a 90 degree rotation in the Z axis
// for x to point forward
Quaternion orientation;
if (ahrs.get_quaternion(orientation))
{
if (ahrs.get_quaternion(orientation)) {
Quaternion aux(orientation[0], orientation[2], orientation[1], -orientation[3]); //NED to ENU transformation
Quaternion transformation (sqrt(2)/2,0,0,sqrt(2)/2); // Z axis 90 degree rotation
orientation = aux * transformation;
Expand Down Expand Up @@ -316,8 +316,7 @@ void AP_DDS_Client::update_topic(geometry_msgs_msg_TwistStamped& msg)
// As a consequence, to follow ROS REP 103, it is necessary to switch X and Y,
// as well as invert Z
Vector3f velocity;
if (ahrs.get_velocity_NED(velocity))
{
if (ahrs.get_velocity_NED(velocity)) {
msg.twist.linear.x = velocity[1];
msg.twist.linear.y = velocity[0];
msg.twist.linear.z = -velocity[2];
Expand All @@ -340,15 +339,24 @@ void AP_DDS_Client::update_topic(geometry_msgs_msg_TwistStamped& msg)
}

/*
class constructor
start the DDS thread
*/
AP_DDS_Client::AP_DDS_Client(void)
bool AP_DDS_Client::start(void)
{
AP_Param::setup_object_defaults(this, var_info);
AP_Param::load_object_from_eeprom(this, var_info);

if (enabled == 0) {
return true;
}

if (!hal.scheduler->thread_create(FUNCTOR_BIND_MEMBER(&AP_DDS_Client::main_loop, void),
"DDS",
8192, AP_HAL::Scheduler::PRIORITY_IO, 1)) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"DDS Client: thread create failed");
return false;
}
return true;
}

/*
Expand All @@ -371,35 +379,42 @@ void AP_DDS_Client::main_loop(void)
}
}


bool AP_DDS_Client::init()
{
AP_SerialManager *serial_manager = AP_SerialManager::get_singleton();
dds_port = serial_manager->find_serial(AP_SerialManager::SerialProtocol_DDS_XRCE, 0);
if (dds_port == nullptr) {
return false;
}
// serial init will fail if the SERIALn_PROTOCOL is not setup
bool initTransportStatus = ddsSerialInit();
is_using_serial = initTransportStatus;

// ensure we own the UART
dds_port->begin(0);
#if AP_DDS_UDP_ENABLED
// fallback to UDP if available
if (!initTransportStatus) {
initTransportStatus = ddsUdpInit();
}
#endif

constexpr uint8_t fd = 0;
constexpr uint8_t relativeSerialAgentAddr = 0;
constexpr uint8_t relativeSerialClientAddr = 1;
if (!uxr_init_serial_transport(&serial_transport,fd,relativeSerialAgentAddr,relativeSerialClientAddr)) {
if (!initTransportStatus) {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Transport Initialization failed");
return false;
}

constexpr uint32_t uniqueClientKey = 0xAAAABBBB;
//TODO does this need to be inside the loop to handle reconnect?
uxr_init_session(&session, &serial_transport.comm, uniqueClientKey);
while (!uxr_create_session(&session)) {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Initialization waiting...");
hal.scheduler->delay(1000);
}

reliable_in = uxr_create_input_reliable_stream(&session,input_reliable_stream,BUFFER_SIZE_SERIAL,STREAM_HISTORY);
reliable_out = uxr_create_output_reliable_stream(&session,output_reliable_stream,BUFFER_SIZE_SERIAL,STREAM_HISTORY);
// setup reliable stream buffers
input_reliable_stream = new uint8_t[DDS_STREAM_HISTORY*DDS_BUFFER_SIZE];
if (input_reliable_stream == nullptr) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"DDS Client: allocation failed");
return false;
}
output_reliable_stream = new uint8_t[DDS_STREAM_HISTORY*DDS_BUFFER_SIZE];
if (output_reliable_stream == nullptr) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"DDS Client: allocation failed");
return false;
}

reliable_in = uxr_create_input_reliable_stream(&session, input_reliable_stream, DDS_BUFFER_SIZE, DDS_STREAM_HISTORY);
reliable_out = uxr_create_output_reliable_stream(&session, output_reliable_stream, DDS_BUFFER_SIZE, DDS_STREAM_HISTORY);

GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Init Complete");

Expand Down Expand Up @@ -601,61 +616,6 @@ void AP_DDS_Client::update()
connected = uxr_run_session_time(&session, 1);
}

/*
implement C functions for serial transport
*/
extern "C" {
#include <uxr/client/profile/transport/serial/serial_transport_platform.h>
}

bool uxr_init_serial_platform(void* args, int fd, uint8_t remote_addr, uint8_t local_addr)
{
//! @todo Add error reporting
return true;
}

bool uxr_close_serial_platform(void* args)
{
//! @todo Add error reporting
return true;
}

size_t uxr_write_serial_data_platform(void* args, const uint8_t* buf, size_t len, uint8_t* errcode)
{
if (dds_port == nullptr) {
*errcode = 1;
return 0;
}
ssize_t bytes_written = dds_port->write(buf, len);
if (bytes_written <= 0) {
*errcode = 1;
return 0;
}
//! @todo Add populate the error code correctly
*errcode = 0;
return bytes_written;
}

size_t uxr_read_serial_data_platform(void* args, uint8_t* buf, size_t len, int timeout, uint8_t* errcode)
{
if (dds_port == nullptr) {
*errcode = 1;
return 0;
}
while (timeout > 0 && dds_port->available() < len) {
hal.scheduler->delay(1); // TODO select or poll this is limiting speed (1mS)
timeout--;
}
ssize_t bytes_read = dds_port->read(buf, len);
if (bytes_read <= 0) {
*errcode = 1;
return 0;
}
//! @todo Add error reporting
*errcode = 0;
return bytes_read;
}

#if CONFIG_HAL_BOARD != HAL_BOARD_SITL
extern "C" {
int clock_gettime(clockid_t clockid, struct timespec *ts);
Expand Down
59 changes: 48 additions & 11 deletions libraries/AP_DDS/AP_DDS_Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,16 @@
#include "fcntl.h"

#include <AP_Param/AP_Param.h>
#define STREAM_HISTORY 8
#define BUFFER_SIZE_SERIAL UXR_CONFIG_SERIAL_TRANSPORT_MTU * STREAM_HISTORY

#define DDS_STREAM_HISTORY 8
#define DDS_BUFFER_SIZE 512

// UDP only on SITL for now
#define AP_DDS_UDP_ENABLED (CONFIG_HAL_BOARD == HAL_BOARD_SITL)

#if AP_DDS_UDP_ENABLED
#include <AP_HAL/utility/Socket.h>
#endif

extern const AP_HAL::HAL& hal;

Expand All @@ -31,16 +39,16 @@ class AP_DDS_Client

private:

AP_Int8 enabled;

// Serial Allocation
uxrSerialTransport serial_transport; // client uxr serial transport
uxrSession session; //Session
bool is_using_serial; // true when using serial transport

// Input Stream
uint8_t input_reliable_stream[BUFFER_SIZE_SERIAL];
// input and output stream
uint8_t *input_reliable_stream;
uint8_t *output_reliable_stream;
uxrStreamId reliable_in;

// Output Stream
uint8_t output_reliable_stream[BUFFER_SIZE_SERIAL];
uxrStreamId reliable_out;

// Topic
Expand Down Expand Up @@ -74,11 +82,40 @@ class AP_DDS_Client
// The last ms timestamp AP_DDS wrote a Local Velocity message
uint64_t last_local_velocity_time_ms;

// functions for serial transport
bool ddsSerialInit();
static bool serial_transport_open(uxrCustomTransport* args);
static bool serial_transport_close(uxrCustomTransport* transport);
static size_t serial_transport_write(uxrCustomTransport* transport, const uint8_t* buf, size_t len, uint8_t* error);
static size_t serial_transport_read(uxrCustomTransport* transport, uint8_t* buf, size_t len, int timeout, uint8_t* error);
struct {
AP_HAL::UARTDriver *port;
uxrCustomTransport transport;
} serial;

#if AP_DDS_UDP_ENABLED
// functions for udp transport
bool ddsUdpInit();
static bool udp_transport_open(uxrCustomTransport* args);
static bool udp_transport_close(uxrCustomTransport* transport);
static size_t udp_transport_write(uxrCustomTransport* transport, const uint8_t* buf, size_t len, uint8_t* error);
static size_t udp_transport_read(uxrCustomTransport* transport, uint8_t* buf, size_t len, int timeout, uint8_t* error);

struct {
AP_Int32 port;
// UDP endpoint
const char* ip = "127.0.0.1";
// UDP Allocation
uxrCustomTransport transport;
SocketAPM *socket;
} udp;
#endif

// client key we present
static constexpr uint32_t uniqueClientKey = 0xAAAABBBB;

public:
// Constructor
AP_DDS_Client();

bool start(void);
void main_loop(void);

//! @brief Initialize the client's transport, uxr session, and IO stream(s)
Expand Down

0 comments on commit fbc7a6d

Please sign in to comment.