Skip to content

Commit

Permalink
use table stats to decide which side of NL to broadcast
Browse files Browse the repository at this point in the history
The smaller table should be broadcasted. This can
significantly speed up query execution if the size
difference of the tables is very big.

In a simple test it reduced the time for a query from 125
sec to 0.8 sec
  • Loading branch information
mfussenegger committed Oct 22, 2015
1 parent ea6a25c commit 45057ff
Show file tree
Hide file tree
Showing 8 changed files with 351 additions and 35 deletions.
2 changes: 2 additions & 0 deletions sql/src/main/java/io/crate/action/sql/TransportSQLAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
Expand All @@ -51,6 +52,7 @@
import java.util.List;


@Singleton
public class TransportSQLAction extends TransportBaseSQLAction<SQLRequest, SQLResponse> {

@Inject
Expand Down
120 changes: 120 additions & 0 deletions sql/src/main/java/io/crate/planner/TableStatsService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to Crate under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional
* information regarding copyright ownership. Crate licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial
* agreement.
*/

package io.crate.planner;


import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.ObjectLongOpenHashMap;
import io.crate.action.sql.SQLRequest;
import io.crate.action.sql.SQLResponse;
import io.crate.action.sql.TransportSQLAction;
import io.crate.metadata.TableIdent;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.BindingAnnotation;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;

@Singleton
public class TableStatsService extends AbstractComponent implements Runnable {

private static final SQLRequest REQUEST = new SQLRequest(
"select cast(sum(num_docs) as long), schema_name, table_name from sys.shards group by 2, 3");
private final Provider<TransportSQLAction> transportSQLAction;
private volatile ObjectLongMap<TableIdent> tableStats = null;

@BindingAnnotation
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
public @interface StatsUpdateInterval {}

@Inject
public TableStatsService(Settings settings,
ThreadPool threadPool,
@StatsUpdateInterval TimeValue updateInterval,
Provider<TransportSQLAction> transportSQLAction) {
super(settings);
this.transportSQLAction = transportSQLAction;
threadPool.scheduleWithFixedDelay(this, updateInterval);
}

@Override
public void run() {
updateStats();
}

private void updateStats() {
transportSQLAction.get().execute(
REQUEST,
new ActionListener<SQLResponse>() {

@Override
public void onResponse(SQLResponse sqlResponse) {
tableStats = statsFromResponse(sqlResponse);
}

@Override
public void onFailure(Throwable e) {
logger.error("error retrieving table stats", e);
}
});
}

private static ObjectLongMap<TableIdent> statsFromResponse(SQLResponse sqlResponse) {
ObjectLongMap<TableIdent> newStats = new ObjectLongOpenHashMap<>((int) sqlResponse.rowCount());
for (Object[] row : sqlResponse.rows()) {
newStats.put(new TableIdent((String) row[1], (String) row[2]), (long) row[0]);
}
return newStats;
}

/**
* Returns the number of docs a table has.
*
* <p>
* The returned number isn't an accurate real-time value but a cached value that is periodically updated
* </p>
* Returns -1 if the table isn't in the cache
*/
public long numDocs(TableIdent tableIdent) {
ObjectLongMap<TableIdent> stats = tableStats;
if (stats == null) {
stats = statsFromResponse(transportSQLAction.get().execute(REQUEST).actionGet(30, TimeUnit.SECONDS));
tableStats = stats;
}
if (stats.containsKey(tableIdent)) {
return stats.get(tableIdent);
}
return -1;
}
}
99 changes: 69 additions & 30 deletions sql/src/main/java/io/crate/planner/consumer/CrossJoinConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.crate.exceptions.ValidationException;
import io.crate.metadata.OutputName;
import io.crate.operation.projectors.TopN;
import io.crate.planner.TableStatsService;
import io.crate.planner.distribution.DistributionInfo;
import io.crate.planner.distribution.UpstreamPhase;
import io.crate.planner.fetch.FetchRequiredVisitor;
Expand All @@ -49,6 +50,8 @@
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;

import javax.annotation.Nullable;
import java.util.*;
Expand All @@ -60,11 +63,11 @@
public class CrossJoinConsumer implements Consumer {

private final Visitor visitor;
private final static ESLogger LOGGER = Loggers.getLogger(CrossJoinConsumer.class);

@Inject
public CrossJoinConsumer(ClusterService clusterService,
AnalysisMetaData analysisMetaData) {
visitor = new Visitor(clusterService, analysisMetaData);
public CrossJoinConsumer(ClusterService clusterService, AnalysisMetaData analysisMetaData, TableStatsService tableStatsService) {
visitor = new Visitor(clusterService, analysisMetaData, tableStatsService);
}

@Override
Expand All @@ -76,11 +79,13 @@ private static class Visitor extends RelationPlanningVisitor {

private final ClusterService clusterService;
private final AnalysisMetaData analysisMetaData;
private final TableStatsService tableStatsService;
private final SubRelationConverter subRelationConverter;

public Visitor(ClusterService clusterService, AnalysisMetaData analysisMetaData) {
public Visitor(ClusterService clusterService, AnalysisMetaData analysisMetaData, TableStatsService tableStatsService) {
this.clusterService = clusterService;
this.analysisMetaData = analysisMetaData;
this.tableStatsService = tableStatsService;
subRelationConverter = new SubRelationConverter(analysisMetaData);
}

Expand Down Expand Up @@ -129,6 +134,18 @@ public PlannedAnalyzedRelation visitMultiSourceSelect(MultiSourceSelect statemen
QueriedTableRelation<?> left = queriedTables.get(0);
QueriedTableRelation<?> right = queriedTables.get(1);

boolean broadcastLeftTable = false;
if (isDistributed) {
long leftNumDocs = tableStatsService.numDocs(left.tableRelation().tableInfo().ident());
long rightNumDocs = tableStatsService.numDocs(right.tableRelation().tableInfo().ident());

if (rightNumDocs > leftNumDocs) {
broadcastLeftTable = true;
LOGGER.debug("Right table is larger with {} docs (left has {}. Will change left plan to broadcast its result",
rightNumDocs, leftNumDocs);
}
}

Integer limit = statement.querySpec().limit();
if (!isFilterNeeded && limit != null) {
context.requiredPageSize(limit + statement.querySpec().offset());
Expand All @@ -137,38 +154,61 @@ public PlannedAnalyzedRelation visitMultiSourceSelect(MultiSourceSelect statemen
PlannedAnalyzedRelation rightPlan = context.plannerContext().planSubRelation(right, context);
context.requiredPageSize(null);


Set<String> localExecutionNodes = ImmutableSet.of(clusterService.localNode().id());
Collection<String> nlExecutionNodes = localExecutionNodes;

MergePhase leftMerge = null;
MergePhase rightMerge = null;
if (isDistributed) {
leftPlan.resultPhase().distributionInfo(DistributionInfo.DEFAULT_SAME_NODE);
nlExecutionNodes = leftPlan.resultPhase().executionNodes();
} else {
leftMerge = mergePhase(
context,
nlExecutionNodes,
leftPlan.resultPhase(),
left.querySpec().orderBy(),
left.querySpec().outputs(),
false);
}
if (nlExecutionNodes.size() == 1
&& nlExecutionNodes.equals(rightPlan.resultPhase().executionNodes())) {
// if the left and the right plan are executed on the same single node the mergePhase
// should be omitted. This is the case if the left and right table have only one shards which
// are on the same node
if (isDistributed && broadcastLeftTable) {
rightPlan.resultPhase().distributionInfo(DistributionInfo.DEFAULT_SAME_NODE);
nlExecutionNodes = rightPlan.resultPhase().executionNodes();

if (nlExecutionNodes.size() == 1
&& nlExecutionNodes.equals(leftPlan.resultPhase().executionNodes())) {
// if the left and the right plan are executed on the same single node the mergePhase
// should be omitted. This is the case if the left and right table have only one shards which
// are on the same node
leftPlan.resultPhase().distributionInfo(DistributionInfo.DEFAULT_SAME_NODE);
} else {
leftMerge = mergePhase(
context,
nlExecutionNodes,
leftPlan.resultPhase(),
left.querySpec().orderBy(),
left.querySpec().outputs(),
true);
leftPlan.resultPhase().distributionInfo(DistributionInfo.DEFAULT_BROADCAST);
}
} else {
rightMerge = mergePhase(
context,
nlExecutionNodes,
rightPlan.resultPhase(),
right.querySpec().orderBy(),
right.querySpec().outputs(),
isDistributed);
rightPlan.resultPhase().distributionInfo(DistributionInfo.DEFAULT_BROADCAST);
if (isDistributed) {
leftPlan.resultPhase().distributionInfo(DistributionInfo.DEFAULT_SAME_NODE);
nlExecutionNodes = leftPlan.resultPhase().executionNodes();
} else {
leftMerge = mergePhase(
context,
nlExecutionNodes,
leftPlan.resultPhase(),
left.querySpec().orderBy(),
left.querySpec().outputs(),
false);
}
if (nlExecutionNodes.size() == 1
&& nlExecutionNodes.equals(rightPlan.resultPhase().executionNodes())) {
// if the left and the right plan are executed on the same single node the mergePhase
// should be omitted. This is the case if the left and right table have only one shards which
// are on the same node
rightPlan.resultPhase().distributionInfo(DistributionInfo.DEFAULT_SAME_NODE);
} else {
rightMerge = mergePhase(
context,
nlExecutionNodes,
rightPlan.resultPhase(),
right.querySpec().orderBy(),
right.querySpec().outputs(),
isDistributed);
rightPlan.resultPhase().distributionInfo(DistributionInfo.DEFAULT_BROADCAST);
}
}

ProjectionBuilder projectionBuilder = new ProjectionBuilder(analysisMetaData.functions(), statement.querySpec());
Expand All @@ -192,7 +232,6 @@ public PlannedAnalyzedRelation visitMultiSourceSelect(MultiSourceSelect statemen
}
}


int topNLimit = firstNonNull(statement.querySpec().limit(), Constants.DEFAULT_SELECT_LIMIT);
TopNProjection topN = projectionBuilder.topNProjection(
inputs,
Expand Down
4 changes: 4 additions & 0 deletions sql/src/main/java/io/crate/plugin/SQLModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@

import io.crate.action.sql.DDLStatementDispatcher;
import io.crate.metadata.FulltextAnalyzerResolver;
import io.crate.planner.TableStatsService;
import io.crate.service.SQLService;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.unit.TimeValue;


public class SQLModule extends AbstractModule {
Expand All @@ -34,5 +36,7 @@ protected void configure() {
bind(SQLService.class).asEagerSingleton();
bind(DDLStatementDispatcher.class).asEagerSingleton();
bind(FulltextAnalyzerResolver.class).asEagerSingleton();

bind(TimeValue.class).annotatedWith(TableStatsService.StatsUpdateInterval.class).toInstance(TimeValue.timeValueSeconds(60));
}
}
6 changes: 3 additions & 3 deletions sql/src/test/java/io/crate/analyze/BaseAnalyzerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public abstract class BaseAnalyzerTest extends CrateUnitTest {
.put("nodeOne", TreeMapBuilder.<String, List<Integer>>newMapBuilder().put("t1", Arrays.asList(1, 2)).map())
.put("nodeTow", TreeMapBuilder.<String, List<Integer>>newMapBuilder().put("t1", Arrays.asList(3, 4)).map())
.map());
static final TableIdent TEST_DOC_TABLE_IDENT = new TableIdent(Schemas.DEFAULT_SCHEMA_NAME, "users");
public static final TableIdent TEST_DOC_TABLE_IDENT = new TableIdent(Schemas.DEFAULT_SCHEMA_NAME, "users");
public static final TableInfo userTableInfo = TestingTableInfo.builder(TEST_DOC_TABLE_IDENT, shardRouting)
.add("id", DataTypes.LONG, null)
.add("other_id", DataTypes.LONG, null)
Expand All @@ -83,8 +83,8 @@ public abstract class BaseAnalyzerTest extends CrateUnitTest {
.addPrimaryKey("id")
.clusteredBy("id")
.build();
static final TableIdent TEST_DOC_TABLE_IDENT_MULTI_PK = new TableIdent(Schemas.DEFAULT_SCHEMA_NAME, "users_multi_pk");
static final TableInfo userTableInfoMultiPk = TestingTableInfo.builder(TEST_DOC_TABLE_IDENT_MULTI_PK, shardRouting)
public static final TableIdent TEST_DOC_TABLE_IDENT_MULTI_PK = new TableIdent(Schemas.DEFAULT_SCHEMA_NAME, "users_multi_pk");
public static final TableInfo userTableInfoMultiPk = TestingTableInfo.builder(TEST_DOC_TABLE_IDENT_MULTI_PK, shardRouting)
.add("id", DataTypes.LONG, null)
.add("name", DataTypes.STRING, null)
.add("details", DataTypes.OBJECT, null)
Expand Down
1 change: 1 addition & 0 deletions sql/src/test/java/io/crate/planner/PlannerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class TestModule extends MetaDataModule {

@Override
protected void configure() {
bind(TableStatsService.class).toInstance(mock(TableStatsService.class));
bind(ThreadPool.class).toInstance(threadPool);
clusterService = mock(ClusterService.class);
DiscoveryNode localNode = mock(DiscoveryNode.class);
Expand Down
Loading

0 comments on commit 45057ff

Please sign in to comment.