Skip to content

Commit

Permalink
Merge branch '3.7' of https://github.com/JumpMind/symmetric-ds into 3.7
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 26, 2015
2 parents 0c5e2cf + 16a5219 commit 07c3054
Show file tree
Hide file tree
Showing 15 changed files with 702 additions and 2 deletions.
15 changes: 14 additions & 1 deletion symmetric-client-clib-test/src/util/PropertiesTest.c
Expand Up @@ -65,6 +65,17 @@ void SymPropertiesTest_test3() {
prop2->destroy(prop2);
}

void SymPropertiesTest_testNewWithString() {
char *propertiesFileContents = " prop1=value1\n#comment\n \nprop2=value 2 ";

SymProperties *prop = SymProperties_newWithString(NULL, propertiesFileContents);

CU_ASSERT(strcmp(prop->get(prop, "prop1", NOT_FOUND), "value1") == 0);
CU_ASSERT(strcmp(prop->get(prop, "prop2", NOT_FOUND), "value 2 ") == 0);

prop->destroy(prop);
}

int SymPropertiesTest_CUnit() {
CU_pSuite suite = CU_add_suite("SymPropertiesTest", NULL, NULL);
if (suite == NULL) {
Expand All @@ -73,7 +84,9 @@ int SymPropertiesTest_CUnit() {

if (CU_add_test(suite, "SymPropertiesTest_test1", SymPropertiesTest_test1) == NULL ||
CU_add_test(suite, "SymPropertiesTest_test2", SymPropertiesTest_test2) == NULL ||
CU_add_test(suite, "SymPropertiesTest_test3", SymPropertiesTest_test3) == NULL) {
CU_add_test(suite, "SymPropertiesTest_test3", SymPropertiesTest_test3) == NULL ||
CU_add_test(suite, "SymPropertiesTest_testNewWithString", SymPropertiesTest_testNewWithString) == NULL ||
1==0) {
return CUE_NOTEST;
}
return CUE_SUCCESS;
Expand Down
14 changes: 14 additions & 0 deletions symmetric-client-clib/inc/common/ParameterConstants.h
Expand Up @@ -64,4 +64,18 @@

#define SYM_PARAMETER_ROUTING_DATA_READER_ORDER_BY_DATA_ID_ENABLED "routing.data.reader.order.by.gap.id.enabled"

#define SYM_PARAMETER_JOB_MANAGER_SLEEP_PERIOD_MS "job.manager.sleep.period.ms"
#define SYM_PARAMETER_START_ROUTE_JOB "start.route.job"
#define SYM_PARAMETER_START_PUSH_JOB "start.pull.job"
#define SYM_PARAMETER_START_PULL_JOB "start.push.job"
#define SYM_PARAMETER_START_HEARTBEAT_JOB "start.heartbeat.job"
#define SYM_PARAMETER_START_PURGE_JOB "start.purge.job"
#define SYM_PARAMETER_START_SYNCTRIGGERS_JOB "start.synctriggers.job"
#define SYM_PARAMETER_ROUTE_PERIOD_MS "job.routing.period.time.ms"
#define SYM_PARAMETER_PURGE_PERIOD_MS "job.purge.period.time.ms"
#define SYM_PARAMETER_PULL_PERIOD_MS "job.pull.period.time.ms"
#define SYM_PARAMETER_PUSH_PERIOD_MS "job.push.period.time.ms"
#define SYM_PARAMETER_HEARTBEAT_JOB_PERIOD_MS "job.heartbeat.period.time.ms"
#define SYM_PARAMETER_SYNCTRIGGERS_PERIOD_MS "job.synctriggers.period.time.ms"

#endif
43 changes: 43 additions & 0 deletions symmetric-client-clib/inc/core/JobManager.h
@@ -0,0 +1,43 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef SYM_JOBMANAGER_H
#define SYM_JOBMANAGER_H

#include <stdlib.h>
#include "core/SymEngine.h"

typedef struct SymJobManager {
SymEngine *engine;
unsigned short started;
long lastPullTime;
long lastPushTime;
long lastHeartbeatTime;
long lastPurgeTime;
long lastRouteTime;
long lastSyncTriggersTime;
void (*startJobs)(struct SymJobManager *this);
void (*stopJobs)(struct SymJobManager *this);
void (*destroy)(struct SymJobManager *this);
} SymJobManager;

SymJobManager * SymJobManager_new(SymJobManager *this, SymEngine *engine);

#endif
1 change: 1 addition & 0 deletions symmetric-client-clib/inc/libsymclient.h
Expand Up @@ -25,6 +25,7 @@
#include <stdlib.h>
#include <stdbool.h>
#include "core/SymEngine.h"
#include "core/JobManager.h"
#include "db/SymDialect.h"
#include "model/BatchAck.h"
#include "model/IncomingBatch.h"
Expand Down
7 changes: 7 additions & 0 deletions symmetric-client-clib/inc/util/Properties.h
Expand Up @@ -23,6 +23,10 @@

#include <string.h>
#include <stdlib.h>
#include "common/Log.h"
#include "util/StringUtils.h"
#include "util/StringBuilder.h"
#include "util/StringArray.h"

typedef struct {
char *key;
Expand All @@ -35,9 +39,12 @@ typedef struct SymProperties {
char * (*get)(struct SymProperties *this, char *key, char *defaultValue);
void (*put)(struct SymProperties *this, char *key, char *value);
void (*putAll)(struct SymProperties *this, void *properties);
char * (*toString)(struct SymProperties *this);
void (*destroy)(struct SymProperties *this);
} SymProperties;

SymProperties * SymProperties_new(SymProperties *);
SymProperties * SymProperties_newWithString(SymProperties *this, char *propertiesFileContents);
SymProperties * SymProperties_newWithFile(SymProperties *, char * argPath);

#endif
112 changes: 112 additions & 0 deletions symmetric-client-clib/src/core/JobManager.c
@@ -0,0 +1,112 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "core/JobManager.h"

unsigned short SymJobManager_shouldRun(SymJobManager *this, char* startJobProperty, char *periodTimeProperty, long lastTime) {

if (! this->engine->parameterService->is(this->engine->parameterService, startJobProperty, 1)) {
return 0;
}

long periodInMs =
this->engine->parameterService->getInt(this->engine->parameterService, periodTimeProperty, 60000);
long periodInSec = periodInMs / 1000;

long now;
time(&now);

if (periodInSec < now-lastTime) {
return 1;
} else {
return 0;
}
}

void SymJobManager_invoke(SymJobManager *this) {
if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_SYNCTRIGGERS_JOB, SYM_PARAMETER_SYNCTRIGGERS_PERIOD_MS, this->lastSyncTriggersTime)) {
this->engine->syncTriggers(this->engine);
time(&this->lastSyncTriggersTime);
}
if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_ROUTE_JOB, SYM_PARAMETER_ROUTE_PERIOD_MS, this->lastRouteTime)) {
this->engine->route(this->engine);
time(&this->lastRouteTime);
}
if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_PUSH_JOB, SYM_PARAMETER_PUSH_PERIOD_MS, this->lastPushTime)) {
SymRemoteNodeStatuses *pushStatus = this->engine->push(this->engine);
if (pushStatus->wasBatchProcessed(pushStatus)
|| pushStatus->wasDataProcessed(pushStatus)) {
// Only run heartbeat after a successful push to avoid queueing up lots of offline heartbeats.
if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_HEARTBEAT_JOB, SYM_PARAMETER_HEARTBEAT_JOB_PERIOD_MS, this->lastHeartbeatTime)) {
this->engine->heartbeat(this->engine, 0);
time(&this->lastHeartbeatTime);
}
}
time(&this->lastPushTime);
}
if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_PULL_JOB, SYM_PARAMETER_PULL_PERIOD_MS, this->lastPullTime)) {
this->engine->pull(this->engine);
time(&this->lastPullTime);
}
if (SymJobManager_shouldRun(this, SYM_PARAMETER_START_PURGE_JOB, SYM_PARAMETER_PURGE_PERIOD_MS, this->lastPurgeTime)) {
this->engine->purge(this->engine);
time(&this->lastPurgeTime);
}
}

