Skip to content

Commit

Permalink
changes for extract service
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 14, 2015
1 parent 4f9230a commit 258e764
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 32 deletions.
2 changes: 2 additions & 0 deletions symmetric-client-clib/inc/common/Constants.h
Expand Up @@ -23,4 +23,6 @@

#define SYM_DEPLOYMENT_TYPE "cclient"

#define SYM_LONG_OPERATION_THRESHOLD 30000

#endif
4 changes: 3 additions & 1 deletion symmetric-client-clib/inc/io/writer/ProtocolDataWriter.h
Expand Up @@ -27,13 +27,15 @@
#include "db/model/Table.h"
#include "io/data/CsvData.h"
#include "io/writer/DataWriter.h"
#include "io/reader/DataReader.h"
#include "util/List.h"

typedef struct SymProtocolDataWriter {
SymDataWriter super;
char *sourceNodeId;
SymDataReader *reader;
} SymProtocolDataWriter;

SymProtocolDataWriter * SymProtocolDataWriter_new(SymProtocolDataWriter *this, char *sourceNodeId);
SymProtocolDataWriter * SymProtocolDataWriter_new(SymProtocolDataWriter *this, char *sourceNodeId, SymDataReader *reader);

#endif
2 changes: 1 addition & 1 deletion symmetric-client-clib/inc/service/DataService.h
Expand Up @@ -35,7 +35,7 @@
typedef struct SymDataService {
SymDatabasePlatform *platform;
SymTriggerRouterService *triggerRouterService;
SymData * (*selectDataFor)(struct SymDataService *this, SymBatch *batch);
SymList * (*selectDataFor)(struct SymDataService *this, SymBatch *batch);
void (*destroy)(struct SymDataService *this);
} SymDataService;

Expand Down
14 changes: 12 additions & 2 deletions symmetric-client-clib/inc/service/OutgoingBatchService.h
Expand Up @@ -23,19 +23,27 @@

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include "model/OutgoingBatch.h"
#include "model/OutgoingBatches.h"
#include "db/platform/DatabasePlatform.h"
#include "service/ParameterService.h"
#include "util/List.h"
#include "util/StringBuilder.h"
#include "util/StringArray.h"
#include "util/AppUtils.h"
#include "common/Constants.h"

typedef struct SymOutgoingBatchService {
SymDatabasePlatform *platform;
SymParameterService *parameterService;
SymOutgoingBatch * (*findOutgoingBatch)(struct SymOutgoingBatchService *this, long batchId, char *nodeId);
SymList * (*getOutgoingBatches)(struct SymOutgoingBatchService *this, char *nodeId);
SymOutgoingBatches * (*getOutgoingBatches)(struct SymOutgoingBatchService *this, char *nodeId);
void (*updateOutgoingBatch)(struct SymOutgoingBatchService *this, SymOutgoingBatch *outgoingBatch);
void (*destroy)(struct SymOutgoingBatchService *this);
} SymOutgoingBatchService;

SymOutgoingBatchService * SymOutgoingBatchService_new(SymOutgoingBatchService *this, SymDatabasePlatform *platform);
SymOutgoingBatchService * SymOutgoingBatchService_new(SymOutgoingBatchService *this, SymDatabasePlatform *platform, SymParameterService *parameterService);

#define SYM_SQL_INSERT_OUTGOING_BATCH \
"insert into sym_outgoing_batch \
Expand All @@ -61,4 +69,6 @@ from sym_outgoing_batch b"

#define SYM_SQL_FIND_OUTGOING_BATCH "where batch_id = ? and node_id = ?"

#define SYM_SQL_SELECT_OUTGOING_BATCH "where node_id = ? and status in (?, ?, ?, ?, ?, ?, ?) order by batch_id asc"

#endif
2 changes: 2 additions & 0 deletions symmetric-client-clib/inc/service/ParameterService.h
Expand Up @@ -51,6 +51,8 @@
#define SYM_PARAMETER_INCOMING_BATCH_RECORD_OK_ENABLED "incoming.batches.record.ok.enabled"
#define SYM_PARAMETER_INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED "incoming.batches.skip.duplicates"

#define SYM_OUTGOING_BATCH_MAX_BATCHES_TO_SELECT "outgoing.batches.max.to.select"

