Skip to content

Commit

Permalink
Fix race that results in a missed config updated event (#727)
Browse files Browse the repository at this point in the history
* Fix race that results in a missed config updated event

Fixed races that can cause a ManagedService or ManagedServiceFactory to miss being called when a configuraiton object is updated.

Signed-off-by: The MathWorks, Inc. <jdicleme@mathworks.com>
  • Loading branch information
jeffdiclemente committed Sep 27, 2022
1 parent 9909b14 commit 4f18659
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 72 deletions.
192 changes: 123 additions & 69 deletions compendium/ConfigurationAdmin/src/ConfigurationAdminImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,12 @@ std::shared_future<void> ConfigurationAdminImpl::NotifyConfigurationUpdated(
std::string nonFPid;
auto removed = false;
auto hasBeenUpdated = false;
std::vector<std::shared_ptr<
TrackedServiceWrapper<cppmicroservices::service::cm::ManagedService>>>
managedServiceWrappers;
std::vector<std::shared_ptr<TrackedServiceWrapper<
cppmicroservices::service::cm::ManagedServiceFactory>>>
managedServiceFactoryWrappers;
{
std::lock_guard<std::mutex> lk{ configurationsMutex };
const auto it = configurations.find(pid);
Expand All @@ -573,78 +579,89 @@ std::shared_future<void> ConfigurationAdminImpl::NotifyConfigurationUpdated(
removed = true;
}
}

// We can only send update notifications for configuration objects that have
// been updated. Just return without sending the notification for objects
// that have not yet been updated.
if (!hasBeenUpdated) {
return;
}

if (pid.find('~') != std::string::npos) {
//this is a factory pid
fPid = pid;
} else {
nonFPid = pid;
}
managedServiceWrappers = trackedManagedServices_;
managedServiceFactoryWrappers = trackedManagedServiceFactories_;
}
// We can only send update notifications for configuration objects that have
// been updated. Just return without sending the notification for objects
// that have not yet been updated.
if (!hasBeenUpdated) {
return;
}
if (pid.find('~') != std::string::npos) {
//this is a factory pid
fPid = pid;
} else {
nonFPid = pid;
}
const auto configurationListeners = configListenerTracker.GetServices();

auto type =
removed
? cppmicroservices::service::cm::ConfigurationEventType::CM_DELETED
: cppmicroservices::service::cm::ConfigurationEventType::CM_UPDATED;

auto configurationListeners = configListenerTracker.GetServices();
auto configAdminRef = cmContext.GetServiceReference<ConfigurationAdmin>();
for (const auto& it : configurationListeners) {
auto configEvent = cppmicroservices::service::cm::ConfigurationEvent(
configAdminRef, type, fPid, nonFPid);
it->configurationEvent((configEvent));
}

const auto managedServiceWrappers = managedServiceTracker.GetServices();
std::for_each(managedServiceWrappers.begin(),
managedServiceWrappers.end(),
[&](const auto& managedServiceWrapper) {
// The ServiceTracker will return a default constructed shared_ptr for each ManagedService
// that we aren't tracking. We must be careful not to dereference these!
if ((managedServiceWrapper) && (managedServiceWrapper->getPid() == pid) && (removed || (!removed && managedServiceWrapper->needsAnUpdateNotification(pid, changeCount)))) {
notifyServiceUpdated(
pid,
*(managedServiceWrapper->getTrackedService()),
properties,
*logger);
managedServiceWrapper->setLastUpdatedChangeCount(pid, changeCount);
}
});
std::for_each(
managedServiceWrappers.begin(),
managedServiceWrappers.end(),
[&](const auto& managedServiceWrapper) {
// The ServiceTracker will return a default constructed shared_ptr for each ManagedService
// that we aren't tracking. We must be careful not to dereference these!
if ((managedServiceWrapper) &&
(managedServiceWrapper->getPid() == pid) &&
(removed ||
(!removed && managedServiceWrapper->needsAnUpdateNotification(
pid, changeCount)))) {
notifyServiceUpdated(pid,
*(managedServiceWrapper->getTrackedService()),
properties,
*logger);
managedServiceWrapper->setLastUpdatedChangeCount(pid, changeCount);
}
});

const auto factoryPid = getFactoryPid(pid);
if (factoryPid.empty()) {
return;
}

const auto managedServiceFactoryWrappers =
managedServiceFactoryTracker.GetServices();
std::for_each(managedServiceFactoryWrappers.begin(),
managedServiceFactoryWrappers.end(),
[&](const auto& managedServiceFactoryWrapper) {
// The ServiceTracker will return a default constructed shared_ptr for each ManagedServiceFactory
// that we aren't tracking. We must be careful not to dereference these!
if ((managedServiceFactoryWrapper) && (managedServiceFactoryWrapper->getPid() == factoryPid)) {
if (removed) {
notifyServiceRemoved(
pid,
*(managedServiceFactoryWrapper->getTrackedService()),
*logger);
} else if (managedServiceFactoryWrapper->needsAnUpdateNotification(pid, changeCount)) {
notifyServiceUpdated(
pid,
*(managedServiceFactoryWrapper->getTrackedService()),
properties,
*logger);
managedServiceFactoryWrapper->setLastUpdatedChangeCount(pid, changeCount);
}
}
});
std::for_each(
managedServiceFactoryWrappers.begin(),
managedServiceFactoryWrappers.end(),
[&](const auto& managedServiceFactoryWrapper) {
// The ServiceTracker will return a default constructed shared_ptr for each ManagedServiceFactory
// that we aren't tracking. We must be careful not to dereference these!
if ((managedServiceFactoryWrapper) &&
(managedServiceFactoryWrapper->getPid() == factoryPid)) {
if (removed) {
notifyServiceRemoved(
pid,
*(managedServiceFactoryWrapper->getTrackedService()),
*logger);
} else if (managedServiceFactoryWrapper->needsAnUpdateNotification(
pid, changeCount)) {
notifyServiceUpdated(
pid,
*(managedServiceFactoryWrapper->getTrackedService()),
properties,
*logger);
managedServiceFactoryWrapper->setLastUpdatedChangeCount(
pid, changeCount);
}
}
});
});
}

