Skip to content

Commit

Permalink
Fix priority ordering for IDO queries
Browse files Browse the repository at this point in the history
fixes #10829
refs #8714
  • Loading branch information
Michael Friedrich authored and gunnarbeutner committed Feb 23, 2016
1 parent 02184ad commit 2bc1d32
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 81 deletions.
2 changes: 1 addition & 1 deletion lib/base/workqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void WorkQueue::Enqueue(const boost::function<void (void)>& function, WorkQueueP
m_CVFull.wait(lock);
}

m_Tasks.push(Task(function, priority));
m_Tasks.push(Task(function, priority, ++m_NextTaskID));

m_CVEmpty.notify_one();
}
Expand Down
20 changes: 16 additions & 4 deletions lib/base/workqueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,31 @@ enum WorkQueuePriority
struct Task
{
Task(void)
: Priority(PriorityNormal)
: Priority(PriorityNormal), ID(-1)
{ }

Task(const boost::function<void (void)>& function, WorkQueuePriority priority)
: Function(function), Priority(priority)
Task(const boost::function<void (void)>& function, WorkQueuePriority priority, int id)
: Function(function), Priority(priority), ID(id)
{ }

boost::function<void (void)> Function;
WorkQueuePriority Priority;
int ID;
};

inline bool operator<(const Task& a, const Task& b)
{
return a.Priority < b.Priority;
if (a.Priority < b.Priority)
return true;

if (a.Priority == b.Priority) {
if (a.ID > b.ID)
return true;
else
return false;
}

return false;
}

/**
Expand Down Expand Up @@ -101,6 +112,7 @@ class I2_BASE_API WorkQueue
bool m_Stopped;
int m_Processing;
std::priority_queue<Task, std::deque<Task> > m_Tasks;
int m_NextTaskID;
ExceptionCallback m_ExceptionCallback;
std::vector<boost::exception_ptr> m_Exceptions;
Timer::Ptr m_StatusTimer;
Expand Down
14 changes: 7 additions & 7 deletions lib/db_ido/dbconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ void DbConnection::Start(bool runtimeCreated)
ObjectImpl<DbConnection>::Start(runtimeCreated);

DbObject::OnQuery.connect(boost::bind(&DbConnection::ExecuteQuery, this, _1));
DbObject::OnMultipleQueries.connect(boost::bind(&DbConnection::ExecuteMultipleQueries, this, _1));
ConfigObject::OnActiveChanged.connect(boost::bind(&DbConnection::UpdateObject, this, _1));
}

Expand Down Expand Up @@ -131,14 +132,16 @@ void DbConnection::ProgramStatusHandler(void)
Log(LogNotice, "DbConnection")
<< "Updating programstatus table.";

std::vector<DbQuery> queries;

DbQuery query1;
query1.Table = "programstatus";
query1.Type = DbQueryDelete;
query1.Category = DbCatProgramStatus;
query1.WhereCriteria = new Dictionary();
query1.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */
query1.Priority = PriorityHigh;
DbObject::OnQuery(query1);
queries.push_back(query1);

DbQuery query2;
query2.Table = "programstatus";
Expand All @@ -165,7 +168,9 @@ void DbConnection::ProgramStatusHandler(void)
query2.Fields->Set("flap_detection_enabled", (IcingaApplication::GetInstance()->GetEnableFlapping() ? 1 : 0));
query2.Fields->Set("process_performance_data", (IcingaApplication::GetInstance()->GetEnablePerfdata() ? 1 : 0));
query2.Priority = PriorityHigh;
DbObject::OnQuery(query2);
queries.push_back(query2);

DbObject::OnMultipleQueries(queries);

DbQuery query3;
query3.Table = "runtimevariables";
Expand Down Expand Up @@ -351,11 +356,6 @@ bool DbConnection::GetStatusUpdate(const DbObject::Ptr& dbobj) const
return (m_StatusUpdates.find(dbobj) != m_StatusUpdates.end());
}

void DbConnection::ExecuteQuery(const DbQuery&)
{
/* Default handler does nothing. */
}

