Skip to content

Commit

Permalink
reorganize data process, read, write pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 15, 2015
1 parent de375ff commit 2318811
Show file tree
Hide file tree
Showing 32 changed files with 248 additions and 101 deletions.
3 changes: 2 additions & 1 deletion symmetric-client-clib-test/src/core/SymEngineTest.c
Expand Up @@ -34,10 +34,11 @@ void SymEngineTest_test1() {
prop->put(prop, SYM_PARAMETER_GROUP_ID, "store");
prop->put(prop, SYM_PARAMETER_EXTERNAL_ID, "003");
prop->put(prop, SYM_PARAMETER_REGISTRATION_URL, "http://localhost:31415/sync/corp-000");
//prop->put(prop, "auto.sync.triggers.at.startup", "0");

SymEngine *engine = SymEngine_new(NULL, prop);
CU_ASSERT(engine->start(engine) == 0);
CU_ASSERT(engine->syncTriggers(engine) == 0);
//CU_ASSERT(engine->syncTriggers(engine) == 0);

engine->pullService->pullData(engine->pullService);

Expand Down
38 changes: 38 additions & 0 deletions symmetric-client-clib/inc/io/data/DataProcessor.h
@@ -0,0 +1,38 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef SYM_DATA_PROCESSOR_H
#define SYM_DATA_PROCESSOR_H

#include <stdio.h>
#include <stdlib.h>
#include "util/List.h"

typedef struct SymDataProcessor {
SymList *batchesProcessed;
void (*open)(struct SymDataProcessor *this);
size_t (*process)(struct SymDataProcessor *this, char *data, size_t size, size_t count);
void (*close)(struct SymDataProcessor *this);
void (*destroy)(struct SymDataProcessor *this);
} SymDataProcessor;

SymDataProcessor * SymDataProcessor_new(SymDataProcessor *this);

#endif
4 changes: 3 additions & 1 deletion symmetric-client-clib/inc/io/reader/DataReader.h
Expand Up @@ -29,7 +29,9 @@

typedef struct SymDataReader {
void (*open)(struct SymDataReader *this);
size_t (*process)(struct SymDataReader *this, char *data, size_t size, size_t count);
SymBatch * (*nextBatch)(struct SymDataReader *this);
SymTable * (*nextTable)(struct SymDataReader *this);
SymCsvData * (*nextData)(struct SymDataReader *this);
void (*close)(struct SymDataReader *this);
void (*destroy)(struct SymDataReader *this);
} SymDataReader;
Expand Down
4 changes: 2 additions & 2 deletions symmetric-client-clib/inc/io/reader/ProtocolDataReader.h
Expand Up @@ -26,7 +26,7 @@
#include <string.h>
#include <curl/curl.h>
#include <csv.h>
#include "io/reader/DataReader.h"
#include "io/data/DataProcessor.h"
#include "io/writer/DataWriter.h"
#include "model/Node.h"
#include "transport/IncomingTransport.h"
Expand All @@ -36,7 +36,7 @@
#include "io/data/CsvData.h"

typedef struct SymProtocolDataReader {
SymDataReader super;
SymDataProcessor super;
char *targetNodeId;
SymDataWriter *writer;
struct csv_parser *csvParser;
Expand Down
1 change: 0 additions & 1 deletion symmetric-client-clib/inc/io/writer/DataWriter.h
Expand Up @@ -29,7 +29,6 @@
#include "util/List.h"

typedef struct SymDataWriter {
SymList *batchesProcessed;
void (*open)(struct SymDataWriter *this);
void (*close)(struct SymDataWriter *this);
void (*startBatch)(struct SymDataWriter *this, SymBatch *batch);
Expand Down
5 changes: 3 additions & 2 deletions symmetric-client-clib/inc/io/writer/ProtocolDataWriter.h
Expand Up @@ -26,12 +26,13 @@
#include "io/data/Batch.h"
#include "db/model/Table.h"
#include "io/data/CsvData.h"
#include "io/writer/DataWriter.h"
#include "io/data/DataProcessor.h"
#include "io/reader/DataReader.h"
#include "util/List.h"
#include "common/Log.h"

typedef struct SymProtocolDataWriter {
SymDataWriter super;
SymDataProcessor super;
char *sourceNodeId;
SymDataReader *reader;
} SymProtocolDataWriter;
Expand Down
18 changes: 9 additions & 9 deletions symmetric-client-clib/inc/model/OutgoingBatch.h
Expand Up @@ -25,15 +25,15 @@
#include <stdlib.h>
#include "util/Date.h"

#define SYM_OUGOING_BATCH_OK "OK"
#define SYM_OUGOING_BATCH_ERROR "ER"
#define SYM_OUGOING_BATCH_REQUEST "RQ"
#define SYM_OUGOING_BATCH_NEW "NE"
#define SYM_OUGOING_BATCH_QUERYING "QY"
#define SYM_OUGOING_BATCH_SENDING "SE"
#define SYM_OUGOING_BATCH_LOADING "LD"
#define SYM_OUGOING_BATCH_ROUTING "RT"
#define SYM_OUGOING_BATCH_IGNORED "IG"
#define SYM_OUTGOING_BATCH_OK "OK"
#define SYM_OUTGOING_BATCH_ERROR "ER"
#define SYM_OUTGOING_BATCH_REQUEST "RQ"
#define SYM_OUTGOING_BATCH_NEW "NE"
#define SYM_OUTGOING_BATCH_QUERYING "QY"
#define SYM_OUTGOING_BATCH_SENDING "SE"
#define SYM_OUTGOING_BATCH_LOADING "LD"
#define SYM_OUTGOING_BATCH_ROUTING "RT"
#define SYM_OUTGOING_BATCH_IGNORED "IG"

typedef struct SymOutgoingBatch {
long batchId;
Expand Down
1 change: 1 addition & 0 deletions symmetric-client-clib/inc/model/OutgoingBatches.h
Expand Up @@ -28,6 +28,7 @@

typedef struct SymOutgoingBatches {
SymList *batches;
unsigned short (*containsBatches)(struct SymOutgoingBatches *this);
void (*destroy)(struct SymOutgoingBatches *this);
} SymOutgoingBatches;

Expand Down
9 changes: 8 additions & 1 deletion symmetric-client-clib/inc/service/DataExtractorService.h
Expand Up @@ -25,18 +25,25 @@
#include "model/Node.h"
#include "model/OutgoingBatch.h"
#include "service/NodeService.h"
#include "service/OutgoingBatchService.h"
#include "service/ParameterService.h"
#include "transport/TransportManager.h"
#include "transport/OutgoingTransport.h"
#include "io/data/DataProcessor.h"
#include "io/writer/ProtocolDataWriter.h"
#include "io/reader/ExtractDataReader.h"
#include "util/List.h"
#include "common/Log.h"

typedef struct SymDataExtractorService {
SymNodeService *nodeService;
SymOutgoingBatchService *outgoingBatchService;
SymParameterService *parameterService;
SymList * (*extract)(struct SymDataExtractorService *this, SymNode *node, SymOutgoingTransport *transport);
void (*destroy)(struct SymDataExtractorService *this);
} SymDataExtractorService;

SymDataExtractorService * SymDataExtractorService_new(SymDataExtractorService *this, SymNodeService *nodeService);
SymDataExtractorService * SymDataExtractorService_new(SymDataExtractorService *this, SymNodeService *nodeService, SymOutgoingBatchService *outgoingBatchService,
SymParameterService *parameterService);

#endif
1 change: 1 addition & 0 deletions symmetric-client-clib/inc/service/DataLoaderService.h
Expand Up @@ -30,6 +30,7 @@
#include "transport/IncomingTransport.h"
#include "model/Node.h"
#include "model/RemoteNodeStatus.h"
#include "io/data/DataProcessor.h"
#include "io/reader/ProtocolDataReader.h"
#include "io/writer/DefaultDatabaseWriter.h"
#include "db/platform/DatabasePlatform.h"
Expand Down
1 change: 1 addition & 0 deletions symmetric-client-clib/inc/service/ParameterService.h
Expand Up @@ -52,6 +52,7 @@
#define SYM_PARAMETER_INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED "incoming.batches.skip.duplicates"

#define SYM_OUTGOING_BATCH_MAX_BATCHES_TO_SELECT "outgoing.batches.max.to.select"
#define SYM_TRANSPORT_MAX_BYTES_TO_SYNC "transport.max.bytes.to.sync"

typedef struct SymParameterService {
SymProperties *parameters;
Expand Down
11 changes: 2 additions & 9 deletions symmetric-client-clib/inc/transport/IncomingTransport.h
Expand Up @@ -23,18 +23,11 @@

#include <stdio.h>
#include <stdlib.h>
#include "io/reader/DataReader.h"

#define SYM_TRANSPORT_OK 200
#define SYM_TRANSPORT_REGISTRATION_NOT_OPEN 656
#define SYM_TRANSPORT_REGISTRATION_REQUIRED 657
#define SYM_TRANSPORT_SYNC_DISABLED 658
#define SYM_TRANSPORT_SC_SERVICE_UNAVAILABLE 503
#define SYM_TRANSPORT_SC_FORBIDDEN 403
#include "io/data/DataProcessor.h"

typedef struct SymIncomingTransport {
char * (*getUrl)(struct SymIncomingTransport *this);
long (*process)(struct SymIncomingTransport *this, SymDataReader *reader);
long (*process)(struct SymIncomingTransport *this, SymDataProcessor *processor);
void (*destroy)(struct SymIncomingTransport *this);
} SymIncomingTransport;

Expand Down
5 changes: 2 additions & 3 deletions symmetric-client-clib/inc/transport/OutgoingTransport.h
Expand Up @@ -23,11 +23,10 @@

#include <stdio.h>
#include <stdlib.h>
#include "io/data/DataProcessor.h"

typedef struct SymOutgoingTransport {
FILE * (*open)(struct SymOutgoingTransport *this);
void (*close)(struct SymOutgoingTransport *this);
int (*isOpen)(struct SymOutgoingTransport *this);
long (*process)(struct SymOutgoingTransport *this, SymDataProcessor *processor);
void (*destroy)(struct SymOutgoingTransport *this);
} SymOutgoingTransport;

Expand Down
7 changes: 7 additions & 0 deletions symmetric-client-clib/inc/transport/TransportManager.h
Expand Up @@ -32,6 +32,13 @@
#include "transport/OutgoingTransport.h"
#include "util/List.h"

#define SYM_TRANSPORT_OK 200
#define SYM_TRANSPORT_REGISTRATION_NOT_OPEN 656
#define SYM_TRANSPORT_REGISTRATION_REQUIRED 657
#define SYM_TRANSPORT_SYNC_DISABLED 658
#define SYM_TRANSPORT_SC_SERVICE_UNAVAILABLE 503
#define SYM_TRANSPORT_SC_FORBIDDEN 403

typedef struct SymTransportManager {
SymParameterService *parameterService;
int (*sendAcknowledgement)(struct SymTransportManager *this, SymNode *remote, SymList *batches, SymNode *local, char *securityToken, char *registrationUrl);
Expand Down
Expand Up @@ -26,7 +26,9 @@
#include <curl/curl.h>
#include <util/Properties.h>
#include "model/Node.h"
#include "transport/http/HttpTransportManager.h"
#include "transport/IncomingTransport.h"
#include "common/Log.h"

typedef struct SymHttpIncomingTransport {
SymIncomingTransport super;
Expand Down
Expand Up @@ -23,13 +23,14 @@

#include <stdio.h>
#include <stdlib.h>
#include <curl/curl.h>
#include "transport/http/HttpTransportManager.h"
#include "transport/OutgoingTransport.h"
#include "util/List.h"

typedef struct SymHttpOutgoingTransport {
SymOutgoingTransport super;
char *url;
FILE *outputStream;
} SymHttpOutgoingTransport;

SymHttpOutgoingTransport * SymHttpOutgoingTransport_new(SymHttpOutgoingTransport *this, char *url);
Expand Down
Expand Up @@ -39,4 +39,6 @@ typedef struct SymHttpTransportManager {

SymHttpTransportManager * SymHttpTransportManager_new(SymHttpTransportManager *this, SymParameterService *parameterService);

char * SymHttpTransportManager_strerror(long rc);

#endif
2 changes: 1 addition & 1 deletion symmetric-client-clib/src/core/SymEngine.c
Expand Up @@ -140,7 +140,7 @@ SymEngine * SymEngine_new(SymEngine *this, SymProperties *properties) {
this->dataLoaderService = SymDataLoaderService_new(NULL, this->parameterService, this->nodeService, this->transportManager, this->platform,
this->dialect, this->incomingBatchService);
this->dataService = SymDataService_new(NULL, this->platform, this->triggerRouterService);
this->dataExtractorService = SymDataExtractorService_new(NULL, this->nodeService);
this->dataExtractorService = SymDataExtractorService_new(NULL, this->nodeService, this->outgoingBatchService, this->parameterService);
this->registrationService = SymRegistrationService_new(NULL, this->nodeService, this->dataLoaderService, this->parameterService,
this->configurationService);
this->pullService = SymPullService_new(NULL, this->nodeService, this->dataLoaderService, this->registrationService, this->configurationService);
Expand Down
26 changes: 23 additions & 3 deletions symmetric-client-clib/src/io/reader/ExtractDataReader.c
Expand Up @@ -20,6 +20,24 @@
*/
#include "io/reader/ExtractDataReader.h"

void SymExtractDataReader_open(SymExtractDataReader *this) {
}

SymBatch * SymExtractDataReader_nextBatch(SymExtractDataReader *this) {
return NULL;
}

SymTable * SymExtractDataReader_nextTable(SymExtractDataReader *this) {
return NULL;
}

SymCsvData * SymExtractDataReader_nextData(SymExtractDataReader *this) {
return NULL;
}

void SymExtractDataReader_close(SymExtractDataReader *this) {
}

void SymExtractDataReader_destroy(SymExtractDataReader *this) {
free(this);
}
Expand All @@ -29,9 +47,11 @@ SymExtractDataReader * SymExtractDataReader_new(SymExtractDataReader *this, SymO
this = (SymExtractDataReader *) calloc(1, sizeof(SymExtractDataReader));
}
SymDataReader *super = &this->super;
//super->open = (void *) &SymProtocolDataReader_open;
//super->close = (void *) &SymProtocolDataReader_close;
//super->process = (void *) &SymProtocolDataReader_process;
super->open = (void *) &SymExtractDataReader_open;
super->close = (void *) &SymExtractDataReader_close;
super->nextBatch = (void *) &SymExtractDataReader_nextBatch;
super->nextTable = (void *) &SymExtractDataReader_nextTable;
super->nextData = (void *) &SymExtractDataReader_nextData;
super->destroy = (void *) &SymExtractDataReader_destroy;
return this;
}
5 changes: 4 additions & 1 deletion symmetric-client-clib/src/io/reader/ProtocolDataReader.c
Expand Up @@ -30,6 +30,7 @@ static void SymProtocolDataReader_parseField(void *data, size_t size, void *user

static void SymProtocolDataReader_parseLine(int eol, void *userData) {
SymProtocolDataReader *this = (SymProtocolDataReader *) userData;
SymDataProcessor *super = &this->super;
SymBatch *batch = this->batch;
SymStringArray *fields = this->fields;

Expand Down Expand Up @@ -90,6 +91,7 @@ static void SymProtocolDataReader_parseLine(int eol, void *userData) {
SymStringBuilder_copyToField(&batch->channelId, fields->get(fields, 1));
} else if (strcmp(token, SYM_CSV_BATCH) == 0) {
batch->batchId = atol(fields->get(fields, 1));
super->batchesProcessed->add(super->batchesProcessed, this->batch);
this->writer->startBatch(this->writer, batch);
} else if (strcmp(token, SYM_CSV_COMMIT) == 0) {
this->writer->endBatch(this->writer, batch);
Expand Down Expand Up @@ -152,7 +154,8 @@ SymProtocolDataReader * SymProtocolDataReader_new(SymProtocolDataReader *this, c
this->batch->targetNodeId = SymStringBuilder_copy(targetNodeId);
this->parsedTables = SymMap_new(NULL, 100);

SymDataReader *super = &this->super;
SymDataProcessor *super = &this->super;
super->batchesProcessed = SymList_new(NULL);
super->open = (void *) &SymProtocolDataReader_open;
super->close = (void *) &SymProtocolDataReader_close;
super->process = (void *) &SymProtocolDataReader_process;
Expand Down
2 changes: 0 additions & 2 deletions symmetric-client-clib/src/io/writer/DefaultDatabaseWriter.c
Expand Up @@ -34,7 +34,6 @@ void SymDefaultDatabaseWriter_startBatch(SymDefaultDatabaseWriter *this, SymBatc
// TODO: if batchId < 0, remove outgoing configuration batches
this->incomingBatch = SymIncomingBatch_newWithBatch(NULL, batch);
SymDataWriter *super = (SymDataWriter *) &this->super;
super->batchesProcessed->add(super->batchesProcessed, this->incomingBatch);
if (!this->incomingBatchService->acquireIncomingBatch(this->incomingBatchService, this->incomingBatch)) {
this->isError = 1;
}
Expand Down Expand Up @@ -246,7 +245,6 @@ SymDefaultDatabaseWriter * SymDefaultDatabaseWriter_new(SymDefaultDatabaseWriter
this->platform = platform;
this->dialect = dialect;
this->targetTables = SymMap_new(NULL, 50);
super->batchesProcessed = SymList_new(NULL);
super->open = (void *) &SymDefaultDatabaseWriter_open;
super->startBatch = (void *) &SymDefaultDatabaseWriter_startBatch;
super->startTable = (void *) &SymDefaultDatabaseWriter_startTable;
Expand Down
26 changes: 5 additions & 21 deletions symmetric-client-clib/src/io/writer/ProtocolDataWriter.c
Expand Up @@ -19,25 +19,13 @@
* under the License.
*/
#include "io/writer/ProtocolDataWriter.h"
#include "common/Log.h"

void SymProtocolDataWriter_open(SymProtocolDataWriter *this) {
}

void SymProtocolDataWriter_startBatch(SymProtocolDataWriter *this, SymBatch *batch) {
}

void SymProtocolDataWriter_startTable(SymProtocolDataWriter *this, SymTable *table) {
}

unsigned short SymProtocolDataWriter_write(SymProtocolDataWriter *this, SymCsvData *data) {
return 0;
}

void SymProtocolDataWriter_endBatch(SymProtocolDataWriter *this, SymBatch *batch) {
}

void SymProtocolDataWriter_endTable(SymProtocolDataWriter *this, SymTable *table) {
size_t SymProtocolDataWriter_process(SymProtocolDataWriter *this, char *data, size_t size, size_t count) {
size_t length = size * count;
return length;
}

void SymProtocolDataWriter_close(SymProtocolDataWriter *this) {
Expand All @@ -53,14 +41,10 @@ SymProtocolDataWriter * SymProtocolDataWriter_new(SymProtocolDataWriter *this, c
}
this->sourceNodeId = sourceNodeId;
this->reader = reader;
SymDataWriter *super = &this->super;
SymDataProcessor *super = &this->super;
super->open = (void *) &SymProtocolDataWriter_open;
super->close = (void *) &SymProtocolDataWriter_close;
super->startBatch = (void *) &SymProtocolDataWriter_startBatch;
super->startTable = (void *) &SymProtocolDataWriter_startTable;
super->write = (void *) &SymProtocolDataWriter_write;
super->endTable = (void *) &SymProtocolDataWriter_endTable;
super->endBatch = (void *) &SymProtocolDataWriter_endBatch;
super->process = (void *) &SymProtocolDataWriter_process;
super->destroy = (void *) &SymProtocolDataWriter_destroy;
return this;
}

0 comments on commit 2318811

Please sign in to comment.