Navigation Menu

Skip to content

Commit

Permalink
0002323: Don't select row_data, old_data or pk_data when routing if o…
Browse files Browse the repository at this point in the history
…nly default routers are assigned
  • Loading branch information
chenson42 committed Jun 19, 2015
1 parent 96d8185 commit 6cd7794
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 14 deletions.
Expand Up @@ -64,6 +64,7 @@ public class ChannelRouterContext extends SimpleRouterContext {
private Data lastDataProcessed;
private List<DataEvent> dataEventsToSend = new ArrayList<DataEvent>();
private boolean produceCommonBatches = false;
private boolean onlyDefaultRoutersAssigned = false;
private long lastLoadId = -1;
private long startDataId;
private long endDataId;
Expand Down Expand Up @@ -258,5 +259,13 @@ public void addTransaction(String transactionId) {
this.transactions.add(transactionId);
}
}

public void setOnlyDefaultRoutersAssigned(boolean onlyDefaultRoutersAssigned) {
this.onlyDefaultRoutersAssigned = onlyDefaultRoutersAssigned;
}

public boolean isOnlyDefaultRoutersAssigned() {
return onlyDefaultRoutersAssigned;
}

}
Expand Up @@ -121,8 +121,7 @@ public void run() {
}
}

protected void execute() {

protected void execute() {
long maxPeekAheadSizeInBytes = (long)(Runtime.getRuntime().maxMemory() * percentOfHeapToUse);
ISymmetricDialect symmetricDialect = engine.getSymmetricDialect();
ISqlReadCursor<Data> cursor = null;
Expand Down Expand Up @@ -365,13 +364,13 @@ protected String qualifyUsingDataGaps(List<DataGap> dataGaps, int numberOfGapsTo

protected String getSql(String sqlName, Channel channel) {
String select = engine.getRouterService().getSql(sqlName);
if (!channel.isUseOldDataToRoute()) {
if (!channel.isUseOldDataToRoute() || context.isOnlyDefaultRoutersAssigned()) {
select = select.replace("d.old_data", "''");
}
if (!channel.isUseRowDataToRoute()) {
if (!channel.isUseRowDataToRoute() || context.isOnlyDefaultRoutersAssigned()) {
select = select.replace("d.row_data", "''");
}
if (!channel.isUsePkDataToRoute()) {
if (!channel.isUsePkDataToRoute() || context.isOnlyDefaultRoutersAssigned()) {
select = select.replace("d.pk_data", "''");
}
return engine.getSymmetricDialect().massageDataExtractionSql(
Expand Down
Expand Up @@ -90,7 +90,10 @@
*/
public class RouterService extends AbstractService implements IRouterService {

protected Map<String, Boolean> defaultRouterOnlyLastState = new HashMap<String, Boolean>();
protected Map<String, Boolean> commonBatchesLastKnownState = new HashMap<String, Boolean>();


protected Map<String, Boolean> defaultRouterOnlyLastKnownState = new HashMap<String, Boolean>();

protected transient ExecutorService readThread = null;

Expand Down Expand Up @@ -324,15 +327,13 @@ protected int routeDataForEachChannel(DataGapDetector gapDetector) {
try {
final List<NodeChannel> channels = engine.getConfigurationService().getNodeChannels(
false);

for (NodeChannel nodeChannel : channels) {
if (nodeChannel.isEnabled()) {
processInfo.setCurrentChannelId(nodeChannel.getChannelId());
dataCount += routeDataForChannel(processInfo,
nodeChannel,
sourceNode,
producesCommonBatches(nodeChannel.getChannel(), parameterService.getNodeGroupId(),
engine.getTriggerRouterService().getTriggerRouters(false)), gapDetector);
sourceNode
, gapDetector);
} else {
if (log.isDebugEnabled()) {
log.debug(
Expand Down Expand Up @@ -416,26 +417,59 @@ protected boolean producesCommonBatches(Channel channel, String nodeGroupId, Lis
}
}

if (!producesCommonBatches.equals(defaultRouterOnlyLastState.get(channelId))) {
if (!producesCommonBatches.equals(commonBatchesLastKnownState.get(channelId))) {
if (producesCommonBatches) {
log.info("The '{}' channel is in common batch mode", channelId);
} else {
log.info("The '{}' channel is NOT in common batch mode", channelId);
}
defaultRouterOnlyLastState.put(channelId, producesCommonBatches);
commonBatchesLastKnownState.put(channelId, producesCommonBatches);
}
return producesCommonBatches;
}

protected boolean onlyDefaultRoutersAssigned(Channel channel, String nodeGroupId, List<TriggerRouter> triggerRouters) {
String channelId = channel.getChannelId();
Boolean onlyDefaultRoutersAssigned = !Constants.CHANNEL_CONFIG.equals(channelId)
&& !channel.isFileSyncFlag()
&& !channel.isReloadFlag()
&& !Constants.CHANNEL_HEARTBEAT.equals(channelId) ? true : false;
if (onlyDefaultRoutersAssigned && triggerRouters != null) {
for (TriggerRouter triggerRouter : triggerRouters) {
if (triggerRouter.getTrigger().getChannelId().equals(channel.getChannelId()) &&
triggerRouter.getRouter().getNodeGroupLink().getSourceNodeGroupId()
.equals(nodeGroupId) && !"default".equals(triggerRouter.getRouter().getRouterType())) {
onlyDefaultRoutersAssigned = false;
}
}
}

if (!onlyDefaultRoutersAssigned.equals(defaultRouterOnlyLastKnownState.get(channelId))) {
if (onlyDefaultRoutersAssigned) {
log.info("The '{}' channel for the '{}' node group has only default routers assigned to it. Change data won't be selected during routing", channelId, nodeGroupId);
}
defaultRouterOnlyLastKnownState.put(channelId, onlyDefaultRoutersAssigned);
}
return onlyDefaultRoutersAssigned;
}

protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nodeChannel, final Node sourceNode,
boolean produceCommonBatches, DataGapDetector gapDetector) {
DataGapDetector gapDetector) {
ChannelRouterContext context = null;
long ts = System.currentTimeMillis();
int dataCount = -1;
try {
List<TriggerRouter> triggerRouters = engine.getTriggerRouterService().getTriggerRouters(false);
boolean producesCommonBatches = producesCommonBatches(nodeChannel.getChannel(), parameterService.getNodeGroupId(),
triggerRouters);
boolean onlyDefaultRoutersAssigned = onlyDefaultRoutersAssigned(nodeChannel.getChannel(),
parameterService.getNodeGroupId(), triggerRouters);

context = new ChannelRouterContext(sourceNode.getNodeId(), nodeChannel,
symmetricDialect.getPlatform().getSqlTemplate().startSqlTransaction());
context.setProduceCommonBatches(produceCommonBatches);
context.setProduceCommonBatches(producesCommonBatches);
context.setOnlyDefaultRoutersAssigned(onlyDefaultRoutersAssigned);

dataCount = selectDataAndRoute(processInfo, context);
return dataCount;
} catch (DelayRoutingException ex) {
Expand Down

0 comments on commit 6cd7794

Please sign in to comment.