Skip to content
Permalink
Browse files
Performance improvements for celix_bundleContext_useService(s)WithOpt…
…ions.

Remove unnecessary celix_framework_waitForEmptyEventQueue from the indirect call path, while replace it with more efficient celix_framework_waitUntilNoPendingRegistration  for the direct call path. Added framework->dispatcher.stats can also be used for diagnostic purpose.
  • Loading branch information
PengZheng committed Feb 24, 2022
1 parent dac050e commit c407c4057540b0cdbb98f3091be0cb4d91b25348
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 3 deletions.
@@ -1411,6 +1411,77 @@ TEST_F(CelixBundleContextServicesTests, serviceOnDemandWithAsyncRegisterTest) {
celix_bundleContext_stopTracker(ctx, trkId);
}

TEST_F(CelixBundleContextServicesTests, UseServiceOnDemandDirectlyWithAsyncRegisterTest) {
//NOTE that even though service are registered async, they should be found by a useService call.

bool called = celix_bundleContext_useService(ctx, "test", nullptr, [](void*, void*){/*nop*/});
EXPECT_FALSE(called); //service not available

struct test_service {
void* handle;
};

struct callback_data {
celix_bundle_context_t* ctx;
long svcId;
test_service ts;
};
callback_data cbData{ctx, -1L, {nullptr}};

long trkId = celix_bundleContext_trackServiceTrackers(ctx, "test", &cbData, [](void *voidData, const celix_service_tracker_info_t*) {
auto* data = static_cast<callback_data*>(voidData);
data->svcId = celix_bundleContext_registerServiceAsync(data->ctx, &data->ts, "test", nullptr);
}, nullptr);
celix_service_use_options_t opts{};
opts.filter.serviceName = "test";
opts.direct = true;
called = celix_bundleContext_useServiceWithOptions(ctx, &opts);
EXPECT_TRUE(called); //service created on demand.

celix_bundleContext_unregisterService(ctx, cbData.svcId);
celix_bundleContext_stopTracker(ctx, trkId);
}

TEST_F(CelixBundleContextServicesTests, UseServicesOnDemandDirectlyWithAsyncRegisterTest) {
//NOTE that even though service are registered async, they should be found by a useService call.

bool called = celix_bundleContext_useService(ctx, "test", nullptr, [](void*, void*){/*nop*/});
EXPECT_FALSE(called); //service not available

struct test_service {
void* handle;
};

struct callback_data {
celix_bundle_context_t* ctx;
long svcId;
test_service ts;
};
callback_data cbData{ctx, -1L, {nullptr}};

long trkId = celix_bundleContext_trackServiceTrackers(ctx, "test", &cbData, [](void *voidData, const celix_service_tracker_info_t*) {
auto* data = static_cast<callback_data*>(voidData);
data->svcId = celix_bundleContext_registerServiceAsync(data->ctx, &data->ts, "test", nullptr);
}, nullptr);

callback_data cbData1{ctx, -1L, {nullptr}};
long trkId1 = celix_bundleContext_trackServiceTrackers(ctx, "test", &cbData1, [](void *voidData, const celix_service_tracker_info_t*) {
auto* data = static_cast<callback_data*>(voidData);
data->svcId = celix_bundleContext_registerServiceAsync(data->ctx, &data->ts, "test", nullptr);
}, nullptr);

celix_service_use_options_t opts{};
opts.filter.serviceName = "test";
opts.direct = true;
size_t count = celix_bundleContext_useServicesWithOptions(ctx, &opts);
EXPECT_EQ(2, count);

celix_bundleContext_unregisterService(ctx, cbData.svcId);
celix_bundleContext_unregisterService(ctx, cbData1.svcId);
celix_bundleContext_stopTracker(ctx, trkId);
celix_bundleContext_stopTracker(ctx, trkId1);
}

