Skip to content

Commit

Permalink
add batch to context. start reload command.
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 10, 2007
1 parent db0193e commit bcfecac
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 15 deletions.
Expand Up @@ -23,10 +23,13 @@
import java.util.ArrayList;
import java.util.List;

import org.jumpmind.symmetric.model.OutgoingBatch;

public class DataExtractorContext implements Cloneable {

private List<String> auditRecordsWritten = new ArrayList<String>();
private String lastTableName;
private OutgoingBatch batch;

public DataExtractorContext copy() {
DataExtractorContext newVersion;
Expand All @@ -51,4 +54,12 @@ public boolean isLastTable(String tableName) {
return lastTableName.equals(tableName);
}

public OutgoingBatch getBatch() {
return batch;
}

public void setBatch(OutgoingBatch batch) {
this.batch = batch;
}

}
Expand Up @@ -58,7 +58,7 @@ public void commit(OutgoingBatch batch, BufferedWriter writer)
public void write(BufferedWriter writer, Data data,
DataExtractorContext context) throws IOException {
preprocessTable(data, writer, context);
dictionary.get(data.getEventType().getCode()).execute(writer, data);
dictionary.get(data.getEventType().getCode()).execute(writer, data, null);
}

/**
Expand Down
Expand Up @@ -23,8 +23,9 @@
import java.io.BufferedWriter;
import java.io.IOException;

import org.jumpmind.symmetric.extract.DataExtractorContext;
import org.jumpmind.symmetric.model.Data;

interface IStreamDataCommand {
void execute(BufferedWriter out, Data data) throws IOException;
void execute(BufferedWriter out, Data data, DataExtractorContext context) throws IOException;
}
Expand Up @@ -24,11 +24,12 @@
import java.io.IOException;

import org.jumpmind.symmetric.common.csv.CsvConstants;
import org.jumpmind.symmetric.extract.DataExtractorContext;
import org.jumpmind.symmetric.model.Data;

class StreamDeleteDataCommand extends AbstractStreamDataCommand {

public void execute(BufferedWriter out, Data data) throws IOException {
public void execute(BufferedWriter out, Data data, DataExtractorContext context) throws IOException {
Util.write(out, CsvConstants.DELETE, DELIMITER, data.getPkData());
out.newLine();
}
Expand Down
Expand Up @@ -24,11 +24,12 @@
import java.io.IOException;

import org.jumpmind.symmetric.common.csv.CsvConstants;
import org.jumpmind.symmetric.extract.DataExtractorContext;
import org.jumpmind.symmetric.model.Data;

class StreamInsertDataCommand extends AbstractStreamDataCommand {

public void execute(BufferedWriter writer, Data data) throws IOException {
public void execute(BufferedWriter writer, Data data, DataExtractorContext context) throws IOException {
Util.write(writer, CsvConstants.INSERT, DELIMITER,data.getRowData());
writer.newLine();
}
Expand Down
Expand Up @@ -23,22 +23,39 @@
import java.io.BufferedWriter;
import java.io.IOException;

import org.jumpmind.symmetric.extract.DataExtractorContext;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IDataExtractorService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.transport.internal.InternalOutgoingTransport;

class StreamReloadDataCommand extends AbstractStreamDataCommand {

@SuppressWarnings("unused")

private IDataExtractorService dataExtractorService;

public void execute(BufferedWriter out, Data data) throws IOException {

// TriggerHistory hist = data.getAudit();
// dataExtractorService.extractInitialLoadFor(client, config, new InternalOutgoingTransport(out));

private IConfigurationService configurationService;

private INodeService nodeService;

public void execute(BufferedWriter out, Data data, DataExtractorContext context) throws IOException {
Trigger trigger = configurationService.getTriggerById(data.getAudit().getTriggerId());
Node node = nodeService.findNode(context.getBatch().getNodeId());
dataExtractorService.extractInitialLoadBatchFor(node, trigger, new InternalOutgoingTransport(out));
}

public void setDataExtractorService(IDataExtractorService dataExtractorService) {
this.dataExtractorService = dataExtractorService;
}


public void setConfigurationService(IConfigurationService configurationService) {
this.configurationService = configurationService;
}

public void setNodeService(INodeService nodeService) {
this.nodeService = nodeService;
}

}
Expand Up @@ -23,10 +23,11 @@
import java.io.BufferedWriter;
import java.io.IOException;

import org.jumpmind.symmetric.extract.DataExtractorContext;
import org.jumpmind.symmetric.model.Data;

class StreamSQLDataCommand extends AbstractStreamDataCommand {
public void execute(BufferedWriter out, Data data) throws IOException {
public void execute(BufferedWriter out, Data data, DataExtractorContext context) throws IOException {
// ???
}
}
Expand Up @@ -24,11 +24,12 @@
import java.io.IOException;

import org.jumpmind.symmetric.common.csv.CsvConstants;
import org.jumpmind.symmetric.extract.DataExtractorContext;
import org.jumpmind.symmetric.model.Data;

class StreamUpdateDataCommand extends AbstractStreamDataCommand {

public void execute(BufferedWriter out, Data data) throws IOException {
public void execute(BufferedWriter out, Data data, DataExtractorContext context) throws IOException {
Util.write(out, CsvConstants.UPDATE, DELIMITER, data.getRowData(), DELIMITER, data.getPkData());
out.newLine();
}
Expand Down
Expand Up @@ -23,10 +23,11 @@
import java.io.BufferedWriter;
import java.io.IOException;

import org.jumpmind.symmetric.extract.DataExtractorContext;
import org.jumpmind.symmetric.model.Data;

class StreamValidateDataCommand extends AbstractStreamDataCommand {
public void execute(BufferedWriter out, Data data) throws IOException {
public void execute(BufferedWriter out, Data data, DataExtractorContext context) throws IOException {
// ???
}
}

0 comments on commit bcfecac

Please sign in to comment.