Skip to content

Commit

Permalink
Bug #271 - set/subscription data
Browse files Browse the repository at this point in the history
  • Loading branch information
Christopher Browne committed Jun 29, 2012
1 parent 34dedfc commit c95c664
Showing 1 changed file with 103 additions and 17 deletions.
120 changes: 103 additions & 17 deletions src/slon/cleanup_thread.c
Expand Up @@ -54,7 +54,7 @@ cleanupThread_main( /* @unused@ */ void *dummy)
SlonDString query_baseclean;
SlonDString query2;
SlonDString query_pertbl;
SlonDString query_nodeinfo;
SlonDString query_health;

PGconn *dbconn;
PGresult *res;
Expand All @@ -65,11 +65,10 @@ cleanupThread_main( /* @unused@ */ void *dummy)
int vac_count = 0;
int vac_enable = SLON_VACUUM_FREQUENCY;
char *vacuum_action;
int ntuples;
bool p_mem_eq_db;
SlonNode *node;
int node_id;
int tupno;
int tupno, ntuples;

slon_log(SLON_CONFIG, "cleanupThread: thread starts\n");

Expand Down Expand Up @@ -249,17 +248,17 @@ cleanupThread_main( /* @unused@ */ void *dummy)

/* 1. check that sl_node agrees with node list */
p_mem_eq_db = true;
dstring_init(&query_nodeinfo);
dstring_init(&query_health);

/* Walk thru node list on disk, simply looking for existence */
slon_mkquery(&query_nodeinfo,
slon_mkquery(&query_health,
"select no_id from %s.sl_node;",
rtcfg_namespace, node_id);
res = PQexec(dbconn, dstring_data(&query_nodeinfo));
rtcfg_namespace);
res = PQexec(dbconn, dstring_data(&query_health));
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
slon_log(SLON_ERROR,
"cleanupThread: \"%s\" - %s",
dstring_data(&query_nodeinfo), PQresultErrorMessage(res));
dstring_data(&query_health), PQresultErrorMessage(res));
}
for (tupno = 0; tupno < ntuples; tupno++)
{
Expand All @@ -275,14 +274,14 @@ cleanupThread_main( /* @unused@ */ void *dummy)
/* Walk thru node list in memory... */
for (node = rtcfg_node_list_head; node ; node = node->next) {
node_id = node->no_id;
slon_mkquery(&query_nodeinfo,
slon_mkquery(&query_health,
"select no_comment from %s.sl_node where no_id = %d;",
rtcfg_namespace, node_id);
res = PQexec(dbconn, dstring_data(&query_nodeinfo));
res = PQexec(dbconn, dstring_data(&query_health));
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
slon_log(SLON_ERROR,
"cleanupThread: \"%s\" - %s",
dstring_data(&query_nodeinfo), PQresultErrorMessage(res));
dstring_data(&query_health), PQresultErrorMessage(res));
}
if (PQntuples(res) != 1) {
slon_log(SLON_WARN, "cleanupThread: in-memory node %d not found in sl_node\n",
Expand All @@ -296,9 +295,9 @@ cleanupThread_main( /* @unused@ */ void *dummy)
}
}
PQclear(res);
dstring_reset(&query_nodeinfo);
dstring_reset(&query_health);
}
dstring_reset(&query_nodeinfo);
dstring_reset(&query_health);

/* Induce logging if you like by setting p_mem_eq_db to "false" */
/* p_mem_eq_db = false; */
Expand All @@ -308,14 +307,14 @@ cleanupThread_main( /* @unused@ */ void *dummy)
} else {
slon_log(SLON_WARN, "cleanupThread: differences between in-memory node list and sl_node\n");
slon_log(SLON_WARN, "cleanupThread: dumping sl_node:\n");
slon_mkquery(&query_nodeinfo,
slon_mkquery(&query_health,
"select no_id, no_active, no_comment, pa_conninfo, pa_connretry from %s.sl_node, %s.sl_path where pa_server = no_id and pa_client = %d;",
rtcfg_namespace, rtcfg_namespace, rtcfg_namespace, rtcfg_nodeid);
res = PQexec(dbconn, dstring_data(&query_nodeinfo));
res = PQexec(dbconn, dstring_data(&query_health));
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
slon_log(SLON_ERROR,
"cleanupThread: \"%s\" - %s",
dstring_data(&query_nodeinfo), PQresultErrorMessage(res));
dstring_data(&query_health), PQresultErrorMessage(res));
}
ntuples = PQntuples(res);
for (tupno = 0; tupno < ntuples; tupno++)
Expand All @@ -341,10 +340,97 @@ cleanupThread_main( /* @unused@ */ void *dummy)
}
}
}

