Skip to content

Commit

Permalink
Merge pull request #9980 from Icinga/config-sync-conflicts
Browse files Browse the repository at this point in the history
Process `config::update/delete` cluster events gracefully
  • Loading branch information
julianbrost committed Feb 19, 2024
2 parents 7d1c887 + 456144c commit 04ef105
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 11 deletions.
53 changes: 53 additions & 0 deletions lib/remote/apilistener-configsync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "base/configtype.hpp"
#include "base/json.hpp"
#include "base/convert.hpp"
#include "base/defer.hpp"
#include "config/vmops.hpp"
#include <fstream>

Expand Down Expand Up @@ -104,6 +105,15 @@ Value ApiListener::ConfigUpdateObjectAPIHandler(const MessageOrigin::Ptr& origin
return Empty;
}

// Wait for the object name to become available for processing and block it immediately.
// Doing so guarantees that only one cluster event (create/update/delete) of a given
// object is being processed at any given time.
listener->m_ObjectConfigChangeLock.Lock(ptype, objName);

Defer unlockAndNotify([&listener, &ptype, &objName]{
listener->m_ObjectConfigChangeLock.Unlock(ptype, objName);
});

ConfigObject::Ptr object = ctype->GetObject(objName);

String config = params->Get("config");
Expand Down Expand Up @@ -258,6 +268,15 @@ Value ApiListener::ConfigDeleteObjectAPIHandler(const MessageOrigin::Ptr& origin
return Empty;
}

// Wait for the object name to become available for processing and block it immediately.
// Doing so guarantees that only one cluster event (create/update/delete) of a given
// object is being processed at any given time.
listener->m_ObjectConfigChangeLock.Lock(ptype, objName);

Defer unlockAndNotify([&listener, &ptype, &objName]{
listener->m_ObjectConfigChangeLock.Unlock(ptype, objName);
});

ConfigObject::Ptr object = ctype->GetObject(objName);

if (!object) {
Expand Down Expand Up @@ -462,3 +481,37 @@ void ApiListener::SendRuntimeConfigObjects(const JsonRpcConnection::Ptr& aclient
Log(LogInformation, "ApiListener")
<< "Finished syncing runtime objects to endpoint '" << endpoint->GetName() << "'.";
}

/**
* Locks the specified object name of the given type. If it is already locked, the call blocks until the lock is released.
*
* @param Type::Ptr ptype The type of the object you want to lock
* @param String objName The object name you want to lock
*/
void ObjectNameMutex::Lock(const Type::Ptr& ptype, const String& objName)
{
std::unique_lock<std::mutex> lock(m_Mutex);
m_CV.wait(lock, [this, &ptype, &objName]{
auto& locked = m_LockedObjectNames[ptype.get()];
return locked.find(objName) == locked.end();
});

// Add object name to the locked list again to block all other threads that try
// to process a message affecting the same object.
m_LockedObjectNames[ptype.get()].emplace(objName);
}

/**
* Unlocks the specified object name of the given type.
*
* @param Type::Ptr ptype The type of the object you want to unlock
* @param String objName The name of the object you want to unlock
*/
void ObjectNameMutex::Unlock(const Type::Ptr& ptype, const String& objName)
{
{
std::unique_lock<std::mutex> lock(m_Mutex);
m_LockedObjectNames[ptype.get()].erase(objName);
}
m_CV.notify_all();
}
23 changes: 23 additions & 0 deletions lib/remote/apilistener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,26 @@ enum class ApiCapabilities : uint_fast64_t
IfwApiCheckCommand = 1u << 1u,
};

/**
* Allows you to easily lock/unlock a specific object of a given type by its name.
*
* That way, locking an object "this" of type Host does not affect an object "this" of
* type "Service" nor an object "other" of type "Host".
*
* @ingroup remote
*/
class ObjectNameMutex
{
public:
void Lock(const Type::Ptr& ptype, const String& objName);
void Unlock(const Type::Ptr& ptype, const String& objName);

private:
std::mutex m_Mutex;
std::condition_variable m_CV;
std::map<Type*, std::set<String>> m_LockedObjectNames;
};

/**
* @ingroup remote
*/
Expand Down Expand Up @@ -257,6 +277,9 @@ class ApiListener final : public ObjectImpl<ApiListener>
mutable std::mutex m_ActivePackageStagesLock;
std::map<String, String> m_ActivePackageStages;

/* ensures that at most one create/update/delete is being processed per object at each time */
mutable ObjectNameMutex m_ObjectConfigChangeLock;

void UpdateActivePackageStagesCache();
};

Expand Down
28 changes: 17 additions & 11 deletions lib/remote/configobjectutility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "remote/apilistener.hpp"
#include "config/configcompiler.hpp"
#include "config/configitem.hpp"
#include "base/atomic-file.hpp"
#include "base/configwriter.hpp"
#include "base/exception.hpp"
#include "base/dependencygraph.hpp"
Expand Down Expand Up @@ -198,13 +199,21 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full
return false;
}

// AtomicFile doesn't create not yet existing directories, so we have to do it by ourselves.
Utility::MkDirP(Utility::DirName(path), 0700);

std::ofstream fp(path.CStr(), std::ofstream::out | std::ostream::trunc);
// Using AtomicFile guarantees that two different threads simultaneously creating and loading the same
// configuration file do not interfere with each other, as the configuration is stored in a unique temp file.
// When one thread fails to pass object validation, it only deletes its temporary file and does not affect
// the other thread in any way.
AtomicFile fp(path, 0644);
fp << config;
fp.close();
// Flush the output buffer to catch any errors ASAP and handle them accordingly!
// Note: AtomicFile places these configs in a temp file and will be automatically
// discarded when it is not committed before going out of scope.
fp.flush();

std::unique_ptr<Expression> expr = ConfigCompiler::CompileFile(path, String(), "_api");
std::unique_ptr<Expression> expr = ConfigCompiler::CompileText(path, config, String(), "_api");

try {
ActivationScope ascope;
Expand All @@ -225,9 +234,7 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full
if (!ConfigItem::CommitItems(ascope.GetContext(), upq, newItems, true)) {
if (errors) {
Log(LogNotice, "ConfigObjectUtility")
<< "Failed to commit config item '" << fullName << "'. Aborting and removing config path '" << path << "'.";

Utility::Remove(path);
<< "Failed to commit config item '" << fullName << "'.";

for (const boost::exception_ptr& ex : upq.GetExceptions()) {
errors->Add(DiagnosticInformation(ex, false));
Expand All @@ -248,9 +255,7 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full
if (!ConfigItem::ActivateItems(newItems, true, false, false, cookie)) {
if (errors) {
Log(LogNotice, "ConfigObjectUtility")
<< "Failed to activate config object '" << fullName << "'. Aborting and removing config path '" << path << "'.";

Utility::Remove(path);
<< "Failed to activate config object '" << fullName << "'.";

for (const boost::exception_ptr& ex : upq.GetExceptions()) {
errors->Add(DiagnosticInformation(ex, false));
Expand All @@ -275,6 +280,9 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full
ConfigObject::Ptr obj = ctype->GetObject(fullName);

if (obj) {
// Object has surpassed the compiling/validation processes, we can safely commit the file!
fp.Commit();

Log(LogInformation, "ConfigObjectUtility")
<< "Created and activated object '" << fullName << "' of type '" << type->GetName() << "'.";
} else {
Expand All @@ -283,8 +291,6 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full
}

} catch (const std::exception& ex) {
Utility::Remove(path);

if (errors)
errors->Add(DiagnosticInformation(ex, false));

Expand Down

0 comments on commit 04ef105

Please sign in to comment.