Skip to content

Commit

Permalink
Merge branch 'feature/ido-reuse-ids-5565' into next
Browse files Browse the repository at this point in the history
Fixes #5565
  • Loading branch information
gunnarbeutner committed Feb 4, 2014
2 parents 6c724d2 + 856f011 commit 6549a6d
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 99 deletions.
89 changes: 44 additions & 45 deletions components/db_ido_mysql/idomysqlconnection.cpp
Expand Up @@ -159,7 +159,7 @@ void IdoMysqlConnection::Reconnect(void)
if (!mysql_init(&m_Connection))
BOOST_THROW_EXCEPTION(std::bad_alloc());

if (!mysql_real_connect(&m_Connection, host, user, passwd, db, port, NULL, 0))
if (!mysql_real_connect(&m_Connection, host, user, passwd, db, port, NULL, CLIENT_FOUND_ROWS))
BOOST_THROW_EXCEPTION(std::runtime_error(mysql_error(&m_Connection)));

m_Connected = true;
Expand Down Expand Up @@ -210,7 +210,7 @@ void IdoMysqlConnection::Reconnect(void)
+ "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())");

/* clear config tables for the initial config dump */
ClearConfigTables();
PrepareDatabase();

std::ostringstream q1buf;
q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
Expand All @@ -233,40 +233,6 @@ void IdoMysqlConnection::Reconnect(void)
UpdateAllObjects();
}

void IdoMysqlConnection::ClearConfigTables(void)
{
/* TODO make hardcoded table names modular */
ClearConfigTable("commands");
ClearConfigTable("comments");
ClearConfigTable("contact_addresses");
ClearConfigTable("contact_notificationcommands");
ClearConfigTable("contactgroup_members");
ClearConfigTable("contactgroups");
ClearConfigTable("contacts");
ClearConfigTable("contactstatus");
ClearConfigTable("customvariables");
ClearConfigTable("customvariablestatus");
ClearConfigTable("host_contactgroups");
ClearConfigTable("host_contacts");
ClearConfigTable("host_parenthosts");
ClearConfigTable("hostdependencies");
ClearConfigTable("hostgroup_members");
ClearConfigTable("hostgroups");
ClearConfigTable("hosts");
ClearConfigTable("hoststatus");
ClearConfigTable("programstatus");
ClearConfigTable("scheduleddowntime");
ClearConfigTable("service_contactgroups");
ClearConfigTable("service_contacts");
ClearConfigTable("servicedependencies");
ClearConfigTable("servicegroup_members");
ClearConfigTable("servicegroups");
ClearConfigTable("services");
ClearConfigTable("servicestatus");
ClearConfigTable("timeperiod_timeranges");
ClearConfigTable("timeperiods");
}

void IdoMysqlConnection::ClearConfigTable(const String& table)
{
Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + Convert::ToString(static_cast<long>(m_InstanceID)));
Expand All @@ -285,6 +251,8 @@ IdoMysqlResult IdoMysqlConnection::Query(const String& query)
<< errinfo_database_query(query)
);

m_AffectedRows = mysql_affected_rows(&m_Connection);

MYSQL_RES *result = mysql_use_result(&m_Connection);

if (!result) {
Expand All @@ -308,6 +276,13 @@ DbReference IdoMysqlConnection::GetLastInsertID(void)
return DbReference(mysql_insert_id(&m_Connection));
}

int IdoMysqlConnection::GetAffectedRows(void)
{
AssertOnWorkQueue();

return m_AffectedRows;
}

String IdoMysqlConnection::Escape(const String& s)
{
AssertOnWorkQueue();
Expand Down Expand Up @@ -465,10 +440,10 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
{
ASSERT(query.Category != DbCatInvalid);

m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query), true);
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), true);
}

void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
{
boost::mutex::scoped_lock lock(m_ConnectionMutex);

Expand Down Expand Up @@ -502,7 +477,11 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
}
}

if ((query.Type & DbQueryInsert) && (query.Type & DbQueryUpdate)) {
type = typeOverride ? *typeOverride : query.Type;

bool upsert = false;

if ((type & DbQueryInsert) && (type & DbQueryUpdate)) {
bool hasid = false;

ASSERT(query.Object);
Expand All @@ -514,12 +493,11 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
else
ASSERT(!"Invalid query flags.");

if (hasid)
type = DbQueryUpdate;
else
type = DbQueryInsert;
} else
type = query.Type;
if (!hasid)
upsert = true;

type = DbQueryUpdate;
}

