Skip to content

Commit

Permalink
Fix deadlock (#651)
Browse files Browse the repository at this point in the history
* Fix deadlock and double update

Signed-off-by: The MathWorks, Inc. <jdicleme@mathworks.com>
  • Loading branch information
jeffdiclemente committed Aug 19, 2022
1 parent 0835cf0 commit be23105
Show file tree
Hide file tree
Showing 16 changed files with 551 additions and 59 deletions.
114 changes: 71 additions & 43 deletions compendium/ConfigurationAdmin/src/ConfigurationAdminImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,11 @@ ConfigurationAdminImpl::~ConfigurationAdminImpl()
if (managedService) {
// can only send a Removed notification if the managedService has previously
// received an Updated notification.
const auto it = configurationsToInvalidate.find(managedService->pid);
const auto it = configurationsToInvalidate.find(managedService->getPid());
if (it != std::end(configurationsToInvalidate) &&
it->second->HasBeenUpdatedAtLeastOnce()) {
notifyServiceUpdated(managedService->pid,
*(managedService->trackedService),
notifyServiceUpdated(managedService->getPid(),
*(managedService->getTrackedService()),
emptyMap,
*logger);
}
Expand All @@ -233,7 +233,7 @@ ConfigurationAdminImpl::~ConfigurationAdminImpl()
if (!managedServiceFactory) {
continue;
}
auto it = factoryInstancesCopy.find(managedServiceFactory->pid);
auto it = factoryInstancesCopy.find(managedServiceFactory->getPid());
if (it == std::end(factoryInstancesCopy)) {
continue;
}
Expand All @@ -244,7 +244,7 @@ ConfigurationAdminImpl::~ConfigurationAdminImpl()
const auto it = configurationsToInvalidate.find(pid);
if (it != std::end(configurationsToInvalidate) && (it->second->HasBeenUpdatedAtLeastOnce())) {
notifyServiceRemoved(
pid, *(managedServiceFactory->trackedService), *logger);
pid, *(managedServiceFactory->getTrackedService()), *logger);
}
}
}
Expand Down Expand Up @@ -392,10 +392,11 @@ std::vector<ConfigurationAddedInfo> ConfigurationAdminImpl::AddConfigurations(
std::vector<ConfigurationAddedInfo> pidsAndChangeCountsAndIDs;
std::vector<bool> createdOrUpdated;
std::vector<std::shared_ptr<ConfigurationImpl>> configurationsToInvalidate;
unsigned long changeCount{ 0u };

{
std::lock_guard<std::mutex> lk{ configurationsMutex };
for (const auto& configMetadata : configurationMetadata) {
unsigned long changeCount{ 0ul };
auto& pid = configMetadata.pid;
auto it = configurations.find(pid);
if (it == std::end(configurations)) {
Expand Down Expand Up @@ -453,7 +454,7 @@ std::vector<ConfigurationAddedInfo> ConfigurationAdminImpl::AddConfigurations(
for (const auto& pidAndChangeCountAndID : pidsAndChangeCountsAndIDs) {
const auto& pid = pidAndChangeCountAndID.pid;
if (createdOrUpdated[idx]) {
NotifyConfigurationUpdated(pid);
NotifyConfigurationUpdated(pid, pidAndChangeCountAndID.changeCount);
logger->Log(cppmicroservices::logservice::SeverityLevel::LOG_DEBUG,
"AddConfigurations: Created or Updated Configuration "
"instance with PID " +
Expand Down Expand Up @@ -524,7 +525,7 @@ void ConfigurationAdminImpl::RemoveConfigurations(
const auto& pid = pidAndChangeCountAndID.pid;
if (removedAndUpdated[idx].first) {
if (removedAndUpdated[idx].second) {
NotifyConfigurationUpdated(pid);
NotifyConfigurationUpdated(pid, pidAndChangeCountAndID.changeCount);
}
logger->Log(
cppmicroservices::logservice::SeverityLevel::LOG_DEBUG,
Expand All @@ -542,7 +543,7 @@ void ConfigurationAdminImpl::RemoveConfigurations(
}

std::shared_future<void> ConfigurationAdminImpl::NotifyConfigurationUpdated(
const std::string& pid)
const std::string& pid, const unsigned long changeCount)
{
// NotifyConfigurationUpdated will only send a notification to the service if
// the configuration object has been updated at least once. In order to determine whether or not
Expand All @@ -551,7 +552,7 @@ std::shared_future<void> ConfigurationAdminImpl::NotifyConfigurationUpdated(
// is not available and that method cannot be called. For this reason, NotifyConfigurationUpdated
// should not be called for Remove operations unless the caller has already confirmed
// the configuration object has been updated at least once.
return PerformAsync([this, pid] {
return PerformAsync([this, pid, changeCount] {
AnyMap properties{ AnyMap::UNORDERED_MAP_CASEINSENSITIVE_KEYS };
std::string fPid;
std::string nonFPid;
Expand Down Expand Up @@ -604,12 +605,13 @@ std::shared_future<void> ConfigurationAdminImpl::NotifyConfigurationUpdated(
[&](const auto& managedServiceWrapper) {
// The ServiceTracker will return a default constructed shared_ptr for each ManagedService
// that we aren't tracking. We must be careful not to dereference these!
if ((managedServiceWrapper) && (managedServiceWrapper->pid == pid)) {
if ((managedServiceWrapper) && (managedServiceWrapper->getPid() == pid) && (removed || (!removed && managedServiceWrapper->needsAnUpdateNotification(pid, changeCount)))) {
notifyServiceUpdated(
pid,
*(managedServiceWrapper->trackedService),
*(managedServiceWrapper->getTrackedService()),
properties,
*logger);
managedServiceWrapper->setLastUpdatedChangeCount(pid, changeCount);
}
});

Expand All @@ -625,18 +627,19 @@ std::shared_future<void> ConfigurationAdminImpl::NotifyConfigurationUpdated(
[&](const auto& managedServiceFactoryWrapper) {
// The ServiceTracker will return a default constructed shared_ptr for each ManagedServiceFactory
// that we aren't tracking. We must be careful not to dereference these!
if ((managedServiceFactoryWrapper) && (managedServiceFactoryWrapper->pid == factoryPid)) {
if ((managedServiceFactoryWrapper) && (managedServiceFactoryWrapper->getPid() == factoryPid)) {
if (removed) {
notifyServiceRemoved(
pid,
*(managedServiceFactoryWrapper->trackedService),
*(managedServiceFactoryWrapper->getTrackedService()),
*logger);
} else {
} else if (managedServiceFactoryWrapper->needsAnUpdateNotification(pid, changeCount)) {
notifyServiceUpdated(
pid,
*(managedServiceFactoryWrapper->trackedService),
*(managedServiceFactoryWrapper->getTrackedService()),
properties,
*logger);
managedServiceFactoryWrapper->setLastUpdatedChangeCount(pid, changeCount);
}
}
});
Expand Down Expand Up @@ -672,7 +675,7 @@ std::shared_future<void> ConfigurationAdminImpl::NotifyConfigurationRemoved(
RemoveFactoryInstanceIfRequired(pid);
}
if (configurationToInvalidate && hasBeenUpdated) {
auto removeFuture = NotifyConfigurationUpdated(pid);
auto removeFuture = NotifyConfigurationUpdated(pid, 0);
// This functor will run on another thread. Just being overly cautious to guarantee that the
// ConfigurationImpl which has called this method doesn't run its own destructor.
PerformAsync(
Expand All @@ -693,11 +696,20 @@ ConfigurationAdminImpl::AddingService(
const ServiceReference<cppmicroservices::service::cm::ManagedService>&
reference)
{
// GetService can cause entry into user code, so don't hold a lock.
auto managedService = cmContext.GetService(reference);
if (!managedService) {
logger->Log(SeverityLevel::LOG_WARNING,
"Ignoring ManagedService as a valid service could not be "
"obtained from the BundleContext");
return nullptr;
}

// Lock the configurations repository so no configuration objects can be
// added or removed while AddingService is processing the new service.
std::lock_guard<std::mutex> lk{ configurationsMutex };
const auto pid = getPidFromServiceReference(reference);

auto pid = getPidFromServiceReference(reference);
if (pid.empty()) {
const auto bundle = reference.GetBundle();
const auto serviceID = std::to_string(cppmicroservices::any_cast<long>(
Expand All @@ -709,20 +721,13 @@ ConfigurationAdminImpl::AddingService(
" as it does not have a service.pid or component.name property");
return nullptr;
}
auto managedService = cmContext.GetService(reference);
if (!managedService) {
logger->Log(SeverityLevel::LOG_DEBUG,
"Ignoring ManagedService as a valid service could not be "
"obtained from the BundleContext");
return nullptr;
}
// Ensure there's a Configuration for this PID if one doesn't exist already.

// Ensure there's a Configuration for this PID if one doesn't exist already.
const auto it = configurations.find(pid);
if (it == std::end(configurations)) {
auto factoryPid = getFactoryPid(pid);
AddFactoryInstanceIfRequired(pid, factoryPid);
configurations.emplace(
auto insertPair = configurations.emplace(
pid,
std::make_shared<ConfigurationImpl>(
this,
Expand All @@ -734,35 +739,50 @@ ConfigurationAdminImpl::AddingService(
// update method. Return here without sending notification.
logger->Log(cppmicroservices::logservice::SeverityLevel::LOG_DEBUG,
"New ManagedService with PID " + pid);

std::unordered_map<std::string, unsigned long> initialChangeCountByPid = {{pid, insertPair.first->second->GetChangeCount()}};
return std::make_shared<
TrackedServiceWrapper<cppmicroservices::service::cm::ManagedService>>(
pid, std::move(managedService));
pid, std::move(initialChangeCountByPid), std::move(managedService));
}
// Send a notification in case a valid configuration object
// was created before the service was active. The service's properties
// need to be updated.
AnyMap properties{ AnyMap::UNORDERED_MAP_CASEINSENSITIVE_KEYS };
unsigned long initialChangeCount{0ul};

if (it != std::end(configurations)) {
try {
properties = it->second->GetProperties();
initialChangeCount = it->second->GetChangeCount();
} catch (const std::runtime_error&) {
// Configuration is being removed
logger->Log(SeverityLevel::LOG_WARNING,
"Attempted to update a configuration which has been removed.",
std::current_exception());
}
// Only send notifications for configuration objects that have been
// Updated.
if (it->second->HasBeenUpdatedAtLeastOnce()) {
PerformAsync([this, pid, managedService, properties] {
notifyServiceUpdated(pid, *managedService, properties, *logger);
});

logger->Log(
cppmicroservices::logservice::SeverityLevel::LOG_DEBUG,
"Async configuration update queued for new ManagedService with PID " +
pid + ".");
}
}

logger->Log(cppmicroservices::logservice::SeverityLevel::LOG_DEBUG,
"New ManagedService with PID " + pid +
" has been added, and async Update has been queued.");
" has been added.");

std::unordered_map<std::string, unsigned long> initialChangeCountByPid = {{pid, initialChangeCount}};
return std::make_shared<
TrackedServiceWrapper<cppmicroservices::service::cm::ManagedService>>(
pid, std::move(managedService));
pid, std::move(initialChangeCountByPid), std::move(managedService));
}

void ConfigurationAdminImpl::ModifiedService(
Expand All @@ -783,7 +803,7 @@ void ConfigurationAdminImpl::RemovedService(
{
// No need to do anything other than log; ManagedService just won't receive any more updates to its Configuration.
logger->Log(cppmicroservices::logservice::SeverityLevel::LOG_DEBUG,
"ManagedService with PID " + service->pid + " has been removed.");
"ManagedService with PID " + service->getPid() + " has been removed.");
}

std::shared_ptr<
Expand All @@ -792,10 +812,18 @@ ConfigurationAdminImpl::AddingService(
const ServiceReference<cppmicroservices::service::cm::ManagedServiceFactory>&
reference)
{
// GetService can cause entry into user code, so don't hold a lock.
auto managedServiceFactory = cmContext.GetService(reference);
if (!managedServiceFactory) {
logger->Log(SeverityLevel::LOG_DEBUG,
"Ignoring ManagedServiceFactory as a valid service could not "
"be obtained from the BundleContext");
return nullptr;
}

std::lock_guard<std::mutex> lk{ configurationsMutex };

const auto pid = getPidFromServiceReference(reference);
auto pid = getPidFromServiceReference(reference);
if (pid.empty()) {
const auto bundle = reference.GetBundle();
const auto serviceID = std::to_string(cppmicroservices::any_cast<long>(
Expand All @@ -806,15 +834,10 @@ ConfigurationAdminImpl::AddingService(
(bundle ? bundle.GetSymbolicName() : "Unknown") +
" as it does not have a service.pid or component.name property");
return nullptr;
}
auto managedServiceFactory = cmContext.GetService(reference);
if (!managedServiceFactory) {
logger->Log(SeverityLevel::LOG_DEBUG,
"Ignoring ManagedServiceFactory as a valid service could not "
"be obtained from the BundleContext");
return nullptr;
}
}

std::vector<std::pair<std::string, AnyMap>> pidsAndProperties;
std::unordered_map<std::string, unsigned long> initialChangeCountPerPid;

const auto it = factoryInstances.find(pid);
if (it != std::end(factoryInstances)) {
Expand All @@ -824,6 +847,7 @@ ConfigurationAdminImpl::AddingService(
"Invalid Configuration iterator");
try {
auto properties = configurationIt->second->GetProperties();
initialChangeCountPerPid[configurationIt->second->GetPid()] = configurationIt->second->GetChangeCount();
// Notifications can only be sent for configuration objects that
// been Updated. Only add it to the notification list if it has
// been Updated.
Expand All @@ -832,6 +856,10 @@ ConfigurationAdminImpl::AddingService(
}
} catch (const std::runtime_error&) {
// Configuration is being removed
logger->Log(
SeverityLevel::LOG_WARNING,
"Attempted to update a configuration which has been removed.",
std::current_exception());
}
}
}
Expand All @@ -852,7 +880,7 @@ ConfigurationAdminImpl::AddingService(
" has been added, and async Update has been queued for all updated instances.");
return std::make_shared<TrackedServiceWrapper<
cppmicroservices::service::cm::ManagedServiceFactory>>(
pid, std::move(managedServiceFactory));
pid, std::move(initialChangeCountPerPid), std::move(managedServiceFactory));
}

void ConfigurationAdminImpl::ModifiedService(
Expand All @@ -872,7 +900,7 @@ void ConfigurationAdminImpl::RemovedService(
{
// No need to do anything other than log; ManagedServiceFactory just won't receive any more updates to any of its Configurations.
logger->Log(cppmicroservices::logservice::SeverityLevel::LOG_DEBUG,
"ManagedServiceFactory with PID " + service->pid +
"ManagedServiceFactory with PID " + service->getPid() +
" has been removed.");
}

Expand Down
32 changes: 27 additions & 5 deletions compendium/ConfigurationAdmin/src/ConfigurationAdminImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,17 @@ namespace cmimpl {
* A wrapper class used for storing the pid of a given ManagedService or ManagedServiceFactory
* with the service in the ServiceTracker.
*/
template<typename T>
template<typename TrackedServiceType>
class TrackedServiceWrapper
{
public:
using TrackedServiceType = T;

TrackedServiceWrapper(std::string trackedPid,
std::unordered_map<std::string, unsigned long> initialChangeCountPerPid,
std::shared_ptr<TrackedServiceType> service)
: pid(std::move(trackedPid))
, trackedService(std::move(service))
: pid(std::move(trackedPid))
, trackedService(std::move(service))
, lastUpdatedChangeCountPerPid(std::move(initialChangeCountPerPid))
{}

TrackedServiceWrapper(const TrackedServiceWrapper&) = delete;
Expand All @@ -68,8 +69,29 @@ class TrackedServiceWrapper

explicit operator bool() const { return static_cast<bool>(trackedService); }

std::string getPid() noexcept {
return pid;
}

std::shared_ptr<TrackedServiceType> getTrackedService() noexcept {
return trackedService;
}

void setLastUpdatedChangeCount(const std::string& pid, const unsigned long& changeCount) {
std::unique_lock<std::mutex> lock(updatedChangeCountMutex);
lastUpdatedChangeCountPerPid[pid] = changeCount;
}

bool needsAnUpdateNotification(const std::string& pid, const unsigned long& changeCount) {
std::unique_lock<std::mutex> lock(updatedChangeCountMutex);
return lastUpdatedChangeCountPerPid[pid] < changeCount;
}

private:
std::string pid;
std::shared_ptr<TrackedServiceType> trackedService;
std::unordered_map<std::string, unsigned long> lastUpdatedChangeCountPerPid; ///< the change count for each pid or factory pid instance
std::mutex updatedChangeCountMutex; ///< guard read/write access to lastUpdatedChangeCountPerPid
};

/**
Expand Down Expand Up @@ -160,7 +182,7 @@ class ConfigurationAdminImpl final
* See {@code ConfigurationAdminPrivate#NotifyConfigurationUpdated}
*/
std::shared_future<void> NotifyConfigurationUpdated(
const std::string& pid) override;
const std::string& pid, const unsigned long changeCount) override;

/**
* Internal method used by {@code ConfigurationImpl} to notify any {@code ManagedService} or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class ConfigurationAdminPrivate
* @param pid The PID of the {@code Configuration} which has been updated
*/
virtual std::shared_future<void> NotifyConfigurationUpdated(
const std::string& pid) = 0;
const std::string& pid, const unsigned long changeCount) = 0;

/**
* Internal method used by {@code ConfigurationImpl} to notify any {@code ManagedService} or
Expand Down

0 comments on commit be23105

Please sign in to comment.