Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ let rcl = {
node.init(nodeName, namespace);
debug('Finish initializing node, name = %s and namespace = %s.', nodeName, namespace);
node.handle = handle;
node.context = context;
this._nodes.push(node);
return node;
},
Expand Down
49 changes: 49 additions & 0 deletions lib/guard_condition.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

'use strict';

const rclnodejs = require('bindings')('rclnodejs');
const Entity = require('./entity.js');
const Context = require('./context.js');

/**
* @class - Class representing a guard condition in ROS
* @hideconstructor
*/

class GuardCondition extends Entity {
constructor(handle, callback) {
super(handle, null, null);

this._callback = callback;
}

get callback() {
return this._callback;
}

static createGuardCondition(callback, context = Context.defaultContext()) {
let handle = rclnodejs.createGuardCondition(context.handle());
return new GuardCondition(handle, callback);
}

/**
* Triggers the guard condition.
* @returns {undefined}
*/
trigger() {
rclnodejs.triggerGuardCondition(this.handle);
}
}

module.exports = GuardCondition;
54 changes: 49 additions & 5 deletions lib/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const QoS = require('./qos.js');
const debug = require('debug')('rclnodejs:node');
const loader = require('./interface_loader.js');
const Context = require('./context.js');
const GuardCondition = require('./guard_condition.js');
/**
* @class - Class representing a Node in ROS
* @hideconstructor
Expand All @@ -36,6 +37,7 @@ class Node {
this._clients = [];
this._services = [];
this._timers = [];
this._guards = [];
this._name = name;

if (namespace.length === 0) {
Expand All @@ -47,15 +49,22 @@ class Node {
this.spinning = false;
}

execute() {
this._timers.forEach((timer) => {
execute(handles) {
let timersReady = this._timers.filter((timer) => handles.indexOf(timer.handle) !== -1);
let guardsReady = this._guards.filter((guard) => handles.indexOf(guard.handle) !== -1);
let subscriptionsReady = this._subscriptions.filter((subscription) =>
handles.indexOf(subscription.handle) !== -1);
let clientsReady = this._clients.filter((client) => handles.indexOf(client.handle) !== -1);
let servicesReady = this._services.filter((service) => handles.indexOf(service.handle) !== -1);

timersReady.forEach((timer) => {
if (timer.isReady()) {
rclnodejs.callTimer(timer.handle);
timer.callback();
}
});

this._subscriptions.forEach((subscription) => {
subscriptionsReady.forEach((subscription) => {
let Message = subscription.typeClass;
let msg = new Message();
let success = rclnodejs.rclTake(subscription.handle, msg.toRawROS());
Expand All @@ -65,7 +74,11 @@ class Node {
Message.destoryRawROS(msg);
});

this._clients.forEach((client) => {
guardsReady.forEach((guard) => {
guard.callback();
});

clientsReady.forEach((client) => {
let Response = client.typeClass.Response;
let response = new Response();
let success = rclnodejs.rclTakeResponse(client.handle, client.sequenceNumber, response.toRawROS());
Expand All @@ -75,7 +88,7 @@ class Node {
Response.destoryRawROS(response);
});

this._services.forEach((service) => {
servicesReady.forEach((service) => {
let Request = service.typeClass.Request;
let request = new Request();
let header = rclnodejs.rclTakeRequest(service.handle, this.handle, request.toRawROS());
Expand Down Expand Up @@ -313,6 +326,24 @@ class Node {
return service;
}

/**
* Create a guard condition.
* @param {Function} callback - The callback to be called when the guard condition is triggered.
* @return {GuardCondition} - An instance of GuardCondition.
*/
createGuardCondition(callback) {
if (typeof (callback) !== 'function') {
throw new TypeError('Invalid argument');
}

let guard = GuardCondition.createGuardCondition(callback, this.context);
debug('Finish creating guard condition');
this._guards.push(guard);
this.syncHandles();

return guard;
}

