Skip to content

Commit

Permalink
Merge branch '3.7' of https://github.com/JumpMind/symmetric-ds into 3.7
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 30, 2015
2 parents d941a48 + 8235c5d commit 1ef4660
Show file tree
Hide file tree
Showing 18 changed files with 198 additions and 47 deletions.
9 changes: 9 additions & 0 deletions symmetric-client-clib/inc/common/Log.h
Expand Up @@ -26,10 +26,18 @@
#include <stdarg.h>
#include <time.h>
#include "util/StringBuilder.h"
#include "util/Properties.h"
#include "util/Date.h"
#include "util/StringUtils.h"

typedef enum {SYM_LOG_LEVEL_DEBUG, SYM_LOG_LEVEL_INFO, SYM_LOG_LEVEL_WARN, SYM_LOG_LEVEL_ERROR} SymLogLevel;

#define SYM_LOG_DESTINATION_CONSOLE "console"

#define SYM_LOG_SETTINGS_LOG_LEVEL "client.log.level"
#define SYM_LOG_SETTINGS_LOG_DESTINATION "client.log.destination"
#define SYM_LOG_SETTINGS_LOG_SHOW_SOURCE_FILE "client.log.show.source.file"

#define SYM_LOG_LEVEL_DESC_DEBUG "DEBUG"
#define SYM_LOG_LEVEL_DESC_INFO "INFO"
#define SYM_LOG_LEVEL_DESC_WARN "WARN"
Expand All @@ -42,5 +50,6 @@ typedef enum {SYM_LOG_LEVEL_DEBUG, SYM_LOG_LEVEL_INFO, SYM_LOG_LEVEL_WARN, SYM_L
#define SymLog_error(M, ...) SymLog_log(3, __func__, __FILE__, __LINE__, M, ##__VA_ARGS__)

void SymLog_log(SymLogLevel logLevel, const char *functionName, const char *filename, int lineNumber, const char* message, ...);
void SymLog_configure(SymProperties *settings);

#endif
4 changes: 4 additions & 0 deletions symmetric-client-clib/inc/common/ParameterConstants.h
Expand Up @@ -85,6 +85,10 @@
#define SYM_PARAMETER_NODE_OFFLINE_ERROR_DIR "node.offline.error.dir"
#define SYM_PARAMETER_NODE_OFFLINE_ARCHIVE_DIR "node.offline.archive.dir"
#define SYM_PARAMETER_NODE_OFFLINE_INCOMING_ACCEPT_ALL "node.offline.incoming.accept.all"
#define SYM_PARAMETER_START_OFFLINE_PULL_JOB "start.offline.pull.job"
#define SYM_PARAMETER_START_OFFLINE_PUSH_JOB "start.offline.push.job"
#define SYM_PARAMETER_OFFLINE_PUSH_PERIOD_MS "job.offline.push.period.time.ms"
#define SYM_PARAMETER_OFFLINE_PULL_PERIOD_MS "job.offline.pull.period.time.ms"

#define SYM_PARAMETER_HTTPS_VERIFIED_SERVERS "https.verified.server.names"
#define SYM_PARAMETER_HTTPS_ALLOW_SELF_SIGNED_CERTS "https.allow.self.signed.certs"
Expand Down
2 changes: 2 additions & 0 deletions symmetric-client-clib/inc/core/JobManager.h
Expand Up @@ -34,6 +34,8 @@ typedef struct SymJobManager {
long lastPurgeTime;
long lastRouteTime;
long lastSyncTriggersTime;
long lastOfflinePushTime;
long lastOfflinePullTime;
void (*startJobs)(struct SymJobManager *this);
void (*stopJobs)(struct SymJobManager *this);
void (*destroy)(struct SymJobManager *this);
Expand Down
1 change: 1 addition & 0 deletions symmetric-client-clib/inc/service/OutgoingBatchService.h
Expand Up @@ -74,6 +74,7 @@ b.failed_data_id, b.last_update_hostname, b.last_update_time, b.create_time, b.b
from sym_outgoing_batch b "

#define SYM_SQL_FIND_OUTGOING_BATCH "where batch_id = ? and node_id = ?"
#define SYM_SQL_FIND_OUTGOING_BATCH_BY_ID_ONLY "where batch_id=? "

#define SYM_SQL_SELECT_OUTGOING_BATCH "where node_id = ? and status in (?, ?, ?, ?, ?, ?, ?) order by batch_id asc"

Expand Down
1 change: 0 additions & 1 deletion symmetric-client-clib/inc/util/Properties.h
Expand Up @@ -23,7 +23,6 @@

#include <string.h>
#include <stdlib.h>
#include "common/Log.h"
#include "util/StringUtils.h"
#include "util/StringBuilder.h"
#include "util/StringArray.h"
Expand Down
86 changes: 73 additions & 13 deletions symmetric-client-clib/src/common/Log.c
Expand Up @@ -20,7 +20,11 @@
*/
#include "common/Log.h"

static char* logLevelDescription(SymLogLevel logLevel) {
static int SymLog_logLevel = SYM_LOG_LEVEL_DEBUG;
static unsigned short SymLog_showSourceFile = 1;
static char* SymLog_destination = "console";

static char* SymLog_getlogLevelDescription(SymLogLevel logLevel) {
switch (logLevel) {
case SYM_LOG_LEVEL_DEBUG:
return SYM_LOG_LEVEL_DESC_DEBUG;
Expand All @@ -35,17 +39,44 @@ static char* logLevelDescription(SymLogLevel logLevel) {
}
}

static SymLogLevel SymLog_getLogLevelValue(char* logLevelDescription) {
if (SymStringUtils_equals(logLevelDescription, SYM_LOG_LEVEL_DESC_DEBUG)) {
return SYM_LOG_LEVEL_DEBUG;
} else if (SymStringUtils_equals(logLevelDescription, SYM_LOG_LEVEL_DESC_INFO)) {
return SYM_LOG_LEVEL_INFO;
} else if (SymStringUtils_equals(logLevelDescription, SYM_LOG_LEVEL_DESC_WARN)) {
return SYM_LOG_LEVEL_WARN;
} else if (SymStringUtils_equals(logLevelDescription, SYM_LOG_LEVEL_DESC_ERROR)) {
return SYM_LOG_LEVEL_ERROR;
}
return SYM_LOG_LEVEL_DEBUG;
}

void SymLog_configure(SymProperties *settings) {
char *logLevelDescription = settings->get(settings, SYM_LOG_SETTINGS_LOG_LEVEL, "DEBUG");
if (! SymStringUtils_isBlank(logLevelDescription)) {
SymLog_logLevel = SymLog_getLogLevelValue(logLevelDescription);
}

char *logDestination = settings->get(settings, SYM_LOG_SETTINGS_LOG_DESTINATION, "console");
if (! SymStringUtils_isBlank(logDestination)) {
SymLog_destination = logDestination;
}

char *showSourceFile = settings->get(settings, SYM_LOG_SETTINGS_LOG_SHOW_SOURCE_FILE, "1");
if (! SymStringUtils_isBlank(showSourceFile)) {
SymLog_showSourceFile = SymStringUtils_equals(showSourceFile, "1")
|| SymStringUtils_equalsIgnoreCase(showSourceFile, "true");
}
}

/** This is the central place where all logging funnels through. */
void SymLog_log(SymLogLevel logLevel, const char *functionName, const char *fileName, int lineNumber, const char* message, ...) {
FILE *destination;
if (logLevel <= SYM_LOG_LEVEL_INFO) {
destination = stdout;
}
else {
destination = stderr;
}
if (logLevel < SymLog_logLevel) {
return;
}

char* levelDescription = logLevelDescription(logLevel);
char* levelDescription = SymLog_getlogLevelDescription(logLevel);

SymStringBuilder *messageBuffer = SymStringBuilder_new();

Expand All @@ -56,22 +87,51 @@ void SymLog_log(SymLogLevel logLevel, const char *functionName, const char *file
messageBuffer->append(messageBuffer, levelDescription);
messageBuffer->append(messageBuffer, "] [");
messageBuffer->append(messageBuffer, functionName);
// messageBuffer->append(messageBuffer, " ");
// messageBuffer->append(messageBuffer, fileName);
// messageBuffer->append(messageBuffer, ":");
// messageBuffer->appendInt(messageBuffer, lineNumber);
messageBuffer->append(messageBuffer, "] ");
va_list varargs;
va_start(varargs, message);
messageBuffer->appendfv(messageBuffer, message, varargs);
va_end(varargs);
if (SymLog_showSourceFile) {
messageBuffer->append(messageBuffer, " (");
messageBuffer->append(messageBuffer, fileName);
messageBuffer->append(messageBuffer, ":");
messageBuffer->appendInt(messageBuffer, lineNumber);
messageBuffer->append(messageBuffer, ")");
}
messageBuffer->append(messageBuffer, "\n");

unsigned short physicalFile = 0;
FILE *destination;

if (SymStringUtils_equalsIgnoreCase(SymLog_destination, SYM_LOG_DESTINATION_CONSOLE)) {
if (logLevel <= SYM_LOG_LEVEL_INFO) {
destination = stdout;
}
else {
destination = stderr;
}
}
else {
destination = fopen(SymLog_destination, "a+");
if (!destination) {
printf("Failed to open log file destination '%s'. Check the path our use 'console'\n", SymLog_destination);
destination = stdout;
}
else {
physicalFile = 1;
}

}

fprintf(destination, "%s", messageBuffer->toString(messageBuffer));

// stdout may not flush before stderr does.
// Do this to keep log messages more or less in order.
fflush(destination);
if (physicalFile) {
fclose(destination);
}

date->destroy(date);
messageBuffer->destroy(messageBuffer);
Expand Down
21 changes: 11 additions & 10 deletions symmetric-client-clib/src/core/JobManager.c
Expand Up @@ -75,26 +75,27 @@ void SymJobManager_invoke(SymJobManager *this) {
this->engine->purge(this->engine);
time(&this->lastPurgeTime);
}
if (1) { // TODO.
if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_OFFLINE_PUSH_JOB, SYM_PARAMETER_OFFLINE_PUSH_PERIOD_MS, this->lastOfflinePushTime)) {
SymLog_info("OFFLINE PUSH ============================)");
this->engine->offlinePushService->pushData(this->engine->offlinePushService);
//time(&this->lastPurgeTime);
time(&this->lastOfflinePushTime);
}
if (1) { // TODO.
if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_OFFLINE_PULL_JOB, SYM_PARAMETER_OFFLINE_PULL_PERIOD_MS, this->lastOfflinePullTime)) {
SymLog_info("OFFLINE PULL ============================)");
this->engine->offlinePullService->pullData(this->engine->offlinePullService);
//time(&this->lastPurgeTime);
time(&this->lastOfflinePullTime);
}
}

void SymJobManager_startJobs(SymJobManager *this) {
this->engine->start(this->engine);
time(&this->lastRouteTime);
time(&this->lastPullTime);
time(&this->lastPushTime);
time(&this->lastHeartbeatTime);
time(&this->lastPurgeTime);
time(&this->lastSyncTriggersTime);

long now;
time(&now);

this->lastRouteTime = this->lastPullTime = this->lastPushTime =
this->lastHeartbeatTime = this->lastPurgeTime = this->lastSyncTriggersTime =
this->lastOfflinePullTime = this->lastOfflinePushTime = now;

this->started = 1;

Expand Down
2 changes: 2 additions & 0 deletions symmetric-client-clib/src/core/SymEngine.c
Expand Up @@ -42,6 +42,8 @@ static unsigned short SymEngine_isConfigured(SymEngine *this) {
unsigned short SymEngine_start(SymEngine *this) {
unsigned short isStarted = 0;

SymLog_configure(this->properties);

SymLog_info("About to start SymmetricDS");

this->dialect->initTablesAndDatabaseObjects(this->dialect);
Expand Down
Expand Up @@ -47,6 +47,10 @@ static int SymSqliteDdlReader_toSqlType(char *type) {
sqlType = SYM_SQL_TYPE_TIMESTAMP;
} else if (strncasecmp(type, "TIME", 4) == 0) {
sqlType = SYM_SQL_TYPE_TIME;
} else if (strncasecmp(type, "BOOLEAN", 7) == 0) {
sqlType = SYM_SQL_TYPE_BOOLEAN;
} else if (strncasecmp(type, "BIT", 3) == 0) {
sqlType = SYM_SQL_TYPE_BIT;
} else {
sqlType = SYM_SQL_TYPE_VARCHAR;
}
Expand Down
Expand Up @@ -25,36 +25,49 @@ char * SymSqliteTriggerTemplate_fillOutColumnTemplate(SymSqliteTriggerTemplate *
char *columnPrefix, SymColumn *column, SymDataEventType dml, unsigned short isOld,
SymChannel *channel, SymTrigger *trigger) {

// TODO: handle LOBs
//unsigned short isLob = 0;

char *templateToUse;

switch (column->sqlType) {
case SYM_SQL_TYPE_BIT:
case SYM_SQL_TYPE_BOOLEAN:
templateToUse = "case when %s.%s is null then '' when %s.%s = 1 then '\"1\"' else '\"0\"' end";
break;
case SYM_SQL_TYPE_TINYINT:
case SYM_SQL_TYPE_SMALLINT:
case SYM_SQL_TYPE_INTEGER:
case SYM_SQL_TYPE_BIGINT:
case SYM_SQL_TYPE_FLOAT:
case SYM_SQL_TYPE_REAL:
case SYM_SQL_TYPE_DOUBLE:
case SYM_SQL_TYPE_NUMERIC:
case SYM_SQL_TYPE_DECIMAL:
case SYM_SQL_TYPE_REAL:
templateToUse = "case when %s.%s is null then '' else ('\"' || cast(%s.%s as varchar) || '\"') end";
break;
case SYM_SQL_TYPE_CHAR:
case SYM_SQL_TYPE_NCHAR:
case SYM_SQL_TYPE_VARCHAR:
case SYM_SQL_TYPE_NVARCHAR:
case SYM_SQL_TYPE_CLOB:
case SYM_SQL_TYPE_LONGVARCHAR:
case SYM_SQL_TYPE_LONGNVARCHAR:
templateToUse = "case when %s.%s is null then '' else '\"' || replace(replace(%s.%s,'\\','\\\\'),'\"','\\\"') || '\"' end";
break;
case SYM_SQL_TYPE_BLOB:
case SYM_SQL_TYPE_LONGVARBINARY:
templateToUse = "case when %s.%s is null then '' else '\"' || replace(replace(hex(%s.%s),'\\','\\\\'),'\"','\\\"') || '\"' end ";
break;
case SYM_SQL_TYPE_DATE:
case SYM_SQL_TYPE_TIMESTAMP:
case SYM_SQL_TYPE_TIME:
templateToUse = "case when strftime('%%Y-%%m-%%d %%H:%%M:%%f',%s.%s) is null then '' else ('\"' || strftime('%%Y-%%m-%%d %%H:%%M:%%f', %s.%s) || '\"') end";
break;
default:
templateToUse = NULL;
SymLog_error("Unknown sqlType %d", column->sqlType);
break;
}

char* columnName = column->name; // TODO
char* columnName = column->name;
char* formattedColumnText =
SymStringUtils_format(templateToUse, tableAlias, columnName, tableAlias, columnName);

Expand Down
10 changes: 8 additions & 2 deletions symmetric-client-clib/src/service/OutgoingBatchService.c
Expand Up @@ -78,13 +78,19 @@ void SymOutgoingBatchService_insertOutgoingBatch(SymOutgoingBatchService *this,
}

SymOutgoingBatch * SymOutgoingBatchService_findOutgoingBatch(SymOutgoingBatchService *this, long batchId, char *nodeId) {
SymSqlTemplate *sqlTemplate = this->platform->getSqlTemplate(this->platform);
SymStringArray *args = SymStringArray_new(NULL);
args->addLong(args, batchId);
args->add(args, nodeId);

SymSqlTemplate *sqlTemplate = this->platform->getSqlTemplate(this->platform);

SymStringBuilder *sb = SymStringBuilder_newWithString(SYM_SQL_SELECT_OUTGOING_BATCH_PREFIX);
sb->append(sb, SYM_SQL_FIND_OUTGOING_BATCH);
if (SymStringUtils_isNotBlank(nodeId)) {
sb->append(sb, SYM_SQL_FIND_OUTGOING_BATCH);
} else {
sb->append(sb, SYM_SQL_FIND_OUTGOING_BATCH_BY_ID_ONLY);
}

int error;
SymList *batches = sqlTemplate->query(sqlTemplate, sb->str, args, NULL, &error, (void *) SymOutgoingBatchService_outgoingBatchMapper);

Expand Down

0 comments on commit 1ef4660

Please sign in to comment.