Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Zenoh Pico Examples #107

Open
wants to merge 1 commit into
base: examples
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions examples/zenoh-pico/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
cmake_minimum_required(VERSION 3.8)
project(zenoh_pico_example)

set(CMAKE_BUILD_TYPE DEBUG)

include(FetchContent)
FetchContent_declare(
c_backend
GIT_REPOSITORY "https://github.com/eclipse-zenoh/zenoh-pico.git"
GIT_TAG main)
set(BUILD_SHARED_LIBS OFF)
FetchContent_MakeAvailable(c_backend)

include(FetchContent)
FetchContent_declare(
cyclonedds
GIT_REPOSITORY "https://github.com/eclipse-cyclonedds/cyclonedds.git"
GIT_TAG origin/master
SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/cyclonedds"
SOURCE_SUBDIR src/core/cdr
)
set(BUILD_SHARED_LIBS OFF)
set(CMAKE_BUILD_TYPE DEBUG)

FetchContent_MakeAvailable(cyclonedds)

include_directories(${CMAKE_CURRENT_SOURCE_DIR}/idlc)

add_executable(add_two_ints_client
add_two_ints_client.c
idlc/example_interfaces/srv/AddTwoInts.c
)
target_link_libraries(add_two_ints_client
zenohpico
cdr
)

add_executable(listener
listener.c
idlc/rcl_interfaces/msg/Log.c
idlc/builtin_interfaces/msg/Time.c
)

target_link_libraries(listener
zenohpico
cdr
)

add_executable(talker
talker.c
idlc/rcl_interfaces/msg/Log.c
idlc/builtin_interfaces/msg/Time.c
)

target_link_libraries(talker
zenohpico
cdr
)

install(TARGETS add_two_ints_client listener talker
ARCHIVE DESTINATION lib
LIBRARY DESTINATION lib
RUNTIME DESTINATION bin
)


106 changes: 106 additions & 0 deletions examples/zenoh-pico/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Examples of Zenoh Pico applications communicating with ROS 2 Nodes


## Messages Publication: [talker.c](talker.c)

