Skip to content

Commit

Permalink
0002330 - Channel sync support for multiple threads
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed Apr 28, 2016
1 parent c3b0c22 commit c4535ea
Showing 1 changed file with 35 additions and 30 deletions.
Expand Up @@ -20,6 +20,7 @@
*/
package org.jumpmind.symmetric.service.impl;

import java.io.BufferedWriter;
import java.io.OutputStream;
import java.io.Writer;
import java.sql.SQLException;
Expand Down Expand Up @@ -68,6 +69,7 @@
import org.jumpmind.symmetric.io.data.reader.ExtractDataReader;
import org.jumpmind.symmetric.io.data.reader.IExtractDataReaderSource;
import org.jumpmind.symmetric.io.data.reader.ProtocolDataReader;
import org.jumpmind.symmetric.io.data.reader.SimpleStagingDataReader;
import org.jumpmind.symmetric.io.data.transform.TransformPoint;
import org.jumpmind.symmetric.io.data.transform.TransformTable;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
Expand Down Expand Up @@ -377,10 +379,6 @@ private List<OutgoingBatch> filterBatchesForExtraction(OutgoingBatches batches,
}
}

if (suspendIgnoreChannelsList.getThreadChannel() != null && !suspendIgnoreChannelsList.getThreadChannel().equals("0")) {
batches.removeThreadChannelBatches(suspendIgnoreChannelsList.getThreadChannel());
}

return batches.getBatches();
}

Expand Down Expand Up @@ -413,7 +411,7 @@ public List<OutgoingBatchWithPayload> extractToPayload(ProcessInfo processInfo,
useDelimiterIdentifiers, symmetricDialect.getBinaryEncoding(),
useJdbcTimestampFormat, useUpsertStatements);
List<OutgoingBatch> extractedBatches = extract(processInfo, targetNode,
activeBatches, writer, ExtractMode.FOR_PAYLOAD_CLIENT);
activeBatches, writer, null, ExtractMode.FOR_PAYLOAD_CLIENT);

List<OutgoingBatchWithPayload> batchesWithPayload = new ArrayList<OutgoingBatchWithPayload>();
for (OutgoingBatch batch : extractedBatches) {
Expand Down Expand Up @@ -466,10 +464,11 @@ public List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode, Str
List<OutgoingBatch> activeBatches = filterBatchesForExtraction(batches, channelMap);

if (activeBatches.size() > 0) {
BufferedWriter writer = transport.openWriter();
IDataWriter dataWriter = new ProtocolDataWriter(nodeService.findIdentityNodeId(),
transport.openWriter(), targetNode.requires13Compatiblity());
writer, targetNode.requires13Compatiblity());

return extract(processInfo, targetNode, activeBatches, dataWriter,
return extract(processInfo, targetNode, activeBatches, dataWriter, writer,
ExtractMode.FOR_SYM_CLIENT);
}

Expand Down Expand Up @@ -498,7 +497,7 @@ public boolean extractOnlyOutgoingBatch(String nodeId, long batchId, Writer writ
writer, targetNode.requires13Compatiblity());
List<OutgoingBatch> batches = new ArrayList<OutgoingBatch>(1);
batches.add(batch);
batches = extract(new ProcessInfo(), targetNode, batches, dataWriter,
batches = extract(new ProcessInfo(), targetNode, batches, dataWriter, null,
ExtractMode.EXTRACT_ONLY);
extracted = batches.size() > 0;
}
Expand All @@ -507,7 +506,7 @@ public boolean extractOnlyOutgoingBatch(String nodeId, long batchId, Writer writ
}

protected List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode,
List<OutgoingBatch> activeBatches, IDataWriter dataWriter, ExtractMode mode) {
List<OutgoingBatch> activeBatches, IDataWriter dataWriter, BufferedWriter writer, ExtractMode mode) {
boolean streamToFileEnabled = parameterService
.is(ParameterConstants.STREAM_TO_FILE_ENABLED);
List<OutgoingBatch> processedBatches = new ArrayList<OutgoingBatch>(activeBatches.size());
Expand Down Expand Up @@ -566,7 +565,7 @@ protected List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode,
if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT) {
processInfo.setStatus(ProcessInfo.Status.TRANSFERRING);
currentBatch = sendOutgoingBatch(processInfo, targetNode, currentBatch,
dataWriter, mode);
dataWriter, writer, mode);
}