typedef struct SymParameterService {
SymProperties *parameters;
SymProperties *properties;
Expand Down
2 changes: 1 addition & 1 deletion symmetric-client-clib/src/core/SymEngine.c
Expand Up @@ -136,7 +136,7 @@ SymEngine * SymEngine_new(SymEngine *this, SymProperties *properties) {
this->transportManager = SymTransportManagerFactory_create(SYM_PROTOCOL_HTTP, this->parameterService);
this->nodeService = SymNodeService_new(NULL, this->platform);
this->incomingBatchService = SymIncomingBatchService_new(NULL, this->platform, this->parameterService);
this->outgoingBatchService = SymOutgoingBatchService_new(NULL, this->platform);
this->outgoingBatchService = SymOutgoingBatchService_new(NULL, this->platform, this->parameterService);
this->dataLoaderService = SymDataLoaderService_new(NULL, this->parameterService, this->nodeService, this->transportManager, this->platform,
this->dialect, this->incomingBatchService);
this->dataService = SymDataService_new(NULL, this->platform, this->triggerRouterService);
Expand Down
3 changes: 2 additions & 1 deletion symmetric-client-clib/src/io/writer/ProtocolDataWriter.c
Expand Up @@ -47,11 +47,12 @@ void SymProtocolDataWriter_destroy(SymProtocolDataWriter *this) {
free(this);
}

SymProtocolDataWriter * SymProtocolDataWriter_new(SymProtocolDataWriter *this, char *sourceNodeId) {
SymProtocolDataWriter * SymProtocolDataWriter_new(SymProtocolDataWriter *this, char *sourceNodeId, SymDataReader *reader) {
if (this == NULL) {
this = (SymProtocolDataWriter *) calloc(1, sizeof(SymProtocolDataWriter));
}
this->sourceNodeId = sourceNodeId;
this->reader = reader;
SymDataWriter *super = &this->super;
super->open = (void *) &SymProtocolDataWriter_open;
super->close = (void *) &SymProtocolDataWriter_close;
Expand Down
11 changes: 5 additions & 6 deletions symmetric-client-clib/src/service/DataExtractorService.c
Expand Up @@ -22,13 +22,12 @@
#include "common/Log.h"

SymList * SymDataExtractorService_extract(SymDataExtractorService *this, SymNode *node, SymOutgoingTransport *transport) {
/*
SymNode *nodeIdentity = this->nodeService->findIdentity(this->nodeService);
SymOutgoingBatch *outgoingBatch;
SymProtocolDataWriter *writer = SymProtocolDataWriter_new(NULL, node);
SymExtractDataReader *reader = SymExtractDataReader_new(NULL, outgoingBatch, nodeIdentity->nodeId, node->nodeId);
transport->process(transport, writer);
*/


//SymExtractDataReader *reader = SymExtractDataReader_new(NULL, outgoingBatch, nodeIdentity->nodeId, node->nodeId);
//SymProtocolDataWriter *writer = SymProtocolDataWriter_new(NULL, node, reader);
//transport->process(transport, writer);
return NULL;
}

Expand Down
39 changes: 23 additions & 16 deletions symmetric-client-clib/src/service/DataService.c
Expand Up @@ -22,20 +22,20 @@

SymData * SymDataService_dataMapper(SymRow *row) {
SymData *data = SymData_new(NULL);
data->dataId = row->getLong(row, "DATA_ID");
data->rowData = row->getString(row, "ROW_DATA");
data->oldData = row->getString(row, "OLD_DATA");
data->pkData = row->getString(row, "PK_DATA");
data->channelId = row->getString(row, "CHANNEL_ID");
data->transactionId = row->getString(row, "TRANSACTION_ID");
data->tableName = row->getString(row, "TABLE_NAME");
data->eventType = row->getString(row, "EVENT_TYPE");
data->sourceNodeId = row->getString(row, "SOURCE_NODE_ID");
data->externalData = row->getString(row, "EXTERNAL_DATA");
data->nodeList = row->getString(row, "NODE_LIST");
data->createTime = row->getDate(row, "CREATE_TIME");
data->routerId = row->getString(row, "ROUTER_ID");
data->triggerHistId = row->getInt(row, "TRIGGER_HIST_ID");
data->dataId = row->getLong(row, "data_id");
data->rowData = row->getString(row, "row_data");
data->oldData = row->getString(row, "old_data");
data->pkData = row->getString(row, "pk_data");
data->channelId = row->getString(row, "channel_id");
data->transactionId = row->getString(row, "transaction_id");
data->tableName = row->getString(row, "table_name");
data->eventType = row->getString(row, "event_type");
data->sourceNodeId = row->getString(row, "source_node_id");
data->externalData = row->getString(row, "external_data");
data->nodeList = row->getString(row, "node_list");
data->createTime = row->getDate(row, "create_time");
data->routerId = row->getString(row, "router_id");
data->triggerHistId = row->getInt(row, "trigger_hist_id");

// TODO: add triggerHistory
/*
Expand All @@ -54,15 +54,22 @@ SymData * SymDataService_dataMapper(SymRow *row) {
*/
return data;
}
SymData * SymDataService_selectDataFor(SymDataService *this, SymBatch *batch) {
SymList * SymDataService_selectDataFor(SymDataService *this, SymBatch *batch) {
SymStringBuilder *sb = SymStringBuilder_new(SYM_SQL_SELECT_EVENT_DATA_TO_EXTRACT);
sb->append(sb, " order by d.data_id asc");

SymStringArray *args = SymStringArray_new(NULL);
args->addLong(args, batch->batchId)->add(args, batch->targetNodeId);

int error;
SymSqlTemplate *sqlTemplate = this->platform->getSqlTemplate(this->platform);
SymList *list = sqlTemplate->query(sqlTemplate, sb->str, args, NULL, &error, (void *) SymDataService_dataMapper);

args->destroy(args);
sb->destroy(sb);
return NULL;

// TODO: return SymSqlReadCursor instead
return list;
}

void SymDataService_destroy(SymDataService *this) {
Expand Down
38 changes: 34 additions & 4 deletions symmetric-client-clib/src/service/OutgoingBatchService.c
Expand Up @@ -93,9 +93,38 @@ SymOutgoingBatch * SymOutgoingBatchService_findOutgoingBatch(SymOutgoingBatchSer
return batch;
}

SymOutgoingBatch* SymOutgoingBatchService_getOutgoingBatches(SymOutgoingBatchService *this, char *nodeId) {
SymLog_info("SymOutgoingBatchService_get_outgoing_batches");
return NULL;
SymOutgoingBatches * SymOutgoingBatchService_getOutgoingBatches(SymOutgoingBatchService *this, char *nodeId) {
time_t ts = time(NULL);

SymStringBuilder *sb = SymStringBuilder_newWithString(SYM_SQL_SELECT_OUTGOING_BATCH_PREFIX);
sb->append(sb, SYM_SQL_SELECT_OUTGOING_BATCH);

SymStringArray *args = SymStringArray_new(NULL);
args->add(args, nodeId)->add(args, SYM_OUGOING_BATCH_REQUEST)->add(args, SYM_OUGOING_BATCH_NEW);
args->add(args, SYM_OUGOING_BATCH_QUERYING)->add(args, SYM_OUGOING_BATCH_SENDING)->add(args, SYM_OUGOING_BATCH_LOADING);
args->add(args, SYM_OUGOING_BATCH_ERROR)->add(args, SYM_OUGOING_BATCH_IGNORED);

// TODO: sqlTemplate->queryWithLimit with limit on results
//int maxNumberOfBatchesToSelect = this->parameterService->getInt(this->parameterService, SYM_OUTGOING_BATCH_MAX_BATCHES_TO_SELECT, 1000);
int error;
SymSqlTemplate *sqlTemplate = this->platform->getSqlTemplate(this->platform);
SymList *list = sqlTemplate->query(sqlTemplate, sb->str, args, NULL, &error, (void *) SymOutgoingBatchService_outgoingBatchMapper);

SymOutgoingBatches *batches = SymOutgoingBatches_newWithList(NULL, list);

// TODO: sort channels by processing order and errors, and filter batches for only active channels

// TODO: filter batches with channel window

time_t executeTimeInMs = (time(NULL) - ts) * 1000;
if (executeTimeInMs > SYM_LONG_OPERATION_THRESHOLD) {
SymLog_info("Selecting %d outgoing batch rows for node %s took %ld ms", list->size, nodeId, executeTimeInMs);
}

sb->destroy(sb);
args->destroy(args);
list->destroy(list);
return batches;
}

void SymOutgoingBatchService_updateOutgoingBatch(SymOutgoingBatchService *this, SymOutgoingBatch *batch) {
Expand Down Expand Up @@ -125,11 +154,12 @@ void SymOutgoingBatchService_destroy(SymOutgoingBatchService *this) {
free(this);
}

SymOutgoingBatchService * SymOutgoingBatchService_new(SymOutgoingBatchService *this, SymDatabasePlatform *platform) {
SymOutgoingBatchService * SymOutgoingBatchService_new(SymOutgoingBatchService *this, SymDatabasePlatform *platform, SymParameterService *parameterService) {
if (this == NULL) {
this = (SymOutgoingBatchService *) calloc(1, sizeof(SymOutgoingBatchService));
}
this->platform = platform;
this->parameterService = parameterService;
this->findOutgoingBatch = (void *) &SymOutgoingBatchService_findOutgoingBatch;
this->getOutgoingBatches = (void *) &SymOutgoingBatchService_getOutgoingBatches;
this->updateOutgoingBatch = (void *) &SymOutgoingBatchService_updateOutgoingBatch;
Expand Down

0 comments on commit 258e764

Please sign in to comment.