void DbConnection::UpdateObject(const ConfigObject::Ptr& object)
{
if (!GetConnected())
Expand Down
1 change: 1 addition & 0 deletions lib/db_ido/dbconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class I2_DB_IDO_API DbConnection : public ObjectImpl<DbConnection>
virtual void Pause(void) override;

virtual void ExecuteQuery(const DbQuery& query) = 0;
virtual void ExecuteMultipleQueries(const std::vector<DbQuery>&) = 0;
virtual void ActivateObject(const DbObject::Ptr& dbobj) = 0;
virtual void DeactivateObject(const DbObject::Ptr& dbobj) = 0;

Expand Down
129 changes: 69 additions & 60 deletions lib/db_ido/dbevents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void DbEvents::StaticInitialize(void)
/* Status */
Comment::OnCommentAdded.connect(boost::bind(&DbEvents::AddComment, _1));
Comment::OnCommentRemoved.connect(boost::bind(&DbEvents::RemoveComment, _1));
Downtime::OnDowntimeAdded.connect(boost::bind(&DbEvents::AddDowntime, _1, true));
Downtime::OnDowntimeAdded.connect(boost::bind(&DbEvents::AddDowntime, _1));
Downtime::OnDowntimeRemoved.connect(boost::bind(&DbEvents::RemoveDowntime, _1));
Downtime::OnDowntimeTriggered.connect(boost::bind(&DbEvents::TriggerDowntime, _1));
Checkable::OnAcknowledgementSet.connect(boost::bind(&DbEvents::AddAcknowledgement, _1, _4));
Expand Down Expand Up @@ -303,30 +303,42 @@ void DbEvents::AddComments(const Checkable::Ptr& checkable)
{
std::set<Comment::Ptr> comments = checkable->GetComments();

if (!comments.empty())
RemoveComments(checkable);
if (comments.empty())
return;

std::vector<DbQuery> queries;

DbQuery query1;
query1.Table = "comments";
query1.Type = DbQueryDelete;
query1.Category = DbCatComment;
query1.WhereCriteria = new Dictionary();
query1.WhereCriteria->Set("object_id", checkable);
queries.push_back(query1);

BOOST_FOREACH(const Comment::Ptr& comment, comments) {
AddComment(comment);
AddCommentInternal(queries, comment, false);
}

DbObject::OnMultipleQueries(queries);
}

void DbEvents::AddComment(const Comment::Ptr& comment)
{
AddCommentInternal(comment, false);
std::vector<DbQuery> queries;
RemoveCommentInternal(queries, comment);
AddCommentInternal(queries, comment, false);
DbObject::OnMultipleQueries(queries);
}

void DbEvents::AddCommentHistory(const Comment::Ptr& comment)
{
AddCommentInternal(comment, true);
std::vector<DbQuery> queries;
AddCommentInternal(queries, comment, true);
DbObject::OnMultipleQueries(queries);
}

void DbEvents::AddCommentInternal(const Comment::Ptr& comment, bool historical)
{
AddCommentByType(comment, historical);
}

void DbEvents::AddCommentByType(const Comment::Ptr& comment, bool historical)
void DbEvents::AddCommentInternal(std::vector<DbQuery>& queries, const Comment::Ptr& comment, bool historical)
{
Checkable::Ptr checkable = comment->GetCheckable();

Expand Down Expand Up @@ -376,24 +388,18 @@ void DbEvents::AddCommentByType(const Comment::Ptr& comment, bool historical)
query1.Type = DbQueryInsert;
query1.Category = DbCatComment;
query1.Fields = fields1;
DbObject::OnQuery(query1);

queries.push_back(query1);
}

void DbEvents::RemoveComments(const Checkable::Ptr& checkable)
void DbEvents::RemoveComment(const Comment::Ptr& comment)
{
Log(LogDebug, "DbEvents")
<< "removing service comments for '" << checkable->GetName() << "'";

DbQuery query1;
query1.Table = "comments";
query1.Type = DbQueryDelete;
query1.Category = DbCatComment;
query1.WhereCriteria = new Dictionary();
query1.WhereCriteria->Set("object_id", checkable);
DbObject::OnQuery(query1);
std::vector<DbQuery> queries;
RemoveCommentInternal(queries, comment);
DbObject::OnMultipleQueries(queries);
}

void DbEvents::RemoveComment(const Comment::Ptr& comment)
void DbEvents::RemoveCommentInternal(std::vector<DbQuery>& queries, const Comment::Ptr& comment)
{
Checkable::Ptr checkable = comment->GetCheckable();

Expand Down Expand Up @@ -425,6 +431,7 @@ void DbEvents::RemoveComment(const Comment::Ptr& comment)

query2.WhereCriteria = new Dictionary();
query2.WhereCriteria->Set("internal_comment_id", comment->GetLegacyId());
query2.WhereCriteria->Set("object_id", checkable);
query2.WhereCriteria->Set("comment_time", DbValue::FromTimestamp(entry_time));
query2.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */

Expand All @@ -436,37 +443,42 @@ void DbEvents::AddDowntimes(const Checkable::Ptr& checkable)
{
std::set<Downtime::Ptr> downtimes = checkable->GetDowntimes();

if (!downtimes.empty())
RemoveDowntimes(checkable);
if (downtimes.empty())
return;

std::vector<DbQuery> queries;

DbQuery query1;
query1.Table = "scheduleddowntime";
query1.Type = DbQueryDelete;
query1.Category = DbCatDowntime;
query1.WhereCriteria = new Dictionary();
query1.WhereCriteria->Set("object_id", checkable);
queries.push_back(query1);

BOOST_FOREACH(const Downtime::Ptr& downtime, downtimes) {
AddDowntime(downtime, false);
AddDowntimeInternal(queries, downtime, false);
}
}

void DbEvents::AddDowntime(const Downtime::Ptr& downtime, bool remove_existing)
{
/*
* make sure to delete any old downtime to avoid multiple inserts from
* configured ScheduledDowntime dumps and CreateNextDowntime() calls
*/
if (remove_existing)
RemoveDowntime(downtime);

AddDowntimeInternal(downtime, false);
DbObject::OnMultipleQueries(queries);
}

void DbEvents::AddDowntimeHistory(const Downtime::Ptr& downtime)
void DbEvents::AddDowntime(const Downtime::Ptr& downtime)
{
AddDowntimeInternal(downtime, true);
std::vector<DbQuery> queries;
RemoveDowntimeInternal(queries, downtime);
AddDowntimeInternal(queries, downtime, false);
DbObject::OnMultipleQueries(queries);
}

void DbEvents::AddDowntimeInternal(const Downtime::Ptr& downtime, bool historical)
void DbEvents::AddDowntimeHistory(const Downtime::Ptr& downtime)
{
AddDowntimeByType(downtime, historical);
std::vector<DbQuery> queries;
AddDowntimeInternal(queries, downtime, false);
DbObject::OnMultipleQueries(queries);
}

void DbEvents::AddDowntimeByType(const Downtime::Ptr& downtime, bool historical)
void DbEvents::AddDowntimeInternal(std::vector<DbQuery>& queries, const Downtime::Ptr& downtime, bool historical)
{
Checkable::Ptr checkable = downtime->GetCheckable();

Expand Down Expand Up @@ -514,21 +526,18 @@ void DbEvents::AddDowntimeByType(const Downtime::Ptr& downtime, bool historical)
query1.Type = DbQueryInsert;
query1.Category = DbCatDowntime;
query1.Fields = fields1;
DbObject::OnQuery(query1);

queries.push_back(query1);
}

void DbEvents::RemoveDowntimes(const Checkable::Ptr& checkable)
void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime)
{
DbQuery query1;
query1.Table = "scheduleddowntime";
query1.Type = DbQueryDelete;
query1.Category = DbCatDowntime;
query1.WhereCriteria = new Dictionary();
query1.WhereCriteria->Set("object_id", checkable);
DbObject::OnQuery(query1);
std::vector<DbQuery> queries;
RemoveDowntimeInternal(queries, downtime);
DbObject::OnMultipleQueries(queries);
}

void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime)
void DbEvents::RemoveDowntimeInternal(std::vector<DbQuery>& queries, const Downtime::Ptr& downtime)
{
Checkable::Ptr checkable = downtime->GetCheckable();

Expand All @@ -541,7 +550,7 @@ void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime)
query1.WhereCriteria->Set("object_id", checkable);
query1.WhereCriteria->Set("internal_downtime_id", downtime->GetLegacyId());
query1.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */
DbObject::OnQuery(query1);
queries.push_back(query1);

