Skip to content

Commit

Permalink
look up trigger hist during extraction
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 16, 2015
1 parent 82421cd commit 3602151
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 33 deletions.
1 change: 1 addition & 0 deletions symmetric-client-clib/inc/service/DataService.h
Expand Up @@ -27,6 +27,7 @@
#include "db/platform/DatabasePlatform.h"
#include "service/TriggerRouterService.h"
#include "model/Data.h"
#include "model/TriggerHistory.h"
#include "io/data/Batch.h"
#include "util/List.h"
#include "util/StringBuilder.h"
Expand Down
22 changes: 19 additions & 3 deletions symmetric-client-clib/inc/service/TriggerRouterService.h
Expand Up @@ -40,6 +40,7 @@
#include "common/Log.h"
#include "model/TriggerRouter.h"
#include "util/StringUtils.h"
#include "util/Map.h"
#include "service/SequenceService.h"
#include "model/Router.h"
#include "io/data/DataEventType.h"
Expand All @@ -51,8 +52,13 @@ typedef struct SymTriggerRouterService {
SymParameterService *parameterService;
SymDatabasePlatform *platform;
SymDialect *symmetricDialect;
SymMap *historyMap;

void (*syncTriggers)(struct SymTriggerRouterService *this, unsigned short force);
SymTriggerHistory * (*getTriggerHistory)(struct SymTriggerRouterService *this, int histId);
SymList * (*getActiveTriggerHistories)(struct SymTriggerRouterService *this);
SymList * (*getActiveTriggerHistoriesByTrigger)(struct SymTriggerRouterService *this, SymTrigger *trigger);
SymList * (*getActiveTriggerHistoriesByTableName)(struct SymTriggerRouterService *this, char *tableName);
void (*destroy)(struct SymTriggerRouterService *this);
} SymTriggerRouterService;

Expand Down Expand Up @@ -83,8 +89,18 @@ r.router_type,r.router_id,r.create_time as r_create_time,r.last_update_time as r
r.use_source_catalog_schema \
from sym_router r order by r.router_id "

#define SYM_SQL_SELECT_TRIGGER_HISTORY "select trigger_hist_id,trigger_id,source_table_name,table_hash,create_time,pk_column_names,column_names,last_trigger_build_reason,name_for_delete_trigger,name_for_insert_trigger,name_for_update_trigger,source_schema_name,source_catalog_name,trigger_row_hash,trigger_template_hash,error_message \
from sym_trigger_hist \
where inactive_time is null "
#define SYM_SQL_TRIGGER_HIST "select trigger_hist_id,trigger_id,source_table_name,table_hash,create_time,pk_column_names,column_names,\
last_trigger_build_reason,name_for_delete_trigger,name_for_insert_trigger,name_for_update_trigger,source_schema_name,source_catalog_name,\
trigger_row_hash,trigger_template_hash,error_message \
from sym_trigger_hist where trigger_hist_id = ?"

#define SYM_ALL_TRIGGER_HIST "select trigger_hist_id,trigger_id,source_table_name,table_hash,create_time,pk_column_names,column_names,\
last_trigger_build_reason,name_for_delete_trigger,name_for_insert_trigger,name_for_update_trigger,source_schema_name,source_catalog_name,\
trigger_row_hash,trigger_template_hash,error_message \
from sym_trigger_hist "

#define SYM_ACTIVE_TRIGGER_HIST "where inactive_time is null"

#define SYM_TRIGGER_HIST_BY_SOURCE_TABLE_WHERE "where source_table_name = ? and inactive_time is null"

#endif
1 change: 1 addition & 0 deletions symmetric-client-clib/src/io/reader/ExtractDataReader.c
Expand Up @@ -68,6 +68,7 @@ void SymExtractDataReader_close(SymExtractDataReader *this) {
}

void SymExtractDataReader_destroy(SymExtractDataReader *this) {
this->batch->destroy(this->batch);
free(this);
}

Expand Down
15 changes: 7 additions & 8 deletions symmetric-client-clib/src/service/DataService.c
Expand Up @@ -20,7 +20,7 @@
*/
#include "service/DataService.h"

