Skip to content

Commit

Permalink
create sequence of fetchIds for every related shard of current job
Browse files Browse the repository at this point in the history
  • Loading branch information
seut committed Feb 24, 2015
1 parent ff9b8e5 commit b818f92
Show file tree
Hide file tree
Showing 29 changed files with 402 additions and 130 deletions.
78 changes: 78 additions & 0 deletions core/src/main/java/io/crate/core/collections/TreeMapBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.core.collections;

import java.util.Map;

import static com.google.common.collect.Maps.newTreeMap;

/**
*
*/
public class TreeMapBuilder<K extends Comparable, V> {

public static <K extends Comparable, V> TreeMapBuilder<K, V> newMapBuilder() {
return new TreeMapBuilder<>();
}

private Map<K, V> map = newTreeMap();

public TreeMapBuilder() {
this.map = newTreeMap();
}

public TreeMapBuilder<K, V> putAll(Map<K, V> map) {
this.map.putAll(map);
return this;
}

public TreeMapBuilder<K, V> put(K key, V value) {
this.map.put(key, value);
return this;
}

public TreeMapBuilder<K, V> remove(K key) {
this.map.remove(key);
return this;
}

public TreeMapBuilder<K, V> clear() {
this.map.clear();
return this;
}

public V get(K key) {
return map.get(key);
}

public boolean containsKey(K key) {
return map.containsKey(key);
}

public boolean isEmpty() {
return map.isEmpty();
}

public Map<K, V> map() {
return this.map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.crate.executor.transport.NodeCollectRequest;
import io.crate.executor.transport.NodeCollectResponse;
import io.crate.executor.transport.TransportCollectNodeAction;
import io.crate.metadata.table.TableInfo;
import io.crate.operation.collect.HandlerSideDataCollectOperation;
import io.crate.operation.collect.StatsTables;
import io.crate.planner.node.dql.CollectNode;
Expand Down Expand Up @@ -89,7 +90,7 @@ public void start() {
for (int i = 0; i < nodeIds.length; i++) {
final int resultIdx = i;

if (nodeIds[i] == null) {
if (nodeIds[i] == TableInfo.NULL_NODE_ID) {
handlerSideCollect(resultIdx);
continue;
}
Expand Down
47 changes: 40 additions & 7 deletions sql/src/main/java/io/crate/metadata/Routing.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
package io.crate.metadata;

import com.google.common.base.Objects;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

public class Routing implements Streamable {

private Map<String, Map<String, Set<Integer>>> locations;
private volatile int numShards = -1;
private int fetchIdBase = -1;

public Routing() {

}

public Routing(@Nullable Map<String, Map<String, Set<Integer>>> locations) {
assert assertLocationsAllTreeMap(locations) : "locations must be a TreeMap only and must contain only TreeMap's";
this.locations = locations;
}

Expand Down Expand Up @@ -103,9 +105,17 @@ public boolean containsShards(String nodeId) {
return false;
}

public void fetchIdBase(int fetchIdBase) {
this.fetchIdBase = fetchIdBase;
}

public int fetchIdBase() {
return fetchIdBase;
}

@Override
public String toString() {
Objects.ToStringHelper helper = Objects.toStringHelper(this);
MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this);
if (hasLocations()) {
helper.add("locations", locations);
}
Expand All @@ -117,15 +127,15 @@ public String toString() {
public void readFrom(StreamInput in) throws IOException {
int numLocations = in.readVInt();
if (numLocations > 0) {
locations = new HashMap<>(numLocations);
locations = new TreeMap<>();

String nodeId;
int numInner;
Map<String, Set<Integer>> innerMap;
for (int i = 0; i < numLocations; i++) {
nodeId = in.readOptionalString();
nodeId = in.readString();
numInner = in.readVInt();
innerMap = new HashMap<>(numInner);
innerMap = new TreeMap<>();

locations.put(nodeId, innerMap);
for (int j = 0; j < numInner; j++) {
Expand All @@ -140,6 +150,9 @@ public void readFrom(StreamInput in) throws IOException {
}
}
}
if (in.readBoolean()) {
fetchIdBase = in.readVInt();
}
}

@Override
Expand All @@ -148,7 +161,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(locations.size());

for (Map.Entry<String, Map<String, Set<Integer>>> entry : locations.entrySet()) {
out.writeOptionalString(entry.getKey());
out.writeString(entry.getKey());

if (entry.getValue() == null) {
out.writeVInt(0);
Expand All @@ -172,5 +185,25 @@ public void writeTo(StreamOutput out) throws IOException {
} else {
out.writeVInt(0);
}
if (fetchIdBase > -1) {
out.writeBoolean(true);
out.writeVInt(fetchIdBase);
} else {
out.writeBoolean(false);
}
}

private boolean assertLocationsAllTreeMap(@Nullable Map<String, Map<String, Set<Integer>>> locations) {
if (locations != null) {
if (!(locations instanceof TreeMap)) {
return false;
}
for (Map<String, Set<Integer>> innerMap : locations.values()) {
if (!(innerMap instanceof TreeMap)) {
return false;
}
}
}
return true;
}
}
4 changes: 2 additions & 2 deletions sql/src/main/java/io/crate/metadata/blob/BlobTableInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private void processShardRouting(Map<String, Map<String, Set<Integer>>> location
node = shardRouting.currentNodeId();
Map<String, Set<Integer>> nodeMap = locations.get(node);
if (nodeMap == null) {
nodeMap = new HashMap<>();
nodeMap = new TreeMap<>();
locations.put(shardRouting.currentNodeId(), nodeMap);
}

Expand All @@ -145,7 +145,7 @@ private void processShardRouting(Map<String, Map<String, Set<Integer>>> location

@Override
public Routing getRouting(WhereClause whereClause, @Nullable String preference) {
Map<String, Map<String, Set<Integer>>> locations = new HashMap<>();
Map<String, Map<String, Set<Integer>>> locations = new TreeMap<>();
GroupShardsIterator shardIterators = clusterService.operationRouting().searchShards(
clusterService.state(),
Strings.EMPTY_ARRAY,
Expand Down
4 changes: 2 additions & 2 deletions sql/src/main/java/io/crate/metadata/doc/DocTableInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private void processShardRouting(Map<String, Map<String, Set<Integer>>> location
String node = shardRouting.currentNodeId();
Map<String, Set<Integer>> nodeMap = locations.get(node);
if (nodeMap == null) {
nodeMap = new HashMap<>();
nodeMap = new TreeMap<>();
locations.put(shardRouting.currentNodeId(), nodeMap);
}

Expand Down Expand Up @@ -183,7 +183,7 @@ public Routing getRouting(WhereClause whereClause, @Nullable String preference)
private Routing getRouting(
final ClusterStateObserver observer, final WhereClause whereClause, @Nullable final String preference) {
ClusterState clusterState = observer.observedState();
final Map<String, Map<String, Set<Integer>>> locations = new HashMap<>();
final Map<String, Map<String, Set<Integer>>> locations = new TreeMap<>();

String[] routingIndices = concreteIndices;
if (whereClause.partitions().size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ protected InformationTableInfo(InformationSchemaInfo schemaInfo,
this.references = references;
this.columns = columns;
this.concreteIndices = new String[]{ident.esName()};
Map<String, Map<String, Set<Integer>>> locations = new HashMap<>(1);
Map<String, Set<Integer>> tableLocation = new HashMap<>(1);
Map<String, Map<String, Set<Integer>>> locations = new TreeMap<>();
Map<String, Set<Integer>> tableLocation = new TreeMap<>();
tableLocation.put(ident.fqn(), null);
locations.put(null, tableLocation);
locations.put(NULL_NODE_ID, tableLocation);
this.routing = new Routing(locations);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@

import com.google.common.collect.ImmutableList;
import io.crate.analyze.WhereClause;
import io.crate.core.collections.TreeMapBuilder;
import io.crate.metadata.*;
import io.crate.metadata.settings.CrateSettings;
import io.crate.planner.RowGranularity;
import io.crate.types.DataType;
import io.crate.types.DataTypes;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject;

import javax.annotation.Nullable;
Expand All @@ -39,9 +39,9 @@ public class SysClusterTableInfo extends SysTableInfo {

public static final TableIdent IDENT = new TableIdent(SCHEMA, "cluster");
public static final Routing ROUTING = new Routing(
MapBuilder.<String, Map<String, Set<Integer>>>newMapBuilder().put(
null,
MapBuilder.<String, Set<Integer>>newMapBuilder().put(IDENT.fqn(), null).map()
TreeMapBuilder.<String, Map<String, Set<Integer>>>newMapBuilder().put(
NULL_NODE_ID,
TreeMapBuilder.<String, Set<Integer>>newMapBuilder().put(IDENT.fqn(), null).map()
).map()
);
private static final String[] PARTITIONS = new String[]{IDENT.name()};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
package io.crate.metadata.sys;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.crate.analyze.WhereClause;
import io.crate.core.collections.TreeMapBuilder;
import io.crate.metadata.*;
import io.crate.metadata.table.ColumnPolicy;
import io.crate.planner.RowGranularity;
Expand Down Expand Up @@ -188,13 +188,13 @@ public TableIdent ident() {
@Override
public Routing getRouting(WhereClause whereClause, @Nullable String preference) {
DiscoveryNodes nodes = clusterService.state().nodes();
ImmutableMap.Builder<String, Map<String, Set<Integer>>> builder = ImmutableMap.builder();
TreeMapBuilder<String, Map<String, Set<Integer>>> builder = TreeMapBuilder.newMapBuilder();

for (DiscoveryNode node : nodes) {
builder.put(node.id(), ImmutableMap.<String, Set<Integer>>of());
builder.put(node.id(), new TreeMap<String, Set<Integer>>());
}

return new Routing(builder.build());
return new Routing(builder.map());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ private void processShardRouting(Map<String, Map<String, Set<Integer>>> routing,

node = shardRouting.currentNodeId();
if (!shardRouting.active()) {
node = null;
node = NULL_NODE_ID;
}
Map<String, Set<Integer>> nodeMap = routing.get(node);
if (nodeMap == null) {
nodeMap = new HashMap<>();
nodeMap = new TreeMap<>();
routing.put(node, nodeMap);
}

Expand All @@ -132,7 +132,7 @@ public TableIdent ident() {
@Override
public Routing getRouting(WhereClause whereClause, @Nullable String preference) {
// TODO: filter on whereClause
Map<String, Map<String, Set<Integer>>> locations = new HashMap<>();
Map<String, Map<String, Set<Integer>>> locations = new TreeMap<>();
for (ShardRouting shardRouting : clusterService.state().routingTable().allShards()) {
processShardRouting(locations, shardRouting, null);
}
Expand Down
9 changes: 4 additions & 5 deletions sql/src/main/java/io/crate/metadata/sys/SysTableInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@

package io.crate.metadata.sys;

import com.google.common.collect.ImmutableMap;
import io.crate.analyze.WhereClause;
import io.crate.core.collections.TreeMapBuilder;
import io.crate.metadata.Routing;
import io.crate.metadata.table.AbstractTableInfo;
import io.crate.metadata.table.ColumnPolicy;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.MapBuilder;

import java.util.Map;
import java.util.Set;
Expand All @@ -47,14 +46,14 @@ protected SysTableInfo(ClusterService clusterService, SysSchemaInfo sysSchemaInf

protected Routing tableRouting(WhereClause whereClause) {
DiscoveryNodes nodes = clusterService.state().nodes();
ImmutableMap.Builder<String, Map<String, Set<Integer>>> builder = ImmutableMap.builder();
TreeMapBuilder<String, Map<String, Set<Integer>>> builder = TreeMapBuilder.newMapBuilder();
for (DiscoveryNode node : nodes) {
builder.put(
node.id(),
MapBuilder.<String, Set<Integer>>newMapBuilder().put(ident().fqn(), null).map()
TreeMapBuilder.<String, Set<Integer>>newMapBuilder().put(ident().fqn(), null).map()
);
}
return new Routing(builder.build());
return new Routing(builder.map());
}

@Override
Expand Down
6 changes: 6 additions & 0 deletions sql/src/main/java/io/crate/metadata/table/TableInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@

public interface TableInfo extends Iterable<ReferenceInfo> {

/**
* Because {@link java.util.TreeMap} does not support <code>null</code> keys,
* we use a placeholder(empty) string instead.
*/
public static final String NULL_NODE_ID = "";

/**
* the schemaInfo for the schema that contains this table.
*/
Expand Down
Loading

0 comments on commit b818f92

Please sign in to comment.