Navigation Menu

Skip to content

Commit

Permalink
Merge branch '3.7' into 3.8
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Dec 16, 2015
2 parents 650f583 + e9c3e45 commit 50870b6
Show file tree
Hide file tree
Showing 15 changed files with 93 additions and 55 deletions.
1 change: 1 addition & 0 deletions symmetric-client-clib/inc/io/writer/DataWriter.h
Expand Up @@ -38,6 +38,7 @@ typedef struct SymDataWriter {
void (*endTable)(struct SymDataWriter *this, SymTable *table);
void (*endBatch)(struct SymDataWriter *this, SymBatch *batch);
void (*destroy)(struct SymDataWriter *this);
unsigned short isSyncTriggersNeeded;
} SymDataWriter;

SymDataWriter * SymDataWriter_new(SymDataWriter *this);
Expand Down
7 changes: 6 additions & 1 deletion symmetric-client-clib/inc/io/writer/DefaultDatabaseWriter.h
Expand Up @@ -40,10 +40,15 @@
#include "db/SymDialect.h"
#include "model/IncomingBatch.h"
#include "service/IncomingBatchService.h"
#include "service/ParameterService.h"
#include "common/TableConstants.h"
#include "common/ParameterConstants.h"
#include "common/Constants.h"

typedef struct SymDefaultDatabaseWriter {
SymDataWriter super;
SymIncomingBatchService *incomingBatchService;
SymParameterService *parameterService;
SymDatabasePlatform *platform;
SymDialect *dialect;
SymDatabaseWriterSettings *settings;
Expand All @@ -59,6 +64,6 @@ typedef struct SymDefaultDatabaseWriter {
} SymDefaultDatabaseWriter;

SymDefaultDatabaseWriter * SymDefaultDatabaseWriter_new(SymDefaultDatabaseWriter *this, SymIncomingBatchService *incomingBatchService,
SymDatabasePlatform *platform, SymDialect *dialect, SymDatabaseWriterSettings *settings);
SymParameterService *parameterService, SymDatabasePlatform *platform, SymDialect *dialect, SymDatabaseWriterSettings *settings);

#endif
2 changes: 1 addition & 1 deletion symmetric-client-clib/inc/model/Node.h
Expand Up @@ -23,7 +23,7 @@

#include <stdlib.h>

#define SYM_VERSION "3.7.27.1"
#define SYM_VERSION "3.7.27.3"