/* Check subscriptions in memory to sl_subscribe */
p_mem_eq_db = true;

slon_log(SLON_WARN, "cleanupThread: comparing subscriptions to sl_subscribe\n");
slon_mkquery(&query_health,
"select sub_set, sub_provider, sub_forward, sub_active, set_origin, set_comment "
"from %s.sl_subscribe, %s.sl_set "
"where sub_receiver = %d and sub_set = set_id;",
rtcfg_namespace, rtcfg_namespace, rtcfg_nodeid);
res = PQexec(dbconn, dstring_data(&query_health));
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
slon_log(SLON_ERROR, "cleanupThread: Cannot get subscription config - %s\n",
PQresultErrorMessage(res));
PQclear(res);
dstring_free(&query_health);
slon_retry();
}
slon_log(SLON_WARN, "cleanupThread: query against sl_subscribe\n");
for (tupno = 0, ntuples = PQntuples(res); tupno < ntuples; tupno++)
{
int sub_set = (int) strtol(PQgetvalue(res, tupno, 0), NULL, 10);
int sub_provider = (int) strtol(PQgetvalue(res, tupno, 1), NULL, 10);
char *sub_forward = PQgetvalue(res, tupno, 2);
char *sub_active = PQgetvalue(res, tupno, 3);
bool found = false;
SlonSet *set;

/* Now, compare to rtcfg_set_list_head structure */
for (set = rtcfg_set_list_head; set; set = set->next)
{
if (set->set_id == sub_set)
{
found = true;
/* Compare entry to tuple */
if (set->sub_provider != sub_provider) {
slon_log(SLON_WARN, "cleanupThread: diff for provider for subscription %d - sl_log:[%d] memory:[%d]\n",
sub_set, sub_provider, set->sub_provider);
p_mem_eq_db = false;
}
if (strcmp(sub_forward, (set->sub_forward)? "t" : "f")) {
slon_log(SLON_WARN, "cleanupThread: diff for forwarding for subscription %d - sl_log:[%s] memory:[%s]\n",
sub_set, sub_forward, set->sub_forward);
p_mem_eq_db = false;
}
if (strcmp(sub_forward, (set->sub_active)? "t" : "f")) {
slon_log(SLON_WARN, "cleanupThread: diff for active for subscription %d - sl_log:[%s] memory:[%s]\n",
sub_set, sub_active, set->sub_active);
p_mem_eq_db = false;
}
}
}
if (found == false) {
slon_log(SLON_WARN, "cleanupThread: memory node for provider for subscription %d not found\n",
sub_set);
p_mem_eq_db = false;
}
}

/* Induce dump of data */
/* p_mem_eq_db = false; */
if (p_mem_eq_db == false) {
SlonSet *set;
/* Note no need to requery the database, as we already have a
* result set with exactly the data that we want in 'res' */
for (tupno = 0, ntuples = PQntuples(res); tupno < ntuples; tupno++)
{
int sub_set = (int) strtol(PQgetvalue(res, tupno, 0), NULL, 10);
int sub_provider = (int) strtol(PQgetvalue(res, tupno, 1), NULL, 10);
char *sub_forward = PQgetvalue(res, tupno, 2);
char *sub_active = PQgetvalue(res, tupno, 3);
int set_origin = (int) strtol(PQgetvalue(res, tupno, 4), NULL, 10);
char *set_comment = PQgetvalue(res, tupno, 5);

slon_log(SLON_WARN, "cleanupThread: sl_subscribe set:[%d] provider:[%d] origin:[%d] forward?:[%s] active?:[%s] comment:[%s]\n",
sub_set, sub_provider, set_origin, sub_forward, sub_active, set_comment);
}
for (set = rtcfg_set_list_head; set; set = set->next)
{
slon_log(SLON_WARN, "cleanupThread: in-memory set id:[%d] origin:[%d] provider:[%d] forwarder?:[%d] active?:[%d] comment:[%s]\n",
set->set_id, set->set_origin, set->sub_provider,
set->sub_forward, set->sub_active, ((set->set_comment) !=NULL)?(set->set_comment):"n/a");
}
}
PQclear(res);

/*
* Free Resources
*/
dstring_free(&query_nodeinfo);
dstring_free(&query_health);
dstring_free(&query_pertbl);
monitor_state("local_cleanup", 0, conn->conn_pid, "thread main loop", 0, "n/a");
}
Expand Down

0 comments on commit c95c664

Please sign in to comment.