TEST_F(CelixBundleContextServicesTests, startStopServiceTrackerAsync) {
std::atomic<int> count{0};

@@ -226,6 +226,11 @@ void celix_framework_setLogCallback(celix_framework_t* fw, void* logHandle, void
*/
void celix_framework_waitUntilNoEventsForBnd(celix_framework_t* fw, long bndId);

/**
* @brief wait until all pending service registration are processed.
*/
void celix_framework_waitUntilNoPendingRegistration(celix_framework_t* fw);

/**
* @brief Returns whether the current thread is the Celix framework event loop thread.
*/
@@ -1208,9 +1208,9 @@ bool celix_bundleContext_useServiceWithOptions(
long eventId = celix_framework_fireGenericEvent(ctx->framework, -1, celix_bundle_getId(ctx->bundle), "create service tracker for celix_bundleContext_useServiceWithOptions", &data, celix_bundleContext_useServiceWithOptions_1_CreateServiceTracker, NULL, NULL);
celix_framework_waitForGenericEvent(ctx->framework, eventId);

celix_framework_waitForEmptyEventQueue(ctx->framework); //ensure that a useService wait if a listener hooks concept, which triggers an async service registration

if(opts->direct) {
celix_framework_waitUntilNoPendingRegistration(ctx->framework); // TL;DR make "service on demand" pattern work. Try comment it out and run CelixBundleContextServicesTests.UseServiceOnDemandDirectlyWithAsyncRegisterTest
called = celix_serviceTracker_useHighestRankingService(data.svcTracker, NULL, opts->waitTimeoutInSeconds, opts->callbackHandle, opts->use, opts->useWithProperties, opts->useWithOwner);
} else {
struct timespec startTime = celix_gettime(CLOCK_MONOTONIC);
@@ -1263,9 +1263,8 @@ size_t celix_bundleContext_useServicesWithOptions(
long eventId = celix_framework_fireGenericEvent(ctx->framework, -1, celix_bundle_getId(ctx->bundle), "create service tracker for celix_bundleContext_useServicesWithOptions", &data, celix_bundleContext_useServiceWithOptions_1_CreateServiceTracker, NULL, NULL);
celix_framework_waitForGenericEvent(ctx->framework, eventId);

celix_framework_waitForEmptyEventQueue(ctx->framework); //ensure that a useService wait if a listener hooks concept, which triggers an async service registration

if (opts->direct) {
celix_framework_waitUntilNoPendingRegistration(ctx->framework); // TL;DR make "service on demand" pattern work. Try comment it out and run CelixBundleContextServicesTests.UseServicesOnDemandDirectlyWithAsyncRegisterTest
celix_bundleContext_useServicesWithOptions_2_UseServiceTracker(&data);
} else {
eventId = celix_framework_fireGenericEvent(ctx->framework, -1, celix_bundle_getId(ctx->bundle), "use service tracker for celix_bundleContext_useServicesWithOptions", &data, celix_bundleContext_useServicesWithOptions_2_UseServiceTracker, NULL, NULL);
@@ -1446,6 +1446,7 @@ void fw_fireBundleEvent(framework_pt framework, bundle_event_type_e eventType, c
event.type = CELIX_BUNDLE_EVENT_TYPE;
event.bndEntry = entry;
event.bundleEvent = eventType;
__atomic_add_fetch(&framework->dispatcher.stats.nbBundle, 1, __ATOMIC_RELAXED);
celix_framework_addToEventQueue(framework, &event);
}

@@ -1460,6 +1461,7 @@ void fw_fireFrameworkEvent(framework_pt framework, framework_event_type_e eventT
event.error = celix_strerror(errorCode);
}

__atomic_add_fetch(&framework->dispatcher.stats.nbFramework, 1, __ATOMIC_RELAXED);
celix_framework_addToEventQueue(framework, &event);
}

@@ -1512,6 +1514,7 @@ static void fw_handleEventRequest(celix_framework_t *framework, celix_framework_
fw_bundleListener_decreaseUseCount(listener);
}
celix_arrayList_destroy(localListeners);
__atomic_sub_fetch(&framework->dispatcher.stats.nbBundle, 1, __ATOMIC_RELAXED);
} else if (event->type == CELIX_FRAMEWORK_EVENT_TYPE) {
celixThreadMutex_lock(&framework->frameworkListenersLock);
for (int i = 0; i < celix_arrayList_size(framework->frameworkListeners); ++i) {
@@ -1525,6 +1528,7 @@ static void fw_handleEventRequest(celix_framework_t *framework, celix_framework_
fw_invokeFrameworkListener(framework, listener->listener, &fEvent, listener->bundle);
}
celixThreadMutex_unlock(&framework->frameworkListenersLock);
__atomic_sub_fetch(&framework->dispatcher.stats.nbFramework, 1, __ATOMIC_RELAXED);
} else if (event->type == CELIX_REGISTER_SERVICE_EVENT) {
service_registration_t* reg = NULL;
celix_status_t status = CELIX_SUCCESS;
@@ -1541,12 +1545,15 @@ static void fw_handleEventRequest(celix_framework_t *framework, celix_framework_
} else if (!event->cancelled && event->registerCallback != NULL) {
event->registerCallback(event->registerData, serviceRegistration_getServiceId(reg));
}
__atomic_sub_fetch(&framework->dispatcher.stats.nbRegister, 1, __ATOMIC_RELAXED);
} else if (event->type == CELIX_UNREGISTER_SERVICE_EVENT) {
celix_serviceRegistry_unregisterService(framework->registry, event->bndEntry->bnd, event->unregisterServiceId);
__atomic_sub_fetch(&framework->dispatcher.stats.nbUnregister, 1, __ATOMIC_RELAXED);
} else if (event->type == CELIX_GENERIC_EVENT) {
if (event->genericProcess != NULL) {
event->genericProcess(event->genericProcessData);
}
__atomic_sub_fetch(&framework->dispatcher.stats.nbEvent, 1, __ATOMIC_RELAXED);
}

if (event->doneCallback != NULL && !event->cancelled) {
@@ -1958,6 +1965,7 @@ long celix_framework_registerServiceAsync(
event.registerCallback = registerDoneCallback;
event.doneData = eventDoneData;
event.doneCallback = eventDoneCallback;
__atomic_add_fetch(&fw->dispatcher.stats.nbRegister, 1, __ATOMIC_RELAXED);
celix_framework_addToEventQueue(fw, &event);

return svcId;
@@ -1975,6 +1983,7 @@ void celix_framework_unregisterAsync(celix_framework_t* fw, celix_bundle_t* bnd,
event.doneData = doneData;
event.doneCallback = doneCallback;

__atomic_add_fetch(&fw->dispatcher.stats.nbUnregister, 1, __ATOMIC_RELAXED);
celix_framework_addToEventQueue(fw, &event);
}

@@ -2665,6 +2674,15 @@ void celix_framework_waitUntilNoEventsForBnd(celix_framework_t* fw, long bndId)
celixThreadMutex_unlock(&fw->dispatcher.mutex);
}

void celix_framework_waitUntilNoPendingRegistration(celix_framework_t* fw)
{
assert(!celix_framework_isCurrentThreadTheEventLoop(fw));
celixThreadMutex_lock(&fw->dispatcher.mutex);
while (__atomic_load_n(&fw->dispatcher.stats.nbRegister, __ATOMIC_RELAXED) > 0) {
celixThreadCondition_wait(&fw->dispatcher.cond, &fw->dispatcher.mutex);
}
celixThreadMutex_unlock(&fw->dispatcher.mutex);
}

void celix_framework_setLogCallback(celix_framework_t* fw, void* logHandle, void (*logFunction)(void* handle, celix_log_level_e level, const char* file, const char *function, int line, const char *format, va_list formatArgs)) {
celix_frameworkLogger_setLogCallback(fw->logger, logHandle, logFunction);
@@ -2694,6 +2712,7 @@ long celix_framework_fireGenericEvent(framework_t* fw, long eventId, long bndId,
event.genericProcess = processCallback;
event.doneData = doneData;
event.doneCallback = doneCallback;
__atomic_add_fetch(&fw->dispatcher.stats.nbEvent, 1, __ATOMIC_RELAXED);
celix_framework_addToEventQueue(fw, &event);

return eventId;
@@ -162,6 +162,13 @@ struct celix_framework {
int eventQueueSize;
int eventQueueFirstEntry;
celix_array_list_t *dynamicEventQueue; //entry = celix_framework_event_t*. Used when the eventQueue is full
struct {
int nbFramework; // number of pending framework events
int nbBundle; // number of pending bundle events
int nbRegister; // number of pending registration
int nbUnregister; // number of pending async de-registration
int nbEvent; // number of pending generic events
} stats;
} dispatcher;

celix_framework_logger_t* logger;

0 comments on commit c407c40

Please sign in to comment.