diff --git a/index.js b/index.js index 2ddcc1c5..f28a28e4 100644 --- a/index.js +++ b/index.js @@ -118,6 +118,7 @@ let rcl = { node.init(nodeName, namespace); debug('Finish initializing node, name = %s and namespace = %s.', nodeName, namespace); node.handle = handle; + node.context = context; this._nodes.push(node); return node; }, diff --git a/lib/guard_condition.js b/lib/guard_condition.js new file mode 100644 index 00000000..1fff8241 --- /dev/null +++ b/lib/guard_condition.js @@ -0,0 +1,49 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +const rclnodejs = require('bindings')('rclnodejs'); +const Entity = require('./entity.js'); +const Context = require('./context.js'); + +/** + * @class - Class representing a guard condition in ROS + * @hideconstructor + */ + +class GuardCondition extends Entity { + constructor(handle, callback) { + super(handle, null, null); + + this._callback = callback; + } + + get callback() { + return this._callback; + } + + static createGuardCondition(callback, context = Context.defaultContext()) { + let handle = rclnodejs.createGuardCondition(context.handle()); + return new GuardCondition(handle, callback); + } + + /** + * Triggers the guard condition. + * @returns {undefined} + */ + trigger() { + rclnodejs.triggerGuardCondition(this.handle); + } +} + +module.exports = GuardCondition; diff --git a/lib/node.js b/lib/node.js index b9aa39bf..b8f37e1c 100644 --- a/lib/node.js +++ b/lib/node.js @@ -24,6 +24,7 @@ const QoS = require('./qos.js'); const debug = require('debug')('rclnodejs:node'); const loader = require('./interface_loader.js'); const Context = require('./context.js'); +const GuardCondition = require('./guard_condition.js'); /** * @class - Class representing a Node in ROS * @hideconstructor @@ -36,6 +37,7 @@ class Node { this._clients = []; this._services = []; this._timers = []; + this._guards = []; this._name = name; if (namespace.length === 0) { @@ -47,15 +49,22 @@ class Node { this.spinning = false; } - execute() { - this._timers.forEach((timer) => { + execute(handles) { + let timersReady = this._timers.filter((timer) => handles.indexOf(timer.handle) !== -1); + let guardsReady = this._guards.filter((guard) => handles.indexOf(guard.handle) !== -1); + let subscriptionsReady = this._subscriptions.filter((subscription) => + handles.indexOf(subscription.handle) !== -1); + let clientsReady = this._clients.filter((client) => handles.indexOf(client.handle) !== -1); + let servicesReady = this._services.filter((service) => handles.indexOf(service.handle) !== -1); + + timersReady.forEach((timer) => { if (timer.isReady()) { rclnodejs.callTimer(timer.handle); timer.callback(); } }); - this._subscriptions.forEach((subscription) => { + subscriptionsReady.forEach((subscription) => { let Message = subscription.typeClass; let msg = new Message(); let success = rclnodejs.rclTake(subscription.handle, msg.toRawROS()); @@ -65,7 +74,11 @@ class Node { Message.destoryRawROS(msg); }); - this._clients.forEach((client) => { + guardsReady.forEach((guard) => { + guard.callback(); + }); + + clientsReady.forEach((client) => { let Response = client.typeClass.Response; let response = new Response(); let success = rclnodejs.rclTakeResponse(client.handle, client.sequenceNumber, response.toRawROS()); @@ -75,7 +88,7 @@ class Node { Response.destoryRawROS(response); }); - this._services.forEach((service) => { + servicesReady.forEach((service) => { let Request = service.typeClass.Request; let request = new Request(); let header = rclnodejs.rclTakeRequest(service.handle, this.handle, request.toRawROS()); @@ -313,6 +326,24 @@ class Node { return service; } + /** + * Create a guard condition. + * @param {Function} callback - The callback to be called when the guard condition is triggered. + * @return {GuardCondition} - An instance of GuardCondition. + */ + createGuardCondition(callback) { + if (typeof (callback) !== 'function') { + throw new TypeError('Invalid argument'); + } + + let guard = GuardCondition.createGuardCondition(callback, this.context); + debug('Finish creating guard condition'); + this._guards.push(guard); + this.syncHandles(); + + return guard; + } + /** * Destroy all resource allocated by this node, including * Timers/Publishers/Subscriptions @@ -330,6 +361,7 @@ class Node { this._subscriptions = []; this._clients = []; this._services = []; + this._guards = []; } /** @@ -392,6 +424,18 @@ class Node { this._destroyEntity(timer, this._timers); } + /** + * Destroy a guard condition. + * @param {GuardCondition} guard - The guard condition to be destroyed. + * @return {undefined} + */ + destroyGuardCondition(guard) { + if (!(guard instanceof GuardCondition)) { + throw new TypeError('Invalid argument'); + } + this._destroyEntity(guard, this._guards); + } + /* Get the name of the node. * @return {string} */ diff --git a/package.json b/package.json index cf15482e..52455630 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "dtslint": "^2.0.2", "eslint": "^5.14.1", "mocha": "^6.0.1", + "sinon": "^8.1.1", "tree-kill": "^1.2.1", "typescript": "^3.7.2" }, diff --git a/src/executor.cpp b/src/executor.cpp index 8091e50f..28dd4f39 100644 --- a/src/executor.cpp +++ b/src/executor.cpp @@ -88,7 +88,8 @@ void Executor::DoWork(uv_async_t* handle) { rcl_reset_error(); g_exception_ptr = nullptr; } - executor->delegate_->Execute(); + executor->delegate_->Execute( + executor->handle_manager_->get_ready_handles()); } } @@ -117,8 +118,8 @@ void Executor::Run(void* arg) { continue; if (rcl_wait_set_resize(&wait_set, handle_manager->subscription_count(), - // TODO(minggang): support guard conditions - 1u, handle_manager->timer_count(), + handle_manager->guard_condition_count() + 1u, + handle_manager->timer_count(), handle_manager->client_count(), handle_manager->service_count(), // TODO(minggang): support events. @@ -147,6 +148,9 @@ void Executor::Run(void* arg) { wait_set.guard_conditions[0]) { executor->running_.store(false); } + + handle_manager->CollectReadyHandles(&wait_set); + if (!uv_is_closing( reinterpret_cast(executor->async_))) { uv_async_send(executor->async_); diff --git a/src/executor.hpp b/src/executor.hpp index adf9ba3e..9d23123b 100644 --- a/src/executor.hpp +++ b/src/executor.hpp @@ -19,6 +19,9 @@ #include #include +#include + +#include "rcl_handle.hpp" struct rcl_context_t; @@ -30,7 +33,8 @@ class Executor { public: class Delegate { public: - virtual void Execute() = 0; + virtual void Execute( + const std::vector& handles) = 0; virtual void CatchException(std::exception_ptr e_ptr) = 0; }; diff --git a/src/handle_manager.cpp b/src/handle_manager.cpp index 568bd24a..da4966bc 100644 --- a/src/handle_manager.cpp +++ b/src/handle_manager.cpp @@ -16,7 +16,6 @@ #include -#include "rcl_handle.hpp" #include "spdlog/spdlog.h" namespace rclnodejs { @@ -47,12 +46,16 @@ void HandleManager::CollectHandles(const v8::Local node) { Nan::Get(node, Nan::New("_clients").ToLocalChecked()); Nan::MaybeLocal services = Nan::Get(node, Nan::New("_services").ToLocalChecked()); + Nan::MaybeLocal guard_conditions = + Nan::Get(node, Nan::New("_guards").ToLocalChecked()); CollectHandlesByType(timers.ToLocalChecked()->ToObject(), &timers_); CollectHandlesByType(subscriptions.ToLocalChecked()->ToObject(), &subscriptions_); CollectHandlesByType(clients.ToLocalChecked()->ToObject(), &clients_); CollectHandlesByType(services.ToLocalChecked()->ToObject(), &services_); + CollectHandlesByType(guard_conditions.ToLocalChecked()->ToObject(), + &guard_conditions_); } is_synchronizing_.store(false); @@ -60,32 +63,75 @@ void HandleManager::CollectHandles(const v8::Local node) { SPDLOG_DEBUG( spdlog::get("rclnodejs"), - "Add {0:d} timers, {1:d} subscriptions, {2:d} clients, {3:d} services.", - timers_.size(), subscriptions_.size(), clients_.size(), services_.size()); + "Add {0:d} timers, {1:d} subscriptions, {2:d} clients, " + + "{3:d} services, {4:d} guards.", + timers_.size(), + subscriptions_.size(), + clients_.size(), + services_.size(), + guard_conditions_.size()); } bool HandleManager::AddHandlesToWaitSet(rcl_wait_set_t* wait_set) { for (auto& timer : timers_) { - if (rcl_wait_set_add_timer(wait_set, timer, nullptr) != RCL_RET_OK) + rcl_timer_t* rcl_timer = reinterpret_cast(timer->ptr()); + if (rcl_wait_set_add_timer(wait_set, rcl_timer, nullptr) != RCL_RET_OK) return false; } for (auto& subscription : subscriptions_) { - if (rcl_wait_set_add_subscription(wait_set, subscription, nullptr) != + rcl_subscription_t* rcl_subscription = + reinterpret_cast(subscription->ptr()); + if (rcl_wait_set_add_subscription(wait_set, rcl_subscription, nullptr) != RCL_RET_OK) return false; } for (auto& client : clients_) { - if (rcl_wait_set_add_client(wait_set, client, nullptr) != RCL_RET_OK) + rcl_client_t* rcl_client = reinterpret_cast(client->ptr()); + if (rcl_wait_set_add_client(wait_set, rcl_client, nullptr) != RCL_RET_OK) return false; } for (auto& service : services_) { - if (rcl_wait_set_add_service(wait_set, service, nullptr) != RCL_RET_OK) + rcl_service_t* rcl_service = + reinterpret_cast(service->ptr()); + if (rcl_wait_set_add_service(wait_set, rcl_service, nullptr) != RCL_RET_OK) + return false; + } + for (auto& guard_condition : guard_conditions_) { + rcl_guard_condition_t* rcl_guard_condition = + reinterpret_cast(guard_condition->ptr()); + if (rcl_wait_set_add_guard_condition(wait_set, rcl_guard_condition, nullptr) + != RCL_RET_OK) return false; } return true; } +void HandleManager::CollectReadyHandles(rcl_wait_set_t* wait_set) { + ready_handles_.clear(); + + CollectReadyHandlesByType( + wait_set->subscriptions, + wait_set->size_of_subscriptions, + subscriptions_); + CollectReadyHandlesByType( + wait_set->clients, + wait_set->size_of_clients, + clients_); + CollectReadyHandlesByType( + wait_set->services, + wait_set->size_of_services, + services_); + CollectReadyHandlesByType( + wait_set->timers, + wait_set->size_of_timers, + timers_); + CollectReadyHandlesByType( + wait_set->guard_conditions, + wait_set->size_of_guard_conditions, + guard_conditions_); +} + void HandleManager::ClearHandles() { timers_.clear(); clients_.clear(); @@ -94,10 +140,9 @@ void HandleManager::ClearHandles() { guard_conditions_.clear(); } -template void HandleManager::CollectHandlesByType( const v8::Local& typeObject, - std::vector* vec) { + std::vector* vec) { Nan::HandleScope scope; if (typeObject->IsArray()) { @@ -112,7 +157,23 @@ void HandleManager::CollectHandlesByType( rclnodejs::RclHandle* rcl_handle = rclnodejs::RclHandle::Unwrap( handle.ToLocalChecked()->ToObject()); - vec->push_back(reinterpret_cast(rcl_handle->ptr())); + vec->push_back(rcl_handle); + } + } +} + +template +void HandleManager::CollectReadyHandlesByType( + const T** struct_ptr, + size_t size, + const std::vector& handles) { + for (size_t idx = 0; idx < size; ++idx) { + if (struct_ptr[idx]) { + for (auto& handle : handles) { + if (struct_ptr[idx] == handle->ptr()) { + ready_handles_.push_back(handle); + } + } } } } diff --git a/src/handle_manager.hpp b/src/handle_manager.hpp index 96f262d1..a6a26f75 100644 --- a/src/handle_manager.hpp +++ b/src/handle_manager.hpp @@ -21,6 +21,8 @@ #include #include +#include "rcl_handle.hpp" + namespace rclnodejs { class ScopedMutex { @@ -39,6 +41,7 @@ class HandleManager { void CollectHandles(const v8::Local node); bool AddHandlesToWaitSet(rcl_wait_set_t* wait_set); + void CollectReadyHandles(rcl_wait_set_t* wait_set); void ClearHandles(); void WaitForSynchronizing() { uv_sem_wait(&sem_); } @@ -46,23 +49,32 @@ class HandleManager { uint32_t service_count() const { return services_.size(); } uint32_t client_count() const { return clients_.size(); } uint32_t timer_count() const { return timers_.size(); } + uint32_t guard_condition_count() const { return guard_conditions_.size(); } + std::vector + get_ready_handles() const { return ready_handles_; } uv_mutex_t* mutex() { return &mutex_; } bool is_synchronizing() const { return is_synchronizing_.load(); } bool is_empty() const { return subscriptions_.size() == 0 && services_.size() == 0 && clients_.size() == 0 - && timers_.size() == 0; } + && timers_.size() == 0 + && guard_conditions_.size() == 0; } protected: - template void CollectHandlesByType( - const v8::Local& typeObject, std::vector* vec); + void CollectHandlesByType(const v8::Local& typeObject, + std::vector* vec); + template void CollectReadyHandlesByType( + const T** struct_ptr, + size_t size, + const std::vector& handles); private: - std::vector timers_; - std::vector clients_; - std::vector services_; - std::vector subscriptions_; - std::vector guard_conditions_; + std::vector timers_; + std::vector clients_; + std::vector services_; + std::vector subscriptions_; + std::vector guard_conditions_; + std::vector ready_handles_; uv_mutex_t mutex_; uv_sem_t sem_; diff --git a/src/rcl_bindings.cpp b/src/rcl_bindings.cpp index 98ed5751..df6e2013 100644 --- a/src/rcl_bindings.cpp +++ b/src/rcl_bindings.cpp @@ -104,6 +104,38 @@ NAN_METHOD(CreateNode) { info.GetReturnValue().Set(handle); } +NAN_METHOD(CreateGuardCondition) { + RclHandle* context_handle = RclHandle::Unwrap(info[0]->ToObject()); + rcl_context_t* context = + reinterpret_cast(context_handle->ptr()); + + rcl_guard_condition_t* gc = reinterpret_cast( + malloc(sizeof(rcl_guard_condition_t))); + + *gc = rcl_get_zero_initialized_guard_condition(); + rcl_guard_condition_options_t gc_options = + rcl_guard_condition_get_default_options(); + + THROW_ERROR_IF_NOT_EQUAL(RCL_RET_OK, + rcl_guard_condition_init(gc, context, gc_options), + rcl_get_error_string().str); + + auto handle = RclHandle::NewInstance(gc, nullptr, [gc] { + return rcl_guard_condition_fini(gc); + }); + info.GetReturnValue().Set(handle); +} + +NAN_METHOD(TriggerGuardCondition) { + RclHandle* gc_handle = RclHandle::Unwrap(info[0]->ToObject()); + rcl_guard_condition_t* gc = + reinterpret_cast(gc_handle->ptr()); + + rcl_ret_t ret = rcl_trigger_guard_condition(gc); + THROW_ERROR_IF_NOT_EQUAL(RCL_RET_OK, rcl_trigger_guard_condition(gc), + rcl_get_error_string().str); +} + NAN_METHOD(CreateTimer) { int64_t period_ms = info[0]->IntegerValue(); RclHandle* context_handle = RclHandle::Unwrap(info[1]->ToObject()); @@ -1289,6 +1321,8 @@ uint32_t GetBindingMethodsCount(BindingMethod* methods) { BindingMethod binding_methods[] = { {"init", Init}, {"createNode", CreateNode}, + {"createGuardCondition", CreateGuardCondition}, + {"triggerGuardCondition", TriggerGuardCondition}, {"createTimer", CreateTimer}, {"isTimerReady", IsTimerReady}, {"callTimer", CallTimer}, diff --git a/src/shadow_node.cpp b/src/shadow_node.cpp index a306eec6..882e1e49 100644 --- a/src/shadow_node.cpp +++ b/src/shadow_node.cpp @@ -15,6 +15,7 @@ #include "shadow_node.hpp" #include +#include #include "executor.hpp" #include "handle_manager.hpp" @@ -108,11 +109,18 @@ NAN_METHOD(ShadowNode::SyncHandles) { } } -void ShadowNode::Execute() { +void ShadowNode::Execute(const std::vector& handles) { Nan::HandleScope scope; - v8::Local argv[1]; Nan::AsyncResource res("shadow_node"); - res.runInAsyncScope(Nan::New(this->persistent()), "execute", 0, argv); + + v8::Local results = Nan::New(handles.size()); + for (size_t i = 0; i < handles.size(); ++i) { + Nan::Set(results, i, handles[i]->handle()); + } + + v8::Local argv[] = { results }; + + res.runInAsyncScope(Nan::New(this->persistent()), "execute", 1, argv); } void ShadowNode::CatchException(std::exception_ptr e_ptr) { diff --git a/src/shadow_node.hpp b/src/shadow_node.hpp index b25708fc..e51385f5 100644 --- a/src/shadow_node.hpp +++ b/src/shadow_node.hpp @@ -19,6 +19,7 @@ #include #include +#include #include "executor.hpp" @@ -38,7 +39,7 @@ class ShadowNode : public Nan::ObjectWrap, HandleManager* handle_manager() { return handle_manager_.get(); } // Executor::Delegate overrides: - void Execute() override; + void Execute(const std::vector& handles) override; void CatchException(std::exception_ptr e_ptr) override; private: diff --git a/test/test-guard-condition.js b/test/test-guard-condition.js new file mode 100644 index 00000000..54a61f7a --- /dev/null +++ b/test/test-guard-condition.js @@ -0,0 +1,81 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +const assert = require('assert'); +const sinon = require('sinon'); +const rclnodejs = require('../index.js'); +const utils = require('./utils.js'); + +describe('rclnodejs guard condition test suite', function() { + var node; + var timeout = 10; + this.timeout(60 * 1000); + + before(function() { + return rclnodejs.init(); + }); + + after(function() { + rclnodejs.shutdown(); + }); + + beforeEach(function() { + node = rclnodejs.createNode('guard_node'); + rclnodejs.spin(node, timeout); + }); + + afterEach(function() { + node.destroy(); + }); + + it('Test trigger', async function() { + let callback = sinon.spy(); + + const gc = node.createGuardCondition(callback); + + await utils.delay(timeout + 1); + assert(callback.notCalled); + + gc.trigger(); + await utils.delay(timeout + 1); + assert(callback.calledOnce); + + node.destroyGuardCondition(gc); + }); + + it('Test double trigger', async function() { + let callback1 = sinon.spy(); + let callback2 = sinon.spy(); + + const gc1 = node.createGuardCondition(callback1); + const gc2 = node.createGuardCondition(callback2); + + await utils.delay(timeout + 1); + assert(callback1.notCalled); + assert(callback2.notCalled); + + gc1.trigger(); + gc2.trigger(); + await utils.delay(timeout + 1); + assert(callback1.calledOnce); + assert(callback2.calledOnce); + + await utils.delay(timeout + 1); + assert(callback1.calledOnce); + assert(callback2.calledOnce); + + node.destroyGuardCondition(gc1); + node.destroyGuardCondition(gc2); + }); +}); diff --git a/test/utils.js b/test/utils.js index 9f5661cd..de3f7adf 100644 --- a/test/utils.js +++ b/test/utils.js @@ -74,9 +74,14 @@ function getAvailablePath(amentPrefixPath, otherDirs) { return availablePath; } +function delay(ms) { + return new Promise((resolve) => { setTimeout(resolve, ms); }); +} + module.exports = { assertMember: assertMember, assertThrowsError: assertThrowsError, launchPythonProcess: launchPythonProcess, - getAvailablePath: getAvailablePath + getAvailablePath: getAvailablePath, + delay: delay }; diff --git a/types/base.d.ts b/types/base.d.ts index 40c8de27..bebd2556 100644 --- a/types/base.d.ts +++ b/types/base.d.ts @@ -5,6 +5,7 @@ /// /// /// +/// /// /// /// diff --git a/types/guard_condition.d.ts b/types/guard_condition.d.ts new file mode 100644 index 00000000..a44ae234 --- /dev/null +++ b/types/guard_condition.d.ts @@ -0,0 +1,17 @@ + +declare module 'rclnodejs' { + + /** + * A ROS guard condition containing a callback executed when the + * guard condition is triggered. + */ + class GuardCondition extends Entity { + + /** + * Triggers the guard condition. + */ + trigger(): void; + + } + +} diff --git a/types/node.d.ts b/types/node.d.ts index 4556cf67..4d00d5bc 100644 --- a/types/node.d.ts +++ b/types/node.d.ts @@ -214,7 +214,15 @@ declare module 'rclnodejs' { * @returns An instance of Service. */ createService(typeClass: TypeClass, serviceName: string, - options: Options, callback: ServiceRequestCallback): Service; + options: Options, callback: ServiceRequestCallback): Service; + + /** + * Create a guard condition. + * + * @param callback - The callback to be called when the guard condition is triggered. + * @return An instance of GuardCondition. + */ + createGuardCondition(callback: () => any): GuardCondition; /**