From dc9e2ce50b01631172c2f1b2b51120cd9d723339 Mon Sep 17 00:00:00 2001 From: mmichalek Date: Thu, 29 Oct 2015 10:56:07 -0400 Subject: [PATCH 1/7] Support LOB in SyncTriggers. --- symmetric-client-clib/src/core/JobManager.c | 76 +++++++++---------- .../platform/sqlite/SqliteTriggerTemplate.c | 10 ++- 2 files changed, 47 insertions(+), 39 deletions(-) diff --git a/symmetric-client-clib/src/core/JobManager.c b/symmetric-client-clib/src/core/JobManager.c index 5caa83d944..3d3845eb7c 100644 --- a/symmetric-client-clib/src/core/JobManager.c +++ b/symmetric-client-clib/src/core/JobManager.c @@ -47,44 +47,44 @@ void SymJobManager_invoke(SymJobManager *this) { this->engine->syncTriggers(this->engine); time(&this->lastSyncTriggersTime); } - if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_ROUTE_JOB, SYM_PARAMETER_ROUTE_PERIOD_MS, this->lastRouteTime)) { - SymLog_info("ROUTE ============================)"); - this->engine->route(this->engine); - time(&this->lastRouteTime); - } - if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_PUSH_JOB, SYM_PARAMETER_PUSH_PERIOD_MS, this->lastPushTime)) { - SymRemoteNodeStatuses *pushStatus = this->engine->push(this->engine); - if (pushStatus->wasBatchProcessed(pushStatus) - || pushStatus->wasDataProcessed(pushStatus)) { - // Only run heartbeat after a successful push to avoid queueing up lots of offline heartbeats. - if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_HEARTBEAT_JOB, SYM_PARAMETER_HEARTBEAT_JOB_PERIOD_MS, this->lastHeartbeatTime)) { - SymLog_info("HEARTBEAT ============================)"); - this->engine->heartbeat(this->engine, 0); - time(&this->lastHeartbeatTime); - } - } - time(&this->lastPushTime); - } - if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_PULL_JOB, SYM_PARAMETER_PULL_PERIOD_MS, this->lastPullTime)) { - SymLog_info("PULL ============================)"); - this->engine->pull(this->engine); - time(&this->lastPullTime); - } - if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_PURGE_JOB, SYM_PARAMETER_PURGE_PERIOD_MS, this->lastPurgeTime)) { - SymLog_info("PURGE ============================)"); - this->engine->purge(this->engine); - time(&this->lastPurgeTime); - } - if (1) { // TODO. - SymLog_info("OFFLINE PUSH ============================)"); - this->engine->offlinePushService->pushData(this->engine->offlinePushService); - //time(&this->lastPurgeTime); - } - if (1) { // TODO. - SymLog_info("OFFLINE PULL ============================)"); - this->engine->offlinePullService->pullData(this->engine->offlinePullService); - //time(&this->lastPurgeTime); - } +// if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_ROUTE_JOB, SYM_PARAMETER_ROUTE_PERIOD_MS, this->lastRouteTime)) { +// SymLog_info("ROUTE ============================)"); +// this->engine->route(this->engine); +// time(&this->lastRouteTime); +// } +// if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_PUSH_JOB, SYM_PARAMETER_PUSH_PERIOD_MS, this->lastPushTime)) { +// SymRemoteNodeStatuses *pushStatus = this->engine->push(this->engine); +// if (pushStatus->wasBatchProcessed(pushStatus) +// || pushStatus->wasDataProcessed(pushStatus)) { +// // Only run heartbeat after a successful push to avoid queueing up lots of offline heartbeats. +// if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_HEARTBEAT_JOB, SYM_PARAMETER_HEARTBEAT_JOB_PERIOD_MS, this->lastHeartbeatTime)) { +// SymLog_info("HEARTBEAT ============================)"); +// this->engine->heartbeat(this->engine, 0); +// time(&this->lastHeartbeatTime); +// } +// } +// time(&this->lastPushTime); +// } +// if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_PULL_JOB, SYM_PARAMETER_PULL_PERIOD_MS, this->lastPullTime)) { +// SymLog_info("PULL ============================)"); +// this->engine->pull(this->engine); +// time(&this->lastPullTime); +// } +// if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_PURGE_JOB, SYM_PARAMETER_PURGE_PERIOD_MS, this->lastPurgeTime)) { +// SymLog_info("PURGE ============================)"); +// this->engine->purge(this->engine); +// time(&this->lastPurgeTime); +// } +// if (1) { // TODO. +// SymLog_info("OFFLINE PUSH ============================)"); +// this->engine->offlinePushService->pushData(this->engine->offlinePushService); +// //time(&this->lastPurgeTime); +// } +// if (1) { // TODO. +// SymLog_info("OFFLINE PULL ============================)"); +// this->engine->offlinePullService->pullData(this->engine->offlinePullService); +// //time(&this->lastPurgeTime); +// } } void SymJobManager_startJobs(SymJobManager *this) { diff --git a/symmetric-client-clib/src/db/platform/sqlite/SqliteTriggerTemplate.c b/symmetric-client-clib/src/db/platform/sqlite/SqliteTriggerTemplate.c index 43ad0397c5..631751e288 100644 --- a/symmetric-client-clib/src/db/platform/sqlite/SqliteTriggerTemplate.c +++ b/symmetric-client-clib/src/db/platform/sqlite/SqliteTriggerTemplate.c @@ -31,23 +31,31 @@ char * SymSqliteTriggerTemplate_fillOutColumnTemplate(SymSqliteTriggerTemplate * char *templateToUse; switch (column->sqlType) { + case SYM_SQL_TYPE_BOOLEAN: case SYM_SQL_TYPE_TINYINT: case SYM_SQL_TYPE_SMALLINT: case SYM_SQL_TYPE_INTEGER: case SYM_SQL_TYPE_BIGINT: case SYM_SQL_TYPE_FLOAT: - case SYM_SQL_TYPE_REAL: case SYM_SQL_TYPE_DOUBLE: case SYM_SQL_TYPE_NUMERIC: case SYM_SQL_TYPE_DECIMAL: + case SYM_SQL_TYPE_REAL: templateToUse = "case when %s.%s is null then '' else ('\"' || cast(%s.%s as varchar) || '\"') end"; break; case SYM_SQL_TYPE_CHAR: case SYM_SQL_TYPE_NCHAR: case SYM_SQL_TYPE_VARCHAR: case SYM_SQL_TYPE_NVARCHAR: + case SYM_SQL_TYPE_CLOB: + case SYM_SQL_TYPE_LONGVARCHAR: + case SYM_SQL_TYPE_LONGNVARCHAR: templateToUse = "case when %s.%s is null then '' else '\"' || replace(replace(%s.%s,'\\','\\\\'),'\"','\\\"') || '\"' end"; break; + case SYM_SQL_TYPE_BLOB: + case SYM_SQL_TYPE_LONGVARBINARY: + templateToUse = "case when %s.%s is null then '' else '\"' || replace(replace(hex(%s.%s),'\\','\\\\'),'\"','\\\"') || '\"' end "; + break; default: templateToUse = NULL; SymLog_error("Unknown sqlType %d", column->sqlType); From d759c0e330ec4dff18bbc2b64bab6982a7795502 Mon Sep 17 00:00:00 2001 From: mmichalek Date: Thu, 29 Oct 2015 13:40:41 -0400 Subject: [PATCH 2/7] syncTriggers support for Date and boolean. --- symmetric-client-clib/src/core/JobManager.c | 76 +++++++++---------- .../src/db/platform/sqlite/SqliteDdlReader.c | 4 + .../platform/sqlite/SqliteTriggerTemplate.c | 13 +++- 3 files changed, 51 insertions(+), 42 deletions(-) diff --git a/symmetric-client-clib/src/core/JobManager.c b/symmetric-client-clib/src/core/JobManager.c index 3d3845eb7c..5caa83d944 100644 --- a/symmetric-client-clib/src/core/JobManager.c +++ b/symmetric-client-clib/src/core/JobManager.c @@ -47,44 +47,44 @@ void SymJobManager_invoke(SymJobManager *this) { this->engine->syncTriggers(this->engine); time(&this->lastSyncTriggersTime); } -// if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_ROUTE_JOB, SYM_PARAMETER_ROUTE_PERIOD_MS, this->lastRouteTime)) { -// SymLog_info("ROUTE ============================)"); -// this->engine->route(this->engine); -// time(&this->lastRouteTime); -// } -// if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_PUSH_JOB, SYM_PARAMETER_PUSH_PERIOD_MS, this->lastPushTime)) { -// SymRemoteNodeStatuses *pushStatus = this->engine->push(this->engine); -// if (pushStatus->wasBatchProcessed(pushStatus) -// || pushStatus->wasDataProcessed(pushStatus)) { -// // Only run heartbeat after a successful push to avoid queueing up lots of offline heartbeats. -// if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_HEARTBEAT_JOB, SYM_PARAMETER_HEARTBEAT_JOB_PERIOD_MS, this->lastHeartbeatTime)) { -// SymLog_info("HEARTBEAT ============================)"); -// this->engine->heartbeat(this->engine, 0); -// time(&this->lastHeartbeatTime); -// } -// } -// time(&this->lastPushTime); -// } -// if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_PULL_JOB, SYM_PARAMETER_PULL_PERIOD_MS, this->lastPullTime)) { -// SymLog_info("PULL ============================)"); -// this->engine->pull(this->engine); -// time(&this->lastPullTime); -// } -// if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_PURGE_JOB, SYM_PARAMETER_PURGE_PERIOD_MS, this->lastPurgeTime)) { -// SymLog_info("PURGE ============================)"); -// this->engine->purge(this->engine); -// time(&this->lastPurgeTime); -// } -// if (1) { // TODO. -// SymLog_info("OFFLINE PUSH ============================)"); -// this->engine->offlinePushService->pushData(this->engine->offlinePushService); -// //time(&this->lastPurgeTime); -// } -// if (1) { // TODO. -// SymLog_info("OFFLINE PULL ============================)"); -// this->engine->offlinePullService->pullData(this->engine->offlinePullService); -// //time(&this->lastPurgeTime); -// } + if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_ROUTE_JOB, SYM_PARAMETER_ROUTE_PERIOD_MS, this->lastRouteTime)) { + SymLog_info("ROUTE ============================)"); + this->engine->route(this->engine); + time(&this->lastRouteTime); + } + if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_PUSH_JOB, SYM_PARAMETER_PUSH_PERIOD_MS, this->lastPushTime)) { + SymRemoteNodeStatuses *pushStatus = this->engine->push(this->engine); + if (pushStatus->wasBatchProcessed(pushStatus) + || pushStatus->wasDataProcessed(pushStatus)) { + // Only run heartbeat after a successful push to avoid queueing up lots of offline heartbeats. + if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_HEARTBEAT_JOB, SYM_PARAMETER_HEARTBEAT_JOB_PERIOD_MS, this->lastHeartbeatTime)) { + SymLog_info("HEARTBEAT ============================)"); + this->engine->heartbeat(this->engine, 0); + time(&this->lastHeartbeatTime); + } + } + time(&this->lastPushTime); + } + if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_PULL_JOB, SYM_PARAMETER_PULL_PERIOD_MS, this->lastPullTime)) { + SymLog_info("PULL ============================)"); + this->engine->pull(this->engine); + time(&this->lastPullTime); + } + if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_PURGE_JOB, SYM_PARAMETER_PURGE_PERIOD_MS, this->lastPurgeTime)) { + SymLog_info("PURGE ============================)"); + this->engine->purge(this->engine); + time(&this->lastPurgeTime); + } + if (1) { // TODO. + SymLog_info("OFFLINE PUSH ============================)"); + this->engine->offlinePushService->pushData(this->engine->offlinePushService); + //time(&this->lastPurgeTime); + } + if (1) { // TODO. + SymLog_info("OFFLINE PULL ============================)"); + this->engine->offlinePullService->pullData(this->engine->offlinePullService); + //time(&this->lastPurgeTime); + } } void SymJobManager_startJobs(SymJobManager *this) { diff --git a/symmetric-client-clib/src/db/platform/sqlite/SqliteDdlReader.c b/symmetric-client-clib/src/db/platform/sqlite/SqliteDdlReader.c index 96babcfad5..cac907f897 100644 --- a/symmetric-client-clib/src/db/platform/sqlite/SqliteDdlReader.c +++ b/symmetric-client-clib/src/db/platform/sqlite/SqliteDdlReader.c @@ -47,6 +47,10 @@ static int SymSqliteDdlReader_toSqlType(char *type) { sqlType = SYM_SQL_TYPE_TIMESTAMP; } else if (strncasecmp(type, "TIME", 4) == 0) { sqlType = SYM_SQL_TYPE_TIME; + } else if (strncasecmp(type, "BOOLEAN", 7) == 0) { + sqlType = SYM_SQL_TYPE_BOOLEAN; + } else if (strncasecmp(type, "BIT", 3) == 0) { + sqlType = SYM_SQL_TYPE_BIT; } else { sqlType = SYM_SQL_TYPE_VARCHAR; } diff --git a/symmetric-client-clib/src/db/platform/sqlite/SqliteTriggerTemplate.c b/symmetric-client-clib/src/db/platform/sqlite/SqliteTriggerTemplate.c index 631751e288..83bb278caf 100644 --- a/symmetric-client-clib/src/db/platform/sqlite/SqliteTriggerTemplate.c +++ b/symmetric-client-clib/src/db/platform/sqlite/SqliteTriggerTemplate.c @@ -25,13 +25,13 @@ char * SymSqliteTriggerTemplate_fillOutColumnTemplate(SymSqliteTriggerTemplate * char *columnPrefix, SymColumn *column, SymDataEventType dml, unsigned short isOld, SymChannel *channel, SymTrigger *trigger) { - // TODO: handle LOBs - //unsigned short isLob = 0; - char *templateToUse; switch (column->sqlType) { + case SYM_SQL_TYPE_BIT: case SYM_SQL_TYPE_BOOLEAN: + templateToUse = "case when %s.%s is null then '' when %s.%s = 1 then '\"1\"' else '\"0\"' end"; + break; case SYM_SQL_TYPE_TINYINT: case SYM_SQL_TYPE_SMALLINT: case SYM_SQL_TYPE_INTEGER: @@ -56,13 +56,18 @@ char * SymSqliteTriggerTemplate_fillOutColumnTemplate(SymSqliteTriggerTemplate * case SYM_SQL_TYPE_LONGVARBINARY: templateToUse = "case when %s.%s is null then '' else '\"' || replace(replace(hex(%s.%s),'\\','\\\\'),'\"','\\\"') || '\"' end "; break; + case SYM_SQL_TYPE_DATE: + case SYM_SQL_TYPE_TIMESTAMP: + case SYM_SQL_TYPE_TIME: + templateToUse = "case when strftime('%%Y-%%m-%%d %%H:%%M:%%f',%s.%s) is null then '' else ('\"' || strftime('%%Y-%%m-%%d %%H:%%M:%%f', %s.%s) || '\"') end"; + break; default: templateToUse = NULL; SymLog_error("Unknown sqlType %d", column->sqlType); break; } - char* columnName = column->name; // TODO + char* columnName = column->name; char* formattedColumnText = SymStringUtils_format(templateToUse, tableAlias, columnName, tableAlias, columnName); From aca9e8c3869eba335b195b8d90a23bb8a82bad9a Mon Sep 17 00:00:00 2001 From: mmichalek Date: Thu, 29 Oct 2015 14:50:29 -0400 Subject: [PATCH 3/7] Support parameterized logging. --- symmetric-client-clib/inc/common/Log.h | 9 ++ symmetric-client-clib/inc/util/Properties.h | 1 - symmetric-client-clib/src/common/Log.c | 86 ++++++++++++++++--- symmetric-client-clib/src/core/SymEngine.c | 2 + symmetric-client-clib/src/util/Properties.c | 5 +- .../run/symmetric.properties | 17 +++- symmetric-client-native/src/SymClientNative.c | 2 + 7 files changed, 103 insertions(+), 19 deletions(-) diff --git a/symmetric-client-clib/inc/common/Log.h b/symmetric-client-clib/inc/common/Log.h index 271582e15f..07eb003b6a 100644 --- a/symmetric-client-clib/inc/common/Log.h +++ b/symmetric-client-clib/inc/common/Log.h @@ -26,10 +26,18 @@ #include #include #include "util/StringBuilder.h" +#include "util/Properties.h" #include "util/Date.h" +#include "util/StringUtils.h" typedef enum {SYM_LOG_LEVEL_DEBUG, SYM_LOG_LEVEL_INFO, SYM_LOG_LEVEL_WARN, SYM_LOG_LEVEL_ERROR} SymLogLevel; +#define SYM_LOG_DESTINATION_CONSOLE "console" + +#define SYM_LOG_SETTINGS_LOG_LEVEL "client.log.level" +#define SYM_LOG_SETTINGS_LOG_DESTINATION "client.log.destination" +#define SYM_LOG_SETTINGS_LOG_SHOW_SOURCE_FILE "client.log.show.source.file" + #define SYM_LOG_LEVEL_DESC_DEBUG "DEBUG" #define SYM_LOG_LEVEL_DESC_INFO "INFO" #define SYM_LOG_LEVEL_DESC_WARN "WARN" @@ -42,5 +50,6 @@ typedef enum {SYM_LOG_LEVEL_DEBUG, SYM_LOG_LEVEL_INFO, SYM_LOG_LEVEL_WARN, SYM_L #define SymLog_error(M, ...) SymLog_log(3, __func__, __FILE__, __LINE__, M, ##__VA_ARGS__) void SymLog_log(SymLogLevel logLevel, const char *functionName, const char *filename, int lineNumber, const char* message, ...); +void SymLog_configure(SymProperties *settings); #endif diff --git a/symmetric-client-clib/inc/util/Properties.h b/symmetric-client-clib/inc/util/Properties.h index cec08a3ec5..4f73547a8f 100644 --- a/symmetric-client-clib/inc/util/Properties.h +++ b/symmetric-client-clib/inc/util/Properties.h @@ -23,7 +23,6 @@ #include #include -#include "common/Log.h" #include "util/StringUtils.h" #include "util/StringBuilder.h" #include "util/StringArray.h" diff --git a/symmetric-client-clib/src/common/Log.c b/symmetric-client-clib/src/common/Log.c index be4cc1c7eb..e17252e16f 100644 --- a/symmetric-client-clib/src/common/Log.c +++ b/symmetric-client-clib/src/common/Log.c @@ -20,7 +20,11 @@ */ #include "common/Log.h" -static char* logLevelDescription(SymLogLevel logLevel) { +static int SymLog_logLevel = SYM_LOG_LEVEL_DEBUG; +static unsigned short SymLog_showSourceFile = 1; +static char* SymLog_destination = "console"; + +static char* SymLog_getlogLevelDescription(SymLogLevel logLevel) { switch (logLevel) { case SYM_LOG_LEVEL_DEBUG: return SYM_LOG_LEVEL_DESC_DEBUG; @@ -35,17 +39,44 @@ static char* logLevelDescription(SymLogLevel logLevel) { } } +static SymLogLevel SymLog_getLogLevelValue(char* logLevelDescription) { + if (SymStringUtils_equals(logLevelDescription, SYM_LOG_LEVEL_DESC_DEBUG)) { + return SYM_LOG_LEVEL_DEBUG; + } else if (SymStringUtils_equals(logLevelDescription, SYM_LOG_LEVEL_DESC_INFO)) { + return SYM_LOG_LEVEL_INFO; + } else if (SymStringUtils_equals(logLevelDescription, SYM_LOG_LEVEL_DESC_WARN)) { + return SYM_LOG_LEVEL_WARN; + } else if (SymStringUtils_equals(logLevelDescription, SYM_LOG_LEVEL_DESC_ERROR)) { + return SYM_LOG_LEVEL_ERROR; + } + return SYM_LOG_LEVEL_DEBUG; +} + +void SymLog_configure(SymProperties *settings) { + char *logLevelDescription = settings->get(settings, SYM_LOG_SETTINGS_LOG_LEVEL, "DEBUG"); + if (! SymStringUtils_isBlank(logLevelDescription)) { + SymLog_logLevel = SymLog_getLogLevelValue(logLevelDescription); + } + + char *logDestination = settings->get(settings, SYM_LOG_SETTINGS_LOG_DESTINATION, "console"); + if (! SymStringUtils_isBlank(logDestination)) { + SymLog_destination = logDestination; + } + + char *showSourceFile = settings->get(settings, SYM_LOG_SETTINGS_LOG_SHOW_SOURCE_FILE, "1"); + if (! SymStringUtils_isBlank(showSourceFile)) { + SymLog_showSourceFile = SymStringUtils_equals(showSourceFile, "1") + || SymStringUtils_equalsIgnoreCase(showSourceFile, "true"); + } +} + /** This is the central place where all logging funnels through. */ void SymLog_log(SymLogLevel logLevel, const char *functionName, const char *fileName, int lineNumber, const char* message, ...) { - FILE *destination; - if (logLevel <= SYM_LOG_LEVEL_INFO) { - destination = stdout; - } - else { - destination = stderr; - } + if (logLevel < SymLog_logLevel) { + return; + } - char* levelDescription = logLevelDescription(logLevel); + char* levelDescription = SymLog_getlogLevelDescription(logLevel); SymStringBuilder *messageBuffer = SymStringBuilder_new(); @@ -56,22 +87,51 @@ void SymLog_log(SymLogLevel logLevel, const char *functionName, const char *file messageBuffer->append(messageBuffer, levelDescription); messageBuffer->append(messageBuffer, "] ["); messageBuffer->append(messageBuffer, functionName); -// messageBuffer->append(messageBuffer, " "); -// messageBuffer->append(messageBuffer, fileName); -// messageBuffer->append(messageBuffer, ":"); -// messageBuffer->appendInt(messageBuffer, lineNumber); messageBuffer->append(messageBuffer, "] "); va_list varargs; va_start(varargs, message); messageBuffer->appendfv(messageBuffer, message, varargs); va_end(varargs); + if (SymLog_showSourceFile) { + messageBuffer->append(messageBuffer, " ("); + messageBuffer->append(messageBuffer, fileName); + messageBuffer->append(messageBuffer, ":"); + messageBuffer->appendInt(messageBuffer, lineNumber); + messageBuffer->append(messageBuffer, ")"); + } messageBuffer->append(messageBuffer, "\n"); + unsigned short physicalFile = 0; + FILE *destination; + + if (SymStringUtils_equalsIgnoreCase(SymLog_destination, SYM_LOG_DESTINATION_CONSOLE)) { + if (logLevel <= SYM_LOG_LEVEL_INFO) { + destination = stdout; + } + else { + destination = stderr; + } + } + else { + destination = fopen(SymLog_destination, "a+"); + if (!destination) { + printf("Failed to open log file destination '%s'. Check the path our use 'console'\n", SymLog_destination); + destination = stdout; + } + else { + physicalFile = 1; + } + + } + fprintf(destination, "%s", messageBuffer->toString(messageBuffer)); // stdout may not flush before stderr does. // Do this to keep log messages more or less in order. fflush(destination); + if (physicalFile) { + fclose(destination); + } date->destroy(date); messageBuffer->destroy(messageBuffer); diff --git a/symmetric-client-clib/src/core/SymEngine.c b/symmetric-client-clib/src/core/SymEngine.c index 95f8d22ef0..5c27e36f11 100644 --- a/symmetric-client-clib/src/core/SymEngine.c +++ b/symmetric-client-clib/src/core/SymEngine.c @@ -42,6 +42,8 @@ static unsigned short SymEngine_isConfigured(SymEngine *this) { unsigned short SymEngine_start(SymEngine *this) { unsigned short isStarted = 0; + SymLog_configure(this->properties); + SymLog_info("About to start SymmetricDS"); this->dialect->initTablesAndDatabaseObjects(this->dialect); diff --git a/symmetric-client-clib/src/util/Properties.c b/symmetric-client-clib/src/util/Properties.c index 85e44816ce..fc08605da8 100644 --- a/symmetric-client-clib/src/util/Properties.c +++ b/symmetric-client-clib/src/util/Properties.c @@ -126,16 +126,13 @@ SymProperties * SymProperties_newWithFile(SymProperties *this, char *argPath) { this = SymProperties_new(this); } - // SymStringArray / split to read properties file. - FILE *file; int BUFFER_SIZE = 1024; char inputBuffer[BUFFER_SIZE]; file = fopen(argPath,"r"); if (!file) { - SymLog_error("Failed to load properties from file %s", argPath); - return this; + return NULL; } SymStringBuilder *buff = SymStringBuilder_new(NULL); diff --git a/symmetric-client-native/run/symmetric.properties b/symmetric-client-native/run/symmetric.properties index 0988dba61f..b36535855a 100644 --- a/symmetric-client-native/run/symmetric.properties +++ b/symmetric-client-native/run/symmetric.properties @@ -50,6 +50,9 @@ start.push.job=true start.heartbeat.job=true start.purge.job=true start.synctriggers.job=true +start.synctriggers.job=true +start.offline.pull.job=true +start.offline.push.job=true # This is how often the job manager will sleep between potentially running jobs. job.manager.sleep.period.ms=5000 @@ -61,4 +64,16 @@ job.pull.period.time.ms=10000 job.push.period.time.ms=10000 job.heartbeat.period.time.ms=300000 job.purge.period.time.ms=300000 -job.synctriggers.period.time.ms=300000 \ No newline at end of file +job.synctriggers.period.time.ms=300000 +job.offline.push.period.time.ms=60000 +job.offline.pull.period.time.ms=60000 + +node.offline.incoming.dir=./tmp/incoming +node.offline.outgoing.dir=./tmp/outgoing +node.offline.error.dir=./tmp/error +node.offline.archive.dir=./tmp/archive + +client.log.level=INFO +#client.log.destination=console +client.log.destination=./sym.log +client.log.show.source.file=true \ No newline at end of file diff --git a/symmetric-client-native/src/SymClientNative.c b/symmetric-client-native/src/SymClientNative.c index 3d832ccacb..04e31759c1 100644 --- a/symmetric-client-native/src/SymClientNative.c +++ b/symmetric-client-native/src/SymClientNative.c @@ -73,6 +73,8 @@ int main(int argCount, char **argValues) { return 1; } + SymLog_configure(properties); + SymNativeClient_runSymmetricEngine(properties); properties->destroy(properties); From bfa56a66b695cecc5c9422250ec1f6cb388e85c3 Mon Sep 17 00:00:00 2001 From: mmichalek Date: Thu, 29 Oct 2015 16:23:20 -0400 Subject: [PATCH 4/7] Archive pull files after processing. --- .../transport/file/FileIncomingTransport.c | 53 +++++++++++++++---- symmetric-client-native/inc/SymClientNative.h | 1 + 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/symmetric-client-clib/src/transport/file/FileIncomingTransport.c b/symmetric-client-clib/src/transport/file/FileIncomingTransport.c index f57c3dd822..c0894c0116 100644 --- a/symmetric-client-clib/src/transport/file/FileIncomingTransport.c +++ b/symmetric-client-clib/src/transport/file/FileIncomingTransport.c @@ -49,15 +49,16 @@ char* SymFileIncomingTransport_getIncomingFile(SymFileIncomingTransport *this, c firstFile = files->get(files, 0); } + // seems we have stack memory here for firstFile. + // This memory gets overwritten after it's returned. + if (firstFile) { + firstFile = SymStringUtils_format("%s", firstFile); + } + free(startFilter); files->destroy(files); - if (firstFile) { - char *path = SymStringUtils_format("%s/%s", this->offlineIncomingDir, firstFile); - return path; - } else { - return firstFile; - } + return firstFile; } long SymFileIncomingTransport_process(SymFileIncomingTransport *this, SymDataProcessor *processor) { @@ -66,22 +67,56 @@ long SymFileIncomingTransport_process(SymFileIncomingTransport *this, SymDataPro char inputBuffer[BUFFER_SIZE]; char *fileName = SymFileIncomingTransport_getIncomingFile(this, ".csv"); + if (!fileName || SymStringUtils_isBlank(fileName)) { + SymLog_info("No incoming files found at '%s'", this->offlineIncomingDir); + return SYM_TRANSPORT_SC_SERVICE_UNAVAILABLE; + } + char *path = SymStringUtils_format("%s/%s", this->offlineIncomingDir, fileName); - file = fopen(fileName,"r"); + file = fopen(path,"r"); if (!file) { - SymLog_warn("Failed to load file %s", this->offlineIncomingDir); + SymLog_warn("Failed to load file '%s'", path); return SYM_TRANSPORT_SC_SERVICE_UNAVAILABLE; } processor->open(processor); + unsigned short success = 1; + int count; while ((count = fread(inputBuffer, sizeof(char), BUFFER_SIZE, file)) > 0) { - processor->process(processor, inputBuffer, sizeof(char), count); + int size = processor->process(processor, inputBuffer, sizeof(char), count); + if (size == 0) { + SymLog_warn("Failed to process file %s", this->offlineIncomingDir); + success = 0; + break; + } } processor->close(processor); + fclose(file); + + if (success) { + if (SymStringUtils_isNotBlank(this->offlineArchiveDir)) { + char *archivePath = SymStringUtils_format("%s/%s", this->offlineArchiveDir, fileName); + int result = rename(path, archivePath); + if (result) { + SymLog_warn("Failed to archive '%s' to '%s'", path, archivePath); + } + } else { + int result = remove(path); + if (result) { + SymLog_warn("Failed to delete '%s'", path); + } + } + } else if (SymStringUtils_isNotBlank(this->offlineErrorDir)) { + char *errorPath = SymStringUtils_format("%s/%s", this->offlineErrorDir, fileName); + int result = rename(path, errorPath); + if (result) { + SymLog_warn("Failed to archive '%s' to '%s'", path, errorPath); + } + } return SYM_TRANSPORT_OK; } diff --git a/symmetric-client-native/inc/SymClientNative.h b/symmetric-client-native/inc/SymClientNative.h index 89fdd3925a..87e34dc012 100644 --- a/symmetric-client-native/inc/SymClientNative.h +++ b/symmetric-client-native/inc/SymClientNative.h @@ -29,6 +29,7 @@ #include "util/StringArray.h" #include "util/StringUtils.h" #include "util/Properties.h" +#include "common/Log.h" int SymNativeClient_runSymmetricEngine(SymProperties *properties); From 8035f09dd0b27cf98bb132218f2c94b009f9ac43 Mon Sep 17 00:00:00 2001 From: mmichalek Date: Thu, 29 Oct 2015 16:48:59 -0400 Subject: [PATCH 5/7] Configure offline push and pull jobs. --- .../inc/common/ParameterConstants.h | 4 ++++ symmetric-client-clib/inc/core/JobManager.h | 2 ++ symmetric-client-clib/src/core/JobManager.c | 21 ++++++++++--------- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/symmetric-client-clib/inc/common/ParameterConstants.h b/symmetric-client-clib/inc/common/ParameterConstants.h index d747c50d8d..e65c155705 100644 --- a/symmetric-client-clib/inc/common/ParameterConstants.h +++ b/symmetric-client-clib/inc/common/ParameterConstants.h @@ -83,6 +83,10 @@ #define SYM_PARAMETER_NODE_OFFLINE_ERROR_DIR "node.offline.error.dir" #define SYM_PARAMETER_NODE_OFFLINE_ARCHIVE_DIR "node.offline.archive.dir" #define SYM_PARAMETER_NODE_OFFLINE_INCOMING_ACCEPT_ALL "node.offline.incoming.accept.all" +#define SYM_PARAMETER_START_OFFLINE_PULL_JOB "start.offline.pull.job" +#define SYM_PARAMETER_START_OFFLINE_PUSH_JOB "start.offline.push.job" +#define SYM_PARAMETER_OFFLINE_PUSH_PERIOD_MS "job.offline.push.period.time.ms" +#define SYM_PARAMETER_OFFLINE_PULL_PERIOD_MS "job.offline.pull.period.time.ms" #define SYM_PARAMETER_HTTPS_VERIFIED_SERVERS "https.verified.server.names" #define SYM_PARAMETER_HTTPS_ALLOW_SELF_SIGNED_CERTS "https.allow.self.signed.certs" diff --git a/symmetric-client-clib/inc/core/JobManager.h b/symmetric-client-clib/inc/core/JobManager.h index beef6d48fd..dd49354732 100644 --- a/symmetric-client-clib/inc/core/JobManager.h +++ b/symmetric-client-clib/inc/core/JobManager.h @@ -34,6 +34,8 @@ typedef struct SymJobManager { long lastPurgeTime; long lastRouteTime; long lastSyncTriggersTime; + long lastOfflinePushTime; + long lastOfflinePullTime; void (*startJobs)(struct SymJobManager *this); void (*stopJobs)(struct SymJobManager *this); void (*destroy)(struct SymJobManager *this); diff --git a/symmetric-client-clib/src/core/JobManager.c b/symmetric-client-clib/src/core/JobManager.c index 5caa83d944..1c796e95f5 100644 --- a/symmetric-client-clib/src/core/JobManager.c +++ b/symmetric-client-clib/src/core/JobManager.c @@ -75,26 +75,27 @@ void SymJobManager_invoke(SymJobManager *this) { this->engine->purge(this->engine); time(&this->lastPurgeTime); } - if (1) { // TODO. + if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_OFFLINE_PUSH_JOB, SYM_PARAMETER_OFFLINE_PUSH_PERIOD_MS, this->lastOfflinePushTime)) { SymLog_info("OFFLINE PUSH ============================)"); this->engine->offlinePushService->pushData(this->engine->offlinePushService); - //time(&this->lastPurgeTime); + time(&this->lastOfflinePushTime); } - if (1) { // TODO. + if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_OFFLINE_PULL_JOB, SYM_PARAMETER_OFFLINE_PULL_PERIOD_MS, this->lastOfflinePullTime)) { SymLog_info("OFFLINE PULL ============================)"); this->engine->offlinePullService->pullData(this->engine->offlinePullService); - //time(&this->lastPurgeTime); + time(&this->lastOfflinePullTime); } } void SymJobManager_startJobs(SymJobManager *this) { this->engine->start(this->engine); - time(&this->lastRouteTime); - time(&this->lastPullTime); - time(&this->lastPushTime); - time(&this->lastHeartbeatTime); - time(&this->lastPurgeTime); - time(&this->lastSyncTriggersTime); + + long now; + time(&now); + + this->lastRouteTime = this->lastPullTime = this->lastPushTime = + this->lastHeartbeatTime = this->lastPurgeTime = this->lastSyncTriggersTime = + this->lastOfflinePullTime = this->lastOfflinePushTime = now; this->started = 1; From 6e3211114d1153e6f705aa668276c2c0da51983d Mon Sep 17 00:00:00 2001 From: mmichalek Date: Thu, 29 Oct 2015 18:18:29 -0400 Subject: [PATCH 6/7] Fix for offline push batch acknowledgement. --- .../inc/service/OutgoingBatchService.h | 1 + .../src/service/OutgoingBatchService.c | 10 ++++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/symmetric-client-clib/inc/service/OutgoingBatchService.h b/symmetric-client-clib/inc/service/OutgoingBatchService.h index 4d196a897c..bf233f1510 100644 --- a/symmetric-client-clib/inc/service/OutgoingBatchService.h +++ b/symmetric-client-clib/inc/service/OutgoingBatchService.h @@ -74,6 +74,7 @@ b.failed_data_id, b.last_update_hostname, b.last_update_time, b.create_time, b.b from sym_outgoing_batch b " #define SYM_SQL_FIND_OUTGOING_BATCH "where batch_id = ? and node_id = ?" +#define SYM_SQL_FIND_OUTGOING_BATCH_BY_ID_ONLY "where batch_id=? " #define SYM_SQL_SELECT_OUTGOING_BATCH "where node_id = ? and status in (?, ?, ?, ?, ?, ?, ?) order by batch_id asc" diff --git a/symmetric-client-clib/src/service/OutgoingBatchService.c b/symmetric-client-clib/src/service/OutgoingBatchService.c index f2a3cba814..0b6e13401d 100644 --- a/symmetric-client-clib/src/service/OutgoingBatchService.c +++ b/symmetric-client-clib/src/service/OutgoingBatchService.c @@ -78,13 +78,19 @@ void SymOutgoingBatchService_insertOutgoingBatch(SymOutgoingBatchService *this, } SymOutgoingBatch * SymOutgoingBatchService_findOutgoingBatch(SymOutgoingBatchService *this, long batchId, char *nodeId) { + SymSqlTemplate *sqlTemplate = this->platform->getSqlTemplate(this->platform); SymStringArray *args = SymStringArray_new(NULL); args->addLong(args, batchId); args->add(args, nodeId); - SymSqlTemplate *sqlTemplate = this->platform->getSqlTemplate(this->platform); + SymStringBuilder *sb = SymStringBuilder_newWithString(SYM_SQL_SELECT_OUTGOING_BATCH_PREFIX); - sb->append(sb, SYM_SQL_FIND_OUTGOING_BATCH); + if (SymStringUtils_isNotBlank(nodeId)) { + sb->append(sb, SYM_SQL_FIND_OUTGOING_BATCH); + } else { + sb->append(sb, SYM_SQL_FIND_OUTGOING_BATCH_BY_ID_ONLY); + } + int error; SymList *batches = sqlTemplate->query(sqlTemplate, sb->str, args, NULL, &error, (void *) SymOutgoingBatchService_outgoingBatchMapper); From 8235c5d4cece3186f17f3c51094ca8f4d8690ca5 Mon Sep 17 00:00:00 2001 From: gwilmer Date: Thu, 29 Oct 2015 22:37:41 -0400 Subject: [PATCH 7/7] 0002430: Fix Windows Service Issue when install directory has a space. --- .../src/main/java/org/jumpmind/symmetric/wrapper/Wrapper.java | 2 +- .../java/org/jumpmind/symmetric/wrapper/WrapperService.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/symmetric-wrapper/src/main/java/org/jumpmind/symmetric/wrapper/Wrapper.java b/symmetric-wrapper/src/main/java/org/jumpmind/symmetric/wrapper/Wrapper.java index 3965129b5b..40d52e85af 100644 --- a/symmetric-wrapper/src/main/java/org/jumpmind/symmetric/wrapper/Wrapper.java +++ b/symmetric-wrapper/src/main/java/org/jumpmind/symmetric/wrapper/Wrapper.java @@ -43,7 +43,7 @@ public static void main(String[] args) throws Exception { } } String configFile = dir + File.separator + "conf" + File.separator + "sym_service.conf"; - String jarFile = dir + File.separator + "lib" + File.pathSeparator + "symmetric-wrapper.jar"; + String jarFile = dir + File.separator + "lib" + File.separator + "symmetric-wrapper.jar"; WrapperHelper.run(args, dir, configFile, jarFile); } diff --git a/symmetric-wrapper/src/main/java/org/jumpmind/symmetric/wrapper/WrapperService.java b/symmetric-wrapper/src/main/java/org/jumpmind/symmetric/wrapper/WrapperService.java index 7470d33aa9..eac056c343 100644 --- a/symmetric-wrapper/src/main/java/org/jumpmind/symmetric/wrapper/WrapperService.java +++ b/symmetric-wrapper/src/main/java/org/jumpmind/symmetric/wrapper/WrapperService.java @@ -304,7 +304,7 @@ protected ArrayList getWrapperCommand(String arg) { ArrayList cmd = new ArrayList(); String quote = getWrapperCommandQuote(); cmd.add(quote + config.getJavaCommand() + quote); - cmd.add("-Djava.io.tmpdir="+System.getProperty("java.io.tmpdir")); + cmd.add("-Djava.io.tmpdir="+quote+System.getProperty("java.io.tmpdir")+quote); cmd.add("-jar"); cmd.add(quote + config.getWrapperJarPath() + quote); cmd.add(arg);