Skip to content

Commit

Permalink
merging
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 15, 2015
2 parents 2318811 + 0196aa3 commit e1ae5bb
Show file tree
Hide file tree
Showing 17 changed files with 375 additions and 760 deletions.
2 changes: 1 addition & 1 deletion symmetric-client-clib-test/src/core/SymEngineTest.c
Expand Up @@ -38,7 +38,7 @@ void SymEngineTest_test1() {

SymEngine *engine = SymEngine_new(NULL, prop);
CU_ASSERT(engine->start(engine) == 0);
//CU_ASSERT(engine->syncTriggers(engine) == 0);
engine->syncTriggers(engine);

engine->pullService->pullData(engine->pullService);

Expand Down
10 changes: 9 additions & 1 deletion symmetric-client-clib-test/src/util/StringUtilsTest.c
Expand Up @@ -53,6 +53,12 @@ void SymStringUtilsTest_test_toLowerCase() {
CU_ASSERT(strcmp(Sym_toLowerCase(""), "") == 0);
}

void SymStringUtilsTest_test_isBlank() {
CU_ASSERT(Sym_isBlank("t") == 0);
CU_ASSERT(Sym_isBlank(" t") == 0);
CU_ASSERT(Sym_isBlank("some string") == 0);
}

int SymStringUtilsTest_CUnit() {
CU_pSuite suite = CU_add_suite("SymStringUtilsTest", NULL, NULL);
if (suite == NULL) {
Expand All @@ -61,7 +67,9 @@ int SymStringUtilsTest_CUnit() {

if (CU_add_test(suite, "SymStringUtilsTest_testTrim", SymStringUtilsTest_testTrim) == NULL ||
CU_add_test(suite, "SymStringUtilsTest_test_toUpperCase", SymStringUtilsTest_test_toUpperCase) == NULL ||
CU_add_test(suite, "SymStringUtilsTest_test_toLowerCase", SymStringUtilsTest_test_toLowerCase) == NULL) {
CU_add_test(suite, "SymStringUtilsTest_test_toLowerCase", SymStringUtilsTest_test_toLowerCase) == NULL ||
CU_add_test(suite, "SymStringUtilsTest_test_isBlank", SymStringUtilsTest_test_isBlank) == NULL ||
1==0) {
return CUE_NOTEST;
}
return CUE_SUCCESS;
Expand Down
3 changes: 2 additions & 1 deletion symmetric-client-clib/inc/core/SymEngine.h
Expand Up @@ -62,11 +62,12 @@ typedef struct SymEngine {
SymIncomingBatchService *incomingBatchService;
SymOutgoingBatchService *outgoingBatchService;
SymConfigurationService *configurationService;
SymSequenceService *sequenceService;

unsigned short (*start)(struct SymEngine *this);
unsigned short (*stop)(struct SymEngine *this);
unsigned short (*uninstall)(struct SymEngine *this);
unsigned short (*syncTriggers)(struct SymEngine *this);
void (*syncTriggers)(struct SymEngine *this);
void (*destroy)(struct SymEngine *this);
} SymEngine;

Expand Down
8 changes: 5 additions & 3 deletions symmetric-client-clib/inc/model/Trigger.h
Expand Up @@ -33,12 +33,13 @@ typedef struct SymTrigger {
char *channelId;
char *reloadChannelId;
unsigned short syncOnUpdate;
unsigned short syncOnInsert;
unsigned short syncOnDelete;
unsigned short syncOnIncomingBatch;
unsigned short useStreamLogs;
unsigned short useCaptureLogs;
unsigned short useStreamLobs;
unsigned short useCaptureLobs;
unsigned short useCaptureOldData;
unsigned short *useHandleKeyUpdates;
unsigned short useHandleKeyUpdates;
char *nameForInsertTrigger;
char *nameForUpdateTrigger;
char *nameForDeleteTrigger;
Expand All @@ -50,6 +51,7 @@ typedef struct SymTrigger {
char *customOnInsertText;
char *customOnDeleteText;
char *excludedColumnNames;
char *syncKeyNames;
/**
* This is a SQL expression that creates a unique id which the sync process
* can use to 'group' events together and commit together.
Expand Down
4 changes: 4 additions & 0 deletions symmetric-client-clib/inc/model/TriggerHistory.h
Expand Up @@ -50,6 +50,10 @@ typedef struct SymTriggerHistory {
long triggerRowHash;
long triggerTemplateHash;
char *lastTriggerBuildReason;

void (*destroy)(struct SymTriggerHistory *this);
} SymTriggerHistory;

SymTriggerHistory * SymTriggerHistory_new(SymTriggerHistory *this);

#endif
3 changes: 3 additions & 0 deletions symmetric-client-clib/inc/service/SequenceService.h
Expand Up @@ -27,6 +27,9 @@ typedef struct SymSequenceService {
void (*init)(struct SymSequenceService *this);
long (*nextVal)(struct SymSequenceService *this, char *name);
long (*currVal)(struct SymSequenceService *this, char *name);
void (*destroy)(struct SymSequenceService *this);
} SymSequenceService;

SymSequenceService * SymSequenceService_new(SymSequenceService *this);

#endif
163 changes: 29 additions & 134 deletions symmetric-client-clib/inc/service/TriggerRouterService.h
Expand Up @@ -36,146 +36,41 @@
#include "model/Router.h"
#include "io/data/DataEventType.h"
#include "db/sql/SqlTemplate.h"


typedef struct SymTriggerSelector {
SymList* triggers;
struct SymList * (*select)(struct SymTriggerSelector *this);
void (*destroy)(struct SymTriggerSelector * this);
} SymTriggerSelector;

SymTriggerSelector * SymTriggerSelector_new(SymTriggerSelector *this, SymList *triggers);


typedef struct SymTriggerRoutersCache {
SymMap* triggerRoutersByTriggerId;
SymMap* routersByRouterId;
void (*destroy)(struct SymTriggerSelector * this);
} SymTriggerRoutersCache;

SymTriggerRoutersCache * SymTriggerRoutersCache_new(SymTriggerRoutersCache *this);

#include "common/ParameterConstants.h"
#include "common/Log.h"
#include "model/TriggerRouter.h"
#include <time.h>
#include "util/StringUtils.h"
#include "service/SequenceService.h"

typedef struct SymTriggerRouterService {
long routersCacheTime;
SymMap* routersCache;

SymList* triggerRoutersCache;
long triggerRouterPerNodeCacheTime;

SymMap* triggersCache;
long triggersCacheTime;
long triggerRoutersCacheTime;

SymMap* triggerRouterCacheByNodeGroupId;

long triggerRouterPerChannelCacheTime;

SymParameterService *parameterService;
SymConfigurationService *configurationService;
SymSequenceService *sequenceService;
SymParameterService *parameterService;
SymDatabasePlatform *platform;

unsigned short (*refreshFromDatabase)(struct SymTriggerRouterService *this);
SymList* (*getTriggers)(struct SymTriggerRouterService *this, unsigned short replaceTokens);
unsigned short (*isTriggerBeingUsed)(struct SymTriggerRouterService *this, char *triggerId);
unsigned short (*doesTriggerExist)(struct SymTriggerRouterService *this, char *triggerId);
unsigned short (*doesTriggerExistForTable)(struct SymTriggerRouterService *this, char *tableName);
void (*deleteTrigger)(struct SymTriggerRouterService *this, SymTrigger *trigger);
void (*dropTriggers)(struct SymTriggerRouterService *this);
void (*dropTriggersForTables)(struct SymTriggerRouterService *this, SymList *tables);
void (*deleteTriggerHistory)(struct SymTriggerRouterService *this, SymTriggerHistory *history);
void (*createTriggersOnChannelForTables)(struct SymTriggerRouterService *this, char *channelId, char *catalogName,
char *schemaName, SymList *tables, char *lastUpdateBy);
SymList* (*createTriggersOnChannelForTablesWithReturn)(struct SymTriggerRouterService *this,
char *channelId, char *catalogName, char *schemaName, SymList *tables, char *lastUpdateBy);
SymList* (*findMatchingTriggers)(struct SymTriggerRouterService *this, SymList *triggers,
char *catalog, char *schema, char *table);
void (*inactivateTriggerHistory)(struct SymTriggerRouterService *this, SymTriggerHistory *history);
SymMap* (*getHistoryRecords)(struct SymTriggerRouterService *this);
unsigned short (*isTriggerNameInUse)(struct SymTriggerRouterService *this, SymList *activeTriggerHistories,
char *triggerId, char *triggerName);
SymTriggerHistory* (*findTriggerHistory)(struct SymTriggerRouterService *this,
char *catalogName, char *schemaName, char *tableName);
SymList* (*findTriggerHistories)(struct SymTriggerRouterService *this,
char *catalogName, char *schemaName, char *tableName);
SymTriggerHistory* (*getTriggerHistory)(struct SymTriggerRouterService *this, int histId);
SymList* (*getActiveTriggerHistoriesForTrigger)(struct SymTriggerRouterService *this, SymTrigger *trigger);
SymTriggerHistory* (*getNewestTriggerHistoryForTrigger)(struct SymTriggerRouterService *this,
char *triggerId, char *catalogName, char *schemaName, char *tableName);
SymList* (*getActiveTriggerHistoriesFromCache)(struct SymTriggerRouterService *this);
SymList* (*getActiveTriggerHistories)(struct SymTriggerRouterService *this);
SymList * (*getActiveTriggerHistoriesForTable)(struct SymTriggerRouterService *this, char *tableName);
SymList* (*buildTriggersForSymmetricTables)(struct SymTriggerRouterService *this, SymStringArray *tablesToExclude);
SymTrigger* (*buildTriggerForSymmetricTable)(struct SymTriggerRouterService *this, char *tableName);
SymList* (*buildTriggerRoutersForSymmetricTables)(struct SymTriggerRouterService *this,
char *version, SymNodeGroupLink *nodeGroupLink, SymStringArray *tablesToExclude);
char* (*buildSymmetricTableRouterId)(struct SymTriggerRouterService *this,
char *triggerId, char *sourceNodeGroupdId, char *targetNodeGroupId);
SymTriggerRouter * (*buildTriggerRoutersForSymmetricTablesWithNodeGroupLink)(struct SymTriggerRouterService *this,
char *version, SymTrigger *trigger, SymNodeGroupLink *nodeGroupLink);
SymList * (*getTriggerRouterForTableForCurrentNode)(struct SymTriggerRouterService *this, SymNodeGroupLink *link, char *catalogName, char *schemaName, char *tableName, unsigned short refreshCache);
unsigned short (*isMatch)(struct SymTriggerRouterService *this, SymNodeGroupLink *link, SymTriggerRouter *router);
unsigned short (*isMatchTableName)(struct SymTriggerRouterService *this, char *catalogName, char *schemaName, char *tableName, SymTrigger *trigger);
SymList * (*getConfigurationTablesTriggerRoutersForCurrentNode)(struct SymTriggerRouterService *this, char *sourceNodeGroupId);
void (*mergeInConfigurationTablesTriggerRoutersForCurrentNode)(struct SymTriggerRouterService *this, char *sourceNodeGroupId, SymList *configuredInDatabase);
unsigned short (*doesTriggerRouterExistInList)(struct SymTriggerRouterService *this, SymList *triggerRouters, SymTriggerRouter *triggerRouter);
SymTriggerRouter * (*getTriggerRouterForCurrentNode)(struct SymTriggerRouterService *this, char *triggerId, char *routerId, unsigned short refreshCache);
SymMap * (*getTriggerRoutersForCurrentNode)(struct SymTriggerRouterService *this, unsigned short refreshCache);
SymList * (*getTriggersForCurrentNode)(struct SymTriggerRouterService *this, unsigned short refreshCache);
SymTriggerRoutersCache * (*getTriggerRoutersCacheForCurrentNode)(struct SymTriggerRouterService *this, unsigned short refreshCache);
SymRouter * (*getActiveRouterByIdForCurrentNode)(struct SymTriggerRouterService *this, char *routerId, unsigned short refreshCache);
SymList * (*getRoutersByGroupLink)(struct SymTriggerRouterService *this, SymNodeGroupLink *link);
SymTrigger * (*getTriggerForCurrentNodeById)(struct SymTriggerRouterService *this, char *triggerId);
SymTrigger * (*getTriggerById)(struct SymTriggerRouterService *this, char *triggerId, unsigned short refreshCache);
SymRouter * (*getRouterById)(struct SymTriggerRouterService *this, char *routerId, unsigned short refreshCache);
SymList * (*getRouters)(struct SymTriggerRouterService *this, unsigned short replaceVariables);
char * (*getTriggerRouterSql)(struct SymTriggerRouterService *this, char *sql);
SymList * (*getTriggerRouters)(struct SymTriggerRouterService *this, unsigned short refreshCache);
SymList * (*getAllTriggerRoutersForCurrentNode)(struct SymTriggerRouterService *this, char *sourceNodeGroupId);
SymList * (*getAllTriggerRoutersForReloadForCurrentNode)(struct SymTriggerRouterService *this, char *sourceNodeGroupId, char *targetNodeGroupId);
SymTriggerRouter * (*findTriggerRouterById)(struct SymTriggerRouterService *this, char *triggerId, char *routerId);
SymList * (*enhanceTriggerRouters)(struct SymTriggerRouterService *this, SymList *triggerRouters);
SymMap * (*getTriggerRoutersByChannel)(struct SymTriggerRouterService *this, char *nodeGroupId, unsigned short refreshCache);
void (*insert)(struct SymTriggerRouterService *this, SymTriggerHistory *newHistRecord);
void (*deleteTriggerRouterWithId)(struct SymTriggerRouterService *this, char *triggerId, char *routerId);
void (*deleteTriggerRouter)(struct SymTriggerRouterService *this, SymTriggerRouter *triggerRouter);
void (*saveTriggerRouter)(struct SymTriggerRouterService *this, SymTriggerRouter *triggerRouter, unsigned short updateTriggerRouterTableOnly);
void (*resetTriggerRouterCacheByNodeGroupId)(struct SymTriggerRouterService *this);
void (*saveRouter)(struct SymTriggerRouterService *this, SymRouter *router);
unsigned short (*isRouterBeingUsed)(struct SymTriggerRouterService *this, char *routerId);
void (*deleteRouter)(struct SymTriggerRouterService *this, SymRouter *router);
void (*saveTrigger)(struct SymTriggerRouterService *this, SymTrigger *trigger);
void (*clearCache)(struct SymTriggerRouterService *this);
SymList * (*getTriggerIdsFrom)(struct SymTriggerRouterService *this, SymList *triggersThatShouldBeActive);
SymTrigger * (*getTriggerFromList)(struct SymTriggerRouterService *this, char *triggerId, SymList *triggersThatShouldBeActive);
void (*inactivateTriggers)(struct SymTriggerRouterService *this, SymList *triggersThatShouldBeActive, SymStringBuilder *sqlBuffer, SymList *activeTriggerHistories);
unsigned short (*isEqual)(struct SymTriggerRouterService *this, char *one, char *two, unsigned short ignoreCase);
void (*dropTriggersForTriggerHistory)(struct SymTriggerRouterService *this, SymTriggerHistory *history, SymStringBuilder *sqlBuffer);
SymList * (*toList)(struct SymTriggerRouterService *this, SymList *source);
SymList * (*getTablesForTrigger)(struct SymTriggerRouterService *this, SymTrigger *trigger, SymList *triggers, unsigned short useTableCache);
unsigned short (*containsExactMatchForSourceTableName)(struct SymTriggerRouterService *this, SymTable *table, SymList *triggers, unsigned short ignoreCase);
void (*updateOrCreateDatabaseTriggers)(struct SymTriggerRouterService *this, SymList *triggers, SymStringBuilder *sqlBuffer, unsigned short force, unsigned short verifyInDatabase, SymList *activeTriggerHistories, unsigned short useTableCache);
void (*updateOrCreateDatabaseTrigger)(struct SymTriggerRouterService *this, SymTrigger *trigger, SymList *triggers, SymStringBuilder *sqlBuffer, unsigned short force, unsigned short verifyInDatabase, SymList *activeTriggerHistories, unsigned short useTableCache);
void (*syncTrigger)(struct SymTriggerRouterService *this, SymTrigger *trigger, unsigned short force, unsigned short verifyInDatabase);
void (*updateOrCreateDatabaseTriggersForTable)(struct SymTriggerRouterService *this, SymTrigger *trigger, SymTable *table, SymStringBuilder *sqlBuffer, unsigned short force, unsigned short verifyInDatabase, SymList *activeTriggerHistories);
SymTriggerHistory * (*rebuildTriggerIfNecessary)(struct SymTriggerRouterService *this, SymStringBuilder *sqlBuffer, unsigned short forceRebuild, SymTrigger *trigger, SymDataEventType *dmlType, char *reason, SymTriggerHistory *oldhist, SymTriggerHistory *hist, unsigned short triggerIsActive, SymTable *table, SymList *activeTriggerHistories);
char * (*replaceCharsToShortenName)(struct SymTriggerRouterService *this, char *triggerName);
char * (*getTriggerName)(struct SymTriggerRouterService *this, SymDataEventType *dml, int maxTriggerNameLength, SymTrigger *trigger, SymTable *table, SymList *activeTriggerHistories);
SymTriggerHistory * (*mapRow)(struct SymTriggerRouterService *this, SymRow *rs);
SymNodeGroupLink * (*getNodeGroupLink)(struct SymTriggerRouterService *this, char *sourceNodeGroupId, char *targetNodeGroupId);
// SymRouter * (*mapRow)(struct SymTriggerRouterService *this, SymRow *rs);
// SymTrigger * (*mapRow)(struct SymTriggerRouterService *this, SymRow *rs);
// SymTriggerRouter * (*mapRow)(struct SymTriggerRouterService *this, SymRow *rs);
void (*addExtraConfigTable)(struct SymTriggerRouterService *this, char *table);
SymMap * (*getFailedTriggers)(struct SymTriggerRouterService *this);
SymTriggerHistory * (*findTriggerHistoryForGenericSync)(struct SymTriggerRouterService *this);
SymMap * (*fillTriggerRoutersByHistIdAndSortHist)(struct SymTriggerRouterService *this, char *sourceNodeGroupId, char *targetNodeGroupId, SymList *triggerHistories);
SymList * (*getSortedTablesFor)(struct SymTriggerRouterService *this, SymList *histories);
int (*syncTriggers)(struct SymTriggerRouterService *this, SymStringBuilder *sqlBuffer, unsigned short force); //
int (*syncTriggersWithTable)(struct SymTriggerRouterService *this, SymTable *table, unsigned short force); //
void (*destroy)(struct SymTriggerRouterService *this); //
void (*syncTriggers)(struct SymTriggerRouterService *this, unsigned short force);
void (*destroy)(struct SymTriggerRouterService *this);
} SymTriggerRouterService;

SymTriggerRouterService * SymTriggerRouterService_new(SymTriggerRouterService * this, SymParameterService *parameterService, SymDatabasePlatform *platform, SymConfigurationService *configurationService);
SymTriggerRouterService * SymTriggerRouterService_new(SymTriggerRouterService *this,
SymConfigurationService *configurationService, SymSequenceService *sequenceService,
SymParameterService *parameterService, SymDatabasePlatform *platform);

#define SYM_SQL_SELECT_TRIGGER_ROUTERS "select tr.trigger_id, tr.router_id, tr.create_time, tr.last_update_time, tr.last_update_by, tr.initial_load_order, tr.initial_load_select, tr.initial_load_delete_stmt, tr.initial_load_batch_count, tr.ping_back_enabled, tr.enabled \
from sym_trigger_router tr \
inner join sym_trigger t on tr.trigger_id=t.trigger_id \
inner join sym_router r on tr.router_id=r.router_id "

#define SYM_SQL_SELECT_TRIGGERS "select t.trigger_id,t.channel_id,t.reload_channel_id,t.source_table_name,t.source_schema_name,t.source_catalog_name, \
t.sync_on_insert,t.sync_on_update,t.sync_on_delete,t.sync_on_incoming_batch,t.use_stream_lobs, \
t.use_capture_lobs,t.use_capture_old_data,t.use_handle_key_updates, \
t.excluded_column_names, t.sync_key_names, \
t.name_for_delete_trigger,t.name_for_insert_trigger,t.name_for_update_trigger, \
t.sync_on_insert_condition,t.sync_on_update_condition,t.sync_on_delete_condition, \
t.custom_on_insert_text,t.custom_on_update_text,t.custom_on_delete_text, \
t.tx_id_expression,t.external_select,t.channel_expression,t.create_time as t_create_time, \
t.last_update_time as t_last_update_time, t.last_update_by as t_last_update_by \
from sym_trigger t order by trigger_id asc "

#endif
2 changes: 2 additions & 0 deletions symmetric-client-clib/inc/util/StringUtils.h
Expand Up @@ -26,5 +26,7 @@
char *Sym_trim(char *str);
char *Sym_toUpperCase(char *str);
char *Sym_toLowerCase(char *str);
unsigned short Sym_isBlank(char *str);
unsigned short Sym_isNotBlank(char *str);

#endif /* INC_UTIL_STRINGUTILS_H_ */
11 changes: 7 additions & 4 deletions symmetric-client-clib/src/core/SymEngine.c
Expand Up @@ -62,7 +62,7 @@ unsigned short SymEngine_start(SymEngine *this) {
SymLog_info("Starting registered node [group=%s, id=%s, externalId=%s]", node->nodeGroupId, node->nodeId, node->externalId);

if (this->parameterService->is(this->parameterService, AUTO_SYNC_TRIGGERS_AT_STARTUP, 1)) {
this->triggerRouterService->syncTriggers(this->triggerRouterService, NULL, 0);
this->triggerRouterService->syncTriggers(this->triggerRouterService, 0);
}
else {
SymLog_info("%s is turned off.", AUTO_SYNC_TRIGGERS_AT_STARTUP);
Expand Down Expand Up @@ -96,8 +96,8 @@ unsigned short SymEngine_stop(SymEngine *this) {
return 0;
}

unsigned short SymEngine_syncTriggers(SymEngine *this) {
return this->triggerRouterService->syncTriggers(this->triggerRouterService, NULL, 0);
void SymEngine_syncTriggers(SymEngine *this) {
this->triggerRouterService->syncTriggers(this->triggerRouterService, 0);
}

unsigned short SymEngine_uninstall(SymEngine *this) {
Expand Down Expand Up @@ -129,10 +129,13 @@ SymEngine * SymEngine_new(SymEngine *this, SymProperties *properties) {
this->platform = SymDatabasePlatformFactory_create(properties);
this->dialect = SymDialectFactory_create(this->platform);

this->sequenceService = SymSequenceService_new(NULL);

this->configurationService = SymConfigurationService_new(NULL, this->platform);

this->parameterService = SymParameterService_new(NULL, properties);
this->triggerRouterService = SymTriggerRouterService_new(NULL, this->parameterService, this->platform, this->configurationService);
this->triggerRouterService = SymTriggerRouterService_new(NULL, this->configurationService,
this->sequenceService, this->parameterService, this->platform);
this->transportManager = SymTransportManagerFactory_create(SYM_PROTOCOL_HTTP, this->parameterService);
this->nodeService = SymNodeService_new(NULL, this->platform);
this->incomingBatchService = SymIncomingBatchService_new(NULL, this->platform, this->parameterService);
Expand Down

0 comments on commit e1ae5bb

Please sign in to comment.