Skip to content

Commit

Permalink
create sequence of fetchIds for every related shard of current job
Browse files Browse the repository at this point in the history
  • Loading branch information
seut committed Feb 24, 2015
1 parent ff9b8e5 commit f91cba0
Show file tree
Hide file tree
Showing 17 changed files with 147 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.crate.executor.transport.NodeCollectRequest;
import io.crate.executor.transport.NodeCollectResponse;
import io.crate.executor.transport.TransportCollectNodeAction;
import io.crate.metadata.table.TableInfo;
import io.crate.operation.collect.HandlerSideDataCollectOperation;
import io.crate.operation.collect.StatsTables;
import io.crate.planner.node.dql.CollectNode;
Expand Down Expand Up @@ -89,7 +90,7 @@ public void start() {
for (int i = 0; i < nodeIds.length; i++) {
final int resultIdx = i;

if (nodeIds[i] == null) {
if (nodeIds[i] == TableInfo.NULL_NODE_ID) {
handlerSideCollect(resultIdx);
continue;
}
Expand Down
32 changes: 25 additions & 7 deletions sql/src/main/java/io/crate/metadata/Routing.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package io.crate.metadata;

import com.google.common.base.Objects;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

public class Routing implements Streamable {

private Map<String, Map<String, Set<Integer>>> locations;
private volatile int numShards = -1;
private int fetchIdBase = -1;

public Routing() {

Expand Down Expand Up @@ -103,9 +104,17 @@ public boolean containsShards(String nodeId) {
return false;
}

public void fetchIdBase(int fetchIdBase) {
this.fetchIdBase = fetchIdBase;
}

public int fetchIdBase() {
return fetchIdBase;
}

@Override
public String toString() {
Objects.ToStringHelper helper = Objects.toStringHelper(this);
MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this);
if (hasLocations()) {
helper.add("locations", locations);
}
Expand All @@ -117,15 +126,15 @@ public String toString() {
public void readFrom(StreamInput in) throws IOException {
int numLocations = in.readVInt();
if (numLocations > 0) {
locations = new HashMap<>(numLocations);
locations = new TreeMap<>();

String nodeId;
int numInner;
Map<String, Set<Integer>> innerMap;
for (int i = 0; i < numLocations; i++) {
nodeId = in.readOptionalString();
nodeId = in.readString();
numInner = in.readVInt();
innerMap = new HashMap<>(numInner);
innerMap = new TreeMap<>();

locations.put(nodeId, innerMap);
for (int j = 0; j < numInner; j++) {
Expand All @@ -140,6 +149,9 @@ public void readFrom(StreamInput in) throws IOException {
}
}
}
if (in.readBoolean()) {
fetchIdBase = in.readVInt();
}
}

@Override
Expand All @@ -148,7 +160,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(locations.size());

for (Map.Entry<String, Map<String, Set<Integer>>> entry : locations.entrySet()) {
out.writeOptionalString(entry.getKey());
out.writeString(entry.getKey());

if (entry.getValue() == null) {
out.writeVInt(0);
Expand All @@ -172,5 +184,11 @@ public void writeTo(StreamOutput out) throws IOException {
} else {
out.writeVInt(0);
}
if (fetchIdBase > -1) {
out.writeBoolean(true);
out.writeVInt(fetchIdBase);
} else {
out.writeBoolean(false);
}
}
}
4 changes: 2 additions & 2 deletions sql/src/main/java/io/crate/metadata/blob/BlobTableInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private void processShardRouting(Map<String, Map<String, Set<Integer>>> location
node = shardRouting.currentNodeId();
Map<String, Set<Integer>> nodeMap = locations.get(node);
if (nodeMap == null) {
nodeMap = new HashMap<>();
nodeMap = new TreeMap<>();
locations.put(shardRouting.currentNodeId(), nodeMap);
}

Expand All @@ -145,7 +145,7 @@ private void processShardRouting(Map<String, Map<String, Set<Integer>>> location

@Override
public Routing getRouting(WhereClause whereClause, @Nullable String preference) {
Map<String, Map<String, Set<Integer>>> locations = new HashMap<>();
Map<String, Map<String, Set<Integer>>> locations = new TreeMap<>();
GroupShardsIterator shardIterators = clusterService.operationRouting().searchShards(
clusterService.state(),
Strings.EMPTY_ARRAY,
Expand Down
4 changes: 2 additions & 2 deletions sql/src/main/java/io/crate/metadata/doc/DocTableInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private void processShardRouting(Map<String, Map<String, Set<Integer>>> location
String node = shardRouting.currentNodeId();
Map<String, Set<Integer>> nodeMap = locations.get(node);
if (nodeMap == null) {
nodeMap = new HashMap<>();
nodeMap = new TreeMap<>();
locations.put(shardRouting.currentNodeId(), nodeMap);
}

Expand Down Expand Up @@ -183,7 +183,7 @@ public Routing getRouting(WhereClause whereClause, @Nullable String preference)
private Routing getRouting(
final ClusterStateObserver observer, final WhereClause whereClause, @Nullable final String preference) {
ClusterState clusterState = observer.observedState();
final Map<String, Map<String, Set<Integer>>> locations = new HashMap<>();
final Map<String, Map<String, Set<Integer>>> locations = new TreeMap<>();

String[] routingIndices = concreteIndices;
if (whereClause.partitions().size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ protected InformationTableInfo(InformationSchemaInfo schemaInfo,
this.references = references;
this.columns = columns;
this.concreteIndices = new String[]{ident.esName()};
Map<String, Map<String, Set<Integer>>> locations = new HashMap<>(1);
Map<String, Set<Integer>> tableLocation = new HashMap<>(1);
Map<String, Map<String, Set<Integer>>> locations = new TreeMap<>();
Map<String, Set<Integer>> tableLocation = new TreeMap<>();
tableLocation.put(ident.fqn(), null);
locations.put(null, tableLocation);
locations.put(NULL_NODE_ID, tableLocation);
this.routing = new Routing(locations);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class SysClusterTableInfo extends SysTableInfo {
public static final TableIdent IDENT = new TableIdent(SCHEMA, "cluster");
public static final Routing ROUTING = new Routing(
MapBuilder.<String, Map<String, Set<Integer>>>newMapBuilder().put(
null,
NULL_NODE_ID,
MapBuilder.<String, Set<Integer>>newMapBuilder().put(IDENT.fqn(), null).map()
).map()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ private void processShardRouting(Map<String, Map<String, Set<Integer>>> routing,

node = shardRouting.currentNodeId();
if (!shardRouting.active()) {
node = null;
node = NULL_NODE_ID;
}
Map<String, Set<Integer>> nodeMap = routing.get(node);
if (nodeMap == null) {
nodeMap = new HashMap<>();
nodeMap = new TreeMap<>();
routing.put(node, nodeMap);
}

Expand All @@ -132,7 +132,7 @@ public TableIdent ident() {
@Override
public Routing getRouting(WhereClause whereClause, @Nullable String preference) {
// TODO: filter on whereClause
Map<String, Map<String, Set<Integer>>> locations = new HashMap<>();
Map<String, Map<String, Set<Integer>>> locations = new TreeMap<>();
for (ShardRouting shardRouting : clusterService.state().routingTable().allShards()) {
processShardRouting(locations, shardRouting, null);
}
Expand Down
6 changes: 6 additions & 0 deletions sql/src/main/java/io/crate/metadata/table/TableInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@

public interface TableInfo extends Iterable<ReferenceInfo> {

/**
* Because {@link java.util.TreeMap} does not support <code>null</code> keys,
* we use a placeholder(empty) string instead.
*/
public static final String NULL_NODE_ID = "";

/**
* the schemaInfo for the schema that contains this table.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ public CrateCollector getCollector(CollectNode collectNode, Projector downstream
return CrateCollector.NOOP;
}
Routing routing = collectNode.routing();
assert routing.locations().containsKey(null);
assert routing.locations().get(null).size() == 1;
String fqTableName = routing.locations().get(null).keySet().iterator().next();
assert routing.locations().containsKey(TableInfo.NULL_NODE_ID);
assert routing.locations().get(TableInfo.NULL_NODE_ID).size() == 1;
String fqTableName = routing.locations().get(TableInfo.NULL_NODE_ID).keySet().iterator().next();
Iterable<?> iterator = iterables.get(fqTableName);
CollectInputSymbolVisitor.Context ctx = docInputSymbolVisitor.process(collectNode);

Expand Down
4 changes: 2 additions & 2 deletions sql/src/main/java/io/crate/planner/PlanNodeBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ public static CollectNode collect(TableInfo tableInfo,
private static Routing filterRouting(Routing routing, String includeTableName) {
assert routing.hasLocations();
assert includeTableName != null;
Map<String, Map<String, Set<Integer>>> newLocations = new HashMap<>();
Map<String, Map<String, Set<Integer>>> newLocations = new TreeMap<>();

for (Map.Entry<String, Map<String, Set<Integer>>> entry : routing.locations().entrySet()) {
Map<String, Set<Integer>> tableMap = new HashMap<>();
Map<String, Set<Integer>> tableMap = new TreeMap<>();
for (Map.Entry<String, Set<Integer>> tableEntry : entry.getValue().entrySet()) {
if (includeTableName.equals(tableEntry.getKey())) {
tableMap.put(tableEntry.getKey(), tableEntry.getValue());
Expand Down
63 changes: 49 additions & 14 deletions sql/src/main/java/io/crate/planner/Planner.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

package io.crate.planner;

import com.carrotsearch.hppc.IntObjectOpenHashMap;
import com.carrotsearch.hppc.procedures.ObjectProcedure;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand All @@ -38,8 +39,8 @@
import io.crate.planner.node.ddl.*;
import io.crate.planner.node.dml.ESDeleteByQueryNode;
import io.crate.planner.node.dml.ESDeleteNode;
import io.crate.planner.node.dml.Upsert;
import io.crate.planner.node.dml.SymbolBasedUpsertByIdNode;
import io.crate.planner.node.dml.Upsert;
import io.crate.planner.node.dql.CollectNode;
import io.crate.planner.node.dql.DQLPlanNode;
import io.crate.planner.node.dql.FileUriCollectNode;
Expand All @@ -62,6 +63,7 @@
import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;

import javax.annotation.Nullable;
import java.io.IOException;
Expand All @@ -78,12 +80,45 @@ public class Planner extends AnalyzedStatementVisitor<Planner.Context, Plan> {
private Functions functions;
private AggregationProjection localMergeProjection;

protected static class Context {
public static class Context {

private final IntObjectOpenHashMap<ShardId> fetchIdShardId = new IntObjectOpenHashMap<>();
private int fetchBaseIdSeq = 0;

private final Analysis analysis;
/**
* Increase current {@link #fetchBaseIdSeq} by number of shards affected by given
* <code>routing</code> parameter and register a {@link org.elasticsearch.index.shard.ShardId}
* under each incremented fetchId.
* The current {@link #fetchBaseIdSeq} is set on the {@link io.crate.metadata.Routing} instance,
* in order to be able to re-generate fetchId's for every shard in a deterministic way.
*
* Skip generating fetchId's if {@link io.crate.metadata.Routing#fetchIdBase} is already
* set on the given <code>routing</code>.
*/
public void allocateFetchIds(Routing routing) {
if (routing.fetchIdBase() > -1) {
return;
}
int fetchIdBase = fetchBaseIdSeq;
fetchBaseIdSeq += routing.numShards();
routing.fetchIdBase(fetchIdBase);
for (Map<String, Set<Integer>> nodeRouting : routing.locations().values()) {
if (nodeRouting != null) {
for (Map.Entry<String, Set<Integer>> entry : nodeRouting.entrySet()) {
for (Integer shardId : entry.getValue()) {
fetchIdShardId.putIfAbsent(fetchIdBase++, new ShardId(entry.getKey(), shardId));
}
}
}
}
}

Context(Analysis analysis) {
this.analysis = analysis;
/**
* Return a {@link org.elasticsearch.index.shard.ShardId} for a given <code>fetchId</code>
* if exists at the fetchId-to-shardId registry map.
*/
public ShardId shardId(int fetchId) {
return fetchIdShardId.get(fetchId);
}
}

Expand All @@ -102,7 +137,7 @@ public Planner(ClusterService clusterService, AnalysisMetaData analysisMetaData,
*/
public Plan plan(Analysis analysis) {
AnalyzedStatement analyzedStatement = analysis.analyzedStatement();
return process(analyzedStatement, new Context(analysis));
return process(analyzedStatement, new Context());
}

@Override
Expand All @@ -112,23 +147,23 @@ protected Plan visitAnalyzedStatement(AnalyzedStatement analyzedStatement, Conte

@Override
protected Plan visitSelectStatement(SelectAnalyzedStatement statement, Context context) {
return consumingPlanner.plan(statement.relation());
return consumingPlanner.plan(statement.relation(), context);
}

@Override
protected Plan visitInsertFromValuesStatement(InsertFromValuesAnalyzedStatement analysis, Context context) {
Preconditions.checkState(!analysis.sourceMaps().isEmpty(), "no values given");
return processInsertStatement(analysis);
protected Plan visitInsertFromValuesStatement(InsertFromValuesAnalyzedStatement statement, Context context) {
Preconditions.checkState(!statement.sourceMaps().isEmpty(), "no values given");
return processInsertStatement(statement);
}

@Override
protected Plan visitInsertFromSubQueryStatement(InsertFromSubQueryAnalyzedStatement analysis, Context context) {
return consumingPlanner.plan(analysis);
protected Plan visitInsertFromSubQueryStatement(InsertFromSubQueryAnalyzedStatement statement, Context context) {
return consumingPlanner.plan(statement, context);
}

@Override
protected Plan visitUpdateStatement(UpdateAnalyzedStatement statement, Context context) {
return consumingPlanner.plan(statement);
return consumingPlanner.plan(statement, context);
}

@Override
Expand Down Expand Up @@ -304,7 +339,7 @@ private void copyFromPlan(CopyAnalyzedStatement analysis, IterablePlan plan) {

private Routing generateRouting(DiscoveryNodes allNodes, int maxNodes) {
final AtomicInteger counter = new AtomicInteger(maxNodes);
final Map<String, Map<String, Set<Integer>>> locations = new HashMap<>();
final Map<String, Map<String, Set<Integer>>> locations = new TreeMap<>();
allNodes.dataNodes().keys().forEach(new ObjectProcedure<String>() {
@Override
public void apply(String value) {
Expand Down
10 changes: 8 additions & 2 deletions sql/src/main/java/io/crate/planner/consumer/ConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,18 @@

import io.crate.analyze.relations.AnalyzedRelation;
import io.crate.exceptions.ValidationException;
import io.crate.planner.Planner;
import org.elasticsearch.common.Nullable;

public class ConsumerContext {

private AnalyzedRelation rootRelation;

private ValidationException validationException;
private Planner.Context plannerContext;

public ConsumerContext(AnalyzedRelation rootRelation) {
public ConsumerContext(AnalyzedRelation rootRelation, Planner.Context plannerContext) {
this.rootRelation = rootRelation;
this.plannerContext = plannerContext;
}

public void rootRelation(AnalyzedRelation relation) {
Expand All @@ -52,4 +54,8 @@ public void validationException(ValidationException validationException){
public ValidationException validationException(){
return validationException;
}

public Planner.Context plannerContext() {
return plannerContext;
}
}
Loading

0 comments on commit f91cba0

Please sign in to comment.