Skip to content

Commit

Permalink
0001977: The bean shell router swallows errors and does not route the…
Browse files Browse the repository at this point in the history
… data
  • Loading branch information
chenson42 committed Sep 18, 2014
1 parent 2548936 commit 23dd95e
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,33 @@

import bsh.EvalError;
import bsh.Interpreter;
import bsh.TargetError;

/**
* This data router is invoked when the router_type is 'bsh'. The router_expression is always a bean shell expression. See
* <a href='http://www.beanshell.org'>the bean shell site</a> for information about the capabilities of the bean shell scripting
* language.
* This data router is invoked when the router_type is 'bsh'. The
* router_expression is always a bean shell expression. See <a
* href='http://www.beanshell.org'>the bean shell site</a> for information about
* the capabilities of the bean shell scripting language.
* <P/>
* Bound to the interpreter are the names of both the current and old column values. They can be used in the expression. They should
* always be referenced using upper case. Also bound to the interpreter is a {@link Collection} of targetNodes. The script is expected
* to add the the list of target nodes a list of the node_ids that should be routed to.
* Bound to the interpreter are the names of both the current and old column
* values. They can be used in the expression. They should always be referenced
* using upper case. Also bound to the interpreter is a {@link Collection} of
* targetNodes. The script is expected to add the the list of target nodes a
* list of the node_ids that should be routed to.
*/
public class BshDataRouter extends AbstractDataRouter {

protected ISymmetricEngine engine;

final String INTERPRETER_KEY = String.format("%d.BshInterpreter", hashCode());

public BshDataRouter(ISymmetricEngine engine) {
this.engine = engine;
}

public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMetaData, Set<Node> nodes,
boolean initialLoad, boolean initialLoadSelectUsed, TriggerRouter triggerRouter) {
public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMetaData,
Set<Node> nodes, boolean initialLoad, boolean initialLoadSelectUsed,
TriggerRouter triggerRouter) {
try {
long ts = System.currentTimeMillis();
Interpreter interpreter = getInterpreter(context);
Expand All @@ -67,11 +72,19 @@ public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMe
Object returnValue = interpreter.eval(dataMetaData.getRouter().getRouterExpression());
context.incrementStat(System.currentTimeMillis() - ts, "bsh.eval.ms");
return eval(returnValue, nodes, targetNodes);
} catch (EvalError e) {
log.error("Error in data router: " + dataMetaData.getRouter() + ". Routing to nobody.", e);
return Collections.emptySet();
} catch (EvalError e) {
if (e instanceof TargetError) {
Throwable t = ((TargetError)e).getTarget();
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RuntimeException(t);
}
} else {
throw new RuntimeException(e);
}
}
}
}

protected Interpreter getInterpreter(SimpleRouterContext context) {
Interpreter interpreter = (Interpreter) context.getContextCache().get(INTERPRETER_KEY);
Expand Down Expand Up @@ -106,16 +119,17 @@ protected Set<String> eval(Object value, Set<Node> nodes, Set<String> targetNode
}
}

protected void bind(Interpreter interpreter, DataMetaData dataMetaData, Set<Node> nodes, Set<String> targetNodes, boolean initialLoad)
throws EvalError {
protected void bind(Interpreter interpreter, DataMetaData dataMetaData, Set<Node> nodes,
Set<String> targetNodes, boolean initialLoad) throws EvalError {
interpreter.set("log", log);
interpreter.set("initialLoad", initialLoad);
interpreter.set("initialLoad", initialLoad);
interpreter.set("dataMetaData", dataMetaData);
interpreter.set("nodes", nodes);
interpreter.set("identityNodeId", engine.getNodeService().findIdentityNodeId());
interpreter.set("targetNodes", targetNodes);
interpreter.set("engine", engine);
Map<String, Object> params = getDataObjectMap(dataMetaData, engine.getSymmetricDialect(), true);
Map<String, Object> params = getDataObjectMap(dataMetaData, engine.getSymmetricDialect(),
true);
if (params != null) {
for (String param : params.keySet()) {
interpreter.set(param, params.get(param));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class DataGapRouteReader implements IDataToRouteReader {

protected int takeTimeout;

protected ProcessInfo processInfo;

protected static Map<String, Boolean> lastSelectUsedGreaterThanQueryByEngineName = new HashMap<String, Boolean>();

public DataGapRouteReader(ChannelRouterContext context, ISymmetricEngine engine) {
Expand Down Expand Up @@ -110,7 +112,7 @@ public void run() {
protected void execute() {
ISymmetricDialect symmetricDialect = engine.getSymmetricDialect();
ISqlReadCursor<Data> cursor = null;
ProcessInfo processInfo = engine.getStatisticManager().
processInfo = engine.getStatisticManager().
newProcessInfo(new ProcessInfoKey(engine.getNodeService().findIdentityNodeId(), null, ProcessType.ROUTER_READER));
processInfo.setCurrentChannelId(context.getChannel().getChannelId());
try {
Expand Down Expand Up @@ -388,6 +390,9 @@ public boolean isReading() {

public void setReading(boolean reading) {
this.reading = reading;
if (processInfo.getStatus() != Status.ERROR) {
processInfo.setStatus(Status.OK);
}
}

public BlockingQueue<Data> getDataQueue() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.route;

public class DelayRoutingException extends RuntimeException {

private static final long serialVersionUID = 1L;

public DelayRoutingException() {
super();
}

public DelayRoutingException(String message, Throwable cause) {
super(message, cause);
}

public DelayRoutingException(String message) {
super(message);
}

public DelayRoutingException(Throwable cause) {
super(cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
*/
package org.jumpmind.symmetric.service.impl;

import static org.apache.commons.lang.StringUtils.isNotBlank;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -66,6 +68,7 @@
import org.jumpmind.symmetric.route.DataGapRouteReader;
import org.jumpmind.symmetric.route.DefaultBatchAlgorithm;
import org.jumpmind.symmetric.route.DefaultDataRouter;
import org.jumpmind.symmetric.route.DelayRoutingException;
import org.jumpmind.symmetric.route.FileSyncDataRouter;
import org.jumpmind.symmetric.route.IBatchAlgorithm;
import org.jumpmind.symmetric.route.IDataRouter;
Expand Down Expand Up @@ -406,6 +409,12 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod
context.setProduceCommonBatches(produceCommonBatches);
dataCount = selectDataAndRoute(processInfo, context);
return dataCount;
} catch (DelayRoutingException ex) {
log.info("The routing process for the {} channel is being delayed. {}", nodeChannel.getChannelId(), isNotBlank(ex.getMessage()) ? ex.getMessage() : "");
if (context != null) {
context.rollback();
}
return 0;
} catch (InterruptedException ex) {
log.warn("The routing process was interrupted. Rolling back changes");
if (context != null) {
Expand Down

0 comments on commit 23dd95e

Please sign in to comment.