typedef enum SymNodeStatus {
SYM_NODE_STATUS_DATA_LOAD_NOT_STARTED,
Expand Down
4 changes: 3 additions & 1 deletion symmetric-client-clib/inc/service/DataLoaderService.h
Expand Up @@ -27,6 +27,7 @@
#include "service/NodeService.h"
#include "service/IncomingBatchService.h"
#include "service/ParameterService.h"
#include "service/TriggerRouterService.h"
#include "transport/TransportManager.h"
#include "transport/IncomingTransport.h"
#include "model/Node.h"
Expand All @@ -44,6 +45,7 @@
typedef struct SymDataLoaderService {
SymParameterService *parameterService;
SymNodeService *nodeService;
SymTriggerRouterService *triggerRouterService;
SymTransportManager *transportManager;
SymTransportManager *fileTransportManager;
SymDatabasePlatform *platform;
Expand All @@ -56,7 +58,7 @@ typedef struct SymDataLoaderService {
void (*destroy)(struct SymDataLoaderService *this);
} SymDataLoaderService;

SymDataLoaderService * SymDataLoaderService_new(SymDataLoaderService *this, SymParameterService *parameterService, SymNodeService *nodeService,
SymDataLoaderService * SymDataLoaderService_new(SymDataLoaderService *this, SymParameterService *parameterService, SymNodeService *nodeService, SymTriggerRouterService *triggerRouterService,
SymTransportManager *transportManager, SymTransportManager *fileTransportManager, SymDatabasePlatform *platform, SymDialect *dialect, SymIncomingBatchService *incomingBatchService);

#endif
3 changes: 3 additions & 0 deletions symmetric-client-clib/inc/service/IncomingBatchService.h
Expand Up @@ -41,6 +41,7 @@ typedef struct SymIncomingBatchService {
int (*updateIncomingBatch)(struct SymIncomingBatchService *this, SymIncomingBatch *incomingBatch);
int (*deleteIncomingBatch)(struct SymIncomingBatchService *this, SymIncomingBatch *incomingBatch);
unsigned short (*isRecordOkBatchesEnabled)(struct SymIncomingBatchService *this);
int (*countIncomingBatchesInError)(struct SymIncomingBatchService *this);
void (*destroy)(struct SymIncomingBatchService *this);
} SymIncomingBatchService;

Expand Down Expand Up @@ -69,4 +70,6 @@ where batch_id = ? and node_id = ?"

#define SYM_SQL_DELETE_INCOMING_BATCH "delete from sym_incoming_batch where batch_id = ? and node_id = ?"

#define SYM_SQL_COUNT_INCOMING_BATCHES_ERRORS "select count(*) from sym_incoming_batch where error_flag = 1"

#endif
4 changes: 2 additions & 2 deletions symmetric-client-clib/src/core/SymEngine.c
Expand Up @@ -186,8 +186,8 @@ SymEngine * SymEngine_new( SymEngine *this, SymProperties *properties) {
this->incomingBatchService = SymIncomingBatchService_new(NULL, this->platform, this->parameterService);
this->outgoingBatchService = SymOutgoingBatchService_new(NULL, this->platform, this->parameterService, this->sequenceService);
this->acknowledgeService = SymAcknowledgeService_new(NULL, this->outgoingBatchService, this->platform);
this->dataLoaderService = SymDataLoaderService_new(NULL, this->parameterService, this->nodeService, this->transportManager, this->offlineTransportManager,
this->platform, this->dialect, this->incomingBatchService);
this->dataLoaderService = SymDataLoaderService_new(NULL, this->parameterService, this->nodeService, this->triggerRouterService,
this->transportManager, this->offlineTransportManager, this->platform, this->dialect, this->incomingBatchService);
this->dataService = SymDataService_new(NULL, this->platform, this->triggerRouterService, this->nodeService, this->dialect, this->outgoingBatchService, this->parameterService);
this->routerService = SymRouterService_new(NULL, this->outgoingBatchService, this->sequenceService, this->dataService, this->nodeService, this->configurationService,
this->parameterService, this->triggerRouterService, this->platform);
Expand Down
14 changes: 13 additions & 1 deletion symmetric-client-clib/src/io/writer/DefaultDatabaseWriter.c
Expand Up @@ -207,6 +207,17 @@ unsigned short SymDefaultDatabaseWriter_write(SymDefaultDatabaseWriter *this, Sy
} else {
this->targetTable = this->sourceTable;
}
} else if (!this->super.isSyncTriggersNeeded) {
unsigned short autoSync = this->parameterService->is(this->parameterService, SYM_PARAMETER_AUTO_SYNC_CONFIGURATION, 1) ||
this->incomingBatch->batchId == SYM_VIRTUAL_BATCH_FOR_REGISTRATION;
if (autoSync && (SymStringUtils_equalsIgnoreCase(this->targetTable->name, SYM_TRIGGER) ||
SymStringUtils_equalsIgnoreCase(this->targetTable->name, SYM_ROUTER) ||
SymStringUtils_equalsIgnoreCase(this->targetTable->name, SYM_TRIGGER_ROUTER) ||
SymStringUtils_equalsIgnoreCase(this->targetTable->name, SYM_TRIGGER_ROUTER_GROUPLET) ||
SymStringUtils_equalsIgnoreCase(this->targetTable->name, SYM_GROUPLET_LINK) ||
SymStringUtils_equalsIgnoreCase(this->targetTable->name, SYM_NODE_GROUP_LINK))) {
this->super.isSyncTriggersNeeded = 1;
}
}
int error = 0;
switch (data->dataEventType) {
Expand Down Expand Up @@ -292,12 +303,13 @@ void SymDefaultDatabaseWriter_destroy(SymDefaultDatabaseWriter *this) {
}

SymDefaultDatabaseWriter * SymDefaultDatabaseWriter_new(SymDefaultDatabaseWriter *this, SymIncomingBatchService *incomingBatchService,
SymDatabasePlatform *platform, SymDialect *dialect, SymDatabaseWriterSettings *settings) {
SymParameterService *parameterService, SymDatabasePlatform *platform, SymDialect *dialect, SymDatabaseWriterSettings *settings) {
if (this == NULL) {
this = (SymDefaultDatabaseWriter *) calloc(1, sizeof(SymDefaultDatabaseWriter));
}
SymDataWriter *super = &this->super;
this->incomingBatchService = incomingBatchService;
this->parameterService = parameterService;
this->platform = platform;
this->dialect = dialect;
this->settings = settings;
Expand Down
3 changes: 2 additions & 1 deletion symmetric-client-clib/src/load/DefaultDataLoaderFactory.c
Expand Up @@ -29,7 +29,8 @@ static SymDatabaseWriterSettings * buildDatabaseWriterSettings(SymDefaultDataLoa

SymDataWriter * SymDefaultDataLoaderFactory_getDataWriter(SymDefaultDataLoaderFactory *this) {
SymDatabaseWriterSettings *settings = buildDatabaseWriterSettings(this);
SymDataWriter *writer = (SymDataWriter *) SymDefaultDatabaseWriter_new(NULL, this->incomingBatchService, this->platform, this->dialect, settings);
SymDataWriter *writer = (SymDataWriter *) SymDefaultDatabaseWriter_new(NULL, this->incomingBatchService, this->parameterService,
this->platform, this->dialect, settings);
return writer;
}

Expand Down
7 changes: 6 additions & 1 deletion symmetric-client-clib/src/service/DataLoaderService.c
Expand Up @@ -50,6 +50,10 @@ static SymList * SymDataLoaderService_loadDataFromTransport(SymDataLoaderService
SymLog_debug("Transport rc = %ld" , rc);

SymList *batchesProcessed = processor->getBatchesProcessed(processor);
if (writer->isSyncTriggersNeeded) {
this->triggerRouterService->syncTriggers(this->triggerRouterService, 0);
}

processor->destroy(processor);
writer->destroy(writer);
return batchesProcessed;
Expand Down Expand Up @@ -150,13 +154,14 @@ void SymDataLoaderService_destroy(SymDataLoaderService *this) {
free(this);
}

SymDataLoaderService * SymDataLoaderService_new(SymDataLoaderService *this, SymParameterService *parameterService, SymNodeService *nodeService,
SymDataLoaderService * SymDataLoaderService_new(SymDataLoaderService *this, SymParameterService *parameterService, SymNodeService *nodeService, SymTriggerRouterService *triggerRouterService,
SymTransportManager *transportManager, SymTransportManager *fileTransportManager, SymDatabasePlatform *platform, SymDialect *dialect, SymIncomingBatchService *incomingBatchService) {
if (this == NULL) {
this = (SymDataLoaderService *) calloc(1, sizeof(SymDataLoaderService));
}
this->parameterService = parameterService;
this->nodeService = nodeService;
this->triggerRouterService = triggerRouterService;
this->transportManager = transportManager;
this->fileTransportManager = fileTransportManager;
this->platform = platform;
Expand Down
88 changes: 44 additions & 44 deletions symmetric-client-clib/src/service/DataService.c
Expand Up @@ -74,55 +74,55 @@ SymList * SymDataService_selectDataFor(SymDataService *this, SymBatch *batch) {
void SymDataService_heartbeat(SymDataService *this, unsigned short force) {
SymNode *me = this->nodeService->findIdentity(this->nodeService);

if (this->parameterService->is(this->parameterService, SYM_PARAMETER_HEARTBEAT_ENABLED, 1)) {
unsigned short updateWithBatchStatus = this->parameterService->is(this->parameterService,
SYM_PARAMETER_HEARTBEAT_UPDATE_NODE_WITH_BATCH_STATUS, 0);

char *syncUrl = this->parameterService->getSyncUrl(this->parameterService);
char *schemaVersion = this->parameterService->getString(this->parameterService, SYM_PARAMETER_SCHEMA_VERSION, "");

int outgoingErrorCount = -1;
int outgoingUnsentCount = -1;
if (updateWithBatchStatus) {
outgoingUnsentCount = this->outgoingBatchService->countOutgoingBatchesUnsent(this->outgoingBatchService);
outgoingErrorCount = this->outgoingBatchService->countOutgoingBatchesInError(this->outgoingBatchService);
}
if (me != NULL) {
if (this->parameterService->is(this->parameterService, SYM_PARAMETER_HEARTBEAT_ENABLED, 1)) {
unsigned short updateWithBatchStatus = this->parameterService->is(this->parameterService,
SYM_PARAMETER_HEARTBEAT_UPDATE_NODE_WITH_BATCH_STATUS, 0);

char *syncUrl = this->parameterService->getSyncUrl(this->parameterService);
char *schemaVersion = this->parameterService->getString(this->parameterService, SYM_PARAMETER_SCHEMA_VERSION, "");

int outgoingErrorCount = -1;
int outgoingUnsentCount = -1;
if (updateWithBatchStatus) {
outgoingUnsentCount = this->outgoingBatchService->countOutgoingBatchesUnsent(this->outgoingBatchService);
outgoingErrorCount = this->outgoingBatchService->countOutgoingBatchesInError(this->outgoingBatchService);
}

if (! SymStringUtils_equals(this->parameterService->getExternalId(this->parameterService), me->externalId)
|| ! SymStringUtils_equals(this->parameterService->getNodeGroupId(this->parameterService), me->nodeGroupId)
|| (syncUrl != NULL && ! SymStringUtils_equals(syncUrl, me->syncUrl))
|| ! SymStringUtils_equals(schemaVersion, me->schemaVersion) // TODO empty string vs. null is causing refresh everytime.
|| ! SymStringUtils_equals(SYM_VERSION, me->symmetricVersion)
|| ! SymStringUtils_equals(this->platform->name, me->databaseType)
|| ! SymStringUtils_equals(this->platform->version, me->databaseVersion)
|| me->batchInErrorCount != outgoingErrorCount
|| me->batchToSendCount != outgoingUnsentCount) {
SymLog_info("Some attribute(s) of node changed. Recording changes");

me->deploymentType = SYM_DEPLOYMENT_TYPE;
me->symmetricVersion = SYM_VERSION;
me->databaseType = this->platform->name;
me->databaseVersion = this->platform->version;
me->batchInErrorCount = outgoingErrorCount;
me->batchToSendCount = outgoingUnsentCount;
me->schemaVersion = schemaVersion;
if (this->parameterService->is(this->parameterService, SYM_PARAMETER_AUTO_UPDATE_NODE_VALUES, 0)) {
SymLog_info("Updating my node configuration info according to the symmetric properties");
me->externalId = this->parameterService->getExternalId(this->parameterService);
me->nodeGroupId = this->parameterService->getNodeGroupId(this->parameterService);
if (SymStringUtils_isNotBlank(this->parameterService->getSyncUrl(this->parameterService))) {
me->syncUrl = this->parameterService->getSyncUrl(this->parameterService);
if (! SymStringUtils_equals(this->parameterService->getExternalId(this->parameterService), me->externalId)
|| ! SymStringUtils_equals(this->parameterService->getNodeGroupId(this->parameterService), me->nodeGroupId)
|| (syncUrl != NULL && ! SymStringUtils_equals(syncUrl, me->syncUrl))
|| ! SymStringUtils_equals(schemaVersion, me->schemaVersion) // TODO empty string vs. null is causing refresh everytime.
|| ! SymStringUtils_equals(SYM_VERSION, me->symmetricVersion)
|| ! SymStringUtils_equals(this->platform->name, me->databaseType)
|| ! SymStringUtils_equals(this->platform->version, me->databaseVersion)
|| me->batchInErrorCount != outgoingErrorCount
|| me->batchToSendCount != outgoingUnsentCount) {
SymLog_info("Some attribute(s) of node changed. Recording changes");

me->deploymentType = SYM_DEPLOYMENT_TYPE;
me->symmetricVersion = SYM_VERSION;
me->databaseType = this->platform->name;
me->databaseVersion = this->platform->version;
me->batchInErrorCount = outgoingErrorCount;
me->batchToSendCount = outgoingUnsentCount;
me->schemaVersion = schemaVersion;
if (this->parameterService->is(this->parameterService, SYM_PARAMETER_AUTO_UPDATE_NODE_VALUES, 0)) {
SymLog_info("Updating my node configuration info according to the symmetric properties");
me->externalId = this->parameterService->getExternalId(this->parameterService);
me->nodeGroupId = this->parameterService->getNodeGroupId(this->parameterService);
if (SymStringUtils_isNotBlank(this->parameterService->getSyncUrl(this->parameterService))) {
me->syncUrl = this->parameterService->getSyncUrl(this->parameterService);
}
}
}

this->nodeService->save(this->nodeService, me);
this->nodeService->save(this->nodeService, me);
}
}

SymLog_debug("Updating my node info");
this->nodeService->updateNodeHostForCurrentNode(this->nodeService);
SymLog_debug("Done updating my node info");
}

SymLog_debug("Updating my node info");
this->nodeService->updateNodeHostForCurrentNode(this->nodeService);
SymLog_debug("Done updating my node info");
}

void SymDataService_insertDataEvents(SymDataService *this, SymSqlTransaction *transaction, SymList *events) {
Expand Down
7 changes: 7 additions & 0 deletions symmetric-client-clib/src/service/IncomingBatchService.c
Expand Up @@ -182,6 +182,12 @@ int SymIncomingBatchService_deleteIncomingBatch(SymIncomingBatchService *this, S
return count;
}

int SymIncomingBatchService_countIncomingBatchesInError(SymIncomingBatchService *this) {
SymSqlTemplate *sqlTemplate = this->platform->getSqlTemplate(this->platform);
int error;
return sqlTemplate->queryForInt(sqlTemplate, SYM_SQL_COUNT_INCOMING_BATCHES_ERRORS, NULL, NULL, &error);
}

void SymIncomingBatchService_destroy(SymIncomingBatchService *this) {
free(this);
}
Expand All @@ -198,6 +204,7 @@ SymIncomingBatchService * SymIncomingBatchService_new(SymIncomingBatchService *t
this->updateIncomingBatch = (void *) &SymIncomingBatchService_updateIncomingBatch;
this->deleteIncomingBatch = (void *) &SymIncomingBatchService_deleteIncomingBatch;
this->isRecordOkBatchesEnabled = (void *) &SymIncomingBatchService_isRecordOkBatchesEnabled;
this->countIncomingBatchesInError = (void *) &SymIncomingBatchService_countIncomingBatchesInError;
this->destroy = (void *) &SymIncomingBatchService_destroy;
return this;
}
Expand Up @@ -34,7 +34,7 @@ static void append(SymStringBuilder *sb, char *name, char *value) {

static char * buildUrl(char *action, SymNode *remote, SymNode *local, char *securityToken, char *registrationUrl) {
SymStringBuilder *sb = SymStringBuilder_new();
if (strcmp(remote->syncUrl, "") == 0) {
if (SymStringUtils_isBlank(remote->syncUrl)) {
sb->append(sb, registrationUrl);
} else {
sb->append(sb, remote->syncUrl);
Expand Down
Expand Up @@ -35,7 +35,7 @@ public OracleTriggerTemplate(ISymmetricDialect symmetricDialect) {
stringColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, $(oracleToClob)'', '\"'||replace(replace($(oracleToClob)$(tableAlias).\"$(columnName)\",'\\','\\\\'),'\"','\\\"')||'\"')" ;
geometryColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then to_clob('') else '\"'||replace(replace(SDO_UTIL.TO_WKTGEOMETRY($(tableAlias).\"$(columnName)\"),'\\','\\\\'),'\"','\\\"')||'\"' end";
numberColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', '\"'||cast($(tableAlias).\"$(columnName)\" as number("+symmetricDialect.getTemplateNumberPrecisionSpec()+"))||'\"')" ;
datetimeColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.FF3')),'\"'))" ;
datetimeColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.FF9')),'\"'))" ;
dateTimeWithTimeZoneColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.FF9 TZH:TZM')),'\"'))" ;
dateTimeWithLocalTimeZoneColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char(cast($(tableAlias).\"$(columnName)\" as timestamp), 'YYYY-MM-DD HH24:MI:SS.FF9')),'\"'))" ;
timeColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS','NLS_CALENDAR=''GREGORIAN''')),'\"'))" ;
Expand Down
Expand Up @@ -807,7 +807,7 @@ pull.lock.timeout.ms=7200000
#
# DatabaseOverridable: true
# Tags: jobs
push.thread.per.server.count=100
push.thread.per.server.count=1

# The amount of time a single push worker node_communication lock will timeout after.
#
Expand Down
Expand Up @@ -317,6 +317,8 @@ protected boolean perform(DataContext context, TransformedData data,
TargetDmlAction targetAction = null;
switch (data.getTargetDmlType()) {
case INSERT:
targetAction = TargetDmlAction.INS_ROW;
break;
case UPDATE:
targetAction = transformation.evaluateTargetDmlAction(context, data);
break;
Expand Down

0 comments on commit 50870b6

Please sign in to comment.