/**
* Destroy all resource allocated by this node, including
* <code>Timer</code>s/<code>Publisher</code>s/<code>Subscription</code>s
Expand All @@ -330,6 +361,7 @@ class Node {
this._subscriptions = [];
this._clients = [];
this._services = [];
this._guards = [];
}

/**
Expand Down Expand Up @@ -392,6 +424,18 @@ class Node {
this._destroyEntity(timer, this._timers);
}

/**
* Destroy a guard condition.
* @param {GuardCondition} guard - The guard condition to be destroyed.
* @return {undefined}
*/
destroyGuardCondition(guard) {
if (!(guard instanceof GuardCondition)) {
throw new TypeError('Invalid argument');
}
this._destroyEntity(guard, this._guards);
}

/* Get the name of the node.
* @return {string}
*/
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"dtslint": "^2.0.2",
"eslint": "^5.14.1",
"mocha": "^6.0.1",
"sinon": "^8.1.1",
"tree-kill": "^1.2.1",
"typescript": "^3.7.2"
},
Expand Down
10 changes: 7 additions & 3 deletions src/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ void Executor::DoWork(uv_async_t* handle) {
rcl_reset_error();
g_exception_ptr = nullptr;
}
executor->delegate_->Execute();
executor->delegate_->Execute(
executor->handle_manager_->get_ready_handles());
}
}