std::shared_future<void> ConfigurationAdminImpl::NotifyConfigurationRemoved(
const std::string& pid,
std::uintptr_t configurationId)
Expand Down Expand Up @@ -729,18 +746,25 @@ ConfigurationAdminImpl::AddingService(

if (const auto it = configurations.find(pid); it != std::end(configurations)) {
AnyMap properties{ AnyMap::UNORDERED_MAP_CASEINSENSITIVE_KEYS };
try {
properties = it->second->GetProperties();
initialChangeCount = it->second->GetChangeCount();
} catch (const std::runtime_error&) {
// Configuration is being removed
logger->Log(SeverityLevel::LOG_WARNING,
"Attempted to update a configuration which has been removed.",
std::current_exception());
}
// Only send notifications for configuration objects that have been
// Updated.
// Updated.
if (it->second->HasBeenUpdatedAtLeastOnce()) {
try {
properties = it->second->GetProperties();
// specifically set the initial change count here because there is
// a race between the call to GetChangeCount() and HasBeenUpdatedAtLeastOnce().
// The logic is that, if HasBeenUpdatedAtLeastOnce() returns true, the change
// count will always be > 1 and should at this point be captured for the
// tracked object to eliminate redundant config updates.
initialChangeCount = it->second->GetChangeCount();
} catch (const std::runtime_error&) {
// Configuration is being removed
logger->Log(SeverityLevel::LOG_WARNING,
"Attempted to retrieve properties from a configuration "
"which has been removed.",
std::current_exception());
}

PerformAsync([this, pid, managedService, properties] {
notifyServiceUpdated(pid, *managedService, properties, *logger);
});
Expand All @@ -757,9 +781,11 @@ ConfigurationAdminImpl::AddingService(
" has been added.");

std::unordered_map<std::string, unsigned long> initialChangeCountByPid = {{pid, initialChangeCount}};
return std::make_shared<
auto trackedManagedService = std::make_shared<
TrackedServiceWrapper<cppmicroservices::service::cm::ManagedService>>(
pid, std::move(initialChangeCountByPid), std::move(managedService));
trackedManagedServices_.emplace_back(trackedManagedService);
return trackedManagedService;
}

void ConfigurationAdminImpl::ModifiedService(
Expand All @@ -778,7 +804,19 @@ void ConfigurationAdminImpl::RemovedService(
TrackedServiceWrapper<cppmicroservices::service::cm::ManagedService>>&
service)
{
// No need to do anything other than log; ManagedService just won't receive any more updates to its Configuration.
// Lock because we are modifying the container of tracked managed services.
std::lock_guard<std::mutex> lk{ configurationsMutex };

auto elemIter = std::find_if(
std::begin(trackedManagedServices_),
std::end(trackedManagedServices_),
[&service](const auto& trackedServiceWrapper) {
return (service->getTrackedService() == trackedServiceWrapper->getTrackedService());
});
if (elemIter != trackedManagedServices_.end()) {
trackedManagedServices_.erase(elemIter);
}
// ManagedService won't receive any more updates to its Configuration.
logger->Log(cppmicroservices::logservice::SeverityLevel::LOG_DEBUG,
"ManagedService with PID " + service->getPid() + " has been removed.");
}
Expand Down Expand Up @@ -823,19 +861,19 @@ ConfigurationAdminImpl::AddingService(
assert(configurationIt != std::end(configurations) &&
"Invalid Configuration iterator");
try {
auto properties = configurationIt->second->GetProperties();
initialChangeCountPerPid[configurationIt->second->GetPid()] = configurationIt->second->GetChangeCount();
// Notifications can only be sent for configuration objects that
// been Updated. Only add it to the notification list if it has
// been Updated.
if (configurationIt->second->HasBeenUpdatedAtLeastOnce()) {
auto properties = configurationIt->second->GetProperties();
initialChangeCountPerPid[configurationIt->second->GetPid()] = configurationIt->second->GetChangeCount();
pidsAndProperties.emplace_back(instance, std::move(properties));
}
} catch (const std::runtime_error&) {
// Configuration is being removed
logger->Log(
SeverityLevel::LOG_WARNING,
"Attempted to update a configuration which has been removed.",
"Attempted to retrieve properties for a configuration which has been removed.",
std::current_exception());
}
}
Expand All @@ -851,13 +889,16 @@ ConfigurationAdminImpl::AddingService(
}
});
}