switch (type) {
case DbQueryInsert:
Expand Down Expand Up @@ -575,6 +553,15 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)

Query(qbuf.str());

if (upsert && GetAffectedRows() == 0) {
lock.unlock();

DbQueryType to = DbQueryInsert;
InternalExecuteQuery(query, &to);

return;
}

if (query.Object) {
if (query.ConfigUpdate)
SetConfigUpdate(query.Object, true);
Expand Down Expand Up @@ -607,3 +594,15 @@ void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const
Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
" < FROM_UNIXTIME(" + Convert::ToString(static_cast<long>(max_age)) + ")");
}

void IdoMysqlConnection::FillIDCache(const DbType::Ptr& type)
{
String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id FROM " + GetTablePrefix() + type->GetTable() + "s";
IdoMysqlResult result = Query(query);

Dictionary::Ptr row;

while ((row = FetchRow(result))) {
SetInsertID(type, DbReference(row->Get("object_id")), DbReference(row->Get(type->GetTable() + "_id")));
}
}
8 changes: 5 additions & 3 deletions components/db_ido_mysql/idomysqlconnection.h
Expand Up @@ -49,6 +49,7 @@ class IdoMysqlConnection : public ObjectImpl<IdoMysqlConnection>
virtual void DeactivateObject(const DbObject::Ptr& dbobj);
virtual void ExecuteQuery(const DbQuery& query);
virtual void CleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
virtual void FillIDCache(const DbType::Ptr& type);

private:
DbReference m_InstanceID;
Expand All @@ -58,12 +59,14 @@ class IdoMysqlConnection : public ObjectImpl<IdoMysqlConnection>
boost::mutex m_ConnectionMutex;
bool m_Connected;
MYSQL m_Connection;
int m_AffectedRows;

Timer::Ptr m_ReconnectTimer;
Timer::Ptr m_TxTimer;

IdoMysqlResult Query(const String& query);
DbReference GetLastInsertID(void);
int GetAffectedRows(void);
String Escape(const String& s);
Dictionary::Ptr FetchRow(const IdoMysqlResult& result);
void DiscardRows(const IdoMysqlResult& result);
Expand All @@ -80,11 +83,10 @@ class IdoMysqlConnection : public ObjectImpl<IdoMysqlConnection>
void TxTimerHandler(void);
void ReconnectTimerHandler(void);

void InternalExecuteQuery(const DbQuery& query);
void InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride = NULL);
void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value);

void ClearConfigTables(void);
void ClearConfigTable(const String& table);
virtual void ClearConfigTable(const String& table);

void ExceptionHandler(boost::exception_ptr exp);
};
Expand Down
88 changes: 45 additions & 43 deletions components/db_ido_pgsql/idopgsqlconnection.cpp
Expand Up @@ -210,7 +210,7 @@ void IdoPgsqlConnection::Reconnect(void)
+ "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())");

/* clear config tables for the initial config dump */
ClearConfigTables();
PrepareDatabase();

std::ostringstream q1buf;
q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
Expand All @@ -236,40 +236,6 @@ void IdoPgsqlConnection::Reconnect(void)
UpdateAllObjects();
}

void IdoPgsqlConnection::ClearConfigTables(void)
{
/* TODO make hardcoded table names modular */
ClearConfigTable("commands");
ClearConfigTable("comments");
ClearConfigTable("contact_addresses");
ClearConfigTable("contact_notificationcommands");
ClearConfigTable("contactgroup_members");
ClearConfigTable("contactgroups");
ClearConfigTable("contacts");
ClearConfigTable("contactstatus");
ClearConfigTable("customvariables");
ClearConfigTable("customvariablestatus");
ClearConfigTable("host_contactgroups");
ClearConfigTable("host_contacts");
ClearConfigTable("host_parenthosts");
ClearConfigTable("hostdependencies");
ClearConfigTable("hostgroup_members");
ClearConfigTable("hostgroups");
ClearConfigTable("hosts");
ClearConfigTable("hoststatus");
ClearConfigTable("programstatus");
ClearConfigTable("scheduleddowntime");
ClearConfigTable("service_contactgroups");
ClearConfigTable("service_contacts");
ClearConfigTable("servicedependencies");
ClearConfigTable("servicegroup_members");
ClearConfigTable("servicegroups");
ClearConfigTable("services");
ClearConfigTable("servicestatus");
ClearConfigTable("timeperiod_timeranges");
ClearConfigTable("timeperiods");
}

