diff --git a/symmetric-client-clib/inc/common/Constants.h b/symmetric-client-clib/inc/common/Constants.h index 91bccc9ae1..5216ba919c 100644 --- a/symmetric-client-clib/inc/common/Constants.h +++ b/symmetric-client-clib/inc/common/Constants.h @@ -23,4 +23,6 @@ #define SYM_DEPLOYMENT_TYPE "cclient" +#define SYM_LONG_OPERATION_THRESHOLD 30000 + #endif diff --git a/symmetric-client-clib/inc/io/writer/ProtocolDataWriter.h b/symmetric-client-clib/inc/io/writer/ProtocolDataWriter.h index d6ad7bf5e6..17d6a63f14 100644 --- a/symmetric-client-clib/inc/io/writer/ProtocolDataWriter.h +++ b/symmetric-client-clib/inc/io/writer/ProtocolDataWriter.h @@ -27,13 +27,15 @@ #include "db/model/Table.h" #include "io/data/CsvData.h" #include "io/writer/DataWriter.h" +#include "io/reader/DataReader.h" #include "util/List.h" typedef struct SymProtocolDataWriter { SymDataWriter super; char *sourceNodeId; + SymDataReader *reader; } SymProtocolDataWriter; -SymProtocolDataWriter * SymProtocolDataWriter_new(SymProtocolDataWriter *this, char *sourceNodeId); +SymProtocolDataWriter * SymProtocolDataWriter_new(SymProtocolDataWriter *this, char *sourceNodeId, SymDataReader *reader); #endif diff --git a/symmetric-client-clib/inc/service/DataService.h b/symmetric-client-clib/inc/service/DataService.h index 019e87da9b..67b72136c2 100644 --- a/symmetric-client-clib/inc/service/DataService.h +++ b/symmetric-client-clib/inc/service/DataService.h @@ -35,7 +35,7 @@ typedef struct SymDataService { SymDatabasePlatform *platform; SymTriggerRouterService *triggerRouterService; - SymData * (*selectDataFor)(struct SymDataService *this, SymBatch *batch); + SymList * (*selectDataFor)(struct SymDataService *this, SymBatch *batch); void (*destroy)(struct SymDataService *this); } SymDataService; diff --git a/symmetric-client-clib/inc/service/OutgoingBatchService.h b/symmetric-client-clib/inc/service/OutgoingBatchService.h index ccc7e4f361..a518e48f02 100644 --- a/symmetric-client-clib/inc/service/OutgoingBatchService.h +++ b/symmetric-client-clib/inc/service/OutgoingBatchService.h @@ -23,19 +23,27 @@ #include #include +#include #include "model/OutgoingBatch.h" +#include "model/OutgoingBatches.h" #include "db/platform/DatabasePlatform.h" +#include "service/ParameterService.h" #include "util/List.h" +#include "util/StringBuilder.h" +#include "util/StringArray.h" +#include "util/AppUtils.h" +#include "common/Constants.h" typedef struct SymOutgoingBatchService { SymDatabasePlatform *platform; + SymParameterService *parameterService; SymOutgoingBatch * (*findOutgoingBatch)(struct SymOutgoingBatchService *this, long batchId, char *nodeId); - SymList * (*getOutgoingBatches)(struct SymOutgoingBatchService *this, char *nodeId); + SymOutgoingBatches * (*getOutgoingBatches)(struct SymOutgoingBatchService *this, char *nodeId); void (*updateOutgoingBatch)(struct SymOutgoingBatchService *this, SymOutgoingBatch *outgoingBatch); void (*destroy)(struct SymOutgoingBatchService *this); } SymOutgoingBatchService; -SymOutgoingBatchService * SymOutgoingBatchService_new(SymOutgoingBatchService *this, SymDatabasePlatform *platform); +SymOutgoingBatchService * SymOutgoingBatchService_new(SymOutgoingBatchService *this, SymDatabasePlatform *platform, SymParameterService *parameterService); #define SYM_SQL_INSERT_OUTGOING_BATCH \ "insert into sym_outgoing_batch \ @@ -61,4 +69,6 @@ from sym_outgoing_batch b" #define SYM_SQL_FIND_OUTGOING_BATCH "where batch_id = ? and node_id = ?" +#define SYM_SQL_SELECT_OUTGOING_BATCH "where node_id = ? and status in (?, ?, ?, ?, ?, ?, ?) order by batch_id asc" + #endif diff --git a/symmetric-client-clib/inc/service/ParameterService.h b/symmetric-client-clib/inc/service/ParameterService.h index aec50b41f0..85fb662ee3 100644 --- a/symmetric-client-clib/inc/service/ParameterService.h +++ b/symmetric-client-clib/inc/service/ParameterService.h @@ -51,6 +51,8 @@ #define SYM_PARAMETER_INCOMING_BATCH_RECORD_OK_ENABLED "incoming.batches.record.ok.enabled" #define SYM_PARAMETER_INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED "incoming.batches.skip.duplicates" +#define SYM_OUTGOING_BATCH_MAX_BATCHES_TO_SELECT "outgoing.batches.max.to.select" + typedef struct SymParameterService { SymProperties *parameters; SymProperties *properties; diff --git a/symmetric-client-clib/src/core/SymEngine.c b/symmetric-client-clib/src/core/SymEngine.c index 7a20935ac9..ee4b8d27bb 100644 --- a/symmetric-client-clib/src/core/SymEngine.c +++ b/symmetric-client-clib/src/core/SymEngine.c @@ -136,7 +136,7 @@ SymEngine * SymEngine_new(SymEngine *this, SymProperties *properties) { this->transportManager = SymTransportManagerFactory_create(SYM_PROTOCOL_HTTP, this->parameterService); this->nodeService = SymNodeService_new(NULL, this->platform); this->incomingBatchService = SymIncomingBatchService_new(NULL, this->platform, this->parameterService); - this->outgoingBatchService = SymOutgoingBatchService_new(NULL, this->platform); + this->outgoingBatchService = SymOutgoingBatchService_new(NULL, this->platform, this->parameterService); this->dataLoaderService = SymDataLoaderService_new(NULL, this->parameterService, this->nodeService, this->transportManager, this->platform, this->dialect, this->incomingBatchService); this->dataService = SymDataService_new(NULL, this->platform, this->triggerRouterService); diff --git a/symmetric-client-clib/src/io/writer/ProtocolDataWriter.c b/symmetric-client-clib/src/io/writer/ProtocolDataWriter.c index 0eac9faef1..228f1f7638 100644 --- a/symmetric-client-clib/src/io/writer/ProtocolDataWriter.c +++ b/symmetric-client-clib/src/io/writer/ProtocolDataWriter.c @@ -47,11 +47,12 @@ void SymProtocolDataWriter_destroy(SymProtocolDataWriter *this) { free(this); } -SymProtocolDataWriter * SymProtocolDataWriter_new(SymProtocolDataWriter *this, char *sourceNodeId) { +SymProtocolDataWriter * SymProtocolDataWriter_new(SymProtocolDataWriter *this, char *sourceNodeId, SymDataReader *reader) { if (this == NULL) { this = (SymProtocolDataWriter *) calloc(1, sizeof(SymProtocolDataWriter)); } this->sourceNodeId = sourceNodeId; + this->reader = reader; SymDataWriter *super = &this->super; super->open = (void *) &SymProtocolDataWriter_open; super->close = (void *) &SymProtocolDataWriter_close; diff --git a/symmetric-client-clib/src/service/DataExtractorService.c b/symmetric-client-clib/src/service/DataExtractorService.c index 1a36ce01ce..0575dd9f00 100644 --- a/symmetric-client-clib/src/service/DataExtractorService.c +++ b/symmetric-client-clib/src/service/DataExtractorService.c @@ -22,13 +22,12 @@ #include "common/Log.h" SymList * SymDataExtractorService_extract(SymDataExtractorService *this, SymNode *node, SymOutgoingTransport *transport) { - /* SymNode *nodeIdentity = this->nodeService->findIdentity(this->nodeService); - SymOutgoingBatch *outgoingBatch; - SymProtocolDataWriter *writer = SymProtocolDataWriter_new(NULL, node); - SymExtractDataReader *reader = SymExtractDataReader_new(NULL, outgoingBatch, nodeIdentity->nodeId, node->nodeId); - transport->process(transport, writer); -*/ + + + //SymExtractDataReader *reader = SymExtractDataReader_new(NULL, outgoingBatch, nodeIdentity->nodeId, node->nodeId); + //SymProtocolDataWriter *writer = SymProtocolDataWriter_new(NULL, node, reader); + //transport->process(transport, writer); return NULL; } diff --git a/symmetric-client-clib/src/service/DataService.c b/symmetric-client-clib/src/service/DataService.c index fef30dbd06..bd282daee1 100644 --- a/symmetric-client-clib/src/service/DataService.c +++ b/symmetric-client-clib/src/service/DataService.c @@ -22,20 +22,20 @@ SymData * SymDataService_dataMapper(SymRow *row) { SymData *data = SymData_new(NULL); - data->dataId = row->getLong(row, "DATA_ID"); - data->rowData = row->getString(row, "ROW_DATA"); - data->oldData = row->getString(row, "OLD_DATA"); - data->pkData = row->getString(row, "PK_DATA"); - data->channelId = row->getString(row, "CHANNEL_ID"); - data->transactionId = row->getString(row, "TRANSACTION_ID"); - data->tableName = row->getString(row, "TABLE_NAME"); - data->eventType = row->getString(row, "EVENT_TYPE"); - data->sourceNodeId = row->getString(row, "SOURCE_NODE_ID"); - data->externalData = row->getString(row, "EXTERNAL_DATA"); - data->nodeList = row->getString(row, "NODE_LIST"); - data->createTime = row->getDate(row, "CREATE_TIME"); - data->routerId = row->getString(row, "ROUTER_ID"); - data->triggerHistId = row->getInt(row, "TRIGGER_HIST_ID"); + data->dataId = row->getLong(row, "data_id"); + data->rowData = row->getString(row, "row_data"); + data->oldData = row->getString(row, "old_data"); + data->pkData = row->getString(row, "pk_data"); + data->channelId = row->getString(row, "channel_id"); + data->transactionId = row->getString(row, "transaction_id"); + data->tableName = row->getString(row, "table_name"); + data->eventType = row->getString(row, "event_type"); + data->sourceNodeId = row->getString(row, "source_node_id"); + data->externalData = row->getString(row, "external_data"); + data->nodeList = row->getString(row, "node_list"); + data->createTime = row->getDate(row, "create_time"); + data->routerId = row->getString(row, "router_id"); + data->triggerHistId = row->getInt(row, "trigger_hist_id"); // TODO: add triggerHistory /* @@ -54,15 +54,22 @@ SymData * SymDataService_dataMapper(SymRow *row) { */ return data; } -SymData * SymDataService_selectDataFor(SymDataService *this, SymBatch *batch) { +SymList * SymDataService_selectDataFor(SymDataService *this, SymBatch *batch) { SymStringBuilder *sb = SymStringBuilder_new(SYM_SQL_SELECT_EVENT_DATA_TO_EXTRACT); sb->append(sb, " order by d.data_id asc"); + SymStringArray *args = SymStringArray_new(NULL); args->addLong(args, batch->batchId)->add(args, batch->targetNodeId); + int error; + SymSqlTemplate *sqlTemplate = this->platform->getSqlTemplate(this->platform); + SymList *list = sqlTemplate->query(sqlTemplate, sb->str, args, NULL, &error, (void *) SymDataService_dataMapper); + args->destroy(args); sb->destroy(sb); - return NULL; + + // TODO: return SymSqlReadCursor instead + return list; } void SymDataService_destroy(SymDataService *this) { diff --git a/symmetric-client-clib/src/service/OutgoingBatchService.c b/symmetric-client-clib/src/service/OutgoingBatchService.c index b5780a0f74..444c1b2a8e 100644 --- a/symmetric-client-clib/src/service/OutgoingBatchService.c +++ b/symmetric-client-clib/src/service/OutgoingBatchService.c @@ -93,9 +93,38 @@ SymOutgoingBatch * SymOutgoingBatchService_findOutgoingBatch(SymOutgoingBatchSer return batch; } -SymOutgoingBatch* SymOutgoingBatchService_getOutgoingBatches(SymOutgoingBatchService *this, char *nodeId) { - SymLog_info("SymOutgoingBatchService_get_outgoing_batches"); - return NULL; +SymOutgoingBatches * SymOutgoingBatchService_getOutgoingBatches(SymOutgoingBatchService *this, char *nodeId) { + time_t ts = time(NULL); + + SymStringBuilder *sb = SymStringBuilder_newWithString(SYM_SQL_SELECT_OUTGOING_BATCH_PREFIX); + sb->append(sb, SYM_SQL_SELECT_OUTGOING_BATCH); + + SymStringArray *args = SymStringArray_new(NULL); + args->add(args, nodeId)->add(args, SYM_OUGOING_BATCH_REQUEST)->add(args, SYM_OUGOING_BATCH_NEW); + args->add(args, SYM_OUGOING_BATCH_QUERYING)->add(args, SYM_OUGOING_BATCH_SENDING)->add(args, SYM_OUGOING_BATCH_LOADING); + args->add(args, SYM_OUGOING_BATCH_ERROR)->add(args, SYM_OUGOING_BATCH_IGNORED); + + // TODO: sqlTemplate->queryWithLimit with limit on results + //int maxNumberOfBatchesToSelect = this->parameterService->getInt(this->parameterService, SYM_OUTGOING_BATCH_MAX_BATCHES_TO_SELECT, 1000); + int error; + SymSqlTemplate *sqlTemplate = this->platform->getSqlTemplate(this->platform); + SymList *list = sqlTemplate->query(sqlTemplate, sb->str, args, NULL, &error, (void *) SymOutgoingBatchService_outgoingBatchMapper); + + SymOutgoingBatches *batches = SymOutgoingBatches_newWithList(NULL, list); + + // TODO: sort channels by processing order and errors, and filter batches for only active channels + + // TODO: filter batches with channel window + + time_t executeTimeInMs = (time(NULL) - ts) * 1000; + if (executeTimeInMs > SYM_LONG_OPERATION_THRESHOLD) { + SymLog_info("Selecting %d outgoing batch rows for node %s took %ld ms", list->size, nodeId, executeTimeInMs); + } + + sb->destroy(sb); + args->destroy(args); + list->destroy(list); + return batches; } void SymOutgoingBatchService_updateOutgoingBatch(SymOutgoingBatchService *this, SymOutgoingBatch *batch) { @@ -125,11 +154,12 @@ void SymOutgoingBatchService_destroy(SymOutgoingBatchService *this) { free(this); } -SymOutgoingBatchService * SymOutgoingBatchService_new(SymOutgoingBatchService *this, SymDatabasePlatform *platform) { +SymOutgoingBatchService * SymOutgoingBatchService_new(SymOutgoingBatchService *this, SymDatabasePlatform *platform, SymParameterService *parameterService) { if (this == NULL) { this = (SymOutgoingBatchService *) calloc(1, sizeof(SymOutgoingBatchService)); } this->platform = platform; + this->parameterService = parameterService; this->findOutgoingBatch = (void *) &SymOutgoingBatchService_findOutgoingBatch; this->getOutgoingBatches = (void *) &SymOutgoingBatchService_getOutgoingBatches; this->updateOutgoingBatch = (void *) &SymOutgoingBatchService_updateOutgoingBatch;