Skip to content
Permalink
Browse files
Service tracker performance optimization.
Previously celix_bundleContext_useService(s)WithOptions invoke service in the main event loop, which means any blocking service is blocking the whole world. Also, usleep(10) polling is expensive if high-resolution timer is enabled on Linux. This commit fixes both of them.
  • Loading branch information
PengZheng committed Feb 22, 2022
1 parent bf7d85e commit 11c1ca1eda59c78e5bd3499973c91793fbeabcab
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 65 deletions.
@@ -742,7 +742,7 @@ typedef struct celix_service_use_options {
/**
* @brief An optional timeout (in seconds), if > 0 the use service call will block until the timeout is expired or
* when at least one service is found. Note that it will be ignored when use service on the event loop.
* Default (0)
* Default (0) Valid Range [0, INT_MAX]
*/
double waitTimeoutInSeconds OPTS_INIT;

@@ -113,13 +113,13 @@ void celix_serviceTracker_destroy(celix_service_tracker_t *tracker);
bool celix_serviceTracker_useHighestRankingService(
celix_service_tracker_t *tracker,
const char *serviceName /*sanity*/,
double waitTimeoutInSeconds /*0 -> do not wait */,
void *callbackHandle,
void (*use)(void *handle, void *svc),
void (*useWithProperties)(void *handle, void *svc, const celix_properties_t *props),
void (*useWithOwner)(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner)
);


/**
* Calls the use callback for every services found by this tracker.
* Returns the number of called services
@@ -1180,9 +1180,11 @@ static void celix_bundleContext_useServiceWithOptions_1_CreateServiceTracker(voi

static void celix_bundleContext_useServiceWithOptions_2_UseServiceTracker(void *data) {
celix_bundle_context_use_service_data_t* d = data;
assert(celix_framework_isCurrentThreadTheEventLoop(d->ctx->framework));

d->called = celix_serviceTracker_useHighestRankingService(d->svcTracker, d->opts->filter.serviceName, d->opts->callbackHandle, d->opts->use, d->opts->useWithProperties, d->opts->useWithOwner);
double waitTimeoutInSeconds = d->opts->waitTimeoutInSeconds;
if (celix_framework_isCurrentThreadTheEventLoop(d->ctx->framework) || waitTimeoutInSeconds < 0) {
waitTimeoutInSeconds = 0;
}
d->called = celix_serviceTracker_useHighestRankingService(d->svcTracker, NULL, waitTimeoutInSeconds, d->opts->callbackHandle, d->opts->use, d->opts->useWithProperties, d->opts->useWithOwner);
}

static void celix_bundleContext_useServiceWithOptions_3_CloseServiceTracker(void *data) {
@@ -1207,45 +1209,30 @@ bool celix_bundleContext_useServiceWithOptions(
data.opts = opts;

if (celix_framework_isCurrentThreadTheEventLoop(ctx->framework)) {
// Ignore timeout: blocking the event loop prevents any progress to be made
celix_bundleContext_useServiceWithOptions_1_CreateServiceTracker(&data);
celix_bundleContext_useServiceWithOptions_2_UseServiceTracker(&data);
celix_bundleContext_useServiceWithOptions_3_CloseServiceTracker(&data);
return data.called;
} else {
long eventId = celix_framework_fireGenericEvent(ctx->framework, -1, celix_bundle_getId(ctx->bundle), "create service tracker for celix_bundleContext_useServiceWithOptions", &data, celix_bundleContext_useServiceWithOptions_1_CreateServiceTracker, NULL, NULL);
celix_framework_waitForGenericEvent(ctx->framework, eventId);
}

long eventId = celix_framework_fireGenericEvent(ctx->framework, -1, celix_bundle_getId(ctx->bundle), "create service tracker for celix_bundleContext_useServiceWithOptions", &data, celix_bundleContext_useServiceWithOptions_1_CreateServiceTracker, NULL, NULL);
celix_framework_waitForGenericEvent(ctx->framework, eventId);
if (!celix_framework_isCurrentThreadTheEventLoop(ctx->framework)) {
celix_framework_waitForEmptyEventQueue(ctx->framework); //ensure that a useService wait if a listener hooks concept, which triggers an async service registration
}

celix_framework_waitForEmptyEventQueue(ctx->framework); //ensure that a useService wait if a listener hooks concept, which triggers an async service registration
celix_bundleContext_useServiceWithOptions_2_UseServiceTracker(&data);

struct timespec startTime = celix_gettime(CLOCK_MONOTONIC);
bool useServiceIsDone = false;
bool called = false;
do {
eventId = celix_framework_fireGenericEvent(ctx->framework, -1, celix_bundle_getId(ctx->bundle), "use service tracker for celix_bundleContext_useServiceWithOptions", &data, celix_bundleContext_useServiceWithOptions_2_UseServiceTracker, NULL, NULL);
if (celix_framework_isCurrentThreadTheEventLoop(ctx->framework)) {
celix_bundleContext_useServiceWithOptions_3_CloseServiceTracker(&data);
} else {
long eventId = celix_framework_fireGenericEvent(ctx->framework, -1, celix_bundle_getId(ctx->bundle), "close service tracker for celix_bundleContext_useServiceWithOptions", &data, celix_bundleContext_useServiceWithOptions_3_CloseServiceTracker, NULL, NULL);
celix_framework_waitForGenericEvent(ctx->framework, eventId);
}

bool timeoutNotUsed = opts->waitTimeoutInSeconds == 0;
bool timeoutExpired = celix_elapsedtime(CLOCK_MONOTONIC, startTime) > opts->waitTimeoutInSeconds;

called = data.called;

useServiceIsDone = timeoutNotUsed || timeoutExpired || called;
if (!useServiceIsDone) {
usleep(10);
}
} while (!useServiceIsDone);

eventId = celix_framework_fireGenericEvent(ctx->framework, -1, celix_bundle_getId(ctx->bundle), "close service tracker for celix_bundleContext_useServiceWithOptions", &data, celix_bundleContext_useServiceWithOptions_3_CloseServiceTracker, NULL, NULL);
celix_framework_waitForGenericEvent(ctx->framework, eventId);

return called;
return data.called;
}

static void celix_bundleContext_useServicesWithOptions_2_UseServiceTracker(void *data) {
celix_bundle_context_use_service_data_t* d = data;
assert(celix_framework_isCurrentThreadTheEventLoop(d->ctx->framework));

d->count = celix_serviceTracker_useServices(d->svcTracker, d->opts->filter.serviceName, d->opts->callbackHandle, d->opts->use, d->opts->useWithProperties, d->opts->useWithOwner);
}
@@ -1272,11 +1259,11 @@ size_t celix_bundleContext_useServicesWithOptions(
celix_framework_waitForEmptyEventQueue(ctx->framework); //ensure that a useService wait if a listener hooks concept, which triggers an async service registration
}

celix_bundleContext_useServicesWithOptions_2_UseServiceTracker(&data);

if (celix_framework_isCurrentThreadTheEventLoop(ctx->framework)) {
celix_bundleContext_useServicesWithOptions_2_UseServiceTracker(&data);
celix_bundleContext_useServiceWithOptions_3_CloseServiceTracker(&data);
} else {
celix_framework_fireGenericEvent(ctx->framework, -1, celix_bundle_getId(ctx->bundle), "use service tracker for celix_bundleContext_useServicesWithOptions", &data, celix_bundleContext_useServicesWithOptions_2_UseServiceTracker, NULL, NULL);
long eventId = celix_framework_fireGenericEvent(ctx->framework, -1, celix_bundle_getId(ctx->bundle), "close service tracker for celix_bundleContext_useServicesWithOptions", &data, celix_bundleContext_useServiceWithOptions_3_CloseServiceTracker, NULL, NULL);
celix_framework_waitForGenericEvent(ctx->framework, eventId);
}
@@ -25,6 +25,7 @@
#include <assert.h>
#include <unistd.h>
#include <celix_api.h>
#include <limits.h>

#include "service_tracker_private.h"
#include "bundle_context.h"
@@ -118,7 +119,8 @@ celix_status_t serviceTracker_createWithFilter(bundle_context_pt context, const
celixThreadCondition_init(&tracker->closeSync.cond, NULL);

celixThreadMutex_create(&tracker->mutex, NULL);
celixThreadCondition_init(&tracker->cond, NULL);
celixThreadCondition_init(&tracker->condTracked, NULL);
celixThreadCondition_init(&tracker->condUntracking, NULL);
tracker->trackedServices = celix_arrayList_create();
tracker->untrackingServices = celix_arrayList_create();

@@ -136,7 +138,8 @@ celix_status_t serviceTracker_destroy(service_tracker_pt tracker) {
celixThreadMutex_destroy(&tracker->closeSync.mutex);
celixThreadCondition_destroy(&tracker->closeSync.cond);
celixThreadMutex_destroy(&tracker->mutex);
celixThreadCondition_destroy(&tracker->cond);
celixThreadCondition_destroy(&tracker->condTracked);
celixThreadCondition_destroy(&tracker->condUntracking);
celix_arrayList_destroy(tracker->trackedServices);
celix_arrayList_destroy(tracker->untrackingServices);
free(tracker);
@@ -229,7 +232,7 @@ celix_status_t serviceTracker_close(service_tracker_t* tracker) {
serviceTracker_untrackTracked(tracker, tracked, currentSize, currentSize == 0);
celixThreadMutex_lock(&tracker->mutex);
celix_arrayList_remove(tracker->untrackingServices, tracked);
celixThreadCondition_broadcast(&tracker->cond);
celixThreadCondition_broadcast(&tracker->condUntracking);
celixThreadMutex_unlock(&tracker->mutex);
}

@@ -411,10 +414,11 @@ static celix_status_t serviceTracker_track(service_tracker_t* tracker, service_r

celixThreadMutex_lock(&tracker->mutex);
arrayList_add(tracker->trackedServices, tracked);
celixThreadCondition_broadcast(&tracker->condTracked);
celixThreadMutex_unlock(&tracker->mutex);

if (tracker->set != NULL || tracker->setWithProperties != NULL || tracker->setWithOwner != NULL) {
celix_serviceTracker_useHighestRankingService(tracker, NULL, tracker, NULL, NULL,
celix_serviceTracker_useHighestRankingService(tracker, NULL, 0, tracker, NULL, NULL,
serviceTracker_checkAndInvokeSetService);
}
serviceTracker_invokeAddService(tracker, tracked);
@@ -541,7 +545,7 @@ static celix_status_t serviceTracker_untrack(service_tracker_t* tracker, service
//ensure no untrack is still happening (to ensure it safe to unregister service)
celixThreadMutex_lock(&tracker->mutex);
while (celix_arrayList_size(tracker->untrackingServices) > 0) {
celixThreadCondition_wait(&tracker->cond, &tracker->mutex);
celixThreadCondition_wait(&tracker->condUntracking, &tracker->mutex);
}
celixThreadMutex_unlock(&tracker->mutex);
}
@@ -559,7 +563,7 @@ static void serviceTracker_untrackTracked(service_tracker_t *tracker, celix_trac
if (trackedSize == 0) {
serviceTracker_checkAndInvokeSetService(tracker, NULL, NULL, NULL);
} else {
celix_serviceTracker_useHighestRankingService(tracker, NULL, tracker, NULL, NULL,
celix_serviceTracker_useHighestRankingService(tracker, NULL, 0, tracker, NULL, NULL,
serviceTracker_checkAndInvokeSetService);
}
}
@@ -673,7 +677,8 @@ celix_service_tracker_t* celix_serviceTracker_createWithOptions(
celixThreadCondition_init(&tracker->closeSync.cond, NULL);

celixThreadMutex_create(&tracker->mutex, NULL);
celixThreadCondition_init(&tracker->cond, NULL);
celixThreadCondition_init(&tracker->condTracked, NULL);
celixThreadCondition_init(&tracker->condUntracking, NULL);
tracker->trackedServices = celix_arrayList_create();
tracker->untrackingServices = celix_arrayList_create();
tracker->currentHighestServiceId = -1;
@@ -693,35 +698,54 @@ void celix_serviceTracker_destroy(celix_service_tracker_t *tracker) {
}

bool celix_serviceTracker_useHighestRankingService(service_tracker_t *tracker,
const char *serviceName /*sanity*/,
void *callbackHandle,
void (*use)(void *handle, void *svc),
void (*useWithProperties)(void *handle, void *svc, const celix_properties_t *props),
void (*useWithOwner)(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner)) {
const char *serviceName /*sanity*/,
double waitTimeoutInSeconds /*0 -> do not wait */,
void *callbackHandle,
void (*use)(void *handle, void *svc),
void (*useWithProperties)(void *handle, void *svc, const celix_properties_t *props),
void (*useWithOwner)(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner)) {
bool called = false;
celix_tracked_entry_t *tracked = NULL;
celix_tracked_entry_t *highest = NULL;
unsigned int i;
struct timespec begin = celix_gettime(CLOCK_MONOTONIC);
double remaining = waitTimeoutInSeconds > INT_MAX ? INT_MAX : waitTimeoutInSeconds;
double elapsed = 0;
long seconds = remaining;
long nanoseconds = (remaining - seconds) * CELIX_NS_IN_SEC;

//first lock tracker and get highest tracked entry
celixThreadMutex_lock(&tracker->mutex);
unsigned int size = arrayList_size(tracker->trackedServices);

for (i = 0; i < size; i++) {
tracked = (celix_tracked_entry_t *) arrayList_get(tracker->trackedServices, i);
if (serviceName == NULL || (serviceName != NULL && tracked->serviceName != NULL && celix_utils_stringEquals(tracked->serviceName, serviceName))) {
if (highest == NULL) {
highest = tracked;
} else {
int compare = celix_utils_compareServiceIdsAndRanking(
tracked->serviceId, tracked->serviceRanking,
highest->serviceId, highest->serviceRanking
);
if (compare < 0) {
while (highest == NULL) {
unsigned int size = arrayList_size(tracker->trackedServices);

for (i = 0; i < size; i++) {
tracked = (celix_tracked_entry_t *) arrayList_get(tracker->trackedServices, i);
if (serviceName == NULL || (serviceName != NULL && tracked->serviceName != NULL &&
celix_utils_stringEquals(tracked->serviceName, serviceName))) {
if (highest == NULL) {
highest = tracked;
} else {
int compare = celix_utils_compareServiceIdsAndRanking(
tracked->serviceId, tracked->serviceRanking,
highest->serviceId, highest->serviceRanking
);
if (compare < 0) {
highest = tracked;
}
}
}
}
if(highest == NULL && (seconds > 0 || nanoseconds > 0)) {
celixThreadCondition_timedwaitRelative(&tracker->condTracked, &tracker->mutex, seconds, nanoseconds);
elapsed = celix_elapsedtime(CLOCK_MONOTONIC, begin);
remaining = remaining > elapsed ? (remaining - elapsed) : 0;
seconds = remaining;
nanoseconds = (remaining - seconds) * CELIX_NS_IN_SEC;
} else {
// highest found or timeout
break;
}
}
if (highest != NULL) {
//highest found lock tracked entry and increase use count
@@ -64,7 +64,8 @@ struct celix_serviceTracker {
} closeSync;

celix_thread_mutex_t mutex; //projects below
celix_thread_cond_t cond;
celix_thread_cond_t condTracked;
celix_thread_cond_t condUntracking;
celix_array_list_t *trackedServices;
celix_array_list_t *untrackingServices;
enum celix_service_tracker_state state;
@@ -28,6 +28,9 @@ extern "C" {
#include <stdbool.h>

#define CELIX_UTILS_MAX_STRLEN 1024*1024*1024
#define CELIX_US_IN_SEC (1000000)
#define CELIX_NS_IN_SEC ((CELIX_US_IN_SEC)*1000)


/**
* Creates a copy of a provided string.
@@ -29,6 +29,7 @@
#include <time.h>
#include "signal.h"
#include "celix_threads.h"
#include "celix_utils.h"


celix_status_t celixThread_create(celix_thread_t *new_thread, celix_thread_attr_t *attr, celix_thread_start_t func, void *data) {
@@ -182,9 +183,16 @@ celix_status_t celixThreadCondition_timedwaitRelative(celix_thread_cond_t *cond,
#else
celix_status_t celixThreadCondition_timedwaitRelative(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds) {
struct timespec time;
clock_gettime(CLOCK_MONOTONIC, &time);
seconds = seconds >= 0 ? seconds : 0;
time = celix_gettime(CLOCK_MONOTONIC);
time.tv_sec += seconds;
time.tv_nsec += nanoseconds;
if(nanoseconds > 0) {
time.tv_nsec += nanoseconds;
while (time.tv_nsec > CELIX_NS_IN_SEC) {
time.tv_sec++;
time.tv_nsec -= CELIX_NS_IN_SEC;
}
}
return pthread_cond_timedwait(cond, mutex, &time);
}
#endif
@@ -169,13 +169,13 @@ double celix_difftime(const struct timespec *tBegin, const struct timespec *tEnd
struct timespec diff;
if ((tEnd->tv_nsec - tBegin->tv_nsec) < 0) {
diff.tv_sec = tEnd->tv_sec - tBegin->tv_sec - 1;
diff.tv_nsec = tEnd->tv_nsec - tBegin->tv_nsec + 1000000000;
diff.tv_nsec = tEnd->tv_nsec - tBegin->tv_nsec + CELIX_NS_IN_SEC;
} else {
diff.tv_sec = tEnd->tv_sec - tBegin->tv_sec;
diff.tv_nsec = tEnd->tv_nsec - tBegin->tv_nsec;
}

return ((double)diff.tv_sec) + diff.tv_nsec / 1000000000.0;
return ((double)diff.tv_sec) + diff.tv_nsec * 1.0 / CELIX_NS_IN_SEC;
}

struct timespec celix_gettime(clockid_t clockId) {

0 comments on commit 11c1ca1

Please sign in to comment.