Skip to content
Permalink
Browse files
Eliminate unnecessary blocking and mutext from useServiceWithOptions.
  • Loading branch information
PengZheng committed Nov 6, 2021
1 parent bb9179a commit c070b8a47945489a9ad57eb77878d02679a66a96
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 41 deletions.
@@ -769,6 +769,42 @@ TEST_F(CelixBundleContextServicesTests, serviceTrackerWithRaceConditionTest) {
celix_bundleContext_stopTracker(ctx, trackerId);
};

TEST_F(CelixBundleContextServicesTests, useServiceDoesNotBlockInEventLoop) {
void *svc1 = (void*)0x100;

auto set = [](void *handle, void */*svc*/) {
celix_bundle_context_t *ctx = static_cast<celix_bundle_context_t *>(handle);
celix_service_use_options_t use_opts{};
use_opts.filter.serviceName = "NotAvailable";
use_opts.waitTimeoutInSeconds = 3600; // unacceptable long blocking
use_opts.callbackHandle = nullptr;
use_opts.use = [](void *handle, void *svc) {
FAIL() << "We shouldn't get here: (" << handle << "," << svc << ")";
};

bool called = celix_bundleContext_useServiceWithOptions(ctx, &use_opts);
ASSERT_FALSE(called);
};

long svcId1 = celix_bundleContext_registerService(ctx, svc1, "NA", nullptr);
ASSERT_GE(svcId1, 0);

celix_service_tracking_options_t opts{};
opts.callbackHandle = (void*)ctx;
opts.filter.serviceName = "NA";
opts.set = set;
long trackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
ASSERT_TRUE(trackerId >= 0);

void *svc2 = (void*)0x200; //no ranking
long svcId2 = celix_bundleContext_registerService(ctx, svc2, "NA", nullptr);
ASSERT_GE(svcId2, 0);

celix_bundleContext_unregisterService(ctx, svcId2);
celix_bundleContext_stopTracker(ctx, trackerId);
celix_bundleContext_unregisterService(ctx, svcId1);
}

