Skip to content

Commit

Permalink
batchesProcessed
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 15, 2015
1 parent 19e66bc commit defbfb5
Show file tree
Hide file tree
Showing 9 changed files with 17 additions and 6 deletions.
2 changes: 1 addition & 1 deletion symmetric-client-clib/inc/io/data/DataProcessor.h
Expand Up @@ -26,9 +26,9 @@
#include "util/List.h"

typedef struct SymDataProcessor {
SymList *batchesProcessed;
void (*open)(struct SymDataProcessor *this);
size_t (*process)(struct SymDataProcessor *this, char *data, size_t size, size_t count);
SymList * (*getBatchesProcessed)(struct SymDataProcessor *this);
void (*close)(struct SymDataProcessor *this);
void (*destroy)(struct SymDataProcessor *this);
} SymDataProcessor;
Expand Down
1 change: 1 addition & 0 deletions symmetric-client-clib/inc/io/reader/DataReader.h
Expand Up @@ -28,6 +28,7 @@
#include "io/data/CsvData.h"

typedef struct SymDataReader {
SymList *batchesProcessed;
void (*open)(struct SymDataReader *this);
SymBatch * (*nextBatch)(struct SymDataReader *this);
SymTable * (*nextTable)(struct SymDataReader *this);
Expand Down
1 change: 1 addition & 0 deletions symmetric-client-clib/inc/io/writer/DataWriter.h
Expand Up @@ -29,6 +29,7 @@
#include "util/List.h"

typedef struct SymDataWriter {
SymList *batchesProcessed;
void (*open)(struct SymDataWriter *this);
void (*close)(struct SymDataWriter *this);
void (*startBatch)(struct SymDataWriter *this, SymBatch *batch);
Expand Down
Expand Up @@ -30,6 +30,7 @@
#include "io/data/CsvData.h"
#include "util/StringArray.h"
#include "util/Map.h"
#include "util/List.h"
#include "db/sql/DmlStatement.h"
#include "db/sql/SqlTemplate.h"
#include "db/sql/SqlTransaction.h"
Expand Down
1 change: 1 addition & 0 deletions symmetric-client-clib/src/io/reader/ExtractDataReader.c
Expand Up @@ -47,6 +47,7 @@ SymExtractDataReader * SymExtractDataReader_new(SymExtractDataReader *this, SymO
this = (SymExtractDataReader *) calloc(1, sizeof(SymExtractDataReader));
}
SymDataReader *super = &this->super;
super->batchesProcessed = SymList_new(NULL);
super->open = (void *) &SymExtractDataReader_open;
super->close = (void *) &SymExtractDataReader_close;
super->nextBatch = (void *) &SymExtractDataReader_nextBatch;
Expand Down
8 changes: 5 additions & 3 deletions symmetric-client-clib/src/io/reader/ProtocolDataReader.c
Expand Up @@ -30,7 +30,6 @@ static void SymProtocolDataReader_parseField(void *data, size_t size, void *user

static void SymProtocolDataReader_parseLine(int eol, void *userData) {
SymProtocolDataReader *this = (SymProtocolDataReader *) userData;
SymDataProcessor *super = &this->super;
SymBatch *batch = this->batch;
SymStringArray *fields = this->fields;

Expand Down Expand Up @@ -91,7 +90,6 @@ static void SymProtocolDataReader_parseLine(int eol, void *userData) {
SymStringBuilder_copyToField(&batch->channelId, fields->get(fields, 1));
} else if (strcmp(token, SYM_CSV_BATCH) == 0) {
batch->batchId = atol(fields->get(fields, 1));
super->batchesProcessed->add(super->batchesProcessed, this->batch);
this->writer->startBatch(this->writer, batch);
} else if (strcmp(token, SYM_CSV_COMMIT) == 0) {
this->writer->endBatch(this->writer, batch);
Expand Down Expand Up @@ -128,6 +126,10 @@ size_t SymProtocolDataReader_process(SymProtocolDataReader *this, char *data, si
return length;
}

SymList * SymProtocolDataReader_getBatchesProcessed(SymProtocolDataReader *this) {
return this->writer->batchesProcessed;
}

void SymProtocolDataReader_close(SymProtocolDataReader *this) {
csv_fini(this->csvParser, SymProtocolDataReader_parseField, SymProtocolDataReader_parseLine, this);
this->writer->close(this->writer);
Expand Down Expand Up @@ -155,10 +157,10 @@ SymProtocolDataReader * SymProtocolDataReader_new(SymProtocolDataReader *this, c
this->parsedTables = SymMap_new(NULL, 100);

SymDataProcessor *super = &this->super;
super->batchesProcessed = SymList_new(NULL);
super->open = (void *) &SymProtocolDataReader_open;
super->close = (void *) &SymProtocolDataReader_close;
super->process = (void *) &SymProtocolDataReader_process;
super->getBatchesProcessed = (void *) &SymProtocolDataReader_getBatchesProcessed;
super->destroy = (void *) &SymProtocolDataReader_destroy;
return this;
}
5 changes: 5 additions & 0 deletions symmetric-client-clib/src/io/writer/ProtocolDataWriter.c
Expand Up @@ -28,6 +28,10 @@ size_t SymProtocolDataWriter_process(SymProtocolDataWriter *this, char *data, s
return length;
}

SymList * SymProtocolDataWriter_getBatchesProcessed(SymProtocolDataWriter *this) {
return this->reader->batchesProcessed;
}

void SymProtocolDataWriter_close(SymProtocolDataWriter *this) {
}

Expand All @@ -45,6 +49,7 @@ SymProtocolDataWriter * SymProtocolDataWriter_new(SymProtocolDataWriter *this, c
super->open = (void *) &SymProtocolDataWriter_open;
super->close = (void *) &SymProtocolDataWriter_close;
super->process = (void *) &SymProtocolDataWriter_process;
super->getBatchesProcessed = (void *) &SymProtocolDataWriter_getBatchesProcessed;
super->destroy = (void *) &SymProtocolDataWriter_destroy;
return this;
}
2 changes: 1 addition & 1 deletion symmetric-client-clib/src/service/DataExtractorService.c
Expand Up @@ -26,7 +26,7 @@ static SymList * SymDataExtractorService_extractOutgoingBatch(SymDataExtractorSe
SymDataProcessor *processor = (SymDataProcessor *) SymProtocolDataWriter_new(NULL, sourceNode->nodeId, reader);
long rc = transport->process(transport, processor);
SymLog_debug("Transport rc = %ld" , rc);
SymList *batchesProcessed = processor->batchesProcessed;
SymList *batchesProcessed = processor->getBatchesProcessed(processor);
reader->destroy(reader);
processor->destroy(processor);
return batchesProcessed;
Expand Down
2 changes: 1 addition & 1 deletion symmetric-client-clib/src/service/DataLoaderService.c
Expand Up @@ -50,7 +50,7 @@ static SymList * SymDataLoaderService_loadDataFromTransport(SymDataLoaderService
long rc = transport->process(transport, processor);
SymLog_debug("Transport rc = %ld" , rc);

SymList *batchesProcessed = processor->batchesProcessed;
SymList *batchesProcessed = processor->getBatchesProcessed(processor);
processor->destroy(processor);
writer->destroy(writer);
return batchesProcessed;
Expand Down

0 comments on commit defbfb5

Please sign in to comment.