From 67d27ee0993e99c27326edf9f13b165e2480112d Mon Sep 17 00:00:00 2001 From: Matthias Wahl Date: Tue, 24 Feb 2015 14:54:49 +0100 Subject: [PATCH] implement node level searchcontext registry waiting for job-scoped shard sequence ids --- .../crate/executor/task/LocalCollectTask.java | 4 +- .../transport/CollectContextService.java | 118 ++++++++++++++++++ .../transport/TransportCollectNodeAction.java | 6 +- .../executor/transport/TransportExecutor.java | 6 +- .../collect/CollectOperationModule.java | 3 + .../collect/DistributingCollectOperation.java | 3 +- .../operation/collect/LuceneDocCollector.java | 83 +++++++----- .../collect/MapSideDataCollectOperation.java | 1 + .../collect/ShardCollectService.java | 14 ++- .../operation/collect/CollectContextTest.java | 84 +++++++++++++ .../collect/DocLevelCollectTest.java | 6 +- .../collect/LocalDataCollectTest.java | 81 +++--------- .../MapSideDataCollectOperationTest.java | 9 +- 13 files changed, 298 insertions(+), 120 deletions(-) create mode 100644 sql/src/main/java/io/crate/executor/transport/CollectContextService.java create mode 100644 sql/src/test/java/io/crate/operation/collect/CollectContextTest.java diff --git a/sql/src/main/java/io/crate/executor/task/LocalCollectTask.java b/sql/src/main/java/io/crate/executor/task/LocalCollectTask.java index 720ce439b2d7..5452c3dbe64b 100644 --- a/sql/src/main/java/io/crate/executor/task/LocalCollectTask.java +++ b/sql/src/main/java/io/crate/executor/task/LocalCollectTask.java @@ -26,8 +26,8 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.crate.breaker.RamAccountingContext; -import io.crate.executor.QueryResult; import io.crate.executor.JobTask; +import io.crate.executor.QueryResult; import io.crate.executor.TaskResult; import io.crate.operation.collect.CollectOperation; import io.crate.planner.node.dql.CollectNode; @@ -41,7 +41,7 @@ /** - * A collect task which returns one future and runs a collectOperation locally and synchronous + * A collect task which returns one future and runs a collectOperation locally */ public class LocalCollectTask extends JobTask { diff --git a/sql/src/main/java/io/crate/executor/transport/CollectContextService.java b/sql/src/main/java/io/crate/executor/transport/CollectContextService.java new file mode 100644 index 000000000000..9ee37566885d --- /dev/null +++ b/sql/src/main/java/io/crate/executor/transport/CollectContextService.java @@ -0,0 +1,118 @@ +/* + * Licensed to CRATE Technology GmbH ("Crate") under one or more contributor + * license agreements. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. Crate licenses + * this file to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * However, if you have executed another commercial license agreement + * with Crate these terms will supersede the license and you may use the + * software solely pursuant to the terms of the relevant commercial agreement. + */ + +package io.crate.executor.transport; + +import com.carrotsearch.hppc.IntObjectMap; +import com.carrotsearch.hppc.IntObjectOpenHashMap; +import com.carrotsearch.hppc.cursors.ObjectCursor; +import com.google.common.base.Throwables; +import com.google.common.cache.*; +import com.google.common.util.concurrent.UncheckedExecutionException; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.internal.SearchContext; + +import javax.annotation.Nonnull; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +public class CollectContextService implements Releasable { + + public static final String EXPIRATION_SETTING = "collect.context_keep_alive"; + public static final TimeValue EXPIRATION_DEFAULT = new TimeValue(5, TimeUnit.MINUTES); + + private final ReentrantLock lock; + private final LoadingCache> contexts; + + @Inject + public CollectContextService(Settings settings) { + long expirationMillis = settings.getAsTime(EXPIRATION_SETTING, EXPIRATION_DEFAULT).millis(); + if (expirationMillis == 0) { + Loggers.getLogger(getClass()).info("invalid {}. using default of {}", EXPIRATION_SETTING, EXPIRATION_DEFAULT); + expirationMillis = EXPIRATION_DEFAULT.millis(); + } + this.contexts = CacheBuilder.newBuilder().expireAfterAccess( + expirationMillis, + TimeUnit.MILLISECONDS + ).removalListener(new RemovalListener>() { + @Override + public void onRemoval(@Nonnull RemovalNotification> notification) { + for (ObjectCursor cursor : notification.getValue().values()) { + Releasables.close(cursor.value); + } + } + }).build(new CacheLoader>() { + @Override + public IntObjectMap load(@Nonnull UUID key) throws Exception { + return new IntObjectOpenHashMap<>(); + } + }); + this.lock = new ReentrantLock(); + } + + + @Nullable + public SearchContext getContext(UUID jobId, int searchContextId) { + final IntObjectMap searchContexts; + try { + searchContexts = contexts.get(jobId); + } catch (ExecutionException|UncheckedExecutionException e) { + throw Throwables.propagate(e); + } + return searchContexts.get(searchContextId); + } + + public SearchContext getOrCreateContext(UUID jobId, + int searchContextId, + Callable createSearchContextCallable) { + try { + final IntObjectMap searchContexts = contexts.get(jobId); + lock.lock(); + try { + SearchContext searchContext = searchContexts.get(searchContextId); + if (searchContext == null) { + searchContext = createSearchContextCallable.call(); + searchContexts.put(searchContextId, searchContext); + } + return searchContext; + } finally { + lock.unlock(); + } + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public void close() throws ElasticsearchException { + this.contexts.invalidateAll(); + } +} diff --git a/sql/src/main/java/io/crate/executor/transport/TransportCollectNodeAction.java b/sql/src/main/java/io/crate/executor/transport/TransportCollectNodeAction.java index b539bfb2ffb9..0115981908f1 100644 --- a/sql/src/main/java/io/crate/executor/transport/TransportCollectNodeAction.java +++ b/sql/src/main/java/io/crate/executor/transport/TransportCollectNodeAction.java @@ -104,11 +104,11 @@ private void nodeOperation(final NodeCollectRequest request, operationId = UUID.randomUUID(); statsTables.operationStarted(operationId, node.jobId().get(), node.id()); } else { - operationId = null; + collectResponse.onFailure(new IllegalArgumentException("no jobId given for CollectOperation")); + return; } String ramAccountingContextId = String.format("%s: %s", node.id(), operationId); - final RamAccountingContext ramAccountingContext = - new RamAccountingContext(ramAccountingContextId, circuitBreaker); + final RamAccountingContext ramAccountingContext = new RamAccountingContext(ramAccountingContextId, circuitBreaker); try { if (node.hasDownstreams()) { diff --git a/sql/src/main/java/io/crate/executor/transport/TransportExecutor.java b/sql/src/main/java/io/crate/executor/transport/TransportExecutor.java index 7b2f6f211c1a..f61b8a3b8928 100644 --- a/sql/src/main/java/io/crate/executor/transport/TransportExecutor.java +++ b/sql/src/main/java/io/crate/executor/transport/TransportExecutor.java @@ -82,6 +82,8 @@ public class TransportExecutor implements Executor, TaskExecutor { private final QueryThenFetchOperation queryThenFetchOperation; + private final CollectContextService collectContextService; + @Inject public TransportExecutor(Settings settings, TransportActionProvider transportActionProvider, @@ -93,7 +95,8 @@ public TransportExecutor(Settings settings, StatsTables statsTables, ClusterService clusterService, CrateCircuitBreakerService breakerService, - QueryThenFetchOperation queryThenFetchOperation) { + QueryThenFetchOperation queryThenFetchOperation, + CollectContextService collectContextService) { this.settings = settings; this.transportActionProvider = transportActionProvider; this.handlerSideDataCollectOperation = handlerSideDataCollectOperation; @@ -103,6 +106,7 @@ public TransportExecutor(Settings settings, this.statsTables = statsTables; this.clusterService = clusterService; this.queryThenFetchOperation = queryThenFetchOperation; + this.collectContextService = collectContextService; this.nodeVisitor = new NodeVisitor(); this.planVisitor = new TaskCollectingVisitor(); this.circuitBreaker = breakerService.getBreaker(CrateCircuitBreakerService.QUERY_BREAKER); diff --git a/sql/src/main/java/io/crate/operation/collect/CollectOperationModule.java b/sql/src/main/java/io/crate/operation/collect/CollectOperationModule.java index 0444ef466232..3a90fead1396 100644 --- a/sql/src/main/java/io/crate/operation/collect/CollectOperationModule.java +++ b/sql/src/main/java/io/crate/operation/collect/CollectOperationModule.java @@ -21,11 +21,14 @@ package io.crate.operation.collect; +import io.crate.executor.transport.CollectContextService; import org.elasticsearch.common.inject.AbstractModule; public class CollectOperationModule extends AbstractModule { @Override protected void configure() { + bind(CollectContextService.class).asEagerSingleton(); + bind(MapSideDataCollectOperation.class).asEagerSingleton(); bind(HandlerSideDataCollectOperation.class).asEagerSingleton(); bind(InformationSchemaCollectService.class).asEagerSingleton(); diff --git a/sql/src/main/java/io/crate/operation/collect/DistributingCollectOperation.java b/sql/src/main/java/io/crate/operation/collect/DistributingCollectOperation.java index adb3e2a0746b..62aac92915f6 100644 --- a/sql/src/main/java/io/crate/operation/collect/DistributingCollectOperation.java +++ b/sql/src/main/java/io/crate/operation/collect/DistributingCollectOperation.java @@ -333,7 +333,8 @@ public DiscoveryNode apply(@Nullable String input) { } @Override - protected ListenableFuture handleShardCollect(CollectNode collectNode, RamAccountingContext ramAccountingContext) { + protected ListenableFuture handleShardCollect(CollectNode collectNode, + RamAccountingContext ramAccountingContext) { assert collectNode.hasDownstreams() : "no downstreams"; return super.handleShardCollect(collectNode, ramAccountingContext); } diff --git a/sql/src/main/java/io/crate/operation/collect/LuceneDocCollector.java b/sql/src/main/java/io/crate/operation/collect/LuceneDocCollector.java index 70691b8d9225..3b5a93e258fa 100644 --- a/sql/src/main/java/io/crate/operation/collect/LuceneDocCollector.java +++ b/sql/src/main/java/io/crate/operation/collect/LuceneDocCollector.java @@ -26,6 +26,7 @@ import io.crate.analyze.WhereClause; import io.crate.breaker.CrateCircuitBreakerService; import io.crate.breaker.RamAccountingContext; +import io.crate.executor.transport.CollectContextService; import io.crate.lucene.LuceneQueryBuilder; import io.crate.metadata.Functions; import io.crate.operation.Input; @@ -56,6 +57,9 @@ import java.io.IOException; import java.util.HashSet; import java.util.List; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.Callable; /** * collect documents from ES shard, a lucene index @@ -98,54 +102,69 @@ public void required(boolean required) { } } - private final SearchContext searchContext; private Projector downstream; private final List> topLevelInputs; private final List> collectorExpressions; + private final SearchContext searchContext; - public LuceneDocCollector(ThreadPool threadPool, + public LuceneDocCollector(final UUID jobId, + final ThreadPool threadPool, ClusterService clusterService, + CollectContextService collectorContextService, ShardId shardId, - IndexService indexService, - ScriptService scriptService, - CacheRecycler cacheRecycler, - PageCacheRecycler pageCacheRecycler, - BigArrays bigArrays, + final IndexService indexService, + final ScriptService scriptService, + final CacheRecycler cacheRecycler, + final PageCacheRecycler pageCacheRecycler, + final BigArrays bigArrays, List> inputs, List> collectorExpressions, - Functions functions, - WhereClause whereClause, + final Functions functions, + final WhereClause whereClause, Projector downStreamProjector) throws Exception { downstream(downStreamProjector); - SearchShardTarget searchShardTarget = new SearchShardTarget( + final SearchShardTarget searchShardTarget = new SearchShardTarget( clusterService.localNode().id(), shardId.getIndex(), shardId.id()); this.topLevelInputs = inputs; this.collectorExpressions = collectorExpressions; this.fieldsVisitor = new CollectorFieldsVisitor(collectorExpressions.size()); - ShardSearchLocalRequest searchRequest = new ShardSearchLocalRequest( - new String[] { Constants.DEFAULT_MAPPING_TYPE }, - System.currentTimeMillis() + final IndexShard indexShard = indexService.shardSafe(shardId.id()); + final int searchContextId = Objects.hash(jobId, searchShardTarget); + this.searchContext = collectorContextService.getOrCreateContext( + jobId, + searchContextId, + new Callable() { + @Override + public SearchContext call() throws Exception { + ShardSearchLocalRequest searchRequest = new ShardSearchLocalRequest( + new String[] { Constants.DEFAULT_MAPPING_TYPE }, + System.currentTimeMillis() + ); + SearchContext localContext = new DefaultSearchContext( + searchContextId, + searchRequest, + searchShardTarget, + EngineSearcher.getSearcherWithRetry(indexShard, null), // TODO: use same searcher/reader for same jobId and searchContextId + indexService, + indexShard, + scriptService, + cacheRecycler, + pageCacheRecycler, + bigArrays, + threadPool.estimatedTimeInMillisCounter() + ); + LuceneQueryBuilder builder = new LuceneQueryBuilder(functions, localContext, indexService.cache()); + LuceneQueryBuilder.Context ctx = builder.convert(whereClause); + localContext.parsedQuery(new ParsedQuery(ctx.query(), ImmutableMap.of())); + Float minScore = ctx.minScore(); + if (minScore != null) { + localContext.minimumScore(minScore); + } + return localContext; + } + } ); - IndexShard indexShard = indexService.shardSafe(shardId.id()); - searchContext = new DefaultSearchContext(0, searchRequest, - searchShardTarget, - EngineSearcher.getSearcherWithRetry(indexShard, null), - indexService, - indexShard, - scriptService, - cacheRecycler, - pageCacheRecycler, - bigArrays, - threadPool.estimatedTimeInMillisCounter() - ); - LuceneQueryBuilder builder = new LuceneQueryBuilder(functions, searchContext, indexService.cache()); - LuceneQueryBuilder.Context ctx = builder.convert(whereClause); - searchContext.parsedQuery(new ParsedQuery(ctx.query(), ImmutableMap.of())); - Float minScore = ctx.minScore(); - if (minScore != null) { - searchContext.minimumScore(minScore); - } } @Override diff --git a/sql/src/main/java/io/crate/operation/collect/MapSideDataCollectOperation.java b/sql/src/main/java/io/crate/operation/collect/MapSideDataCollectOperation.java index 2f4ab2ccee3c..bfcdecc7ddda 100644 --- a/sql/src/main/java/io/crate/operation/collect/MapSideDataCollectOperation.java +++ b/sql/src/main/java/io/crate/operation/collect/MapSideDataCollectOperation.java @@ -147,6 +147,7 @@ public MapSideDataCollectOperation(ClusterService clusterService, @Override public ListenableFuture collect(CollectNode collectNode, RamAccountingContext ramAccountingContext) { assert collectNode.isRouted(); // not routed collect is not handled here + assert collectNode.jobId().isPresent() : "no jobId present for collect operation"; String localNodeId = clusterService.localNode().id(); if (collectNode.executionNodes().contains(localNodeId)) { if (!collectNode.routing().containsShards(localNodeId)) { diff --git a/sql/src/main/java/io/crate/operation/collect/ShardCollectService.java b/sql/src/main/java/io/crate/operation/collect/ShardCollectService.java index 3370bff39582..2f204625451f 100644 --- a/sql/src/main/java/io/crate/operation/collect/ShardCollectService.java +++ b/sql/src/main/java/io/crate/operation/collect/ShardCollectService.java @@ -24,8 +24,8 @@ import io.crate.analyze.EvaluatingNormalizer; import io.crate.blob.v2.BlobIndices; import io.crate.breaker.CrateCircuitBreakerService; -import io.crate.breaker.RamAccountingContext; import io.crate.exceptions.UnhandledServerException; +import io.crate.executor.transport.CollectContextService; import io.crate.executor.transport.TransportActionProvider; import io.crate.metadata.Functions; import io.crate.metadata.shard.ShardReferenceResolver; @@ -44,7 +44,6 @@ import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.cache.recycler.PageCacheRecycler; import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; @@ -53,8 +52,6 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; -import java.util.UUID; - public class ShardCollectService { private final CollectInputSymbolVisitor docInputSymbolVisitor; @@ -73,6 +70,8 @@ public class ShardCollectService { private final Functions functions; private final BlobIndices blobIndices; + private final CollectContextService collectContextService; + @Inject public ShardCollectService(ThreadPool threadPool, ClusterService clusterService, @@ -88,11 +87,11 @@ public ShardCollectService(ThreadPool threadPool, ShardReferenceResolver referenceResolver, BlobIndices blobIndices, BlobShardReferenceResolver blobShardReferenceResolver, - CrateCircuitBreakerService breakerService) { + CrateCircuitBreakerService breakerService, + CollectContextService collectContextService) { this.threadPool = threadPool; this.clusterService = clusterService; this.shardId = shardId; - this.indexService = indexService; this.scriptService = scriptService; this.cacheRecycler = cacheRecycler; @@ -125,6 +124,7 @@ public ShardCollectService(ThreadPool threadPool, shardNormalizer, shardId, docInputSymbolVisitor); + this.collectContextService = collectContextService; } /** @@ -178,8 +178,10 @@ private CrateCollector getBlobIndexCollector(CollectNode collectNode, Projector private CrateCollector getLuceneIndexCollector(CollectNode collectNode, Projector downstream) throws Exception { CollectInputSymbolVisitor.Context docCtx = docInputSymbolVisitor.process(collectNode); return new LuceneDocCollector( + collectNode.jobId().get(), threadPool, clusterService, + collectContextService, shardId, indexService, scriptService, diff --git a/sql/src/test/java/io/crate/operation/collect/CollectContextTest.java b/sql/src/test/java/io/crate/operation/collect/CollectContextTest.java new file mode 100644 index 000000000000..c56c6f583220 --- /dev/null +++ b/sql/src/test/java/io/crate/operation/collect/CollectContextTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to CRATE Technology GmbH ("Crate") under one or more contributor + * license agreements. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. Crate licenses + * this file to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * However, if you have executed another commercial license agreement + * with Crate these terms will supersede the license and you may use the + * software solely pursuant to the terms of the relevant commercial agreement. + */ + +package io.crate.operation.collect; + +import io.crate.executor.transport.CollectContextService; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.test.TestSearchContext; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.UUID; +import java.util.concurrent.Callable; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; + +public class CollectContextTest { + + private CollectContextService collectContextService; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @After + public void cleanup() { + Releasables.close(collectContextService); + } + + private static final Callable CONTEXT_CALLABLE = new Callable() { + @Override + public SearchContext call() throws Exception { + return new TestSearchContext(); + } + }; + + @Test + public void testSameForSameArgs() throws Throwable { + collectContextService = new CollectContextService(ImmutableSettings.EMPTY); + UUID jobId = UUID.randomUUID(); + SearchContext ctx1 = collectContextService.getOrCreateContext(jobId, 0, CONTEXT_CALLABLE); + SearchContext ctx2 = collectContextService.getOrCreateContext(jobId, 0, CONTEXT_CALLABLE); + assertThat(ctx1, is(ctx2)); + SearchContext ctx3 = collectContextService.getOrCreateContext(UUID.randomUUID(), 0, CONTEXT_CALLABLE); + assertThat(ctx3, is(not(ctx1))); + + SearchContext ctx4 = collectContextService.getOrCreateContext(jobId, 1, CONTEXT_CALLABLE); + assertThat(ctx4, is(not(ctx1))); + } + + @Test + public void testNoZeroExpirationSettingUseDefault() throws Throwable { + collectContextService = new CollectContextService(ImmutableSettings.builder().put(CollectContextService.EXPIRATION_SETTING, "0ms").build()); + + UUID jobId = UUID.randomUUID(); + SearchContext ctx1 = collectContextService.getOrCreateContext(jobId, 0, CONTEXT_CALLABLE); + SearchContext ctx2 = collectContextService.getOrCreateContext(jobId, 0, CONTEXT_CALLABLE); + assertThat(ctx1, is(ctx2)); + + } +} diff --git a/sql/src/test/java/io/crate/operation/collect/DocLevelCollectTest.java b/sql/src/test/java/io/crate/operation/collect/DocLevelCollectTest.java index 542b3ae322a5..90d53bfbe565 100644 --- a/sql/src/test/java/io/crate/operation/collect/DocLevelCollectTest.java +++ b/sql/src/test/java/io/crate/operation/collect/DocLevelCollectTest.java @@ -138,7 +138,7 @@ public void testCollectDocLevel() throws Exception { CollectNode collectNode = new CollectNode("docCollect", routing(TEST_TABLE_NAME)); collectNode.toCollect(Arrays.asList(testDocLevelReference, underscoreRawReference, underscoreIdReference)); collectNode.maxRowGranularity(RowGranularity.DOC); - + collectNode.jobId(UUID.randomUUID()); Object[][] result = operation.collect(collectNode, null).get(); assertThat(result.length, is(2)); @@ -164,7 +164,7 @@ public void testCollectDocLevelWhereClause() throws Exception { op.info(), Arrays.asList(testDocLevelReference, Literal.newLiteral(2))) )); - + collectNode.jobId(UUID.randomUUID()); Object[][] result = operation.collect(collectNode, null).get(); assertThat(result.length, is(1)); assertThat(result[0].length, is(1)); @@ -184,6 +184,7 @@ public void testCollectWithShardAndNodeExpressions() throws Exception { new Reference(SysClusterTableInfo.INFOS.get(new ColumnIdent("name"))) )); collectNode.maxRowGranularity(RowGranularity.DOC); + collectNode.jobId(UUID.randomUUID()); Object[][] result = operation.collect(collectNode, null).get(); @@ -217,6 +218,7 @@ public void testCollectWithPartitionedColumns() throws Exception { )); collectNode.maxRowGranularity(RowGranularity.DOC); collectNode.isPartitioned(true); + collectNode.jobId(UUID.randomUUID()); Object[][] result = operation.collect(collectNode, null).get(); assertThat(result.length, is(2)); diff --git a/sql/src/test/java/io/crate/operation/collect/LocalDataCollectTest.java b/sql/src/test/java/io/crate/operation/collect/LocalDataCollectTest.java index a38018b4e44b..50341de577dc 100644 --- a/sql/src/test/java/io/crate/operation/collect/LocalDataCollectTest.java +++ b/sql/src/test/java/io/crate/operation/collect/LocalDataCollectTest.java @@ -94,10 +94,7 @@ import org.junit.rules.ExpectedException; import org.mockito.Answers; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ExecutionException; import static org.hamcrest.MatcherAssert.assertThat; @@ -322,8 +319,6 @@ public void configure() { when(indexService.shardSafe(1)).thenReturn(shard1Injector.getInstance(IndexShard.class)); when(indicesService.indexServiceSafe(TEST_TABLE_NAME)).thenReturn(indexService); - - NodeSettingsService nodeSettingsService = mock(NodeSettingsService.class); operation = new MapSideDataCollectOperation( @@ -336,8 +331,7 @@ public void configure() { discoveryService, functions, new StatsTables(ImmutableSettings.EMPTY, nodeSettingsService)) - ) - ); + )); } private Routing shardRouting(final Integer... shardIds) { @@ -353,7 +347,7 @@ public void testCollectExpressions() throws Exception { CollectNode collectNode = new CollectNode("collect", testRouting); collectNode.maxRowGranularity(RowGranularity.NODE); collectNode.toCollect(Arrays.asList(testNodeReference)); - + collectNode.jobId(UUID.randomUUID()); Object[][] result = operation.collect(collectNode, null).get(); assertThat(result.length, equalTo(1)); @@ -374,6 +368,7 @@ public void testWrongRouting() throws Exception { }}); }})); collectNode.maxRowGranularity(RowGranularity.DOC); + collectNode.jobId(UUID.randomUUID()); operation.collect(collectNode, null); } @@ -395,6 +390,7 @@ public void testCollectUnknownReference() throws Throwable { ); collectNode.toCollect(Arrays.asList(unknownReference)); collectNode.maxRowGranularity(RowGranularity.NODE); + collectNode.jobId(UUID.randomUUID()); try { operation.collect(collectNode, null).get(); } catch (ExecutionException e) { @@ -411,6 +407,7 @@ public void testCollectFunction() throws Exception { ); collectNode.toCollect(Arrays.asList(twoTimesTruthFunction, testNodeReference)); collectNode.maxRowGranularity(RowGranularity.NODE); + collectNode.jobId(UUID.randomUUID()); Object[][] result = operation.collect(collectNode, null).get(); assertThat(result.length, equalTo(1)); assertThat(result[0].length, equalTo(2)); @@ -434,6 +431,7 @@ public void testUnknownFunction() throws Throwable { ImmutableList.of() ); collectNode.toCollect(Arrays.asList(unknownFunction)); + collectNode.jobId(UUID.randomUUID()); try { operation.collect(collectNode, null).get(); } catch (ExecutionException e) { @@ -450,6 +448,7 @@ public void testCollectLiterals() throws Exception { Literal.newLiteral(1), Literal.newLiteral(4.2) )); + collectNode.jobId(UUID.randomUUID()); Object[][] result = operation.collect(collectNode, null).get(); assertThat(result.length, equalTo(1)); assertThat((BytesRef) result[0][0], equalTo(new BytesRef("foobar"))); @@ -467,6 +466,7 @@ public void testCollectWithFalseWhereClause() throws Exception { AndOperator.INFO, Arrays.asList(Literal.newLiteral(false), Literal.newLiteral(false)) ))); + collectNode.jobId(UUID.randomUUID()); Object[][] result = operation.collect(collectNode, null).get(); assertArrayEquals(new Object[0][], result); } @@ -479,6 +479,7 @@ public void testCollectWithTrueWhereClause() throws Exception { AndOperator.INFO, Arrays.asList(Literal.newLiteral(true), Literal.newLiteral(true)) ))); + collectNode.jobId(UUID.randomUUID()); collectNode.maxRowGranularity(RowGranularity.NODE); Object[][] result = operation.collect(collectNode, null).get(); assertThat(result.length, equalTo(1)); @@ -496,6 +497,7 @@ public void testCollectWithNullWhereClause() throws Exception { op.info(), Arrays.asList(Literal.NULL, Literal.NULL) ))); + collectNode.jobId(UUID.randomUUID()); Object[][] result = operation.collect(collectNode, null).get(); assertArrayEquals(new Object[0][], result); } @@ -505,6 +507,7 @@ public void testCollectShardExpressions() throws Exception { CollectNode collectNode = new CollectNode("shardCollect", shardRouting(0, 1)); collectNode.toCollect(Arrays.asList(testShardIdReference)); collectNode.maxRowGranularity(RowGranularity.SHARD); + collectNode.jobId(UUID.randomUUID()); Object[][] result = operation.collect(collectNode, null).get(); assertThat(result.length, is(equalTo(2))); assertThat((Integer) result[0][0], isOneOf(0, 1)); @@ -522,6 +525,7 @@ public void testCollectShardExpressionsWhereShardIdIs0() throws Exception { collectNode.whereClause(new WhereClause( new Function(op.info(), Arrays.asList(testShardIdReference, Literal.newLiteral(0))))); collectNode.maxRowGranularity(RowGranularity.SHARD); + collectNode.jobId(UUID.randomUUID()); Object[][] result = operation.collect(collectNode, null).get(); assertThat(result.length, is(equalTo(1))); assertThat((Integer) result[0][0], is(0)); @@ -532,6 +536,7 @@ public void testCollectShardExpressionsLiteralsAndNodeExpressions() throws Excep CollectNode collectNode = new CollectNode("shardCollect", shardRouting(0, 1)); collectNode.toCollect(Arrays.asList(testShardIdReference, Literal.newLiteral(true), testNodeReference)); collectNode.maxRowGranularity(RowGranularity.SHARD); + collectNode.jobId(UUID.randomUUID()); Object[][] result = operation.collect(collectNode, null).get(); assertThat(result.length, is(equalTo(2))); assertThat(result[0].length, is(equalTo(3))); @@ -551,62 +556,4 @@ public void testCollectShardExpressionsLiteralsAndNodeExpressions() throws Excep assertThat((Boolean) result[j][1], is(true)); assertThat((Integer) result[j][2], is(42)); } - - /** - @Test public void testCollectShardExpressionsLimit1() throws Exception { - CollectNode collectNode = new CollectNode("shardCollect", shardRouting(0,1), null, 1, CollectNode.NO_OFFSET); - collectNode.toCollect(testShardIdReference, testNodeReference); - Object result[][] = operation.collect(collectNode).get(); - assertThat(result.length, is(1)); - assertThat(result[0].length, is(2)); - assertThat((Integer)result[0][0], isOneOf(0,1)); - assertThat((Integer)result[0][1], is(42)); - } - - @Test public void testCollectShardExpressionsNoLimitOffset2() throws Exception { - CollectNode collectNode = new CollectNode("shardCollect", shardRouting(0,1), null, CollectNode.NO_LIMIT, 2); - collectNode.toCollect(testShardIdReference, testNodeReference); - Object result[][] = operation.collect(collectNode).get(); - assertThat(result.length, is(0)); - } - - @Test public void testCollectShardExpressionsLimit0() throws Exception { - CollectNode collectNode = new CollectNode("shardCollect", shardRouting(0,1), null, 0, CollectNode.NO_OFFSET); - collectNode.toCollect(testShardIdReference, testNodeReference); - Object result[][] = operation.collect(collectNode).get(); - assertThat(result.length, is(0)); - } - - @Test public void testCollectNodeExpressionsLimit0() throws Exception { - CollectNode collectNode = new CollectNode("nodeCollect", testRouting, null, 0, CollectNode.NO_OFFSET); - collectNode.toCollect(testNodeReference); - Object result[][] = operation.collect(collectNode).get(); - assertThat(result.length, is(0)); - } - - @Test public void testCollectNodeExpressionsOffset1() throws Exception { - CollectNode collectNode = new CollectNode("nodeCollect", testRouting, null, CollectNode.NO_LIMIT, 1); - collectNode.toCollect(testNodeReference); - Object result[][] = operation.collect(collectNode).get(); - assertThat(result.length, is(0)); - } - - @Test public void testCollectShardExpressionsOrderByAsc() throws Exception { - CollectNode collectNode = new CollectNode("shardCollect", shardRouting(0,1), null, CollectNode.NO_LIMIT, CollectNode.NO_OFFSET, new int[]{0}, new boolean[]{false}); - collectNode.toCollect(testShardIdReference, testNodeReference); - Object result[][] = operation.collect(collectNode).get(); - assertThat(result.length, is(2)); - assertThat((Integer)result[0][0], is(0)); - assertThat((Integer)result[1][0], is(1)); - } - - @Test public void testCollectShardExpressionsOrderByDesc() throws Exception { - CollectNode collectNode = new CollectNode("shardCollect", shardRouting(0,1), null, CollectNode.NO_LIMIT, CollectNode.NO_OFFSET, new int[]{0}, new boolean[]{true}); - collectNode.toCollect(testShardIdReference, testNodeReference); - Object result[][] = operation.collect(collectNode).get(); - assertThat(result.length, is(2)); - assertThat((Integer)result[0][0], is(1)); - assertThat((Integer)result[1][0], is(0)); - } - */ } diff --git a/sql/src/test/java/io/crate/operation/collect/MapSideDataCollectOperationTest.java b/sql/src/test/java/io/crate/operation/collect/MapSideDataCollectOperationTest.java index 7e93d50cf065..6c6824df3969 100644 --- a/sql/src/test/java/io/crate/operation/collect/MapSideDataCollectOperationTest.java +++ b/sql/src/test/java/io/crate/operation/collect/MapSideDataCollectOperationTest.java @@ -43,10 +43,7 @@ import java.io.File; import java.io.FileWriter; import java.nio.file.Paths; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; +import java.util.*; import static io.crate.testing.TestingHelpers.createReference; import static org.hamcrest.core.Is.is; @@ -91,8 +88,7 @@ public ReferenceImplementation getImplementation(ReferenceIdent ident) { functions, new StatsTables(ImmutableSettings.EMPTY, nodeSettingsService) ) - ) - ); + )); File tmpFile = File.createTempFile("fileUriCollectOperation", ".json"); try (FileWriter writer = new FileWriter(tmpFile)) { @@ -115,6 +111,7 @@ public ReferenceImplementation getImplementation(ReferenceIdent ident) { null, false ); + collectNode.jobId(UUID.randomUUID()); ListenableFuture resultFuture = collectOperation.collect(collectNode, null); Object[][] objects = resultFuture.get();