/* History - update actual_end_time, was_cancelled for service (and host in case) */
double now = Utility::GetTime();
Expand All @@ -564,7 +573,7 @@ void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime)
query3.WhereCriteria->Set("entry_time", DbValue::FromTimestamp(downtime->GetEntryTime()));
query3.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */

DbObject::OnQuery(query3);
queries.push_back(query3);

/* host/service status */
Host::Ptr host;
Expand All @@ -577,7 +586,7 @@ void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime)
else
query4.Table = "hoststatus";

query4.Type = DbQueryInsert | DbQueryUpdate;
query4.Type = DbQueryUpdate;
query4.Category = DbCatState;
query4.StatusUpdate = true;
query4.Object = DbObject::GetOrCreateByObject(checkable);
Expand All @@ -595,7 +604,7 @@ void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime)

query4.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */

DbObject::OnQuery(query4);
queries.push_back(query4);
}

void DbEvents::TriggerDowntime(const Downtime::Ptr& downtime)
Expand Down Expand Up @@ -660,7 +669,7 @@ void DbEvents::TriggerDowntime(const Downtime::Ptr& downtime)
else
query4.Table = "hoststatus";

query4.Type = DbQueryInsert | DbQueryUpdate;
query4.Type = DbQueryUpdate;
query4.Category = DbCatState;
query4.StatusUpdate = true;
query4.Object = DbObject::GetOrCreateByObject(checkable);
Expand Down Expand Up @@ -754,7 +763,7 @@ void DbEvents::AddAcknowledgementInternal(const Checkable::Ptr& checkable, Ackno
else
query1.Table = "hoststatus";

query1.Type = DbQueryInsert | DbQueryUpdate;
query1.Type = DbQueryUpdate;
query1.Category = DbCatState;
query1.StatusUpdate = true;
query1.Object = DbObject::GetOrCreateByObject(checkable);
Expand Down
11 changes: 5 additions & 6 deletions lib/db_ido/dbevents.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,8 @@ class DbEvents
public:
static void StaticInitialize(void);

static void AddCommentByType(const Comment::Ptr& comment, bool historical);
static void AddComments(const Checkable::Ptr& checkable);
static void RemoveComments(const Checkable::Ptr& checkable);

static void AddDowntimeByType(const Downtime::Ptr& downtime, bool historical);
static void AddDowntimes(const Checkable::Ptr& checkable);
static void RemoveDowntimes(const Checkable::Ptr& checkable);

Expand All @@ -85,7 +82,7 @@ class DbEvents
static void AddComment(const Comment::Ptr& comment);
static void RemoveComment(const Comment::Ptr& comment);

static void AddDowntime(const Downtime::Ptr& downtime, bool remove_existing);
static void AddDowntime(const Downtime::Ptr& downtime);
static void RemoveDowntime(const Downtime::Ptr& downtime);
static void TriggerDowntime(const Downtime::Ptr& downtime);

Expand Down Expand Up @@ -130,8 +127,10 @@ class DbEvents
private:
DbEvents(void);

static void AddCommentInternal(const Comment::Ptr& comment, bool historical);
static void AddDowntimeInternal(const Downtime::Ptr& downtime, bool historical);
static void AddCommentInternal(std::vector<DbQuery>& queries, const Comment::Ptr& comment, bool historical);
static void RemoveCommentInternal(std::vector<DbQuery>& queries, const Comment::Ptr& comment);
static void AddDowntimeInternal(std::vector<DbQuery>& queries, const Downtime::Ptr& downtime, bool historical);
static void RemoveDowntimeInternal(std::vector<DbQuery>& queries, const Downtime::Ptr& downtime);
static void EnableChangedHandlerInternal(const Checkable::Ptr& checkable, const String& fieldName, bool enabled);
};

Expand Down
1 change: 1 addition & 0 deletions lib/db_ido/dbobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
using namespace icinga;

boost::signals2::signal<void (const DbQuery&)> DbObject::OnQuery;
boost::signals2::signal<void (const std::vector<DbQuery>&)> DbObject::OnMultipleQueries;

INITIALIZE_ONCE(&DbObject::StaticInitialize);

Expand Down
Loading

0 comments on commit 2bc1d32

Please sign in to comment.