processedBatches.add(currentBatch);
Expand Down Expand Up @@ -842,7 +841,7 @@ protected boolean isPreviouslyExtracted(OutgoingBatch currentBatch) {
}

protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNode,
OutgoingBatch currentBatch, IDataWriter dataWriter, ExtractMode mode) {
OutgoingBatch currentBatch, IDataWriter dataWriter, BufferedWriter writer, ExtractMode mode) {
if (currentBatch.getStatus() != Status.OK || ExtractMode.EXTRACT_ONLY == mode) {
currentBatch.setSentCount(currentBatch.getSentCount() + 1);
changeBatchStatus(Status.SE, currentBatch, mode);
Expand All @@ -851,25 +850,31 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo

IStagedResource extractedBatch = getStagedResource(currentBatch);
if (extractedBatch != null) {
IDataReader dataReader = new ProtocolDataReader(BatchType.EXTRACT,
currentBatch.getNodeId(), extractedBatch);

DataContext ctx = new DataContext();
ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode);
ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, nodeService.findIdentity());
new DataProcessor(dataReader, new ProcessInfoDataWriter(dataWriter, processInfo), "send from stage")
.process(ctx);
if (dataWriter.getStatistics().size() > 0) {
Statistics stats = dataWriter.getStatistics().values().iterator().next();
statisticManager.incrementDataSent(currentBatch.getChannelId(),
stats.get(DataWriterStatisticConstants.STATEMENTCOUNT));
long byteCount = stats.get(DataWriterStatisticConstants.BYTECOUNT);
statisticManager.incrementDataBytesSent(currentBatch.getChannelId(), byteCount);
if (mode == ExtractMode.FOR_SYM_CLIENT && writer != null) {
DataContext ctx = new DataContext();
SimpleStagingDataReader dataReader = new SimpleStagingDataReader(BatchType.EXTRACT,
currentBatch.getBatchId(), currentBatch.getNodeId(), extractedBatch, writer, ctx);
dataReader.process();
} else {
log.warn("Could not find recorded statistics for batch {}",
currentBatch.getNodeBatchId());
IDataReader dataReader = new ProtocolDataReader(BatchType.EXTRACT,
currentBatch.getNodeId(), extractedBatch);

DataContext ctx = new DataContext();
ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode);
ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, nodeService.findIdentity());
new DataProcessor(dataReader, new ProcessInfoDataWriter(dataWriter, processInfo), "send from stage")
.process(ctx);
if (dataWriter.getStatistics().size() > 0) {
Statistics stats = dataWriter.getStatistics().values().iterator().next();
statisticManager.incrementDataSent(currentBatch.getChannelId(),
stats.get(DataWriterStatisticConstants.STATEMENTCOUNT));
long byteCount = stats.get(DataWriterStatisticConstants.BYTECOUNT);
statisticManager.incrementDataBytesSent(currentBatch.getChannelId(), byteCount);
} else {
log.warn("Could not find recorded statistics for batch {}",
currentBatch.getNodeBatchId());
}
}

} else {
throw new IllegalStateException(String.format(
"Could not find the staged resource for batch %s",
Expand Down Expand Up @@ -1053,7 +1058,7 @@ public Map<String, String> getExtractRequestNodes() {

public List<ExtractRequest> getExtractRequestsForNode(NodeCommunication nodeCommunication) {
return sqlTemplate.query(getSql("selectExtractRequestForNodeSql"),
new ExtractRequestMapper(), nodeCommunication.getNodeId(), nodeCommunication.getChannelId()
new ExtractRequestMapper(), nodeCommunication.getNodeId(), nodeCommunication.getQueue()
, ExtractRequest.ExtractStatus.NE.name());
}

Expand All @@ -1069,7 +1074,7 @@ public void requestExtractRequest(ISqlTransaction transaction, String nodeId, St
new Object[] { requestId, nodeId, channelId, ExtractStatus.NE.name(), startBatchId,
endBatchId, triggerRouter.getTrigger().getTriggerId(),
triggerRouter.getRouter().getRouterId() }, new int[] { Types.BIGINT, Types.VARCHAR,
Types.VARCHAR, Types.BIGINT, Types.BIGINT, Types.VARCHAR, Types.VARCHAR });
Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.BIGINT, Types.VARCHAR, Types.VARCHAR });
}

protected void updateExtractRequestStatus(ISqlTransaction transaction, long extractId,
Expand Down

0 comments on commit c4535ea

Please sign in to comment.