Skip to content
Permalink
Browse files

0003809: Data extract and routing fallback to contains_big_lob

  • Loading branch information...
erilong committed Mar 13, 2019
1 parent b1981c6 commit 43144256ab3541be5249eb7e0fcc0f3e050e56e7
@@ -177,6 +177,16 @@ public void resetStats() {
setLoadMillis(0);
}

public void resetRouterStats() {
dataRowCount = 0;
dataInsertRowCount = 0;
dataUpdateRowCount = 0;
dataDeleteRowCount = 0;
otherRowCount = 0;
reloadRowCount = 0;
tableCounts.clear();
}

public void revertStatsOnError() {
if (this.oldDataRowCount > 0) {
this.extractMillis = this.oldExtractMillis;
@@ -77,6 +77,9 @@
private Set<String> transactions = new HashSet<String>();
private long lastDataId = -1;
private List<Long> dataIds = new ArrayList<Long>();
private List<Long> uncommittedDataIds = new ArrayList<Long>();
private long uncommittedDataEventCount = 0;
private long committedDataEventCount = 0;

public ChannelRouterContext(String nodeId, NodeChannel channel, ISqlTransaction transaction)
throws SQLException {
@@ -96,18 +99,23 @@ public void clearDataEventsList() {
public void addDataEvent(long dataId, long batchId, String routerId) {
dataEventsToSend.add(new DataEvent(dataId, batchId, routerId));
if (dataId != lastDataId) {
dataIds.add(dataId);
uncommittedDataIds.add(dataId);
lastDataId = dataId;
}
uncommittedDataEventCount++;
}

public void addData(long dataId) {
if (dataId != lastDataId) {
dataIds.add(dataId);
uncommittedDataIds.add(dataId);
lastDataId = dataId;
}
}

public long getCommittedDataEventCount() {
return this.committedDataEventCount;
}

public Map<String, OutgoingBatch> getBatchesByNodes() {
return batchesByNodes;
}
@@ -119,6 +127,8 @@ public void addData(long dataId) {
public void commit() {
try {
sqlTransaction.commit();
dataIds.addAll(uncommittedDataIds);
committedDataEventCount += uncommittedDataEventCount;
} finally {
clearState();
}
@@ -131,6 +141,8 @@ protected void clearState() {
this.batchesByNodes.clear();
this.availableNodes.clear();
this.dataEventsToSend.clear();
this.uncommittedDataIds.clear();
this.uncommittedDataEventCount = 0;
}

public void rollback() {
@@ -1050,7 +1050,8 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targe
} catch (ProtocolException e) {
if (!configurationService.getNodeChannel(currentBatch.getChannelId(), false).getChannel().isContainsBigLob()) {
log.warn(e.getMessage());
log.info("Re-attempting extraction with contains_big_lobs enabled for channel " + currentBatch.getChannelId());
log.info("Re-attempting extraction for batch {} with contains_big_lobs enabled for channel {}",
currentBatch.getBatchId(), currentBatch.getChannelId());
extractInfo.setTotalDataCount(currentBatch.getDataRowCount());
currentBatch.resetStats();
IStagedResource resource = getStagedResource(currentBatch);
@@ -490,8 +490,8 @@ protected void sendReverseInitialLoad(ProcessInfo processInfo) {
* thread pool here and waiting for all channels to be processed. The other
* reason is to reduce the number of connections we are required to have.
*/
protected int routeDataForEachChannel() {
int dataCount = 0;
protected long routeDataForEachChannel() {
long dataCount = 0;
Node sourceNode = engine.getNodeService().findIdentity();
ProcessInfo processInfo = engine.getStatisticManager().newProcessInfo(
new ProcessInfoKey(sourceNode.getNodeId(), null, ProcessType.ROUTER_JOB));
@@ -506,13 +506,7 @@ protected int routeDataForEachChannel() {
engine.getClusterService().refreshLock(ClusterConstants.ROUTE);
if (nodeChannel.isEnabled() && (readyChannels == null || readyChannels.contains(nodeChannel.getChannelId()))) {
processInfo.setCurrentChannelId(nodeChannel.getChannelId());
int count = routeDataForChannel(processInfo, nodeChannel, sourceNode, false);
if (count >= 0) {
dataCount += count;
} else {
log.info("Re-attempting routing with contains_big_lobs enabled for channel " + nodeChannel.getChannelId());
dataCount += routeDataForChannel(processInfo, nodeChannel, sourceNode, true);
}
dataCount += routeDataForChannel(processInfo, nodeChannel, sourceNode, false, null);
} else if (!nodeChannel.isEnabled()) {
gapDetector.setIsAllDataRead(false);
if (log.isDebugEnabled()) {
@@ -707,10 +701,11 @@ protected boolean onlyDefaultRoutersAssigned(Channel channel, String nodeGroupId
return onlyDefaultRoutersAssigned;
}

protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nodeChannel, final Node sourceNode, boolean isOverrideContainsBigLob) {
protected long routeDataForChannel(ProcessInfo processInfo, final NodeChannel nodeChannel, final Node sourceNode, boolean isOverrideContainsBigLob,
Map<String, OutgoingBatch> overrideBatchesByNodes) {
ChannelRouterContext context = null;
long ts = System.currentTimeMillis();
int dataCount = -1;
long dataCount = -1;
try {
List<TriggerRouter> triggerRouters = engine.getTriggerRouterService().getTriggerRouters(false);
boolean producesCommonBatches = producesCommonBatches(nodeChannel.getChannel(), parameterService.getNodeGroupId(),
@@ -724,6 +719,10 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod
context.setOnlyDefaultRoutersAssigned(onlyDefaultRoutersAssigned);
context.setDataGaps(gapDetector.getDataGaps());
context.setOverrideContainsBigLob(isOverrideContainsBigLob);

if (overrideBatchesByNodes != null) {
context.getBatchesByNodes().putAll(overrideBatchesByNodes);
}

dataCount = selectDataAndRoute(processInfo, context);
return dataCount;
@@ -732,13 +731,13 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod
if (context != null) {
context.rollback();
}
return 0;
return context.getCommittedDataEventCount();
} catch (InterruptedException ex) {
log.warn("The routing process was interrupted. Rolling back changes");
if (context != null) {
context.rollback();
}
return 0;
return context.getCommittedDataEventCount();
} catch (SyntaxParsingException ex) {
log.error(
String.format(
@@ -747,25 +746,38 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod
if (context != null) {
context.rollback();
}
return 0;
return context.getCommittedDataEventCount();
} catch (ProtocolException ex) {
List<OutgoingBatch> batches = new ArrayList<OutgoingBatch>(context.getBatchesByNodes().values());
Map<String, OutgoingBatch> batchesByNodes = new HashMap<String, OutgoingBatch>(context.getBatchesByNodes());
if (context != null) {
context.rollback();
}
for (OutgoingBatch batch : batches) {
batch.setStatus(Status.OK);
engine.getOutgoingBatchService().updateOutgoingBatch(batch);
if (isOverrideContainsBigLob) {
log.error(String.format("Failed to route and batch data on '%s' channel", nodeChannel.getChannelId()), ex);
return context.getCommittedDataEventCount();
} else {
long batchId = -1;
for (OutgoingBatch batch : batchesByNodes.values()) {
batch.resetRouterStats();
batchId = batch.getBatchId();
}
log.info("Re-attempting routing for batch {} with contains_big_lobs enabled for channel {}",
batchId, nodeChannel.getChannelId());
dataCount = 0;
gapDetector.addDataIds(context.getDataIds());
gapDetector.afterRouting();
gapDetector.beforeRouting();
long dataCountWithBigLob = routeDataForChannel(processInfo, nodeChannel, sourceNode, true, batchesByNodes);
return context.getCommittedDataEventCount() + dataCountWithBigLob;
}
return -1;
} catch (Throwable ex) {
log.error(
String.format("Failed to route and batch data on '%s' channel",
nodeChannel.getChannelId()), ex);
if (context != null) {
context.rollback();
}
return 0;
return context.getCommittedDataEventCount();
} finally {
try {
if (dataCount > 0) {
@@ -795,7 +807,9 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod
engine.getStatisticManager().setDataUnRouted(channelId, dataLeftToRoute);
}
}
} else {
} else if (dataCount == -1) {
// rolled back as exception, but let gap detector know about what was committed before halting
gapDetector.addDataIds(context.getDataIds());
gapDetector.setIsAllDataRead(false);
}
} catch (Exception e) {
@@ -913,14 +927,14 @@ public Thread newThread(Runnable r) {
* @param context
* The current context of the routing process
*/
protected int selectDataAndRoute(ProcessInfo processInfo, ChannelRouterContext context) throws InterruptedException {
protected long selectDataAndRoute(ProcessInfo processInfo, ChannelRouterContext context) throws InterruptedException {
IDataToRouteReader reader = startReading(context);
Data data = null;
Data nextData = null;
int totalDataCount = 0;
int totalDataEventCount = 0;
int statsDataCount = 0;
int statsDataEventCount = 0;
long totalDataCount = 0;
long totalDataEventCount = 0;
long statsDataCount = 0;
long statsDataEventCount = 0;
final int maxNumberOfEventsBeforeFlush = parameterService
.getInt(ParameterConstants.ROUTING_FLUSH_JDBC_BATCH_SIZE);
try {

0 comments on commit 4314425

Please sign in to comment.
You can’t perform that action at this time.