void IdoPgsqlConnection::ClearConfigTable(const String& table)
{
Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + Convert::ToString(static_cast<long>(m_InstanceID)));
Expand Down Expand Up @@ -303,6 +269,9 @@ IdoPgsqlResult IdoPgsqlConnection::Query(const String& query)
);
}

char *rowCount = PQcmdTuples(result);
m_AffectedRows = atoi(rowCount);

return IdoPgsqlResult(result, std::ptr_fun(PQclear));
}

Expand All @@ -323,6 +292,13 @@ DbReference IdoPgsqlConnection::GetSequenceValue(const String& table, const Stri
return DbReference(Convert::ToLong(row->Get("id")));
}

int IdoPgsqlConnection::GetAffectedRows(void)
{
AssertOnWorkQueue();

return m_AffectedRows;
}

String IdoPgsqlConnection::Escape(const String& s)
{
AssertOnWorkQueue();
Expand Down Expand Up @@ -468,10 +444,10 @@ void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query)
{
ASSERT(query.Category != DbCatInvalid);

m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query), true);
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), true);
}

void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query)
void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
{
boost::mutex::scoped_lock lock(m_ConnectionMutex);

Expand Down Expand Up @@ -505,6 +481,10 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query)
}
}

type = typeOverride ? *typeOverride : query.Type;

bool upsert = false;

if ((query.Type & DbQueryInsert) && (query.Type & DbQueryUpdate)) {
bool hasid = false;

Expand All @@ -517,12 +497,11 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query)
else
ASSERT(!"Invalid query flags.");

if (hasid)
type = DbQueryUpdate;
else
type = DbQueryInsert;
} else
type = query.Type;
if (!hasid)
upsert = true;

type = DbQueryUpdate;
}

switch (type) {
case DbQueryInsert:
Expand Down Expand Up @@ -577,6 +556,15 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query)

Query(qbuf.str());

if (upsert && GetAffectedRows() == 0) {
lock.unlock();

DbQueryType to = DbQueryInsert;
InternalExecuteQuery(query, &to);

return;
}

if (query.Object) {
if (query.ConfigUpdate)
SetConfigUpdate(query.Object, true);
Expand Down Expand Up @@ -616,3 +604,17 @@ void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const
Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
" < TO_TIMESTAMP(" + Convert::ToString(static_cast<long>(max_age)) + ")");
}

void IdoPgsqlConnection::FillIDCache(const DbType::Ptr& type)
{
String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id FROM " + GetTablePrefix() + type->GetTable() + "s";
IdoPgsqlResult result = Query(query);

Dictionary::Ptr row;

int index = 0;
while ((row = FetchRow(result, index))) {
index++;
SetInsertID(type, DbReference(row->Get("object_id")), DbReference(row->Get(type->GetTable() + "_id")));
}
}
8 changes: 5 additions & 3 deletions components/db_ido_pgsql/idopgsqlconnection.h
Expand Up @@ -49,6 +49,7 @@ class IdoPgsqlConnection : public ObjectImpl<IdoPgsqlConnection>
virtual void DeactivateObject(const DbObject::Ptr& dbobj);
virtual void ExecuteQuery(const DbQuery& query);
virtual void CleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
virtual void FillIDCache(const DbType::Ptr& type);

private:
DbReference m_InstanceID;
Expand All @@ -57,12 +58,14 @@ class IdoPgsqlConnection : public ObjectImpl<IdoPgsqlConnection>

boost::mutex m_ConnectionMutex;
PGconn *m_Connection;
int m_AffectedRows;

Timer::Ptr m_ReconnectTimer;
Timer::Ptr m_TxTimer;

IdoPgsqlResult Query(const String& query);
DbReference GetSequenceValue(const String& table, const String& column);
int GetAffectedRows(void);
String Escape(const String& s);
Dictionary::Ptr FetchRow(const IdoPgsqlResult& result, int row);

Expand All @@ -78,11 +81,10 @@ class IdoPgsqlConnection : public ObjectImpl<IdoPgsqlConnection>
void TxTimerHandler(void);
void ReconnectTimerHandler(void);

void InternalExecuteQuery(const DbQuery& query);
void InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride = NULL);
void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value);

void ClearConfigTables(void);
void ClearConfigTable(const String& table);
virtual void ClearConfigTable(const String& table);

void ExceptionHandler(boost::exception_ptr exp);
};
Expand Down

0 comments on commit 6549a6d

Please sign in to comment.