Skip to content

Commit

Permalink
0001249: Add REST API methods to allow a "Pull Only" client
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jun 11, 2013
1 parent db6db6b commit 12f0b30
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
Expand Up @@ -323,7 +323,9 @@ public List<OutgoingBatchWithPayload> extractToPayload(ProcessInfo processInfo,
List<OutgoingBatch> activeBatches = filterBatchesForExtraction(batches, channelMap);

if (activeBatches.size() > 0) {
StructureDataWriter writer = new StructureDataWriter(symmetricDialect.getPlatform(), payloadType);
StructureDataWriter writer = new StructureDataWriter(symmetricDialect.getPlatform(), targetNode.getDatabaseType(), payloadType,
// TODO might want to pass this into the rest call as an optional parameter
parameterService.is(ParameterConstants.DB_DELIMITED_IDENTIFIER_MODE), symmetricDialect.getBinaryEncoding());
List<OutgoingBatch> extractedBatches = extract(processInfo, targetNode,
activeBatches, writer, false);

Expand Down
Expand Up @@ -25,10 +25,13 @@
import java.util.List;
import java.util.Map;

import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.DmlStatementFactory;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.DmlStatement;
import org.jumpmind.db.sql.DmlStatement.DmlType;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataContext;
Expand All @@ -47,17 +50,29 @@ public enum PayloadType {
CSV, SQL
};

// map is comprised of a batch and a list of payload data
// that goes with the batch
private Map<Long, List<String>> payloadMap = new HashMap<Long, List<String>>();
/*
* map is comprised of a batch and a list of payload data that goes with the
* batch
*/
protected Map<Long, List<String>> payloadMap = new HashMap<Long, List<String>>();

private PayloadType payloadType = PayloadType.SQL;
protected PayloadType payloadType = PayloadType.SQL;

private long currentBatch;
protected long currentBatch;

public StructureDataWriter(IDatabasePlatform platform, PayloadType payloatType) {
protected String databaseType;

protected boolean useQuotedIdentifiers;

protected BinaryEncoding binaryEncoding;

public StructureDataWriter(IDatabasePlatform platform, String databaseType,
PayloadType payloatType, boolean useQuotedIdentifiers, BinaryEncoding binaryEncoding) {
this.platform = platform;
this.payloadType = payloatType;
this.databaseType = databaseType;
this.useQuotedIdentifiers = useQuotedIdentifiers;
this.binaryEncoding = binaryEncoding;
}

public void open(DataContext context) {
Expand All @@ -83,32 +98,44 @@ public boolean start(Table table) {
}

public void write(CsvData data) {
DmlStatement dml = null;
String sql = null;
switch (data.getDataEventType()) {
case UPDATE:
dml = platform.createDmlStatement(DmlType.UPDATE, currentTable);
sql = makeDynamic(DmlStatementFactory.createDmlStatement(databaseType, DmlType.UPDATE, currentTable, useQuotedIdentifiers).getSql(), data.getParsedData(CsvData.ROW_DATA), currentTable.getColumns());
break;
case INSERT:
dml = platform.createDmlStatement(DmlType.INSERT, currentTable);
sql = makeDynamic(DmlStatementFactory.createDmlStatement(databaseType, DmlType.INSERT, currentTable, useQuotedIdentifiers).getSql(), data.getParsedData(CsvData.ROW_DATA), currentTable.getColumns());
break;
case DELETE:
dml = platform.createDmlStatement(DmlType.DELETE, currentTable);
sql = makeDynamic(DmlStatementFactory.createDmlStatement(databaseType, DmlType.DELETE, currentTable, useQuotedIdentifiers).getSql(), data.getParsedData(CsvData.PK_DATA), currentTable.getPrimaryKeyColumns());
break;
case SQL:
// TODO: figure out what to do with these
sql = data.getParsedData(CsvData.ROW_DATA)[0];
break;
case CREATE:
// TODO: figure out what to do with these
break;
default:
break;
}

// TODO: change the ? to the actual data
// platform.replaceSql(dml.getSql(), BinaryEncoding.NONE, currentTable,
// data.getParsedData(CsvData.ROW_DATA),
// true);

this.payloadMap.get(this.currentBatch).add(dml.getSql());
if (sql != null) {
this.payloadMap.get(this.currentBatch).add(sql);
}
}

protected String makeDynamic(String sql, String[] values, Column[] columns) {
Object[] objects = platform.getObjectValues(
binaryEncoding, values, columns, false);
Row row = new Row(columns.length);
for (int i = 0; i < columns.length; i++) {
row.put(columns[i].getName(), objects[i]);
}
return platform.replaceSql(sql, binaryEncoding, currentTable, row, false);
}

public void end(Table table) {
Expand Down
Expand Up @@ -91,6 +91,8 @@ protected void test(ISymmetricEngine rootServer, ISymmetricEngine clientServer)
registrationInfo.getNodePassword());
Assert.assertNotNull("Should have a non null results object", results);
Assert.assertEquals(1, results.getNbrBatches());

log.info(results.getBatches().get(0).getSqlStatements().get(0));

}

Expand Down

0 comments on commit 12f0b30

Please sign in to comment.