logger->Log(
cppmicroservices::logservice::SeverityLevel::LOG_DEBUG,
"New ManagedServiceFactory with PID " + pid +
" has been added, and async Update has been queued for all updated instances.");
return std::make_shared<TrackedServiceWrapper<
auto trackedManagedServiceFactory = std::make_shared<TrackedServiceWrapper<
cppmicroservices::service::cm::ManagedServiceFactory>>(
pid, std::move(initialChangeCountPerPid), std::move(managedServiceFactory));
trackedManagedServiceFactories_.emplace_back(trackedManagedServiceFactory);
return trackedManagedServiceFactory;
}

void ConfigurationAdminImpl::ModifiedService(
Expand All @@ -875,7 +916,20 @@ void ConfigurationAdminImpl::RemovedService(
const std::shared_ptr<TrackedServiceWrapper<
cppmicroservices::service::cm::ManagedServiceFactory>>& service)
{
// No need to do anything other than log; ManagedServiceFactory just won't receive any more updates to any of its Configurations.
// Lock because we are modifying the container of tracked managed services.
std::lock_guard<std::mutex> lk{ configurationsMutex };

auto elemIter = std::find_if(
std::begin(trackedManagedServiceFactories_),
std::end(trackedManagedServiceFactories_),
[&service](const auto& trackedServiceWrapper) {
return (service->getTrackedService() == trackedServiceWrapper->getTrackedService());
});
if (elemIter != trackedManagedServiceFactories_.end()) {
trackedManagedServiceFactories_.erase(elemIter);
}

// ManagedServiceFactory won't receive any more updates to any of its Configurations.
logger->Log(cppmicroservices::logservice::SeverityLevel::LOG_DEBUG,
"ManagedServiceFactory with PID " + service->getPid() +
" has been removed.");
Expand All @@ -892,7 +946,7 @@ std::shared_future<void> ConfigurationAdminImpl::PerformAsync(Functor&& f)
[this, func = std::forward<Functor>(f), id]() mutable {
// func() can throw, make sure that the futures
// are correctly cleaned up if an exception occurs.
detail::ScopeGuard cleanupFutures([this, &id]() {
detail::ScopeGuard cleanupFutures([this, id]() {
std::lock_guard<std::mutex> lk{ futuresMutex };
auto it = incompleteFutures.find(id);
assert(it != std::end(incompleteFutures) && "Invalid future iterator");
Expand Down
11 changes: 11 additions & 0 deletions compendium/ConfigurationAdmin/src/ConfigurationAdminImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <cstdint>
#include <future>
#include <memory>
#include <mutex>
#include <random>
#include <unordered_map>
Expand Down Expand Up @@ -272,6 +273,16 @@ class ConfigurationAdminImpl final
cppmicroservices::ServiceTracker<
cppmicroservices::service::cm::ConfigurationListener>
configListenerTracker;

// used instead of querying the service trackers since a race exists between when the service tracker
// adds the tracked service to the internal map and when a client asks the service tracker for the
// list of tracked objects.
std::vector<std::shared_ptr<
TrackedServiceWrapper<cppmicroservices::service::cm::ManagedService>>>
trackedManagedServices_;
std::vector<std::shared_ptr<TrackedServiceWrapper<
cppmicroservices::service::cm::ManagedServiceFactory>>>
trackedManagedServiceFactories_;
};
} // cmimpl
} // cppmicroservices
Expand Down
25 changes: 22 additions & 3 deletions compendium/ConfigurationAdmin/test/TestConfigAdmin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -588,9 +588,14 @@ namespace cppmicroservices { namespace test {
TestManagedService() : updatedCount_{0} {}
virtual ~TestManagedService() noexcept = default;

void Updated(const cppmicroservices::AnyMap&) override {
void Updated(const cppmicroservices::AnyMap& props) override {
std::unique_lock<std::mutex> lock(updatedCountMutex_);
updatedCount_++;
// empty properties can be sent when stopping the configadmin
// service. For the purpose of this test we only want to
// update the count when non-empty properties have been sent.
if(!props.empty()) {
updatedCount_++;
}
}

unsigned long getUpdatedMethodCallCount() noexcept override {
Expand All @@ -606,7 +611,7 @@ namespace cppmicroservices { namespace test {
// This test simulates sending a config update when two threads are
// racing to register a ManagedService and update the configuration
// object.
// Thgis test is meant to be run in a loop to detect race conditions.
// This test is meant to be run in a loop to detect race conditions.
TEST_F(ConfigAdminTests, testConcurrentDuplicateManagedServiceUpdated)
{
auto f = GetFramework();
Expand Down Expand Up @@ -662,6 +667,13 @@ TEST_F(ConfigAdminTests, testConcurrentDuplicateManagedServiceUpdated)
ctx.GetService<cppmicroservices::test::TestManagedServiceInterface>(sr);
ASSERT_NE(managedService, nullptr);

// wait for config admin to finish processing all config events
// by stopping the configadmin bundle. This is necessary to guarantee
// that when we check for the # of Updated method calls, a config
// admin thread isn't still processing one.
auto configAdminBundle = GetConfigAdminBundle();
configAdminBundle.Stop();
m_configAdmin.reset();
EXPECT_EQ(managedService->getUpdatedMethodCallCount(), 1);
}

Expand Down Expand Up @@ -717,6 +729,13 @@ TEST_F(ConfigAdminTests, testConcurrentDuplicateManagedServiceFactoryUpdated)
auto const serviceFactory = getManagedServiceFactory(ctx);
ASSERT_NE(serviceFactory, nullptr);

// wait for config admin to finish processing all config events
// by stopping the configadmin bundle. This is necessary to guarantee
// that when we check for the # of Updated method calls, a config
// admin thread isn't still processing one.
auto configAdminBundle = GetConfigAdminBundle();
configAdminBundle.Stop();
m_configAdmin.reset();
EXPECT_EQ(serviceFactory->getUpdatedCounter("cm.testfactory~0"), 1);
}

Expand Down

0 comments on commit 4f18659

Please sign in to comment.