SymData * SymDataService_dataMapper(SymRow *row) {
SymData * SymDataService_dataMapper(SymRow *row, SymDataService *this) {
SymData *data = SymData_new(NULL);
data->dataId = row->getLong(row, "data_id");
data->rowData = row->getString(row, "row_data");
Expand All @@ -37,23 +37,22 @@ SymData * SymDataService_dataMapper(SymRow *row) {
data->routerId = row->getString(row, "router_id");
data->triggerHistId = row->getInt(row, "trigger_hist_id");

// TODO: add triggerHistory
/*
SymTriggerHistory *triggerHistory = this->triggerRouterService->getTriggerHistory(this->triggerRouterService, triggerHistId);
SymTriggerHistory *triggerHistory = this->triggerRouterService->getTriggerHistory(this->triggerRouterService, data->triggerHistId);
if (triggerHistory == NULL) {
triggerHistory = SymTriggerHistory_newWithId(triggerHistId);
triggerHistory = SymTriggerHistory_newWithId(NULL, data->triggerHistId);
} else {
if (strcmp(triggerHistory->sourceTableName, data->tableName) != 0) {
SymLog_warn("There was a mismatch between the data table name {} and the trigger_hist table name %s for data_id {}. Attempting to look up a valid trigger_hist row by table name",
data->tableName, triggerHistory->sourceTableName, data->dataId);
SymList *list = this->triggerRouterService->getActiveTriggerHistories(this->triggerRouterService, data->tableName);
SymList *list = this->triggerRouterService->getActiveTriggerHistoriesByTableName(this->triggerRouterService, data->tableName);
triggerHistory = list->get(list, 0);
list->destroy(list);
}
}
data->triggerHistory = triggerHistory;
*/
return data;
}

SymList * SymDataService_selectDataFor(SymDataService *this, SymBatch *batch) {
SymStringBuilder *sb = SymStringBuilder_new(SYM_SQL_SELECT_EVENT_DATA_TO_EXTRACT);
sb->append(sb, " order by d.data_id asc");
Expand All @@ -63,7 +62,7 @@ SymList * SymDataService_selectDataFor(SymDataService *this, SymBatch *batch) {

int error;
SymSqlTemplate *sqlTemplate = this->platform->getSqlTemplate(this->platform);
SymList *list = sqlTemplate->query(sqlTemplate, sb->str, args, NULL, &error, (void *) SymDataService_dataMapper);
SymList *list = sqlTemplate->queryWithUserData(sqlTemplate, sb->str, args, NULL, &error, (void *) SymDataService_dataMapper, this);

args->destroy(args);
sb->destroy(sb);
Expand Down
99 changes: 77 additions & 22 deletions symmetric-client-clib/src/service/TriggerRouterService.c
Expand Up @@ -140,33 +140,91 @@ static SymRouter * SymTriggerRouterService_routerMapper(SymRow *row) {
return router;
}

static SymRouter * SymTriggerRouterService_triggerHistoryMapper(SymRow *row) {
static SymTriggerHistory * SymTriggerRouterService_triggerHistoryMapper(SymRow *row) {

SymTriggerHistory *hist = SymTriggerHistory_new(NULL);

hist->triggerHistoryId = row->getInt(row, "trigger_hist_id");
hist->triggerId = row->getString(row, "trigger_id");
hist->sourceTableName = row->getString(row, "source_table_name");
hist->triggerId = row->getStringNew(row, "trigger_id");
hist->sourceTableName = row->getStringNew(row, "source_table_name");
hist->tableHash = row->getInt(row, "table_hash");
hist->createTime = row->getDate(row, "create_time");
hist->pkColumnNames = row->getString(row, "pk_column_names");
hist->columnNames = row->getString(row, "column_names");
hist->lastTriggerBuildReason = row->getString(row, "last_trigger_build_reason");
hist->nameForDeleteTrigger = row->getString(row, "name_for_delete_trigger");
hist->nameForInsertTrigger = row->getString(row, "name_for_insert_trigger");
hist->nameForUpdateTrigger = row->getString(row, "name_for_update_trigger");
hist->sourceSchemaName = row->getString(row, "source_schema_name");
hist->sourceCatalogName = row->getString(row, "source_catalog_name");
hist->pkColumnNames = row->getStringNew(row, "pk_column_names");
hist->columnNames = row->getStringNew(row, "column_names");
hist->lastTriggerBuildReason = row->getStringNew(row, "last_trigger_build_reason");
hist->nameForDeleteTrigger = row->getStringNew(row, "name_for_delete_trigger");
hist->nameForInsertTrigger = row->getStringNew(row, "name_for_insert_trigger");
hist->nameForUpdateTrigger = row->getStringNew(row, "name_for_update_trigger");
hist->sourceSchemaName = row->getStringNew(row, "source_schema_name");
hist->sourceCatalogName = row->getStringNew(row, "source_catalog_name");
hist->triggerRowHash = row->getLong(row, "trigger_row_hash");
hist->triggerTemplateHash = row->getLong(row, "trigger_template_hash");
hist->errorMessage = row->getString(row, "error_message");
hist->errorMessage = row->getStringNew(row, "error_message");
// if (this.retMap != null) {
// this.retMap.put((long) hist.getTriggerHistoryId(), hist);
// }

return hist;
}

SymTriggerHistory * SymTriggerRouterService_getTriggerHistory(SymTriggerRouterService *this, int histId) {
SymTriggerHistory *history = this->historyMap->getByInt(this->historyMap, histId);
if (history == NULL && histId >= 0) {
SymSqlTemplate *sqlTemplate = this->platform->getSqlTemplate(this->platform);
SymStringArray *args = SymStringArray_new(NULL);
args->addInt(args, histId);
int error;
history = sqlTemplate->queryForObject(sqlTemplate, SYM_SQL_TRIGGER_HIST, args, NULL, &error, (void *) SymTriggerRouterService_triggerHistoryMapper);
this->historyMap->putByInt(this->historyMap, histId, history, sizeof(SymTriggerHistory));
args->destroy(args);
}
return history;
}

SymList * SymTriggerRouterService_getActiveTriggerHistories(SymTriggerRouterService *this) {
SymSqlTemplate *sqlTemplate = this->platform->getSqlTemplate(this->platform);
SymStringBuilder *sb = SymStringBuilder_newWithString(SYM_ALL_TRIGGER_HIST);
sb->append(sb, SYM_ACTIVE_TRIGGER_HIST);
int error;
SymList *histories = sqlTemplate->query(sqlTemplate, sb->str, NULL, NULL, &error, (void *) SymTriggerRouterService_triggerHistoryMapper);
sb->destroy(sb);

SymIterator *iter = histories->iterator(histories);
while (iter->hasNext(iter)) {
SymTriggerHistory *triggerHistory = (SymTriggerHistory *) iter->next(iter);
this->historyMap->putByInt(this->historyMap, triggerHistory->triggerHistoryId, triggerHistory, sizeof(SymTriggerHistory));
}
iter->destroy(iter);
return histories;
}

SymList * SymTriggerRouterService_getActiveTriggerHistoriesByTrigger(SymTriggerRouterService *this, SymTrigger *trigger) {
SymList *active = SymTriggerRouterService_getActiveTriggerHistories(this);
SymList *list = SymList_new(NULL);
SymIterator *iter = active->iterator(active);
while (iter->hasNext(iter)) {
SymTriggerHistory *triggerHistory = (SymTriggerHistory *) iter->next(iter);
if (strcmp(triggerHistory->triggerId, trigger->triggerId) == 0) {
list->add(list, triggerHistory);
}
}
iter->destroy(iter);
active->destroy(active);
return list;
}

SymList * SymTriggerRouterService_getActiveTriggerHistoriesByTableName(SymTriggerRouterService *this, char *tableName) {
SymSqlTemplate *sqlTemplate = this->platform->getSqlTemplate(this->platform);
SymStringArray *args = SymStringArray_new(NULL);
args->add(args, tableName);
SymStringBuilder *sb = SymStringBuilder_newWithString(SYM_ALL_TRIGGER_HIST);
sb->append(sb, SYM_TRIGGER_HIST_BY_SOURCE_TABLE_WHERE);
int error;
SymList *histories = sqlTemplate->query(sqlTemplate, sb->str, args, NULL, &error, (void *) SymTriggerRouterService_triggerHistoryMapper);
args->destroy(args);
sb->destroy(sb);
return histories;
}

SymList * SymTriggerRouterService_getTriggers(SymTriggerRouterService *this, unsigned short replaceTokens) {
SymSqlTemplate *sqlTemplate = this->platform->getSqlTemplate(this->platform);

Expand All @@ -189,14 +247,6 @@ SymTriggerHistory * SymTriggerRouterService_getNewestTriggerHistoryForTrigger(Sy
return 0;
}

SymList * SymTriggerRouterService_getActiveTriggerHistories(SymTriggerRouterService *this) {
SymSqlTemplate *sqlTemplate = this->platform->getSqlTemplate(this->platform);

int error;
SymList* triggerHistories = sqlTemplate->query(sqlTemplate, SYM_SQL_SELECT_TRIGGER_HISTORY, NULL, NULL, &error, (void *) SymTriggerRouterService_triggerHistoryMapper);
return triggerHistories;
}

SymList * SymTriggerRouterService_getRouters(SymTriggerRouterService *this, unsigned short replaceVariables) {
SymSqlTemplate *sqlTemplate = this->platform->getSqlTemplate(this->platform);

Expand Down Expand Up @@ -458,14 +508,19 @@ SymTriggerRouterService * SymTriggerRouterService_new(SymTriggerRouterService *t
this = (SymTriggerRouterService*) calloc(1, sizeof(SymTriggerRouterService));
}

this->historyMap = SymMap_new(NULL, 100);

this->configurationService = configurationService;
this->sequenceService = sequenceService;
this->parameterService = parameterService;
this->symmetricDialect = symmetricDialect;
this->platform = platform;

this->syncTriggers = (void *) &SymTriggerRouterService_syncTriggers;

this->getTriggerHistory = (void *) &SymTriggerRouterService_getTriggerHistory;
this->getActiveTriggerHistories = (void *) &SymTriggerRouterService_getActiveTriggerHistories;
this->getActiveTriggerHistoriesByTrigger = (void *) &SymTriggerRouterService_getActiveTriggerHistoriesByTrigger;
this->getActiveTriggerHistoriesByTableName = (void *) &SymTriggerRouterService_getActiveTriggerHistoriesByTableName;
this->destroy = (void *) &SymTriggerRouterService_destroy;
return this;
}

0 comments on commit 3602151

Please sign in to comment.