diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/extract/DataExtractorContext.java b/symmetric/src/main/java/org/jumpmind/symmetric/extract/DataExtractorContext.java index cad6ea300e..f5478923e9 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/extract/DataExtractorContext.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/extract/DataExtractorContext.java @@ -23,10 +23,13 @@ import java.util.ArrayList; import java.util.List; +import org.jumpmind.symmetric.model.OutgoingBatch; + public class DataExtractorContext implements Cloneable { private List auditRecordsWritten = new ArrayList(); private String lastTableName; + private OutgoingBatch batch; public DataExtractorContext copy() { DataExtractorContext newVersion; @@ -51,4 +54,12 @@ public boolean isLastTable(String tableName) { return lastTableName.equals(tableName); } + public OutgoingBatch getBatch() { + return batch; + } + + public void setBatch(OutgoingBatch batch) { + this.batch = batch; + } + } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/CsvExtractor.java b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/CsvExtractor.java index fafaf1e3e3..3bee2cf83a 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/CsvExtractor.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/CsvExtractor.java @@ -58,7 +58,7 @@ public void commit(OutgoingBatch batch, BufferedWriter writer) public void write(BufferedWriter writer, Data data, DataExtractorContext context) throws IOException { preprocessTable(data, writer, context); - dictionary.get(data.getEventType().getCode()).execute(writer, data); + dictionary.get(data.getEventType().getCode()).execute(writer, data, null); } /** diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/IStreamDataCommand.java b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/IStreamDataCommand.java index d4dcc76131..0544bd0bf5 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/IStreamDataCommand.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/IStreamDataCommand.java @@ -23,8 +23,9 @@ import java.io.BufferedWriter; import java.io.IOException; +import org.jumpmind.symmetric.extract.DataExtractorContext; import org.jumpmind.symmetric.model.Data; interface IStreamDataCommand { - void execute(BufferedWriter out, Data data) throws IOException; + void execute(BufferedWriter out, Data data, DataExtractorContext context) throws IOException; } \ No newline at end of file diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamDeleteDataCommand.java b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamDeleteDataCommand.java index 8b2fca292f..d60a5b312c 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamDeleteDataCommand.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamDeleteDataCommand.java @@ -24,11 +24,12 @@ import java.io.IOException; import org.jumpmind.symmetric.common.csv.CsvConstants; +import org.jumpmind.symmetric.extract.DataExtractorContext; import org.jumpmind.symmetric.model.Data; class StreamDeleteDataCommand extends AbstractStreamDataCommand { - public void execute(BufferedWriter out, Data data) throws IOException { + public void execute(BufferedWriter out, Data data, DataExtractorContext context) throws IOException { Util.write(out, CsvConstants.DELETE, DELIMITER, data.getPkData()); out.newLine(); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamInsertDataCommand.java b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamInsertDataCommand.java index 625d5310e4..bbd997ec94 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamInsertDataCommand.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamInsertDataCommand.java @@ -24,11 +24,12 @@ import java.io.IOException; import org.jumpmind.symmetric.common.csv.CsvConstants; +import org.jumpmind.symmetric.extract.DataExtractorContext; import org.jumpmind.symmetric.model.Data; class StreamInsertDataCommand extends AbstractStreamDataCommand { - public void execute(BufferedWriter writer, Data data) throws IOException { + public void execute(BufferedWriter writer, Data data, DataExtractorContext context) throws IOException { Util.write(writer, CsvConstants.INSERT, DELIMITER,data.getRowData()); writer.newLine(); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamReloadDataCommand.java b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamReloadDataCommand.java index 2464b60299..9c5317ce41 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamReloadDataCommand.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamReloadDataCommand.java @@ -23,22 +23,39 @@ import java.io.BufferedWriter; import java.io.IOException; +import org.jumpmind.symmetric.extract.DataExtractorContext; import org.jumpmind.symmetric.model.Data; +import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.Trigger; +import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.IDataExtractorService; +import org.jumpmind.symmetric.service.INodeService; +import org.jumpmind.symmetric.transport.internal.InternalOutgoingTransport; class StreamReloadDataCommand extends AbstractStreamDataCommand { - - @SuppressWarnings("unused") + private IDataExtractorService dataExtractorService; - - public void execute(BufferedWriter out, Data data) throws IOException { - -// TriggerHistory hist = data.getAudit(); -// dataExtractorService.extractInitialLoadFor(client, config, new InternalOutgoingTransport(out)); + + private IConfigurationService configurationService; + + private INodeService nodeService; + + public void execute(BufferedWriter out, Data data, DataExtractorContext context) throws IOException { + Trigger trigger = configurationService.getTriggerById(data.getAudit().getTriggerId()); + Node node = nodeService.findNode(context.getBatch().getNodeId()); + dataExtractorService.extractInitialLoadBatchFor(node, trigger, new InternalOutgoingTransport(out)); } public void setDataExtractorService(IDataExtractorService dataExtractorService) { this.dataExtractorService = dataExtractorService; } - + + public void setConfigurationService(IConfigurationService configurationService) { + this.configurationService = configurationService; + } + + public void setNodeService(INodeService nodeService) { + this.nodeService = nodeService; + } + } \ No newline at end of file diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamSQLDataCommand.java b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamSQLDataCommand.java index 59f13dd553..9e64f7c54b 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamSQLDataCommand.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamSQLDataCommand.java @@ -23,10 +23,11 @@ import java.io.BufferedWriter; import java.io.IOException; +import org.jumpmind.symmetric.extract.DataExtractorContext; import org.jumpmind.symmetric.model.Data; class StreamSQLDataCommand extends AbstractStreamDataCommand { - public void execute(BufferedWriter out, Data data) throws IOException { + public void execute(BufferedWriter out, Data data, DataExtractorContext context) throws IOException { // ??? } } \ No newline at end of file diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamUpdateDataCommand.java b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamUpdateDataCommand.java index be7e7dc68c..8e5bbec036 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamUpdateDataCommand.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamUpdateDataCommand.java @@ -24,11 +24,12 @@ import java.io.IOException; import org.jumpmind.symmetric.common.csv.CsvConstants; +import org.jumpmind.symmetric.extract.DataExtractorContext; import org.jumpmind.symmetric.model.Data; class StreamUpdateDataCommand extends AbstractStreamDataCommand { - public void execute(BufferedWriter out, Data data) throws IOException { + public void execute(BufferedWriter out, Data data, DataExtractorContext context) throws IOException { Util.write(out, CsvConstants.UPDATE, DELIMITER, data.getRowData(), DELIMITER, data.getPkData()); out.newLine(); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamValidateDataCommand.java b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamValidateDataCommand.java index 91b86fb959..89ff6262c2 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamValidateDataCommand.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/extract/csv/StreamValidateDataCommand.java @@ -23,10 +23,11 @@ import java.io.BufferedWriter; import java.io.IOException; +import org.jumpmind.symmetric.extract.DataExtractorContext; import org.jumpmind.symmetric.model.Data; class StreamValidateDataCommand extends AbstractStreamDataCommand { - public void execute(BufferedWriter out, Data data) throws IOException { + public void execute(BufferedWriter out, Data data, DataExtractorContext context) throws IOException { // ??? } }