void SymJobManager_startJobs(SymJobManager *this) {
this->engine->start(this->engine);
time(&this->lastRouteTime);
time(&this->lastPullTime);
time(&this->lastPushTime);
time(&this->lastHeartbeatTime);
time(&this->lastPurgeTime);
time(&this->lastSyncTriggersTime);

this->started = 1;

while (this->started) {
long sleepPeriodInSec = this->engine->parameterService->getInt(this->engine->parameterService,
SYM_PARAMETER_JOB_MANAGER_SLEEP_PERIOD_MS, 5000) / 1000;

sleep(sleepPeriodInSec);
SymJobManager_invoke(this);
}
this->engine->stop(this->engine);
}

void SymJobManager_stopJobs(SymJobManager *this) {
this->started = 0;
}

void SymJobManager_destroy(SymJobManager *this) {
free(this);
}

SymJobManager * SymJobManager_new(SymJobManager *this, SymEngine *engine) {
if (this == NULL) {
this = (SymJobManager *) calloc(1, sizeof(SymJobManager));
}
this->engine = engine;
this->startJobs = (void *) &SymJobManager_startJobs;
this->stopJobs = (void *) &SymJobManager_stopJobs;
this->destroy = (void *) &SymJobManager_destroy;
return this;
}
6 changes: 5 additions & 1 deletion symmetric-client-clib/src/service/ParameterService.c
Expand Up @@ -92,7 +92,11 @@ unsigned short SymParameterService_is(SymParameterService *this, char *name, uns
unsigned short value = defaultValue;
char *stringValue = this->getString(this, name, NULL);
if (stringValue != NULL) {
value = atoi(stringValue);
if (strcmp("true", stringValue) == 0) {
value = 1;
} else {
value = atoi(stringValue);
}
}
return value;
}
Expand Down
92 changes: 92 additions & 0 deletions symmetric-client-clib/src/util/Properties.c
Expand Up @@ -43,6 +43,22 @@ void SymProperties_putAll(SymProperties *this, SymProperties *properties) {
}
}