Expand Down Expand Up @@ -117,8 +118,8 @@ void Executor::Run(void* arg) {
continue;

if (rcl_wait_set_resize(&wait_set, handle_manager->subscription_count(),
// TODO(minggang): support guard conditions
1u, handle_manager->timer_count(),
handle_manager->guard_condition_count() + 1u,
handle_manager->timer_count(),
handle_manager->client_count(),
handle_manager->service_count(),
// TODO(minggang): support events.
Expand Down Expand Up @@ -147,6 +148,9 @@ void Executor::Run(void* arg) {
wait_set.guard_conditions[0]) {
executor->running_.store(false);
}

handle_manager->CollectReadyHandles(&wait_set);

if (!uv_is_closing(
reinterpret_cast<uv_handle_t*>(executor->async_))) {
uv_async_send(executor->async_);
Expand Down
6 changes: 5 additions & 1 deletion src/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

#include <atomic>
#include <exception>
#include <vector>

#include "rcl_handle.hpp"

struct rcl_context_t;

Expand All @@ -30,7 +33,8 @@ class Executor {
public:
class Delegate {
public:
virtual void Execute() = 0;
virtual void Execute(
const std::vector<rclnodejs::RclHandle *>& handles) = 0;
virtual void CatchException(std::exception_ptr e_ptr) = 0;
};

Expand Down
81 changes: 71 additions & 10 deletions src/handle_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include <vector>

#include "rcl_handle.hpp"
#include "spdlog/spdlog.h"

namespace rclnodejs {
Expand Down Expand Up @@ -47,45 +46,92 @@ void HandleManager::CollectHandles(const v8::Local<v8::Object> node) {
Nan::Get(node, Nan::New("_clients").ToLocalChecked());
Nan::MaybeLocal<v8::Value> services =
Nan::Get(node, Nan::New("_services").ToLocalChecked());
Nan::MaybeLocal<v8::Value> guard_conditions =
Nan::Get(node, Nan::New("_guards").ToLocalChecked());

CollectHandlesByType(timers.ToLocalChecked()->ToObject(), &timers_);
CollectHandlesByType(subscriptions.ToLocalChecked()->ToObject(),
&subscriptions_);
CollectHandlesByType(clients.ToLocalChecked()->ToObject(), &clients_);
CollectHandlesByType(services.ToLocalChecked()->ToObject(), &services_);
CollectHandlesByType(guard_conditions.ToLocalChecked()->ToObject(),
&guard_conditions_);
}

is_synchronizing_.store(false);
uv_sem_post(&sem_);

SPDLOG_DEBUG(
spdlog::get("rclnodejs"),
"Add {0:d} timers, {1:d} subscriptions, {2:d} clients, {3:d} services.",
timers_.size(), subscriptions_.size(), clients_.size(), services_.size());
"Add {0:d} timers, {1:d} subscriptions, {2:d} clients, " +
"{3:d} services, {4:d} guards.",
timers_.size(),
subscriptions_.size(),
clients_.size(),
services_.size(),
guard_conditions_.size());
}

bool HandleManager::AddHandlesToWaitSet(rcl_wait_set_t* wait_set) {
for (auto& timer : timers_) {
if (rcl_wait_set_add_timer(wait_set, timer, nullptr) != RCL_RET_OK)
rcl_timer_t* rcl_timer = reinterpret_cast<rcl_timer_t*>(timer->ptr());
if (rcl_wait_set_add_timer(wait_set, rcl_timer, nullptr) != RCL_RET_OK)
return false;
}
for (auto& subscription : subscriptions_) {
if (rcl_wait_set_add_subscription(wait_set, subscription, nullptr) !=
rcl_subscription_t* rcl_subscription =
reinterpret_cast<rcl_subscription_t*>(subscription->ptr());
if (rcl_wait_set_add_subscription(wait_set, rcl_subscription, nullptr) !=
RCL_RET_OK)
return false;
}
for (auto& client : clients_) {
if (rcl_wait_set_add_client(wait_set, client, nullptr) != RCL_RET_OK)
rcl_client_t* rcl_client = reinterpret_cast<rcl_client_t*>(client->ptr());
if (rcl_wait_set_add_client(wait_set, rcl_client, nullptr) != RCL_RET_OK)
return false;
}
for (auto& service : services_) {
if (rcl_wait_set_add_service(wait_set, service, nullptr) != RCL_RET_OK)
rcl_service_t* rcl_service =
reinterpret_cast<rcl_service_t*>(service->ptr());
if (rcl_wait_set_add_service(wait_set, rcl_service, nullptr) != RCL_RET_OK)
return false;
}
for (auto& guard_condition : guard_conditions_) {
rcl_guard_condition_t* rcl_guard_condition =
reinterpret_cast<rcl_guard_condition_t*>(guard_condition->ptr());
if (rcl_wait_set_add_guard_condition(wait_set, rcl_guard_condition, nullptr)
!= RCL_RET_OK)
return false;
}

return true;
}

void HandleManager::CollectReadyHandles(rcl_wait_set_t* wait_set) {
ready_handles_.clear();

CollectReadyHandlesByType(
wait_set->subscriptions,
wait_set->size_of_subscriptions,
subscriptions_);
CollectReadyHandlesByType(
wait_set->clients,
wait_set->size_of_clients,
clients_);
CollectReadyHandlesByType(
wait_set->services,
wait_set->size_of_services,
services_);
CollectReadyHandlesByType(
wait_set->timers,
wait_set->size_of_timers,
timers_);
CollectReadyHandlesByType(
wait_set->guard_conditions,
wait_set->size_of_guard_conditions,
guard_conditions_);
}

void HandleManager::ClearHandles() {
timers_.clear();
clients_.clear();
Expand All @@ -94,10 +140,9 @@ void HandleManager::ClearHandles() {
guard_conditions_.clear();
}

template <typename T>
void HandleManager::CollectHandlesByType(
const v8::Local<v8::Object>& typeObject,
std::vector<const T*>* vec) {
std::vector<rclnodejs::RclHandle*>* vec) {
Nan::HandleScope scope;

if (typeObject->IsArray()) {
Expand All @@ -112,7 +157,23 @@ void HandleManager::CollectHandlesByType(
rclnodejs::RclHandle* rcl_handle =
rclnodejs::RclHandle::Unwrap<rclnodejs::RclHandle>(
handle.ToLocalChecked()->ToObject());
vec->push_back(reinterpret_cast<T*>(rcl_handle->ptr()));
vec->push_back(rcl_handle);
}
}
}

template<typename T>
void HandleManager::CollectReadyHandlesByType(
const T** struct_ptr,
size_t size,
const std::vector<rclnodejs::RclHandle*>& handles) {
for (size_t idx = 0; idx < size; ++idx) {
if (struct_ptr[idx]) {
for (auto& handle : handles) {
if (struct_ptr[idx] == handle->ptr()) {
ready_handles_.push_back(handle);
}
}
}
}
}
Expand Down
Loading