TEST_F(CelixBundleContextServicesTests, servicesTrackerSetTest) {
int count = 0;

@@ -1359,4 +1395,4 @@ TEST_F(CelixBundleContextServicesTests, setServicesWithTrackerWhenMultipleRegist
celix_bundleContext_unregisterService(ctx, svcId1);
celix_bundleContext_unregisterService(ctx, svcId2);
celix_bundleContext_unregisterService(ctx, svcId3);
}
}
@@ -708,7 +708,7 @@ typedef struct celix_service_use_options {

/**
* An optional timeout (in seconds), if > 0 the use service call will block until the timeout is expired or
* when at least one service is found.
* when at least one service is found. Note that it will be ignored when use service on the event loop.
* Default (0)
*/
double waitTimeoutInSeconds OPTS_INIT;
@@ -1163,7 +1163,6 @@ typedef struct celix_bundle_context_use_service_data {
celix_bundle_context_t* ctx;
const celix_service_use_options_t* opts;

celix_thread_mutex_t mutex; //protects below;
bool called; //for use service
size_t count; //for use services
celix_service_tracker_t * svcTracker;
@@ -1176,30 +1175,24 @@ static void celix_bundleContext_useServiceWithOptions_1_CreateServiceTracker(voi
celix_service_tracking_options_t trkOpts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
trkOpts.filter = d->opts->filter;

celixThreadMutex_lock(&d->mutex);
d->called = false;
d->count = 0;
d->svcTracker = celix_serviceTracker_createWithOptions(d->ctx, &trkOpts);
celixThreadMutex_unlock(&d->mutex);
}

static void celix_bundleContext_useServiceWithOptions_2_UseServiceTracker(void *data) {
celix_bundle_context_use_service_data_t* d = data;
assert(celix_framework_isCurrentThreadTheEventLoop(d->ctx->framework));

celixThreadMutex_lock(&d->mutex);
d->called = celix_serviceTracker_useHighestRankingService(d->svcTracker, d->opts->filter.serviceName, d->opts->callbackHandle, d->opts->use, d->opts->useWithProperties, d->opts->useWithOwner);
celixThreadMutex_unlock(&d->mutex);
}

static void celix_bundleContext_useServiceWithOptions_3_CloseServiceTracker(void *data) {
celix_bundle_context_use_service_data_t* d = data;
assert(celix_framework_isCurrentThreadTheEventLoop(d->ctx->framework));

celixThreadMutex_lock(&d->mutex);
celix_service_tracker_t *tracker = d->svcTracker;
d->svcTracker = NULL;
celixThreadMutex_unlock(&d->mutex);

celix_serviceTracker_destroy(tracker);
}
@@ -1214,61 +1207,49 @@ bool celix_bundleContext_useServiceWithOptions(
celix_bundle_context_use_service_data_t data = {0};
data.ctx = ctx;
data.opts = opts;
celixThreadMutex_create(&data.mutex, NULL);

if (celix_framework_isCurrentThreadTheEventLoop(ctx->framework)) {
// Ignore timeout: blocking the event loop prevents any progress to be made
celix_bundleContext_useServiceWithOptions_1_CreateServiceTracker(&data);
} else {
long eventId = celix_framework_fireGenericEvent(ctx->framework, -1, celix_bundle_getId(ctx->bundle), "create service tracker for celix_bundleContext_useServiceWithOptions", &data, celix_bundleContext_useServiceWithOptions_1_CreateServiceTracker, NULL, NULL);
celix_framework_waitForGenericEvent(ctx->framework, eventId);
celix_bundleContext_useServiceWithOptions_2_UseServiceTracker(&data);
celix_bundleContext_useServiceWithOptions_3_CloseServiceTracker(&data);
return data.called;
}

if (!celix_framework_isCurrentThreadTheEventLoop(ctx->framework)) {
celix_framework_waitForEmptyEventQueue(ctx->framework); //ensure that a useService wait if a listener hooks concept, which triggers an async service registration
}
long eventId = celix_framework_fireGenericEvent(ctx->framework, -1, celix_bundle_getId(ctx->bundle), "create service tracker for celix_bundleContext_useServiceWithOptions", &data, celix_bundleContext_useServiceWithOptions_1_CreateServiceTracker, NULL, NULL);
celix_framework_waitForGenericEvent(ctx->framework, eventId);

celix_framework_waitForEmptyEventQueue(ctx->framework); //ensure that a useService wait if a listener hooks concept, which triggers an async service registration

struct timespec startTime = celix_gettime(CLOCK_MONOTONIC);
bool useServiceIsDone = false;
bool called = false;
do {
if (celix_framework_isCurrentThreadTheEventLoop(ctx->framework)) {
celix_bundleContext_useServiceWithOptions_2_UseServiceTracker(&data);
} else {
long eventId = celix_framework_fireGenericEvent(ctx->framework, -1, celix_bundle_getId(ctx->bundle), "use service tracker for celix_bundleContext_useServiceWithOptions", &data, celix_bundleContext_useServiceWithOptions_2_UseServiceTracker, NULL, NULL);
celix_framework_waitForGenericEvent(ctx->framework, eventId);
}
eventId = celix_framework_fireGenericEvent(ctx->framework, -1, celix_bundle_getId(ctx->bundle), "use service tracker for celix_bundleContext_useServiceWithOptions", &data, celix_bundleContext_useServiceWithOptions_2_UseServiceTracker, NULL, NULL);
celix_framework_waitForGenericEvent(ctx->framework, eventId);

bool timeoutNotUsed = opts->waitTimeoutInSeconds == 0;
bool timeoutExpired = celix_elapsedtime(CLOCK_MONOTONIC, startTime) > opts->waitTimeoutInSeconds;

celixThreadMutex_lock(&data.mutex);
called = data.called;
celixThreadMutex_unlock(&data.mutex);

useServiceIsDone = timeoutNotUsed || timeoutExpired || called;
if (!useServiceIsDone) {
usleep(10);
}
} while (!useServiceIsDone);

if (celix_framework_isCurrentThreadTheEventLoop(ctx->framework)) {
celix_bundleContext_useServiceWithOptions_3_CloseServiceTracker(&data);
} else {
long eventId = celix_framework_fireGenericEvent(ctx->framework, -1, celix_bundle_getId(ctx->bundle), "close service tracker for celix_bundleContext_useServiceWithOptions", &data, celix_bundleContext_useServiceWithOptions_3_CloseServiceTracker, NULL, NULL);
celix_framework_waitForGenericEvent(ctx->framework, eventId);
}
eventId = celix_framework_fireGenericEvent(ctx->framework, -1, celix_bundle_getId(ctx->bundle), "close service tracker for celix_bundleContext_useServiceWithOptions", &data, celix_bundleContext_useServiceWithOptions_3_CloseServiceTracker, NULL, NULL);
celix_framework_waitForGenericEvent(ctx->framework, eventId);

celixThreadMutex_destroy(&data.mutex);
return called;
}

static void celix_bundleContext_useServicesWithOptions_2_UseServiceTracker(void *data) {
celix_bundle_context_use_service_data_t* d = data;
assert(celix_framework_isCurrentThreadTheEventLoop(d->ctx->framework));

celixThreadMutex_lock(&d->mutex);
d->count = celix_serviceTracker_useServices(d->svcTracker, d->opts->filter.serviceName, d->opts->callbackHandle, d->opts->use, d->opts->useWithProperties, d->opts->useWithOwner);
celixThreadMutex_unlock(&d->mutex);
}

size_t celix_bundleContext_useServicesWithOptions(
@@ -1281,7 +1262,6 @@ size_t celix_bundleContext_useServicesWithOptions(
celix_bundle_context_use_service_data_t data = {0};
data.ctx = ctx;
data.opts = opts;
celixThreadMutex_create(&data.mutex, NULL);

if (celix_framework_isCurrentThreadTheEventLoop(ctx->framework)) {
celix_bundleContext_useServiceWithOptions_1_CreateServiceTracker(&data);
@@ -1303,12 +1283,7 @@ size_t celix_bundleContext_useServicesWithOptions(
celix_framework_waitForGenericEvent(ctx->framework, eventId);
}

celixThreadMutex_lock(&data.mutex);
size_t count = data.count;
celixThreadMutex_unlock(&data.mutex);
celixThreadMutex_destroy(&data.mutex);

return count;
return data.count;
}


@@ -1723,4 +1698,4 @@ void celix_bundleContext_log(const celix_bundle_context_t *ctx, celix_log_level_

void celix_bundleContext_vlog(const celix_bundle_context_t *ctx, celix_log_level_e level, const char *format, va_list formatArgs) {
celix_framework_vlog(ctx->framework->logger, level, NULL, NULL, 0, format, formatArgs);
}
}

0 comments on commit c070b8a

Please sign in to comment.