char * SymProperties_toString(struct SymProperties *this) {
SymStringBuilder *buff = SymStringBuilder_new(NULL);

int i;
for (i = 0; i < this->index; i++) {
buff->append(buff, this->propArray[i].key);
buff->append(buff, "=");
buff->append(buff, this->propArray[i].value);
if (i < this->index) {
buff->append(buff, ", ");
}
}

return buff->destroyAndReturn(buff);
}

void SymProperties_destroy(SymProperties *this) {
free(this->propArray);
free(this);
Expand All @@ -56,6 +72,82 @@ SymProperties * SymProperties_new(SymProperties *this) {
this->get = (void *) &SymProperties_get;
this->put = (void *) &SymProperties_put;
this->putAll = (void *) &SymProperties_putAll;
this->toString = (void *) &SymProperties_toString;
this->destroy = (void *) &SymProperties_destroy;
return this;
}

SymProperties * SymProperties_newWithString(SymProperties *this, char *propertiesFileContents) {
if (this == NULL) {
this = SymProperties_new(this);
}

SymStringArray *propertyLines = SymStringArray_split(propertiesFileContents, "\n");
int i;
for (i = 0; i < propertyLines->size; ++i) {
char *propertyLine = propertyLines->get(propertyLines, i);
if (SymStringUtils_isBlank(propertyLine)) {
continue;
}

char *trimmed = SymStringUtils_ltrim(propertyLine);
if (trimmed[0] == '#' || trimmed[0] == '!' ) {
continue;
}

int delimiterIndex = 0;
int length = strlen(trimmed);
int i;
for (i = 0; i < length; ++i) {
if (trimmed[i] == '=' || trimmed[i] == ':') {
delimiterIndex = i;
break;
}
}

if (delimiterIndex > 0) {
SymStringBuilder *buff = SymStringBuilder_newWithString(trimmed);
char *propertyName = buff->substring(buff, 0, delimiterIndex);
char *propertyValue = buff->substring(buff, delimiterIndex+1, length);
this->put(this, propertyName, propertyValue);
buff->destroy(buff);
}

free(trimmed);
}

propertyLines->destroy(propertyLines);

return this;
}

SymProperties * SymProperties_newWithFile(SymProperties *this, char *argPath) {
if (this == NULL) {
this = SymProperties_new(this);
}

// SymStringArray / split to read properties file.

FILE *file;
int BUFFER_SIZE = 1024;
char intputBuffer[BUFFER_SIZE];

file = fopen(argPath,"r");
if (!file) {
SymLog_error("Failed to load properties from file %s", argPath);
return this;
}

SymStringBuilder *buff = SymStringBuilder_new(NULL);

while (fgets(intputBuffer, BUFFER_SIZE, file) != NULL) {
buff->append(buff, intputBuffer);
}

this = SymProperties_newWithString(this, buff->destroyAndReturn(buff));

fclose(file);


return this;
}

0 comments on commit 07c3054

Please sign in to comment.