From defbfb54fc57c42002b2531d2d128b37014e95fa Mon Sep 17 00:00:00 2001 From: elong Date: Thu, 15 Oct 2015 14:41:39 -0400 Subject: [PATCH] batchesProcessed --- symmetric-client-clib/inc/io/data/DataProcessor.h | 2 +- symmetric-client-clib/inc/io/reader/DataReader.h | 1 + symmetric-client-clib/inc/io/writer/DataWriter.h | 1 + .../inc/io/writer/DefaultDatabaseWriter.h | 1 + symmetric-client-clib/src/io/reader/ExtractDataReader.c | 1 + symmetric-client-clib/src/io/reader/ProtocolDataReader.c | 8 +++++--- symmetric-client-clib/src/io/writer/ProtocolDataWriter.c | 5 +++++ symmetric-client-clib/src/service/DataExtractorService.c | 2 +- symmetric-client-clib/src/service/DataLoaderService.c | 2 +- 9 files changed, 17 insertions(+), 6 deletions(-) diff --git a/symmetric-client-clib/inc/io/data/DataProcessor.h b/symmetric-client-clib/inc/io/data/DataProcessor.h index aadd1b7b96..7d3f9d1028 100644 --- a/symmetric-client-clib/inc/io/data/DataProcessor.h +++ b/symmetric-client-clib/inc/io/data/DataProcessor.h @@ -26,9 +26,9 @@ #include "util/List.h" typedef struct SymDataProcessor { - SymList *batchesProcessed; void (*open)(struct SymDataProcessor *this); size_t (*process)(struct SymDataProcessor *this, char *data, size_t size, size_t count); + SymList * (*getBatchesProcessed)(struct SymDataProcessor *this); void (*close)(struct SymDataProcessor *this); void (*destroy)(struct SymDataProcessor *this); } SymDataProcessor; diff --git a/symmetric-client-clib/inc/io/reader/DataReader.h b/symmetric-client-clib/inc/io/reader/DataReader.h index afbf2aa102..91f54ba3f4 100644 --- a/symmetric-client-clib/inc/io/reader/DataReader.h +++ b/symmetric-client-clib/inc/io/reader/DataReader.h @@ -28,6 +28,7 @@ #include "io/data/CsvData.h" typedef struct SymDataReader { + SymList *batchesProcessed; void (*open)(struct SymDataReader *this); SymBatch * (*nextBatch)(struct SymDataReader *this); SymTable * (*nextTable)(struct SymDataReader *this); diff --git a/symmetric-client-clib/inc/io/writer/DataWriter.h b/symmetric-client-clib/inc/io/writer/DataWriter.h index b95ef5577f..dd1718c8c3 100644 --- a/symmetric-client-clib/inc/io/writer/DataWriter.h +++ b/symmetric-client-clib/inc/io/writer/DataWriter.h @@ -29,6 +29,7 @@ #include "util/List.h" typedef struct SymDataWriter { + SymList *batchesProcessed; void (*open)(struct SymDataWriter *this); void (*close)(struct SymDataWriter *this); void (*startBatch)(struct SymDataWriter *this, SymBatch *batch); diff --git a/symmetric-client-clib/inc/io/writer/DefaultDatabaseWriter.h b/symmetric-client-clib/inc/io/writer/DefaultDatabaseWriter.h index 7fa5022cac..ce22123fdb 100644 --- a/symmetric-client-clib/inc/io/writer/DefaultDatabaseWriter.h +++ b/symmetric-client-clib/inc/io/writer/DefaultDatabaseWriter.h @@ -30,6 +30,7 @@ #include "io/data/CsvData.h" #include "util/StringArray.h" #include "util/Map.h" +#include "util/List.h" #include "db/sql/DmlStatement.h" #include "db/sql/SqlTemplate.h" #include "db/sql/SqlTransaction.h" diff --git a/symmetric-client-clib/src/io/reader/ExtractDataReader.c b/symmetric-client-clib/src/io/reader/ExtractDataReader.c index 064d5a1a83..b4c7cc29f2 100644 --- a/symmetric-client-clib/src/io/reader/ExtractDataReader.c +++ b/symmetric-client-clib/src/io/reader/ExtractDataReader.c @@ -47,6 +47,7 @@ SymExtractDataReader * SymExtractDataReader_new(SymExtractDataReader *this, SymO this = (SymExtractDataReader *) calloc(1, sizeof(SymExtractDataReader)); } SymDataReader *super = &this->super; + super->batchesProcessed = SymList_new(NULL); super->open = (void *) &SymExtractDataReader_open; super->close = (void *) &SymExtractDataReader_close; super->nextBatch = (void *) &SymExtractDataReader_nextBatch; diff --git a/symmetric-client-clib/src/io/reader/ProtocolDataReader.c b/symmetric-client-clib/src/io/reader/ProtocolDataReader.c index 291b631e37..140e43520a 100644 --- a/symmetric-client-clib/src/io/reader/ProtocolDataReader.c +++ b/symmetric-client-clib/src/io/reader/ProtocolDataReader.c @@ -30,7 +30,6 @@ static void SymProtocolDataReader_parseField(void *data, size_t size, void *user static void SymProtocolDataReader_parseLine(int eol, void *userData) { SymProtocolDataReader *this = (SymProtocolDataReader *) userData; - SymDataProcessor *super = &this->super; SymBatch *batch = this->batch; SymStringArray *fields = this->fields; @@ -91,7 +90,6 @@ static void SymProtocolDataReader_parseLine(int eol, void *userData) { SymStringBuilder_copyToField(&batch->channelId, fields->get(fields, 1)); } else if (strcmp(token, SYM_CSV_BATCH) == 0) { batch->batchId = atol(fields->get(fields, 1)); - super->batchesProcessed->add(super->batchesProcessed, this->batch); this->writer->startBatch(this->writer, batch); } else if (strcmp(token, SYM_CSV_COMMIT) == 0) { this->writer->endBatch(this->writer, batch); @@ -128,6 +126,10 @@ size_t SymProtocolDataReader_process(SymProtocolDataReader *this, char *data, si return length; } +SymList * SymProtocolDataReader_getBatchesProcessed(SymProtocolDataReader *this) { + return this->writer->batchesProcessed; +} + void SymProtocolDataReader_close(SymProtocolDataReader *this) { csv_fini(this->csvParser, SymProtocolDataReader_parseField, SymProtocolDataReader_parseLine, this); this->writer->close(this->writer); @@ -155,10 +157,10 @@ SymProtocolDataReader * SymProtocolDataReader_new(SymProtocolDataReader *this, c this->parsedTables = SymMap_new(NULL, 100); SymDataProcessor *super = &this->super; - super->batchesProcessed = SymList_new(NULL); super->open = (void *) &SymProtocolDataReader_open; super->close = (void *) &SymProtocolDataReader_close; super->process = (void *) &SymProtocolDataReader_process; + super->getBatchesProcessed = (void *) &SymProtocolDataReader_getBatchesProcessed; super->destroy = (void *) &SymProtocolDataReader_destroy; return this; } diff --git a/symmetric-client-clib/src/io/writer/ProtocolDataWriter.c b/symmetric-client-clib/src/io/writer/ProtocolDataWriter.c index b3f14d6dfb..7fb31992a6 100644 --- a/symmetric-client-clib/src/io/writer/ProtocolDataWriter.c +++ b/symmetric-client-clib/src/io/writer/ProtocolDataWriter.c @@ -28,6 +28,10 @@ size_t SymProtocolDataWriter_process(SymProtocolDataWriter *this, char *data, s return length; } +SymList * SymProtocolDataWriter_getBatchesProcessed(SymProtocolDataWriter *this) { + return this->reader->batchesProcessed; +} + void SymProtocolDataWriter_close(SymProtocolDataWriter *this) { } @@ -45,6 +49,7 @@ SymProtocolDataWriter * SymProtocolDataWriter_new(SymProtocolDataWriter *this, c super->open = (void *) &SymProtocolDataWriter_open; super->close = (void *) &SymProtocolDataWriter_close; super->process = (void *) &SymProtocolDataWriter_process; + super->getBatchesProcessed = (void *) &SymProtocolDataWriter_getBatchesProcessed; super->destroy = (void *) &SymProtocolDataWriter_destroy; return this; } diff --git a/symmetric-client-clib/src/service/DataExtractorService.c b/symmetric-client-clib/src/service/DataExtractorService.c index 0dce810ac4..767117f510 100644 --- a/symmetric-client-clib/src/service/DataExtractorService.c +++ b/symmetric-client-clib/src/service/DataExtractorService.c @@ -26,7 +26,7 @@ static SymList * SymDataExtractorService_extractOutgoingBatch(SymDataExtractorSe SymDataProcessor *processor = (SymDataProcessor *) SymProtocolDataWriter_new(NULL, sourceNode->nodeId, reader); long rc = transport->process(transport, processor); SymLog_debug("Transport rc = %ld" , rc); - SymList *batchesProcessed = processor->batchesProcessed; + SymList *batchesProcessed = processor->getBatchesProcessed(processor); reader->destroy(reader); processor->destroy(processor); return batchesProcessed; diff --git a/symmetric-client-clib/src/service/DataLoaderService.c b/symmetric-client-clib/src/service/DataLoaderService.c index 6baf737cf0..4cd67fbdb5 100644 --- a/symmetric-client-clib/src/service/DataLoaderService.c +++ b/symmetric-client-clib/src/service/DataLoaderService.c @@ -50,7 +50,7 @@ static SymList * SymDataLoaderService_loadDataFromTransport(SymDataLoaderService long rc = transport->process(transport, processor); SymLog_debug("Transport rc = %ld" , rc); - SymList *batchesProcessed = processor->batchesProcessed; + SymList *batchesProcessed = processor->getBatchesProcessed(processor); processor->destroy(processor); writer->destroy(writer); return batchesProcessed;