Skip to content

Commit

Permalink
fix: performing queries on a partitioned table
Browse files Browse the repository at this point in the history
could cause unnecessary “execution rejected”
  • Loading branch information
Philipp Bogensberger committed May 4, 2015
1 parent 3008857 commit b595737
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 23 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ Unreleased

NOTE: Upgrading from 0.47 or earlier versions requires a full cluster restart

- Fix: performing queries on a partitioned table could cause unnecessary
``execution rejected`` errors

- Fixed an issue that caused GROUP BY queries with multiple group keys, no
order by and limit to hang.

Expand Down
49 changes: 29 additions & 20 deletions sql/src/main/java/io/crate/operation/projectors/FetchProjector.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public class FetchProjector implements Projector, RowDownstreamHandle {
private final List<String> executionNodes;
private final int numNodes;
private final AtomicInteger remainingRequests = new AtomicInteger(0);
private final Map<String, Object[]> partitionValuesCache = new HashMap<>();
private final Object partitionValuesCacheLock = new Object();
private final Map<String, Row> partitionRowsCache = new HashMap<>();
private final Object partitionRowsCacheLock = new Object();

private int inputCursor = 0;
private boolean consumedRows = false;
Expand Down Expand Up @@ -171,15 +171,16 @@ public synchronized boolean setNextRow(Row row) {

String nodeId = jobSearchContextIdToNode.get(jobSearchContextId);
String index = jobSearchContextIdToShard.get(jobSearchContextId).getIndex();
int nodeIdIndex = Objects.hash(nodeId, index);
int nodeIdIndex = Objects.hash(nodeId);

NodeBucket nodeBucket = nodeBuckets.get(nodeIdIndex);
if (nodeBucket == null) {
nodeBucket = new NodeBucket(nodeId, jobSearchContextIdToShard.get(jobSearchContextId).getIndex(), executionNodes.indexOf(nodeId));
nodeBucket = new NodeBucket(nodeId, executionNodes.indexOf(nodeId));
nodeBuckets.put(nodeIdIndex, nodeBucket);

}
nodeBucket.add(inputCursor++, docId, row);
Row partitionRow = partitionedByRow(index);
nodeBucket.add(inputCursor++, docId, partitionRow, row);
if (bulkSize != NO_BULK_REQUESTS && nodeBucket.size() >= bulkSize) {
flushNodeBucket(nodeBucket);
nodeBuckets.remove(nodeIdIndex);
Expand Down Expand Up @@ -230,18 +231,22 @@ public void fail(Throwable throwable) {
private Row partitionedByRow(String index) {
if (!partitionedBy.isEmpty() && PartitionName.isPartition(index)) {
Object[] partitionValues;
synchronized (partitionValuesCacheLock) {
partitionValues = partitionValuesCache.get(index);
if (partitionValues == null) {
List<BytesRef> partitionRawValues = PartitionName.fromStringSafe(index).values();
partitionValues = new Object[partitionRawValues.size()];
for (int i = 0; i < partitionRawValues.size(); i++) {
partitionValues[i] = partitionedBy.get(i).type().value(partitionRawValues.get(i));
}
partitionValuesCache.put(index, partitionValues);
synchronized (partitionRowsCacheLock) {
Row partitionValuesRow = partitionRowsCache.get(index);
if (partitionValuesRow != null) {
return partitionValuesRow;
}
}
return new RowN(partitionValues);
List<BytesRef> partitionRowValues = PartitionName.fromStringSafe(index).values();
partitionValues = new Object[partitionRowValues.size()];
for (int i = 0; i < partitionRowValues.size(); i++) {
partitionValues[i] = partitionedBy.get(i).type().value(partitionRowValues.get(i));
}
Row partitionValuesRow = new RowN(partitionValues);
synchronized (partitionRowsCacheLock) {
partitionRowsCache.put(index, partitionValuesRow);
}
return partitionValuesRow;
}
return null;
}
Expand All @@ -259,7 +264,6 @@ private void flushNodeBucket(final NodeBucket nodeBucket) {
if (bulkSize > NO_BULK_REQUESTS) {
request.closeContext(false);
}
final Row partitionRow = partitionedByRow(nodeBucket.index);
transportFetchNodeAction.execute(nodeBucket.nodeId, request, new ActionListener<NodeFetchResponse>() {
@Override
public void onResponse(NodeFetchResponse response) {
Expand All @@ -271,6 +275,7 @@ public void onResponse(NodeFetchResponse response) {
if (needInputRow) {
collectRowDelegate.delegate(nodeBucket.inputRow(idx));
}
Row partitionRow = nodeBucket.partitionRow(idx);
if (partitionRow != null) {
partitionRowDelegate.delegate(partitionRow);
}
Expand Down Expand Up @@ -328,20 +333,20 @@ private static class NodeBucket {

private final int nodeIdx;
private final String nodeId;
private final String index;
private final List<Row> partitionRows = new ArrayList<>();
private final List<Row> inputRows = new ArrayList<>();
private final IntArrayList cursors = new IntArrayList();
private final LongArrayList docIds = new LongArrayList();

public NodeBucket(String nodeId, String index, int nodeIdx) {
public NodeBucket(String nodeId, int nodeIdx) {
this.nodeId = nodeId;
this.index = index;
this.nodeIdx = nodeIdx;
}

public void add(int cursor, Long docId, Row row) {
public void add(int cursor, Long docId, @Nullable Row partitionRow, Row row) {
cursors.add(cursor);
docIds.add(docId);
partitionRows.add(partitionRow);
inputRows.add(new RowN(row.materialize()));
}

Expand All @@ -360,6 +365,10 @@ public int cursor(int index) {
public Row inputRow(int index) {
return inputRows.get(index);
}

public @Nullable Row partitionRow(int idx) {
return partitionRows.get(idx);
}
}

private static class RowDelegate implements Row {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand Down Expand Up @@ -1773,4 +1774,29 @@ public void testGroupOnDynamicColumn() throws Exception {
assertThat((String)response.rows()[0][0], Is.is("hello"));
assertThat(response.rows()[1][0], Is.is(nullValue()));
}

@Test
public void testFetchPartitionedTable() throws Exception {
execute("SET GLOBAL stats.enabled = true");
execute("create table t (name string, p string) partitioned by (p) with (number_of_replicas=0)");
ensureYellow();
Object[][] bulkArgs = new Object[100][];
for (int i = 0; i < 100; i++) {
bulkArgs[i] = new Object[]{"Marvin", i};
}
execute("insert into t (name, p) values (?, ?)", bulkArgs);
execute("refresh table t");
execute("select * from t");
assertThat(response.rowCount(), is(100L));
waitNoPendingTasksOnAll();
execute("select count(*), job_id, arbitrary(name) from sys.operations_log where name='fetch' group by 2");
assertThat(response.rowCount(), is(lessThanOrEqualTo(1L)));
}

@After
@Override
public void tearDown() throws Exception {
execute("SET GLOBAL stats.enabled = false");
super.tearDown();
}
}
18 changes: 15 additions & 3 deletions stresstest/src/test/java/io/crate/benchmark/BenchmarkBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@

package io.crate.benchmark;

import io.crate.action.sql.SQLAction;
import io.crate.action.sql.SQLRequest;
import io.crate.action.sql.SQLResponse;
import io.crate.action.sql.*;
import io.crate.test.integration.CrateTestCluster;
import io.crate.test.integration.NodeSettingsSource;
import io.crate.test.integration.PathAccessor;
import io.crate.testing.SQLTransportExecutor;
import org.apache.lucene.util.AbstractRandomizedTest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
Expand Down Expand Up @@ -81,6 +80,15 @@ public class BenchmarkBase {

public final ESLogger logger = Loggers.getLogger(getClass());

protected SQLTransportExecutor sqlExecutor = new SQLTransportExecutor(
new SQLTransportExecutor.ClientProvider() {
@Override
public Client client() {
return getClient(true);
}
}
);

@Rule
public TestRule ruleChain = RuleChain.emptyRuleChain();

Expand All @@ -96,6 +104,10 @@ public SQLResponse execute(String stmt, Object[] args) {
return getClient(true).execute(SQLAction.INSTANCE, new SQLRequest(stmt, args)).actionGet();
}

public SQLBulkResponse execute(String stmt, Object[][] bulkArgs) {
return sqlExecutor.exec(new SQLBulkRequest(stmt, bulkArgs));
}

@Before
public void setUp() throws Exception {
if (NODE1 == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.benchmark;

import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
import com.carrotsearch.junitbenchmarks.BenchmarkRule;
import com.carrotsearch.junitbenchmarks.annotation.BenchmarkHistoryChart;
import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
import com.carrotsearch.junitbenchmarks.annotation.LabelType;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

@BenchmarkHistoryChart(filePrefix="benchmark-partitionedtablebenchmark-history", labelWith = LabelType.CUSTOM_KEY)
@BenchmarkMethodChart(filePrefix = "benchmark-partitionedtablebenchmark")
public class PartitionedTableBenchmark extends BenchmarkBase {

static {
ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(true);
}

private static boolean dataGenerated = false;

private static final Integer NUMBER_OF_PARTITIONS = 500;
private static final Integer NUMBER_OF_DOCS_PER_PARTITION = 1;

public static final int BENCHMARK_ROUNDS = 100;
public static final int WARMUP_ROUNDS = 10;

@Rule
public TestRule benchmarkRun = RuleChain.outerRule(new BenchmarkRule()).around(super.ruleChain);

@Override
public boolean generateData() {
return !dataGenerated;
}

@Override
protected void createTable() {
execute("create table \"" + INDEX_NAME + "\" (" +
" name string," +
" p string) partitioned by (p) with (number_of_replicas=0, refresh_interval = 0)", new Object[0], true);
client().admin().cluster().prepareHealth(INDEX_NAME).setWaitForGreenStatus().execute().actionGet();
}

@Override
protected void doGenerateData() throws Exception {
Object[][] bulkArgs = new Object[NUMBER_OF_PARTITIONS * NUMBER_OF_DOCS_PER_PARTITION][];
for (int i = 0; i < NUMBER_OF_PARTITIONS * NUMBER_OF_DOCS_PER_PARTITION; i+= NUMBER_OF_DOCS_PER_PARTITION) {
for (int j = 0; j < NUMBER_OF_DOCS_PER_PARTITION; j++) {
bulkArgs[i+j] = new Object[]{"Marvin", i};
}
}
execute("insert into " + INDEX_NAME + " (name, p) values (?, ?)", bulkArgs);
execute("refresh table " + INDEX_NAME);
dataGenerated = true;
}

@BenchmarkOptions(benchmarkRounds = BENCHMARK_ROUNDS, warmupRounds = WARMUP_ROUNDS)
@Test
public void testFetchAll() throws Exception {
execute("select * from " + INDEX_NAME + " limit " + NUMBER_OF_PARTITIONS * NUMBER_OF_DOCS_PER_PARTITION);
}
}

0 comments on commit b595737

Please sign in to comment.