Skip to content

Commit

Permalink
0000720: Variable Transform source-node-id incorrect on Extract type …
Browse files Browse the repository at this point in the history
…transforms
  • Loading branch information
chenson42 committed Jul 19, 2012
1 parent d208fb7 commit 4c78374
Show file tree
Hide file tree
Showing 18 changed files with 104 additions and 68 deletions.
Expand Up @@ -25,8 +25,6 @@
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.IDataReader;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterFilterAdapter;

/**
Expand All @@ -44,7 +42,7 @@ public boolean beforeWrite(
DataContext context, Table table, CsvData data) {
if (!table.getName().startsWith(tablePrefix)) {
Batch batch = context.getBatch();
String sourceNodeId = batch.getNodeId();
String sourceNodeId = batch.getSourceNodeId();
table.setSchema(schemaPrefix != null ? schemaPrefix + sourceNodeId : sourceNodeId);
}
return true;
Expand Down
Expand Up @@ -100,7 +100,7 @@ public IncomingBatch() {

public IncomingBatch(Batch batch) {
this.batchId = batch.getBatchId();
this.nodeId = batch.getNodeId();
this.nodeId = batch.getSourceNodeId();
this.channelId = batch.getChannelId();
this.status = Status.LD;
}
Expand Down
Expand Up @@ -50,6 +50,7 @@
import org.jumpmind.symmetric.io.data.DataProcessor;
import org.jumpmind.symmetric.io.data.IDataReader;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
import org.jumpmind.symmetric.io.data.reader.ExtractDataReader;
import org.jumpmind.symmetric.io.data.reader.IExtractDataReaderSource;
import org.jumpmind.symmetric.io.data.reader.ProtocolDataReader;
Expand Down Expand Up @@ -153,8 +154,8 @@ public void extractConfigurationStandalone(Node targetNode, Writer writer,
String... tablesToExclude) {
Node sourceNode = nodeService.findIdentity();

Batch batch = new Batch(BatchAck.VIRTUAL_BATCH_FOR_REGISTRATION, Constants.CHANNEL_CONFIG,
symmetricDialect.getBinaryEncoding(), targetNode.getNodeId(), false);
Batch batch = new Batch(BatchType.EXTRACT, BatchAck.VIRTUAL_BATCH_FOR_REGISTRATION, Constants.CHANNEL_CONFIG,
symmetricDialect.getBinaryEncoding(), sourceNode.getNodeId(), targetNode.getNodeId(), false);

NodeGroupLink nodeGroupLink = new NodeGroupLink(parameterService.getNodeGroupId(),
targetNode.getNodeGroupId());
Expand Down Expand Up @@ -445,8 +446,8 @@ protected OutgoingBatch extractOutgoingBatch(Node targetNode,
long byteCount = 0l;

if (currentBatch.getStatus() == Status.IG) {
Batch batch = new Batch(currentBatch.getBatchId(), currentBatch.getChannelId(),
symmetricDialect.getBinaryEncoding(), currentBatch.getNodeId(),
Batch batch = new Batch(BatchType.EXTRACT, currentBatch.getBatchId(), currentBatch.getChannelId(),
symmetricDialect.getBinaryEncoding(), sourceNode.getNodeId(), currentBatch.getNodeId(),
currentBatch.isCommonFlag());
batch.setIgnored(true);
try {
Expand Down Expand Up @@ -490,7 +491,7 @@ protected OutgoingBatch extractOutgoingBatch(Node targetNode,

IDataReader dataReader = new ExtractDataReader(
symmetricDialect.getPlatform(),
new SelectFromSymDataSource(currentBatch, targetNode));
new SelectFromSymDataSource(currentBatch, sourceNode, targetNode));
DataContext ctx = new DataContext();
ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode);
ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, sourceNode);
Expand Down Expand Up @@ -570,7 +571,7 @@ protected OutgoingBatch sendOutgoingBatch(Node targetNode, OutgoingBatch current

IStagedResource extractedBatch = getStagedResource(currentBatch);
if (extractedBatch != null) {
IDataReader dataReader = new ProtocolDataReader(extractedBatch);
IDataReader dataReader = new ProtocolDataReader(BatchType.EXTRACT, currentBatch.getNodeId(), extractedBatch);

DataContext ctx = new DataContext();
ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode);
Expand All @@ -595,11 +596,12 @@ protected OutgoingBatch sendOutgoingBatch(Node targetNode, OutgoingBatch current
public boolean extractBatchRange(Writer writer, String nodeId, long startBatchId,
long endBatchId) {
boolean foundBatch = false;
Node sourceNode = nodeService.findIdentity();
for (long batchId = startBatchId; batchId <= endBatchId; batchId++) {
OutgoingBatch batch = outgoingBatchService.findOutgoingBatch(batchId, nodeId);
Node targetNode = nodeService.findNode(batch.getNodeId());
IDataReader dataReader = new ExtractDataReader(symmetricDialect.getPlatform(),
new SelectFromSymDataSource(batch, targetNode));
new SelectFromSymDataSource(batch, sourceNode, targetNode));
DataContext ctx = new DataContext();
ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode);
ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, nodeService.findIdentity());
Expand Down Expand Up @@ -707,10 +709,10 @@ class SelectFromSymDataSource implements IExtractDataReaderSource {

private Node targetNode;

public SelectFromSymDataSource(OutgoingBatch outgoingBatch, Node targetNode) {
public SelectFromSymDataSource(OutgoingBatch outgoingBatch, Node sourceNode, Node targetNode) {
this.outgoingBatch = outgoingBatch;
this.batch = new Batch(outgoingBatch.getBatchId(), outgoingBatch.getChannelId(),
symmetricDialect.getBinaryEncoding(), outgoingBatch.getNodeId(),
this.batch = new Batch(BatchType.EXTRACT, outgoingBatch.getBatchId(), outgoingBatch.getChannelId(),
symmetricDialect.getBinaryEncoding(), sourceNode.getNodeId(), outgoingBatch.getNodeId(),
outgoingBatch.isCommonFlag());
this.targetNode = targetNode;
}
Expand Down Expand Up @@ -838,10 +840,10 @@ protected void init(Batch batch, List<SelectFromTableEvent> initialLoadEvents) {
this.selectFromTableEventsToSend = new ArrayList<SelectFromTableEvent>(
initialLoadEvents);
this.batch = batch;
this.node = nodeService.findNode(batch.getNodeId());
this.node = nodeService.findNode(batch.getTargetNodeId());
if (node == null) {
throw new SymmetricException("Could not find a node represented by %s",
this.batch.getNodeId());
this.batch.getTargetNodeId());
}
}

Expand Down Expand Up @@ -886,7 +888,7 @@ protected CsvData selectNext() {
NodeChannel channel = batch != null ? configurationService.getNodeChannel(
batch.getChannelId(), false) : new NodeChannel(this.triggerRouter
.getTrigger().getChannelId());
this.routingContext = new SimpleRouterContext(batch.getNodeId(), channel);
this.routingContext = new SimpleRouterContext(batch.getTargetNodeId(), channel);
this.currentTable = lookupAndOrderColumnsAccordingToTriggerHistory(
triggerRouter.getRouter().getRouterId(), history, currentTable, false);
this.startNewCursor(history, triggerRouter);
Expand Down
Expand Up @@ -56,6 +56,7 @@
import org.jumpmind.symmetric.io.data.IDataProcessorListener;
import org.jumpmind.symmetric.io.data.IDataReader;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
import org.jumpmind.symmetric.io.data.reader.ProtocolDataReader;
import org.jumpmind.symmetric.io.data.transform.TransformPoint;
import org.jumpmind.symmetric.io.data.transform.TransformTable;
Expand Down Expand Up @@ -308,16 +309,17 @@ protected List<IncomingBatch> loadDataFromTransport(final Node sourceNode,
ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, sourceNode);

long totalNetworkMillis = System.currentTimeMillis();
String targetNodeId = nodeService.findIdentityNodeId();
if (parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) {
IDataReader dataReader = new ProtocolDataReader(transport.open());
IDataReader dataReader = new ProtocolDataReader(BatchType.LOAD, targetNodeId, transport.open());
IDataWriter dataWriter = new StagingDataWriter(sourceNode.getNodeId(),
Constants.STAGING_CATEGORY_INCOMING, stagingManager,
new LoadIntoDatabaseOnArrivalListener(sourceNode.getNodeId(), listener));
new DataProcessor(dataReader, dataWriter).process(ctx);
totalNetworkMillis = System.currentTimeMillis() - totalNetworkMillis;
} else {
DataProcessor processor = new DataProcessor(
new ProtocolDataReader(transport.open()), null, listener) {
new ProtocolDataReader(BatchType.LOAD, targetNodeId, transport.open()), null, listener) {
@Override
protected IDataWriter chooseDataWriter(Batch batch) {
return buildDataWriter(sourceNode.getNodeId(), batch.getChannelId(),
Expand Down Expand Up @@ -637,7 +639,7 @@ public void end(DataContext ctx, Batch batch, IStagedResource resource) {
}

try {
DataProcessor processor = new DataProcessor(new ProtocolDataReader(resource), null,
DataProcessor processor = new DataProcessor(new ProtocolDataReader(BatchType.LOAD, batch.getTargetNodeId(), resource), null,
listener) {
@Override
protected IDataWriter chooseDataWriter(Batch batch) {
Expand Down Expand Up @@ -681,7 +683,7 @@ public boolean beforeBatchStarted(DataContext context) {

public void afterBatchStarted(DataContext context) {
Batch batch = context.getBatch();
symmetricDialect.disableSyncTriggers(context.findTransaction(), batch.getNodeId());
symmetricDialect.disableSyncTriggers(context.findTransaction(), batch.getSourceNodeId());
}

public void batchSuccessful(DataContext context) {
Expand Down Expand Up @@ -789,7 +791,7 @@ public void batchInError(DataContext context, Exception ex) {
} catch (Exception e) {
log.error("Failed to record status of batch {}",
this.currentBatch != null ? this.currentBatch.getNodeBatchId() : context
.getBatch().getNodeBatchId(), e);
.getBatch().getSourceNodeBatchId(), e);
}
}

Expand Down
Expand Up @@ -1058,7 +1058,7 @@ public Data mapData(Row row) {
public ISqlReadCursor<Data> selectDataFor(Batch batch) {
return sqlTemplate
.queryForCursor(getDataSelectSql(batch.getBatchId(), -1l, batch.getChannelId()),
dataMapper, new Object[] { batch.getBatchId(), batch.getNodeId() },
dataMapper, new Object[] { batch.getBatchId(), batch.getTargetNodeId() },
new int[] { Types.NUMERIC });
}

Expand Down
Expand Up @@ -10,8 +10,11 @@ public class Batch {

public static final long UNKNOWN_BATCH_ID = -9999;

public enum BatchType { EXTRACT, LOAD };

protected long batchId = UNKNOWN_BATCH_ID;
protected String nodeId;
protected String sourceNodeId;
protected String targetNodeId;
protected boolean initialLoad;
protected String channelId;
protected BinaryEncoding binaryEncoding;
Expand All @@ -22,13 +25,16 @@ public class Batch {
protected boolean ignored = false;
protected boolean common = false;
protected boolean complete = false;
protected BatchType batchType;

protected Map<String, Long> timers = new HashMap<String, Long>();

public Batch(long batchId, String channelId, BinaryEncoding binaryEncoding, String nodeId, boolean common) {
public Batch(BatchType batchType, long batchId, String channelId, BinaryEncoding binaryEncoding, String sourceNodeId, String targetNodeId, boolean common) {
this.batchType = batchType;
this.batchId = batchId;
this.channelId = channelId;
this.nodeId = nodeId;
this.sourceNodeId = sourceNodeId;
this.targetNodeId = targetNodeId;
this.binaryEncoding = binaryEncoding;
this.common = common;
}
Expand Down Expand Up @@ -86,10 +92,18 @@ public void setStartTime(Date startTime) {
this.startTime = startTime;
}

public String getNodeId() {
return nodeId;
public String getSourceNodeId() {
return sourceNodeId;
}


public String getTargetNodeId() {
return targetNodeId;
}

public String getSourceNodeBatchId() {
return String.format("%s-%d", sourceNodeId, batchId);
}

public long getBatchId() {
return batchId;
}
Expand All @@ -105,10 +119,6 @@ public boolean isInitialLoad() {
public BinaryEncoding getBinaryEncoding() {
return binaryEncoding;
}

public String getNodeBatchId() {
return nodeId + "-" + batchId;
}

public void setIgnored(boolean ignored) {
this.ignored = ignored;
Expand All @@ -127,7 +137,11 @@ public boolean isCommon() {
}

public String getStagedLocation() {
return getStagedLocation(common, nodeId);
if (batchType == BatchType.EXTRACT) {
return getStagedLocation(common, targetNodeId);
} else {
return getStagedLocation(common, sourceNodeId);
}
}

public static String getStagedLocation(boolean common, String nodeId) {
Expand Down
Expand Up @@ -86,7 +86,7 @@ public void process(DataContext context) {
forEachTableInBatch(context, processBatch, currentBatch);

if (currentBatch != null && !currentBatch.isComplete()) {
throw new ProtocolException("The batch %s was not complete", currentBatch.getNodeBatchId());
throw new ProtocolException("The batch %s was not complete", currentBatch.getSourceNodeBatchId());
}

if (processBatch) {
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.IDataReader;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.util.CollectionUtils;
import org.jumpmind.util.FormatUtils;
Expand All @@ -52,47 +53,55 @@ public class ProtocolDataReader implements IDataReader {
protected Table table;
protected String channelId;
protected String sourceNodeId;
protected String targetNodeId;
protected BinaryEncoding binaryEncoding;
protected BatchType batchType;
protected int lineNumber = 0;

public ProtocolDataReader(StringBuilder input) {
this(new BufferedReader(new StringReader(input.toString())));
public ProtocolDataReader(BatchType batchType, String targetNodeId, StringBuilder input) {
this(batchType, targetNodeId, new BufferedReader(new StringReader(input.toString())));
}

public ProtocolDataReader(InputStream is) {
this(toReader(is));
public ProtocolDataReader(BatchType batchType, String targetNodeId, InputStream is) {
this(batchType, targetNodeId, toReader(is));
}

public ProtocolDataReader(IStagedResource stagedResource) {
public ProtocolDataReader(BatchType batchType, String targetNodeId, IStagedResource stagedResource) {
this.stagedResource = stagedResource;
this.targetNodeId = targetNodeId;
this.batchType = batchType;
}

protected static Reader toReader(InputStream is) {
try {
return new BufferedReader(new InputStreamReader(is, "UTF-8"));
} catch (IOException ex) {
throw new IoException(ex);
}
}

public ProtocolDataReader(String input) {
this(new BufferedReader(new StringReader(input)));

public ProtocolDataReader(BatchType batchType, String targetNodeId, String input) {
this(batchType, targetNodeId, new BufferedReader(new StringReader(input)));
}

public ProtocolDataReader(Reader reader) {
public ProtocolDataReader(BatchType batchType, String targetNodeId, Reader reader) {
this.reader = reader;
this.targetNodeId = targetNodeId;
this.batchType = batchType;
}

public ProtocolDataReader(File file) {
public ProtocolDataReader(BatchType batchType, String targetNodeId, File file) {
try {
FileInputStream fis = new FileInputStream(file);
InputStreamReader in = new InputStreamReader(fis, "UTF-8");
this.targetNodeId = targetNodeId;
this.batchType = batchType;
this.reader = new BufferedReader(in);
} catch (IOException ex) {
throw new IoException(ex);
}
}

protected static Reader toReader(InputStream is) {
try {
return new BufferedReader(new InputStreamReader(is, "UTF-8"));
} catch (IOException ex) {
throw new IoException(ex);
}
}

public IStagedResource getStagedResource() {
return stagedResource;
}
Expand Down Expand Up @@ -142,8 +151,8 @@ protected Object readNext() {
bytesRead = 0;
}
if (tokens[0].equals(CsvConstants.BATCH)) {
Batch batch = new Batch(Long.parseLong(tokens[1]), channelId, binaryEncoding,
sourceNodeId, false);
Batch batch = new Batch(batchType, Long.parseLong(tokens[1]), channelId, binaryEncoding,
sourceNodeId, targetNodeId, false);
statistics.put(batch, new DataReaderStatistics());
return batch;
} else if (tokens[0].equals(CsvConstants.NODEID)) {
Expand Down
Expand Up @@ -12,6 +12,7 @@
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.Batch.BatchType;

/**
* Convert a source table's rows to {@link CsvData}
Expand All @@ -31,7 +32,7 @@ public class TableExtractDataReaderSource implements IExtractDataReaderSource {
protected boolean streamLobs;

public TableExtractDataReaderSource(IDatabasePlatform platform, String catalogName,
String schemaName, String tableName, String whereClause, boolean streamLobs) {
String schemaName, String tableName, String whereClause, boolean streamLobs, String sourceNodeId, String targetNodeId) {
this.platform = platform;
this.table = platform.getTableFromCache(catalogName, schemaName, tableName, true);
if (table == null) {
Expand All @@ -40,7 +41,7 @@ public TableExtractDataReaderSource(IDatabasePlatform platform, String catalogNa
}
this.whereClause = whereClause;
this.streamLobs = streamLobs;
this.batch = new Batch(-1, "default", BinaryEncoding.BASE64, "localhost", false);
this.batch = new Batch(BatchType.EXTRACT, -1, "default", BinaryEncoding.BASE64, sourceNodeId, targetNodeId, false);

}

Expand Down

0 comments on commit 4c78374

Please sign in to comment.