Skip to content

Commit

Permalink
fallback insert and update
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 10, 2015
1 parent 36e7075 commit 34aeb3a
Showing 1 changed file with 57 additions and 14 deletions.
71 changes: 57 additions & 14 deletions symmetric-client-clib/src/io/writer/DefaultDatabaseWriter.c
Expand Up @@ -35,7 +35,7 @@ void SymDefaultDatabaseWriter_startBatch(SymDefaultDatabaseWriter *this, SymBatc
SymDataWriter *super = (SymDataWriter *) &this->super;
super->batchesProcessed->add(super->batchesProcessed, this->incomingBatch);
if (!this->incomingBatchService->acquireIncomingBatch(this->incomingBatchService, this->incomingBatch)) {
// TODO: how to handle when incoming batch is not acquired
this->isError = 1;
}

// IDataProcessorListener.afterBatchStarted
Expand All @@ -62,7 +62,20 @@ unsigned short SymDefaultDatabaseWriter_requiresNewStatement(SymDefaultDatabaseW
return requiresNew;
}

void SymDefaultDatabaseWriter_insert(SymDefaultDatabaseWriter *this, SymCsvData *data) {
void SymDefaultDatabaseWriter_buildTargetValues(SymDefaultDatabaseWriter *this, SymStringArray *sourceValues, SymStringArray *targetValues,
unsigned int onlyPrimaryKeys) {
SymIterator *iter = this->sourceTable->columns->iterator(this->sourceTable->columns);
while (iter->hasNext(iter)) {
SymColumn *column = (SymColumn *) iter->next(iter);
SymColumn *targetColumn = this->targetTable->findColumn(this->targetTable, column->name, 0);
if (targetColumn && (!onlyPrimaryKeys || targetColumn->isPrimaryKey)) {
targetValues->add(targetValues, sourceValues->array[iter->index]);
}
}
iter->destroy(iter);
}

int SymDefaultDatabaseWriter_insert(SymDefaultDatabaseWriter *this, SymCsvData *data) {
if (SymDefaultDatabaseWriter_requiresNewStatement(this, SYM_DML_TYPE_INSERT, data)) {
if (this->dmlStatement) {
this->sqlTransaction->close(this->sqlTransaction);
Expand All @@ -73,11 +86,17 @@ void SymDefaultDatabaseWriter_insert(SymDefaultDatabaseWriter *this, SymCsvData
this->sqlTransaction->prepare(this->sqlTransaction, this->dmlStatement->sql);
}
// TODO: need to know length of each rowData
this->sqlTransaction->addRow(this->sqlTransaction, data->rowData, this->dmlStatement->sqlTypes);
SymStringArray *values = SymStringArray_new(NULL);
SymDefaultDatabaseWriter_buildTargetValues(this, data->rowData, values, 0);
int count = this->sqlTransaction->addRow(this->sqlTransaction, values, this->dmlStatement->sqlTypes);
values->destroy(values);
if (count > 0) {
this->incomingBatch->statementCount++;
}
return count;
}

void SymDefaultDatabaseWriter_update(SymDefaultDatabaseWriter *this, SymCsvData *data) {
printf("update\n");
int SymDefaultDatabaseWriter_update(SymDefaultDatabaseWriter *this, SymCsvData *data) {
if (SymDefaultDatabaseWriter_requiresNewStatement(this, SYM_DML_TYPE_UPDATE, data)) {
if (this->dmlStatement) {
this->sqlTransaction->close(this->sqlTransaction);
Expand All @@ -88,15 +107,24 @@ void SymDefaultDatabaseWriter_update(SymDefaultDatabaseWriter *this, SymCsvData
this->sqlTransaction->prepare(this->sqlTransaction, this->dmlStatement->sql);
}
// TODO: need to know length of each rowData and pkData

SymStringArray *values = SymStringArray_new(NULL);
values->addAll(values, data->rowData);
values->addAll(values, data->pkData);
this->sqlTransaction->addRow(this->sqlTransaction, values, this->dmlStatement->sqlTypes);
SymDefaultDatabaseWriter_buildTargetValues(this, data->rowData, values, 0);
if (data->oldData) {
SymDefaultDatabaseWriter_buildTargetValues(this, data->oldData, values, 1);
} else {
SymDefaultDatabaseWriter_buildTargetValues(this, data->rowData, values, 1);
}
int count = this->sqlTransaction->addRow(this->sqlTransaction, values, this->dmlStatement->sqlTypes);
values->destroy(values);
this->incomingBatch->statementCount++;
if (count > 0) {
this->incomingBatch->statementCount++;
}
return count;
}

void SymDefaultDatabaseWriter_delete(SymDefaultDatabaseWriter *this, SymCsvData *data) {
printf("delete\n");
int SymDefaultDatabaseWriter_delete(SymDefaultDatabaseWriter *this, SymCsvData *data) {
if (SymDefaultDatabaseWriter_requiresNewStatement(this, SYM_DML_TYPE_DELETE, data)) {
if (this->dmlStatement) {
this->sqlTransaction->close(this->sqlTransaction);
Expand All @@ -107,7 +135,16 @@ void SymDefaultDatabaseWriter_delete(SymDefaultDatabaseWriter *this, SymCsvData
this->sqlTransaction->prepare(this->sqlTransaction, this->dmlStatement->sql);
}
// TODO: need to know length of each pkData
this->sqlTransaction->addRow(this->sqlTransaction, data->pkData, this->dmlStatement->sqlTypes);
SymStringArray *values = SymStringArray_new(NULL);
SymDefaultDatabaseWriter_buildTargetValues(this, data->oldData, values, 1);
int count = this->sqlTransaction->addRow(this->sqlTransaction, data->pkData, this->dmlStatement->sqlTypes);
values->destroy(values);
if (count > 0) {
this->incomingBatch->statementCount++;
} else {
this->incomingBatch->missingDeleteCount++;
}
return count;
}

void SymDefaultDatabaseWriter_sql(SymDefaultDatabaseWriter *this, SymCsvData *data) {
Expand All @@ -122,10 +159,16 @@ void SymDefaultDatabaseWriter_sql(SymDefaultDatabaseWriter *this, SymCsvData *da
unsigned short SymDefaultDatabaseWriter_write(SymDefaultDatabaseWriter *this, SymCsvData *data) {
switch (data->dataEventType) {
case SYM_DATA_EVENT_INSERT:
SymDefaultDatabaseWriter_insert(this, data);
if (SymDefaultDatabaseWriter_insert(this, data) == 0) {
this->incomingBatch->fallbackUpdateCount++;
SymDefaultDatabaseWriter_update(this, data);
}
break;
case SYM_DATA_EVENT_UPDATE:
SymDefaultDatabaseWriter_update(this, data);
if (SymDefaultDatabaseWriter_update(this, data) == 0) {
this->incomingBatch->fallbackInsertCount++;
SymDefaultDatabaseWriter_insert(this, data);
}
break;
case SYM_DATA_EVENT_DELETE:
SymDefaultDatabaseWriter_delete(this, data);
Expand Down Expand Up @@ -166,7 +209,7 @@ void SymDefaultDatabaseWriter_endBatch(SymDefaultDatabaseWriter *this, SymBatch
// IDataProcessorListener.batchInError
// TODO: update batch statistics
// TODO: update batch sql code, state, message

this->incomingBatch->status = SYM_INCOMING_BATCH_STATUS_ERROR;
if (this->incomingBatchService->isRecordOkBatchesEnabled(this->incomingBatchService) || this->incomingBatch->retry) {
this->incomingBatchService->updateIncomingBatch(this->incomingBatchService, this->incomingBatch);
} else {
Expand Down

0 comments on commit 34aeb3a

Please sign in to comment.