This code mimics the ROS 2 [Topics "talker" demo](https://github.com/ros2/demos/blob/rolling/demo_nodes_cpp/src/topics/talker.cpp). It's compatible with the ROS 2 [Topics "listener" demo](https://github.com/ros2/demos/blob/rolling/demo_nodes_cpp/src/topics/listener.cpp) running those commands:
- Terminal 1
```
source /opt/ros/iron/local_setup.bash
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
ros2 topic echo /rosout
```

- Terminal 2
```
source /opt/ros/iron/local_setup.bash
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
zenho-bridge-ros2dds -l tcp/0.0.0.0:7447
```

- Terminal 3
```
talker
```
| Here we publish to the `rosout` topic because there is a conflict when using `idlc` to compile `std_msgs/msg/String.idl`


## Messages Subscription: [listener.c](listener.c)

This code mimics the ROS 2 [Topics "listener" demo](https://github.com/ros2/demos/blob/rolling/demo_nodes_cpp/src/topics/listener.cpp). It's compatible with the ROS 2 [Topics "talker" demo](https://github.com/ros2/demos/blob/rolling/demo_nodes_cpp/src/topics/talker.cpp) running those commands:

- Terminal 1
```
source /opt/ros/iron/local_setup.bash
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
ros2 run demo_nodes_cpp talker
```

- Terminal 2
```
source /opt/ros/iron/local_setup.bash
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
zenho-bridge-ros2dds -l tcp/0.0.0.0:7447
```

- Terminal 3
```
listener
```
| Here we subscribe to the `rosout` topic because there is a conflict when using `idlc` to compile `std_msgs/msg/String.idl`

## Services Client: [add_two_ints_client.c](add_two_ints_client.c)

This code mimics the ROS 2 [Services "add_two_ints_client" demo](https://github.com/ros2/demos/blob/rolling/demo_nodes_cpp/src/services/add_two_ints_client.cpp). It's compatible with the ROS 2 [Services "add_two_ints_server" demo](https://github.com/ros2/demos/blob/rolling/demo_nodes_cpp/src/services/add_two_ints_server.cpp) running those commands:


- Terminal 1

```
source /opt/ros/iron/local_setup.bash
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
ros2 run demo_nodes_cpp add_two_ints_server
```

- Terminal 2
```
source /opt/ros/iron/local_setup.bash
zenho-bridge-ros2dds -l tcp/0.0.0.0:7447
```

- Terminal 3
```
add_two_ints_client -a 1000 -b 2000
```



## How to use idlc to compile *.idl

> rmw_cyclonedds_cpp doesn't expect XCDR encoding, but CDR encoding. Thus, we shall use -x final with idlc command.

> ROS2 env must be set up first. ( ROS Iron recommended )

```
find_package(CycloneDDS REQUIRED)
find_package(rcl_interfaces REQUIRED)
find_package(builtin_interfaces REQUIRED)

foreach(_idl ${rcl_interfaces_IDL_FILES})
configure_file(${rcl_interfaces_DIR}/../${_idl} ${IDL_OUT_PATH}/rcl_interfaces/${_idl} COPYONLY)
list(APPEND IDL_FILES "${IDL_OUT_PATH}/rcl_interfaces/${_idl}")
endforeach()

foreach(_idl ${builtin_interfaces_IDL_FILES})
configure_file(${builtin_interfaces_DIR}/../${_idl} ${IDL_OUT_PATH}/builtin_interfaces/${_idl} COPYONLY)
list(APPEND IDL_FILES "${IDL_OUT_PATH}/builtin_interfaces/${_idl}")
endforeach()

foreach(_idl ${IDL_FILES})
execute_process(
COMMAND mkdir -p ${CMAKE_BINARY_DIR}/src
COMMAND idlc -I ${CMAKE_BINARY_DIR}/idl -o ${CMAKE_BINARY_DIR}/src -b ${CMAKE_BINARY_DIR}/idl -x final ${_idl}
OUTPUT_VARIABLE cmd_output
)
endforeach()
```
214 changes: 214 additions & 0 deletions examples/zenoh-pico/add_two_ints_client.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <ctype.h>

#include <zenoh-pico.h>
#include "example_interfaces/srv/AddTwoInts.h"
// CycloneDDS CDR Deserializer
#include <dds/cdr/dds_cdrstream.h>

// CDR Xtypes header {0x00, 0x01} indicates it's Little Endian (CDR_LE representation)
const uint8_t ros2_header[4] = {0x00, 0x01, 0x00, 0x00};

static dds_cdrstream_allocator_t allocator = {.malloc = malloc, .realloc = realloc, .free = free};

z_condvar_t cond;
z_mutex_t mutex;

void reply_dropper(void *ctx) {
(void)(ctx);
printf(">> Received query final notification\n");
z_condvar_signal(&cond);
z_condvar_free(&cond);
}

void reply_handler(z_owned_reply_t *reply, void *ctx) {
(void)(ctx);
if (z_reply_is_ok(reply)) {
z_sample_t sample = z_reply_ok(reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);

struct dds_cdrstream_desc desc_rd;
dds_cdrstream_desc_init(&desc_rd, &allocator,
example_interfaces_srv_AddTwoInts_Response_desc.m_size,
example_interfaces_srv_AddTwoInts_Response_desc.m_align,
example_interfaces_srv_AddTwoInts_Response_desc.m_flagset,
example_interfaces_srv_AddTwoInts_Response_desc.m_ops,
example_interfaces_srv_AddTwoInts_Response_desc.m_keys,
example_interfaces_srv_AddTwoInts_Response_desc.m_nkeys);

uint32_t actual_size;
const bool byteswap = (sample.payload.start[1] != DDSRT_LITTLE_ENDIAN);
const bool norm_ok = dds_stream_normalize(
sample.payload.start + 4,
sample.payload.len,
byteswap,
DDSI_RTPS_CDR_ENC_VERSION_2, &desc_rd, false, &actual_size);

if (!norm_ok) {
printf("dds_stream_normalize failed\n");
return;
}

dds_istream_t is;
is.m_buffer = sample.payload.start + 4;
is.m_index = 0;
is.m_size = sample.payload.len;
is.m_xcdr_version = DDSI_RTPS_CDR_ENC_VERSION_2;

example_interfaces_srv_AddTwoInts_Response *data = calloc(1, desc_rd.size);
dds_stream_read_sample(&is, data, &allocator, &desc_rd);

printf(">> Received ('%s': %ld)\n", z_str_loan(&keystr), data->sum);

z_str_drop(z_str_move(&keystr));

dds_cdrstream_desc_fini(&desc_rd, &allocator);
} else {
printf(">> Received an error\n");
}
}

int main(int argc, char **argv) {
const char *keyexpr = "add_two_ints";
const char *mode = "client";
const char *clocator = NULL;
const char *llocator = NULL;
const char *req_a = "1000";
const char *req_b = "2000";

int opt;
while ((opt = getopt(argc, argv, "k:e:m:v:a:b:l:")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
break;
case 'e':
clocator = optarg;
break;
case 'm':
mode = optarg;
break;
case 'l':
llocator = optarg;
break;
case 'a':
req_a = optarg;
break;
case 'b':
req_b = optarg;
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
return 1;
default:
return -1;
}
}

allocator.malloc = malloc;
allocator.free = free;
allocator.realloc = realloc;

z_mutex_init(&mutex);
z_condvar_init(&cond);

z_owned_config_t config = z_config_default();
zp_config_insert(z_config_loan(&config), Z_CONFIG_MODE_KEY, z_string_make(mode));
if (clocator != NULL) {
zp_config_insert(z_config_loan(&config), Z_CONFIG_CONNECT_KEY, z_string_make(clocator));
}
if (llocator != NULL) {
zp_config_insert(z_config_loan(&config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator));
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_config_move(&config));
if (!z_session_check(&s)) {
printf("Unable to open session!\n");
return -1;
}

// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_session_loan(&s), NULL) < 0 || zp_start_lease_task(z_session_loan(&s), NULL) < 0) {
printf("Unable to start read and lease tasks\n");
z_close(z_session_move(&s));
return -1;
}

z_keyexpr_t ke = z_keyexpr(keyexpr);
if (!z_keyexpr_is_initialized(&ke)) {
printf("%s is not a valid key expression", keyexpr);
return -1;
}

z_mutex_lock(&mutex);

// Setup ostream for serializer
dds_ostream_t os;
os.m_buffer = NULL;
os.m_index = 0;
os.m_size = 0;
os.m_xcdr_version = DDSI_RTPS_CDR_ENC_VERSION_2;

example_interfaces_srv_AddTwoInts_Request req;
req.a = (int64_t)(atoi(req_a));
req.b = (int64_t)(atoi(req_b));

printf("Sending Query '%s'... with a = %ld, b = %ld\n", keyexpr, req.a, req.b);
// Allocate buffer for serialized message
uint8_t *buf = malloc(sizeof(ros2_header));

// Add ROS2 header
memcpy(buf, ros2_header, sizeof(ros2_header));
os.m_buffer = buf;
os.m_index = sizeof(ros2_header); // Offset for CDR Xtypes header
os.m_size = sizeof(ros2_header);
os.m_xcdr_version = DDSI_RTPS_CDR_ENC_VERSION_2;

struct dds_cdrstream_desc desc_wr;
dds_cdrstream_desc_init(&desc_wr, &allocator,
example_interfaces_srv_AddTwoInts_Request_desc.m_size,
example_interfaces_srv_AddTwoInts_Request_desc.m_align,
example_interfaces_srv_AddTwoInts_Request_desc.m_flagset,
example_interfaces_srv_AddTwoInts_Request_desc.m_ops,
example_interfaces_srv_AddTwoInts_Request_desc.m_keys,
example_interfaces_srv_AddTwoInts_Request_desc.m_nkeys);

// Do serialization
bool ret = dds_stream_write_sample(&os, &allocator, (void *)&req, &desc_wr);
if (!ret) {
printf("dds_stream_write_sampleLE failed\n");
exit(-1);
}

z_get_options_t opts = z_get_options_default();
opts.value.payload.start = os.m_buffer;
opts.value.payload.len = os.m_index;
opts.value.payload._is_alloc = false;

z_owned_closure_reply_t callback = z_closure_reply(reply_handler, reply_dropper, NULL);
if (z_get(z_session_loan(&s), ke, "", z_closure_reply_move(&callback), &opts) < 0) {
printf("Unable to send query.\n");
return -1;
}

z_condvar_wait(&cond, &mutex);
z_mutex_unlock(&mutex);

dds_cdrstream_desc_fini(&desc_wr, &allocator);

// Stop read and lease tasks for zenoh-pico
zp_stop_read_task(z_session_loan(&s));
zp_stop_lease_task(z_session_loan(&s));

z_close(z_session_move(&s));

return 0;
}
Loading