Skip to content

Commit

Permalink
Merge pull request #993 from actiontech/issue-989
Browse files Browse the repository at this point in the history
#989 global table switch back conn in transaction for complex query
  • Loading branch information
yanhuqing666 committed Feb 15, 2019
2 parents a76d365 + d77cfde commit 2c240dd
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 5 deletions.
Expand Up @@ -165,8 +165,27 @@ protected final void noShardBuild() {
GlobalVisitor visitor = new GlobalVisitor(node, true);
visitor.visit();
String sql = visitor.getSql().toString();
RouteResultsetNode[] rrss = getTableSources(node.getNoshardNode(), sql);
String randomDataNode = getRandomNode(node.getNoshardNode());
RouteResultsetNode rrsNode = new RouteResultsetNode(randomDataNode, ServerParse.SELECT, sql);
RouteResultsetNode[] rrss = new RouteResultsetNode[]{rrsNode};
hBuilder.checkRRSs(rrss);
if (session.getTargetCount() > 0 && session.getTarget(rrss[0]) == null) {
for (String dataNode : node.getNoshardNode()) {
if (!dataNode.equals(randomDataNode)) {
RouteResultsetNode tmpRrsNode = new RouteResultsetNode(dataNode, ServerParse.SELECT, sql);
RouteResultsetNode[] tmpRrss = new RouteResultsetNode[]{tmpRrsNode};
hBuilder.checkRRSs(tmpRrss);
if (session.getTarget(tmpRrsNode) != null) {
rrss = tmpRrss;
hBuilder.removeRrs(rrsNode);
break;
} else {
hBuilder.removeRrs(tmpRrsNode);
}
}
}
}

MultiNodeMergeHandler mh = new MultiNodeMergeHandler(getSequenceId(), rrss, session.getSource().isAutocommit() && !session.getSource().isTxStart(),
session, null);
addHandler(mh);
Expand Down Expand Up @@ -397,7 +416,7 @@ protected void buildMergeHandler(PlanNode planNode, RouteResultsetNode[] rrssArr
addHandler(mh);
}

protected RouteResultsetNode[] getTableSources(Set<String> dataNodes, String sql) {
protected String getRandomNode(Set<String> dataNodes) {
String randomDatenode = null;
int index = (int) (System.currentTimeMillis() % dataNodes.size());
int i = 0;
Expand All @@ -408,8 +427,7 @@ protected RouteResultsetNode[] getTableSources(Set<String> dataNodes, String sql
}
i++;
}
RouteResultsetNode rrss = new RouteResultsetNode(randomDatenode, ServerParse.SELECT, sql);
return new RouteResultsetNode[]{rrss};
return randomDatenode;
}

protected TableConfig getTableConfig(String schema, String table) {
Expand Down
Expand Up @@ -39,6 +39,10 @@ synchronized void checkRRSs(RouteResultsetNode[] rrssArray) {
}
}

synchronized void removeRrs(RouteResultsetNode rrsNode) {
rrsNodes.remove(rrsNode);
}

/**
* start all leaf handler of children of special handler
*/
Expand Down
Expand Up @@ -12,6 +12,7 @@
import com.actiontech.dble.plan.node.NoNameNode;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.server.parser.ServerParse;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -50,7 +51,8 @@ public void buildOwn() {
String sql = visitor.getSql().toString();
String schema = session.getSource().getSchema();
SchemaConfig schemaConfig = schemaConfigMap.get(schema);
RouteResultsetNode[] rrss = getTableSources(schemaConfig.getAllDataNodes(), sql);
String randomDatenode = getRandomNode(schemaConfig.getAllDataNodes());
RouteResultsetNode[] rrss = new RouteResultsetNode[]{new RouteResultsetNode(randomDatenode, ServerParse.SELECT, sql)};
hBuilder.checkRRSs(rrss);
MultiNodeMergeHandler mh = new MultiNodeMergeHandler(getSequenceId(), rrss, session.getSource().isAutocommit() && !session.getSource().isTxStart(),
session, null);
Expand Down

0 comments on commit 2c240dd

Please sign in to comment.