From a22f0bd39060a46905a3c7df11ea559efb022acf Mon Sep 17 00:00:00 2001 From: Sebastian Utz Date: Wed, 25 Feb 2015 22:05:00 +0100 Subject: [PATCH] make CollectContextService jobSearchContextId aware added more tests for CollectContextService --- sql/build.gradle | 7 + .../action/sql/query/CrateSearchContext.java | 46 +++- .../action/sql/query/CrateSearchService.java | 3 - .../transport/CollectContextService.java | 116 -------- .../executor/transport/TransportExecutor.java | 1 + .../main/java/io/crate/metadata/Routing.java | 8 +- .../collect/CollectContextService.java | 144 ++++++++++ .../collect/CollectOperationModule.java | 3 - .../collect/DistributingCollectOperation.java | 3 +- .../operation/collect/JobCollectContext.java | 221 +++++++++++++++ .../operation/collect/LuceneDocCollector.java | 106 +------ .../collect/MapSideDataCollectOperation.java | 46 +++- .../collect/ShardCollectService.java | 95 +++++-- .../io/crate/planner/PlanNodeBuilder.java | 45 +-- .../main/java/io/crate/planner/Planner.java | 12 +- .../consumer/DistributedGroupByConsumer.java | 1 + .../consumer/GlobalAggregateConsumer.java | 6 +- .../NonDistributedGroupByConsumer.java | 1 + .../consumer/QueryAndFetchConsumer.java | 14 +- .../ReduceOnCollectorGroupByConsumer.java | 1 + .../planner/consumer/UpdateConsumer.java | 12 +- .../main/java/io/crate/plugin/SQLPlugin.java | 5 +- .../SQLTransportIntegrationTest.java | 9 - .../collect/CollectContextServiceTest.java | 108 ++++++++ .../operation/collect/CollectContextTest.java | 76 ----- .../collect/DistributingCollectTest.java | 1 + .../collect/JobCollectContextTest.java | 259 ++++++++++++++++++ .../collect/LocalDataCollectTest.java | 14 +- .../MapSideDataCollectOperationTest.java | 4 +- 29 files changed, 982 insertions(+), 385 deletions(-) delete mode 100644 sql/src/main/java/io/crate/executor/transport/CollectContextService.java create mode 100644 sql/src/main/java/io/crate/operation/collect/CollectContextService.java create mode 100644 sql/src/main/java/io/crate/operation/collect/JobCollectContext.java create mode 100644 sql/src/test/java/io/crate/operation/collect/CollectContextServiceTest.java delete mode 100644 sql/src/test/java/io/crate/operation/collect/CollectContextTest.java create mode 100644 sql/src/test/java/io/crate/operation/collect/JobCollectContextTest.java diff --git a/sql/build.gradle b/sql/build.gradle index 978be68f7fd1..82e58a640237 100644 --- a/sql/build.gradle +++ b/sql/build.gradle @@ -19,6 +19,13 @@ dependencies { testCompile project(':testing') testCompile 'org.skyscreamer:jsonassert:1.2.0' testCompile 'com.h2database:h2:1.3.173' + testCompile ('org.powermock:powermock-module-junit4:1.6.1') { + exclude group: 'junit', module: 'junit' + } + testCompile ('org.powermock:powermock-api-mockito:1.6.1') { + exclude group: 'junit', module: 'junit' + exclude group: 'org.mockito', module: 'mockito-all' + } } buildscript { diff --git a/sql/src/main/java/io/crate/action/sql/query/CrateSearchContext.java b/sql/src/main/java/io/crate/action/sql/query/CrateSearchContext.java index b07525d9e5ad..4c31070963c4 100644 --- a/sql/src/main/java/io/crate/action/sql/query/CrateSearchContext.java +++ b/sql/src/main/java/io/crate/action/sql/query/CrateSearchContext.java @@ -22,12 +22,15 @@ package io.crate.action.sql.query; import com.google.common.base.Optional; +import io.crate.Constants; import org.apache.lucene.util.Counter; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.cache.recycler.PageCacheRecycler; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.service.IndexService; @@ -43,9 +46,10 @@ public class CrateSearchContext extends DefaultSearchContext { + private final Engine.Searcher engineSearcher; + private volatile boolean isEngineSearcherShared = false; + public CrateSearchContext(long id, - final int numShards, - final String[] types, final long nowInMillis, SearchShardTarget shardTarget, Engine.Searcher engineSearcher, @@ -58,30 +62,52 @@ public CrateSearchContext(long id, Counter timeEstimateCounter, Optional scroll, long keepAlive) { - super(id, new CrateSearchShardRequest(numShards, types, nowInMillis, scroll, indexShard), + super(id, new CrateSearchShardRequest(nowInMillis, scroll, indexShard), shardTarget, engineSearcher, indexService, indexShard, scriptService, cacheRecycler, pageCacheRecycler, bigArrays, timeEstimateCounter); + this.engineSearcher = engineSearcher; if (scroll.isPresent()) { scroll(scroll.get()); } keepAlive(keepAlive); } + public Engine.Searcher engineSearcher() { + return engineSearcher; + } + + public void sharedEngineSearcher(boolean isShared) { + isEngineSearcherShared = isShared; + } + + public boolean isEngineSearcherShared() { + return isEngineSearcherShared; + } + + @Override + public void doClose() throws ElasticsearchException { + if (scanContext() != null) { + scanContext().clear(); + } + // clear and scope phase we have + Releasables.close(searcher()); + if (!isEngineSearcherShared) { + Releasables.close(engineSearcher); + } + } + + private static class CrateSearchShardRequest implements ShardSearchRequest { - private final int numShards; - private final String[] types; + private final String[] types = new String[]{Constants.DEFAULT_MAPPING_TYPE}; private final long nowInMillis; private final Scroll scroll; private final String index; private final int shardId; - private CrateSearchShardRequest(int numShards, String[] types, - long nowInMillis, Optional scroll, + private CrateSearchShardRequest(long nowInMillis, Optional scroll, IndexShard indexShard) { - this.numShards = numShards; - this.types = types; this.nowInMillis = nowInMillis; this.scroll = scroll.orNull(); this.index = indexShard.indexService().index().name(); @@ -121,7 +147,7 @@ public BytesReference extraSource() { @Override public int numberOfShards() { - return numShards; + return 0; } @Override diff --git a/sql/src/main/java/io/crate/action/sql/query/CrateSearchService.java b/sql/src/main/java/io/crate/action/sql/query/CrateSearchService.java index c8162352ee08..aa6c390b6ebc 100644 --- a/sql/src/main/java/io/crate/action/sql/query/CrateSearchService.java +++ b/sql/src/main/java/io/crate/action/sql/query/CrateSearchService.java @@ -24,7 +24,6 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import io.crate.Constants; import io.crate.core.StringUtils; import io.crate.executor.transport.task.elasticsearch.SortOrder; import io.crate.lucene.LuceneQueryBuilder; @@ -217,8 +216,6 @@ private SearchContext createContext(QueryShardRequest request, @Nullable Engine. } SearchContext context = new CrateSearchContext( idGenerator.incrementAndGet(), - 0, // TODO: is this necessary? seems not, wasn't set before - new String[] { Constants.DEFAULT_MAPPING_TYPE }, System.currentTimeMillis(), searchShardTarget, engineSearcher, diff --git a/sql/src/main/java/io/crate/executor/transport/CollectContextService.java b/sql/src/main/java/io/crate/executor/transport/CollectContextService.java deleted file mode 100644 index 4793ed41b4cf..000000000000 --- a/sql/src/main/java/io/crate/executor/transport/CollectContextService.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to CRATE Technology GmbH ("Crate") under one or more contributor - * license agreements. See the NOTICE file distributed with this work for - * additional information regarding copyright ownership. Crate licenses - * this file to you under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. You may - * obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - * - * However, if you have executed another commercial license agreement - * with Crate these terms will supersede the license and you may use the - * software solely pursuant to the terms of the relevant commercial agreement. - */ - -package io.crate.executor.transport; - -import com.carrotsearch.hppc.IntObjectMap; -import com.carrotsearch.hppc.IntObjectOpenHashMap; -import com.carrotsearch.hppc.cursors.ObjectCursor; -import com.google.common.base.Function; -import com.google.common.base.Throwables; -import com.google.common.cache.*; -import com.google.common.util.concurrent.UncheckedExecutionException; -import org.apache.lucene.index.IndexReader; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.search.internal.SearchContext; - -import javax.annotation.Nonnull; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; - -public class CollectContextService implements Releasable { - - // TODO: maybe make configurable - public static final TimeValue EXPIRATION_DEFAULT = new TimeValue(5, TimeUnit.MINUTES); - - private final ReentrantLock lock; - // TODO: wrap search context map into object and add shardId -> Reader map - // so we can get a reader for a shard if one is open already - private final LoadingCache> contexts; - - public CollectContextService() { - this.contexts = CacheBuilder.newBuilder().expireAfterAccess( - EXPIRATION_DEFAULT.millis(), - TimeUnit.MILLISECONDS - ).removalListener(new RemovalListener>() { - @Override - public void onRemoval(@Nonnull RemovalNotification> notification) { - IntObjectMap removedMap = notification.getValue(); - if (removedMap != null) { - for (ObjectCursor cursor : removedMap.values()){ - Releasables.close(cursor.value); - } - } - } - }).build(new CacheLoader>() { - @Override - public IntObjectMap load(@Nonnull UUID key) throws Exception { - return new IntObjectOpenHashMap<>(); - } - }); - this.lock = new ReentrantLock(); - } - - - @Nullable - public SearchContext getContext(UUID jobId, int searchContextId) { - final IntObjectMap searchContexts; - try { - searchContexts = contexts.get(jobId); - } catch (ExecutionException|UncheckedExecutionException e) { - throw Throwables.propagate(e); - } - return searchContexts.get(searchContextId); - } - - public SearchContext getOrCreateContext(UUID jobId, - int searchContextId, - Function createSearchContextFunction) { - try { - final IntObjectMap searchContexts = contexts.get(jobId); - lock.lock(); - try { - SearchContext searchContext = searchContexts.get(searchContextId); - if (searchContext == null) { - // TODO: insert shard IndexReader here - searchContext = createSearchContextFunction.apply(null); - searchContexts.put(searchContextId, searchContext); - } - return searchContext; - } finally { - lock.unlock(); - } - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public void close() throws ElasticsearchException { - this.contexts.invalidateAll(); - } -} diff --git a/sql/src/main/java/io/crate/executor/transport/TransportExecutor.java b/sql/src/main/java/io/crate/executor/transport/TransportExecutor.java index aa0a9cc5b886..03585b519add 100644 --- a/sql/src/main/java/io/crate/executor/transport/TransportExecutor.java +++ b/sql/src/main/java/io/crate/executor/transport/TransportExecutor.java @@ -35,6 +35,7 @@ import io.crate.metadata.Functions; import io.crate.metadata.ReferenceResolver; import io.crate.operation.ImplementationSymbolVisitor; +import io.crate.operation.collect.CollectContextService; import io.crate.operation.collect.HandlerSideDataCollectOperation; import io.crate.operation.collect.StatsTables; import io.crate.operation.projectors.ProjectionToProjectorVisitor; diff --git a/sql/src/main/java/io/crate/metadata/Routing.java b/sql/src/main/java/io/crate/metadata/Routing.java index 5e3933fe5663..95e57d9a0ce8 100644 --- a/sql/src/main/java/io/crate/metadata/Routing.java +++ b/sql/src/main/java/io/crate/metadata/Routing.java @@ -57,7 +57,9 @@ public int numShards(String nodeId) { Map> nodeRouting = locations.get(nodeId); if (nodeRouting != null) { for (List shardIds : nodeRouting.values()) { - count += shardIds.size(); + if (shardIds != null) { + count += shardIds.size(); + } } } } @@ -75,7 +77,9 @@ public int numShards() { for (Map> nodeRouting : locations.values()) { if (nodeRouting != null) { for (List shardIds : nodeRouting.values()) { - count += shardIds.size(); + if (shardIds != null) { + count += shardIds.size(); + } } } } diff --git a/sql/src/main/java/io/crate/operation/collect/CollectContextService.java b/sql/src/main/java/io/crate/operation/collect/CollectContextService.java new file mode 100644 index 000000000000..61fe6fb1068a --- /dev/null +++ b/sql/src/main/java/io/crate/operation/collect/CollectContextService.java @@ -0,0 +1,144 @@ +/* + * Licensed to CRATE Technology GmbH ("Crate") under one or more contributor + * license agreements. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. Crate licenses + * this file to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * However, if you have executed another commercial license agreement + * with Crate these terms will supersede the license and you may use the + * software solely pursuant to the terms of the relevant commercial agreement. + */ + +package io.crate.operation.collect; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.inject.Singleton; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledFuture; + +import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes; + +@Singleton +public class CollectContextService extends AbstractLifecycleComponent { + + // TODO: maybe make configurable + protected static long DEFAULT_KEEP_ALIVE = timeValueMinutes(5).millis(); + protected static TimeValue DEFAULT_KEEP_ALIVE_INTERVAL = timeValueMinutes(1); + + private final ThreadPool threadPool; + private final ScheduledFuture keepAliveReaper; + private final ConcurrentMap activeContexts = + ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); + + @Inject + public CollectContextService(Settings settings, ThreadPool threadPool) { + super(settings); + this.threadPool = threadPool; + this.keepAliveReaper = threadPool.scheduleWithFixedDelay( + new Reaper(), + DEFAULT_KEEP_ALIVE_INTERVAL); + } + + @Override + protected void doStart() throws ElasticsearchException { + } + + @Override + protected void doStop() throws ElasticsearchException { + for (JobCollectContext context : activeContexts.values()) { + context.close(); + } + activeContexts.clear(); + } + + @Override + protected void doClose() throws ElasticsearchException { + keepAliveReaper.cancel(false); + } + + /** + * Return a {@link JobCollectContext} for given jobId, create new one if not found. + */ + public JobCollectContext acquireContext(UUID jobId) { + JobCollectContext jobCollectContext = new JobCollectContext(jobId); + jobCollectContext.keepAlive(DEFAULT_KEEP_ALIVE); + JobCollectContext jobCollectContextExisting = + activeContexts.putIfAbsent(jobId, jobCollectContext); + if (jobCollectContextExisting != null) { + jobCollectContext = jobCollectContextExisting; + } + contextProcessing(jobCollectContext); + return jobCollectContext; + } + + /** + * Release a {@link JobCollectContext}, just settings its last accessed time. + */ + public void releaseContext(UUID jobId) { + JobCollectContext jobCollectContext = activeContexts.get(jobId); + if (jobCollectContext != null) { + contextProcessedSuccessfully(jobCollectContext); + } + } + + /** + * Close {@link JobCollectContext} for given jobId and remove if from active map. + */ + public void closeContext(UUID jobId) { + JobCollectContext jobCollectContext = activeContexts.get(jobId); + if (jobCollectContext != null) { + activeContexts.remove(jobId, jobCollectContext); + jobCollectContext.close(); + } + } + + protected void contextProcessing(JobCollectContext context) { + // disable timeout while executing a job + context.accessed(-1); + } + + protected void contextProcessedSuccessfully(JobCollectContext context) { + context.accessed(threadPool.estimatedTimeInMillis()); + } + + + class Reaper implements Runnable { + @Override + public void run() { + final long time = threadPool.estimatedTimeInMillis(); + for (JobCollectContext context : activeContexts.values()) { + // Use the same value for both checks since lastAccessTime can + // be modified by another thread between checks! + final long lastAccessTime = context.lastAccessTime(); + if (lastAccessTime == -1l) { // its being processed or timeout is disabled + continue; + } + if ((time - lastAccessTime > context.keepAlive())) { + logger.debug("closing job collect context [{}], time [{}], " + + "lastAccessTime [{}], keepAlive [{}]", + context.id(), time, lastAccessTime, context.keepAlive()); + closeContext(context.id()); + } + } + } + } + +} diff --git a/sql/src/main/java/io/crate/operation/collect/CollectOperationModule.java b/sql/src/main/java/io/crate/operation/collect/CollectOperationModule.java index 3a90fead1396..0444ef466232 100644 --- a/sql/src/main/java/io/crate/operation/collect/CollectOperationModule.java +++ b/sql/src/main/java/io/crate/operation/collect/CollectOperationModule.java @@ -21,14 +21,11 @@ package io.crate.operation.collect; -import io.crate.executor.transport.CollectContextService; import org.elasticsearch.common.inject.AbstractModule; public class CollectOperationModule extends AbstractModule { @Override protected void configure() { - bind(CollectContextService.class).asEagerSingleton(); - bind(MapSideDataCollectOperation.class).asEagerSingleton(); bind(HandlerSideDataCollectOperation.class).asEagerSingleton(); bind(InformationSchemaCollectService.class).asEagerSingleton(); diff --git a/sql/src/main/java/io/crate/operation/collect/DistributingCollectOperation.java b/sql/src/main/java/io/crate/operation/collect/DistributingCollectOperation.java index 62aac92915f6..fc96d8c0e4fb 100644 --- a/sql/src/main/java/io/crate/operation/collect/DistributingCollectOperation.java +++ b/sql/src/main/java/io/crate/operation/collect/DistributingCollectOperation.java @@ -236,10 +236,11 @@ public DistributingCollectOperation(ClusterService clusterService, TransportService transportService, PlanNodeStreamerVisitor streamerVisitor, CollectServiceResolver collectServiceResolver, + CollectContextService collectContextService, CrateCircuitBreakerService breakerService) { super(clusterService, settings, transportActionProvider, functions, referenceResolver, indicesService, - threadPool, collectServiceResolver); + threadPool, collectServiceResolver, collectContextService); this.transportService = transportService; this.streamerVisitor = streamerVisitor; this.circuitBreaker = breakerService.getBreaker(CrateCircuitBreakerService.QUERY_BREAKER); diff --git a/sql/src/main/java/io/crate/operation/collect/JobCollectContext.java b/sql/src/main/java/io/crate/operation/collect/JobCollectContext.java new file mode 100644 index 000000000000..7d9cf60cd1ef --- /dev/null +++ b/sql/src/main/java/io/crate/operation/collect/JobCollectContext.java @@ -0,0 +1,221 @@ +/* + * Licensed to CRATE Technology GmbH ("Crate") under one or more contributor + * license agreements. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. Crate licenses + * this file to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * However, if you have executed another commercial license agreement + * with Crate these terms will supersede the license and you may use the + * software solely pursuant to the terms of the relevant commercial agreement. + */ + +package io.crate.operation.collect; + +import com.google.common.base.Function; +import io.crate.action.sql.query.CrateSearchContext; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.search.internal.SearchContext; + +import javax.annotation.Nullable; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; + +public class JobCollectContext implements Releasable { + + private final UUID id; + private final Map activeContexts = new HashMap<>(); + private final ConcurrentMap> shardsMap = new ConcurrentHashMap<>(); + private final ConcurrentMap jobContextIdMap = new ConcurrentHashMap<>(); + private final ConcurrentMap engineSearchersRefCount = new ConcurrentHashMap<>(); + private final Object lock = new Object(); + private final AtomicBoolean closed = new AtomicBoolean(false); + + private final ESLogger logger = Loggers.getLogger(getClass()); + + private volatile long lastAccessTime = -1; + private volatile long keepAlive; + + public JobCollectContext(UUID id) { + this.id = id; + } + + public UUID id() { + return id; + } + + public void registerJobContextId(ShardId shardId, int jobContextId) { + if (jobContextIdMap.putIfAbsent(jobContextId, shardId) == null) { + List oldShardContextIds; + List shardContextIds = new ArrayList<>(); + shardContextIds.add(jobContextId); + for (;;) { + oldShardContextIds = shardsMap.putIfAbsent(shardId, shardContextIds); + if (oldShardContextIds == null) { + return; + } + shardContextIds = new ArrayList<>(); + shardContextIds.addAll(oldShardContextIds); + shardContextIds.add(jobContextId); + if (shardsMap.replace(shardId, oldShardContextIds, shardContextIds)) { + return; + } + } + } + } + + public CrateSearchContext createContext(IndexShard indexShard, + int jobSearchContextId, + Function createSearchContextFunction) { + assert shardsMap.containsKey(indexShard.shardId()) : "all jobSearchContextId's must be registered first using registerJobContextId(..)"; + CrateSearchContext searchContext; + synchronized (lock) { + searchContext = activeContexts.get(jobSearchContextId); + if (searchContext == null) { + boolean sharedEngineSearcher = true; + Engine.Searcher engineSearcher = acquireSearcher(indexShard); + if (engineSearcher == null) { + sharedEngineSearcher = false; + engineSearcher = acquireNewSearcher(indexShard); + engineSearchersRefCount.put(indexShard.shardId(), 1); + } + searchContext = createSearchContextFunction.apply(engineSearcher); + assert searchContext != null; // should be never null, but interface marks it as nullable + searchContext.sharedEngineSearcher(sharedEngineSearcher); + activeContexts.put(jobSearchContextId, searchContext); + if (logger.isTraceEnabled()) { + logger.trace("Created context {} on shard {} for job {}", + jobSearchContextId, indexShard.shardId(), id); + } + } + } + return searchContext; + } + + @Nullable + public CrateSearchContext findContext(int jobSearchContextId) { + return activeContexts.get(jobSearchContextId); + } + + public void closeContext(int jobSearchContextId) { + if (logger.isTraceEnabled()) { + logger.trace("Closing context {} on shard {} for job {}", + jobSearchContextId, jobContextIdMap.get(jobSearchContextId), id); + } + synchronized (lock) { + CrateSearchContext searchContext = activeContexts.get(jobSearchContextId); + if (searchContext != null) { + if (searchContext.isEngineSearcherShared()) { + ShardId shardId = jobContextIdMap.get(jobSearchContextId); + Integer refCount = engineSearchersRefCount.get(shardId); + assert refCount != null : "refCount should be initialized while creating context"; + if (logger.isTraceEnabled()) { + logger.trace("[closeContext] Current engine searcher refCount {} for context {} of shard {}", + refCount, jobSearchContextId, shardId); + } + while (!engineSearchersRefCount.replace(shardId, refCount, refCount - 1)) { + refCount = engineSearchersRefCount.get(shardId); + } + if (engineSearchersRefCount.get(shardId) == 0) { + searchContext.sharedEngineSearcher(false); + } + } + activeContexts.remove(jobSearchContextId); + searchContext.close(); + } + } + } + + /** + * Try to find a {@link CrateSearchContext} for the same shard. + * If one is found return its {@link Engine.Searcher}, otherwise return null. + */ + protected Engine.Searcher acquireSearcher(IndexShard indexShard) { + List jobSearchContextIds = shardsMap.get(indexShard.shardId()); + if (jobSearchContextIds != null && jobSearchContextIds.size() > 0) { + CrateSearchContext searchContext = null; + Integer jobSearchContextId = null; + synchronized (lock) { + Iterator it = jobSearchContextIds.iterator(); + while (searchContext == null && it.hasNext()) { + jobSearchContextId = it.next(); + searchContext = activeContexts.get(jobSearchContextId); + } + } + if (searchContext != null) { + logger.trace("Reusing engine searcher of shard {}", indexShard.shardId()); + Integer refCount = engineSearchersRefCount.get(indexShard.shardId()); + assert refCount != null : "refCount should be initialized while creating context"; + while (!engineSearchersRefCount.replace(indexShard.shardId(), refCount, refCount+1)) { + refCount = engineSearchersRefCount.get(indexShard.shardId()); + } + if (logger.isTraceEnabled()) { + logger.trace("[acquireSearcher] Current engine searcher refCount {}:{} for context {} of shard {}", + refCount, engineSearchersRefCount.get(indexShard.shardId()), jobSearchContextId, indexShard.shardId()); + } + searchContext.sharedEngineSearcher(true); + return searchContext.engineSearcher(); + } + } + return null; + } + + public void acquireContext(SearchContext context) { + SearchContext.setCurrent(context); + } + + public void releaseContext(SearchContext context) { + assert context == SearchContext.current(); + context.clearReleasables(SearchContext.Lifetime.PHASE); + SearchContext.removeCurrent(); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { // prevent double release + for (Integer jobSearchContextId : activeContexts.keySet()) { + closeContext(jobSearchContextId); + } + } + } + + public void accessed(long accessTime) { + this.lastAccessTime = accessTime; + } + + public long lastAccessTime() { + return this.lastAccessTime; + } + + public long keepAlive() { + return this.keepAlive; + } + + public void keepAlive(long keepAlive) { + this.keepAlive = keepAlive; + } + + /** + * Acquire a new searcher, wrapper method needed for simplified testing + */ + protected Engine.Searcher acquireNewSearcher(IndexShard indexShard) { + return EngineSearcher.getSearcherWithRetry(indexShard, null); + } + +} diff --git a/sql/src/main/java/io/crate/operation/collect/LuceneDocCollector.java b/sql/src/main/java/io/crate/operation/collect/LuceneDocCollector.java index 255f4304de08..f5fcdb3e6ea0 100644 --- a/sql/src/main/java/io/crate/operation/collect/LuceneDocCollector.java +++ b/sql/src/main/java/io/crate/operation/collect/LuceneDocCollector.java @@ -21,15 +21,8 @@ package io.crate.operation.collect; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableMap; -import io.crate.Constants; -import io.crate.analyze.WhereClause; import io.crate.breaker.CrateCircuitBreakerService; import io.crate.breaker.RamAccountingContext; -import io.crate.executor.transport.CollectContextService; -import io.crate.lucene.LuceneQueryBuilder; -import io.crate.metadata.Functions; import io.crate.operation.Input; import io.crate.operation.projectors.Projector; import io.crate.operation.reference.doc.lucene.CollectorContext; @@ -37,31 +30,17 @@ import org.apache.lucene.index.AtomicReader; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.FieldInfo; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.search.*; -import org.elasticsearch.cache.recycler.CacheRecycler; -import org.elasticsearch.cache.recycler.PageCacheRecycler; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.common.util.BigArrays; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Scorer; import org.elasticsearch.index.fieldvisitor.FieldsVisitor; import org.elasticsearch.index.mapper.internal.SourceFieldMapper; -import org.elasticsearch.index.query.ParsedQuery; -import org.elasticsearch.index.service.IndexService; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.service.IndexShard; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.internal.DefaultSearchContext; import org.elasticsearch.search.internal.SearchContext; -import org.elasticsearch.search.internal.ShardSearchLocalRequest; -import org.elasticsearch.threadpool.ThreadPool; -import javax.annotation.Nullable; import java.io.IOException; import java.util.HashSet; import java.util.List; -import java.util.Objects; -import java.util.UUID; /** * collect documents from ES shard, a lucene index @@ -107,80 +86,23 @@ public void required(boolean required) { private final CollectorFieldsVisitor fieldsVisitor; private final List> topLevelInputs; private final List> collectorExpressions; + private final JobCollectContext jobCollectContext; private final SearchContext searchContext; private final int jobSearchContextId; - public LuceneDocCollector(final UUID jobId, - final ThreadPool threadPool, - ClusterService clusterService, - CollectContextService collectorContextService, - ShardId shardId, - final IndexService indexService, - final ScriptService scriptService, - final CacheRecycler cacheRecycler, - final PageCacheRecycler pageCacheRecycler, - final BigArrays bigArrays, - List> inputs, + public LuceneDocCollector(List> inputs, List> collectorExpressions, - final Functions functions, - final WhereClause whereClause, Projector downStreamProjector, + JobCollectContext jobCollectContext, + SearchContext searchContext, int jobSearchContextId) throws Exception { downstream(downStreamProjector); - final SearchShardTarget searchShardTarget = new SearchShardTarget( - clusterService.localNode().id(), shardId.getIndex(), shardId.id()); this.topLevelInputs = inputs; this.collectorExpressions = collectorExpressions; this.fieldsVisitor = new CollectorFieldsVisitor(collectorExpressions.size()); this.jobSearchContextId = jobSearchContextId; - - final IndexShard indexShard = indexService.shardSafe(shardId.id()); - final int searchContextId = Objects.hash(jobId, shardId); - this.searchContext = collectorContextService.getOrCreateContext( - jobId, - searchContextId, - new Function() { - - @Nullable - @Override - public SearchContext apply(@Nullable IndexReader indexReader) { - // TODO: handle IndexReader - ShardSearchLocalRequest searchRequest = new ShardSearchLocalRequest( - new String[] { Constants.DEFAULT_MAPPING_TYPE }, - System.currentTimeMillis() - ); - SearchContext localContext = null; - try { - localContext = new DefaultSearchContext( - searchContextId, - searchRequest, - searchShardTarget, - EngineSearcher.getSearcherWithRetry(indexShard, null), // TODO: use same searcher/reader for same jobId and searchContextId - indexService, - indexShard, - scriptService, - cacheRecycler, - pageCacheRecycler, - bigArrays, - threadPool.estimatedTimeInMillisCounter() - ); - LuceneQueryBuilder builder = new LuceneQueryBuilder(functions, localContext, indexService.cache()); - LuceneQueryBuilder.Context ctx = builder.convert(whereClause); - localContext.parsedQuery(new ParsedQuery(ctx.query(), ImmutableMap.of())); - Float minScore = ctx.minScore(); - if (minScore != null) { - localContext.minimumScore(minScore); - } - } catch (Throwable t) { - if (localContext != null) { - localContext.close(); - } - throw t; - } - return localContext; - } - } - ); + this.jobCollectContext = jobCollectContext; + this.searchContext = searchContext; } @Override @@ -243,7 +165,7 @@ public void doCollect(RamAccountingContext ramAccountingContext) throws Exceptio collectorExpression.startCollect(collectorContext); } visitorEnabled = fieldsVisitor.required(); - SearchContext.setCurrent(searchContext); + jobCollectContext.acquireContext(searchContext); Query query = searchContext.query(); if (query == null) { query = new MatchAllDocsQuery(); @@ -260,8 +182,10 @@ public void doCollect(RamAccountingContext ramAccountingContext) throws Exceptio downstream.upstreamFailed(e); throw e; } finally { - searchContext.close(); - SearchContext.removeCurrent(); + jobCollectContext.releaseContext(searchContext); + // should only be done on QAF not QTF! + jobCollectContext.closeContext(jobSearchContextId); } } + } diff --git a/sql/src/main/java/io/crate/operation/collect/MapSideDataCollectOperation.java b/sql/src/main/java/io/crate/operation/collect/MapSideDataCollectOperation.java index c43c0f169819..980884c38fa4 100644 --- a/sql/src/main/java/io/crate/operation/collect/MapSideDataCollectOperation.java +++ b/sql/src/main/java/io/crate/operation/collect/MapSideDataCollectOperation.java @@ -68,17 +68,17 @@ */ public class MapSideDataCollectOperation implements CollectOperation { - private final FileCollectInputSymbolVisitor fileInputSymbolVisitor; - private final CollectServiceResolver collectServiceResolver; - private final ProjectionToProjectorVisitor projectorVisitor; - private final ThreadPoolExecutor executor; - private final int poolSize; - private ESLogger logger = Loggers.getLogger(getClass()); - private static class SimpleShardCollectFuture extends ShardCollectFuture { - public SimpleShardCollectFuture(int numShards, ShardProjectorChain projectorChain) { + private final CollectContextService collectContextService; + private final UUID jobId; + + public SimpleShardCollectFuture(int numShards, ShardProjectorChain projectorChain, + CollectContextService collectContextService, + UUID jobId) { super(numShards, projectorChain); + this.collectContextService = collectContextService; + this.jobId = jobId; } @Override @@ -86,11 +86,13 @@ protected void onAllShardsFinished() { Futures.addCallback(resultProvider.result(), new FutureCallback() { @Override public void onSuccess(@Nullable Object[][] result) { + collectContextService.releaseContext(jobId); set(result); } @Override public void onFailure(@Nonnull Throwable t) { + collectContextService.releaseContext(jobId); setException(t); } }); @@ -101,6 +103,13 @@ public void onFailure(@Nonnull Throwable t) { protected final EvaluatingNormalizer nodeNormalizer; protected final ClusterService clusterService; private final ImplementationSymbolVisitor nodeImplementationSymbolVisitor; + private final CollectContextService collectContextService; + private final FileCollectInputSymbolVisitor fileInputSymbolVisitor; + private final CollectServiceResolver collectServiceResolver; + private final ProjectionToProjectorVisitor projectorVisitor; + private final ThreadPoolExecutor executor; + private final int poolSize; + private ESLogger logger = Loggers.getLogger(getClass()); @Inject public MapSideDataCollectOperation(ClusterService clusterService, @@ -110,11 +119,13 @@ public MapSideDataCollectOperation(ClusterService clusterService, ReferenceResolver referenceResolver, IndicesService indicesService, ThreadPool threadPool, - CollectServiceResolver collectServiceResolver) { + CollectServiceResolver collectServiceResolver, + CollectContextService collectContextService) { executor = (ThreadPoolExecutor) threadPool.executor(ThreadPool.Names.SEARCH); poolSize = executor.getPoolSize(); this.clusterService = clusterService; this.indicesService = indicesService; + this.collectContextService = collectContextService; this.nodeNormalizer = new EvaluatingNormalizer(functions, RowGranularity.NODE, referenceResolver); this.collectServiceResolver = collectServiceResolver; this.nodeImplementationSymbolVisitor = new ImplementationSymbolVisitor( @@ -252,8 +263,10 @@ protected ListenableFuture handleShardCollect(CollectNode collectNod return result; } - int jobSearchContextId = collectNode.routing().jobSearchContextIdBase(); + assert collectNode.jobId().isPresent() : "jobId must be set on CollectNode"; + JobCollectContext jobCollectContext = collectContextService.acquireContext(collectNode.jobId().get()); + int jobSearchContextId = collectNode.routing().jobSearchContextIdBase(); // get shardCollectors from single shards final List shardCollectors = new ArrayList<>(numShards); for (Map.Entry>> nodeEntry : collectNode.routing().locations().entrySet()) { @@ -269,6 +282,8 @@ protected ListenableFuture handleShardCollect(CollectNode collectNod } for (Integer shardId : entry.getValue()) { + jobCollectContext.registerJobContextId( + indexService.shardSafe(shardId).shardId(), jobSearchContextId); Injector shardInjector; try { shardInjector = indexService.shardInjectorSafe(shardId); @@ -276,11 +291,9 @@ protected ListenableFuture handleShardCollect(CollectNode collectNod CrateCollector collector = shardCollectService.getCollector( collectNode, projectorChain, + jobCollectContext, jobSearchContextId ); - if (jobSearchContextId > -1) { - jobSearchContextId++; - } shardCollectors.add(collector); } catch (IndexShardMissingException e) { throw new UnhandledServerException( @@ -290,6 +303,7 @@ protected ListenableFuture handleShardCollect(CollectNode collectNod logger.error("Error while getting collector", e); throw new UnhandledServerException(e); } + jobSearchContextId++; } } } else if (jobSearchContextId > -1) { @@ -384,7 +398,9 @@ private void doCollect(ShardCollectFuture result, CrateCollector shardCollector, * @param collectNode in case any other properties need to be extracted * @return a fancy ShardCollectFuture implementation */ - protected ShardCollectFuture getShardCollectFuture(int numShards, ShardProjectorChain projectorChain, CollectNode collectNode) { - return new SimpleShardCollectFuture(numShards, projectorChain); + protected ShardCollectFuture getShardCollectFuture(int numShards, + ShardProjectorChain projectorChain, + CollectNode collectNode) { + return new SimpleShardCollectFuture(numShards, projectorChain, collectContextService, collectNode.jobId().get()); } } diff --git a/sql/src/main/java/io/crate/operation/collect/ShardCollectService.java b/sql/src/main/java/io/crate/operation/collect/ShardCollectService.java index eb7fcb00476a..1a0bf15216ee 100644 --- a/sql/src/main/java/io/crate/operation/collect/ShardCollectService.java +++ b/sql/src/main/java/io/crate/operation/collect/ShardCollectService.java @@ -21,12 +21,16 @@ package io.crate.operation.collect; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import io.crate.action.sql.query.CrateSearchContext; import io.crate.analyze.EvaluatingNormalizer; +import io.crate.analyze.WhereClause; import io.crate.blob.v2.BlobIndices; -import io.crate.breaker.CrateCircuitBreakerService; import io.crate.exceptions.UnhandledServerException; -import io.crate.executor.transport.CollectContextService; import io.crate.executor.transport.TransportActionProvider; +import io.crate.lucene.LuceneQueryBuilder; import io.crate.metadata.Functions; import io.crate.metadata.shard.ShardReferenceResolver; import io.crate.metadata.shard.blob.BlobShardReferenceResolver; @@ -41,17 +45,26 @@ import io.crate.planner.RowGranularity; import io.crate.planner.node.dql.CollectNode; import io.crate.planner.symbol.Literal; +import org.apache.lucene.search.Filter; import org.elasticsearch.cache.recycler.CacheRecycler; import org.elasticsearch.cache.recycler.PageCacheRecycler; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.query.ParsedQuery; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.Scroll; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.threadpool.ThreadPool; +import javax.annotation.Nullable; + public class ShardCollectService { private final CollectInputSymbolVisitor docInputSymbolVisitor; @@ -70,8 +83,6 @@ public class ShardCollectService { private final Functions functions; private final BlobIndices blobIndices; - private final CollectContextService collectContextService; - @Inject public ShardCollectService(ThreadPool threadPool, ClusterService clusterService, @@ -86,9 +97,7 @@ public ShardCollectService(ThreadPool threadPool, Functions functions, ShardReferenceResolver referenceResolver, BlobIndices blobIndices, - BlobShardReferenceResolver blobShardReferenceResolver, - CrateCircuitBreakerService breakerService, - CollectContextService collectContextService) { + BlobShardReferenceResolver blobShardReferenceResolver) { this.threadPool = threadPool; this.clusterService = clusterService; this.shardId = shardId; @@ -124,7 +133,6 @@ public ShardCollectService(ThreadPool threadPool, shardNormalizer, shardId, docInputSymbolVisitor); - this.collectContextService = collectContextService; } /** @@ -137,6 +145,7 @@ public ShardCollectService(ThreadPool threadPool, */ public CrateCollector getCollector(CollectNode collectNode, ShardProjectorChain projectorChain, + JobCollectContext jobCollectContext, int jobSearchContextId) throws Exception { CollectNode normalizedCollectNode = collectNode.normalize(shardNormalizer); Projector downstream = projectorChain.newShardDownstreamProjector(projectorVisitor); @@ -149,7 +158,7 @@ public CrateCollector getCollector(CollectNode collectNode, if (isBlobShard) { return getBlobIndexCollector(normalizedCollectNode, downstream); } else { - return getLuceneIndexCollector(normalizedCollectNode, downstream, jobSearchContextId); + return getLuceneIndexCollector(normalizedCollectNode, downstream, jobCollectContext, jobSearchContextId); } } else if (granularity == RowGranularity.SHARD) { ImplementationSymbolVisitor.Context shardCtx = shardImplementationSymbolVisitor.process(normalizedCollectNode); @@ -178,24 +187,68 @@ private CrateCollector getBlobIndexCollector(CollectNode collectNode, Projector private CrateCollector getLuceneIndexCollector(CollectNode collectNode, Projector downstream, + JobCollectContext jobCollectContext, int jobSearchContextId) throws Exception { CollectInputSymbolVisitor.Context docCtx = docInputSymbolVisitor.process(collectNode); + SearchContext searchContext = getSearchContext(jobCollectContext, jobSearchContextId, + collectNode.whereClause()); return new LuceneDocCollector( - collectNode.jobId().get(), - threadPool, - clusterService, - collectContextService, - shardId, - indexService, - scriptService, - cacheRecycler, - pageCacheRecycler, - bigArrays, docCtx.topLevelInputs(), docCtx.docLevelExpressions(), - functions, - collectNode.whereClause(), downstream, + jobCollectContext, + searchContext, jobSearchContextId); } + + private SearchContext getSearchContext(JobCollectContext jobCollectContext, + final int jobSearchContextId, + final WhereClause whereClause) { + final SearchShardTarget searchShardTarget = new SearchShardTarget( + clusterService.localNode().id(), shardId.getIndex(), shardId.id()); + final IndexShard indexShard = indexService.shardSafe(shardId.id()); + return jobCollectContext.createContext( + indexShard, + jobSearchContextId, + new Function() { + + @Nullable + @Override + public CrateSearchContext apply(Engine.Searcher engineSearcher) { + CrateSearchContext localContext = null; + try { + localContext = new CrateSearchContext( + jobSearchContextId, + System.currentTimeMillis(), + searchShardTarget, + engineSearcher, + indexService, + indexShard, + scriptService, + cacheRecycler, + pageCacheRecycler, + bigArrays, + threadPool.estimatedTimeInMillisCounter(), + Optional.absent(), + CollectContextService.DEFAULT_KEEP_ALIVE + ); + LuceneQueryBuilder builder = new LuceneQueryBuilder(functions, localContext, indexService.cache()); + LuceneQueryBuilder.Context ctx = builder.convert(whereClause); + localContext.parsedQuery(new ParsedQuery(ctx.query(), ImmutableMap.of())); + Float minScore = ctx.minScore(); + if (minScore != null) { + localContext.minimumScore(minScore); + } + } catch (Throwable t) { + if (localContext != null) { + localContext.close(); + } + throw t; + } + return localContext; + } + } + ); + + } } diff --git a/sql/src/main/java/io/crate/planner/PlanNodeBuilder.java b/sql/src/main/java/io/crate/planner/PlanNodeBuilder.java index e0739b6c4298..90620e829084 100644 --- a/sql/src/main/java/io/crate/planner/PlanNodeBuilder.java +++ b/sql/src/main/java/io/crate/planner/PlanNodeBuilder.java @@ -43,11 +43,14 @@ public class PlanNodeBuilder { public static CollectNode distributingCollect(TableInfo tableInfo, - WhereClause whereClause, - List toCollect, - List downstreamNodes, - ImmutableList projections) { - CollectNode node = new CollectNode("distributing collect", tableInfo.getRouting(whereClause, null)); + Planner.Context plannerContext, + WhereClause whereClause, + List toCollect, + List downstreamNodes, + ImmutableList projections) { + Routing routing = tableInfo.getRouting(whereClause, null); + plannerContext.allocateJobSearchContextIds(routing); + CollectNode node = new CollectNode("distributing collect", routing); node.whereClause(whereClause); node.maxRowGranularity(tableInfo.rowGranularity()); node.downStreamNodes(downstreamNodes); @@ -102,17 +105,19 @@ public static void connectTypes(DQLPlanNode previousNode, DQLPlanNode nextNode) } public static CollectNode collect(TableInfo tableInfo, - WhereClause whereClause, - List toCollect, - ImmutableList projections, - @Nullable String partitionIdent, - @Nullable String routingPreference) { + Planner.Context plannerContext, + WhereClause whereClause, + List toCollect, + ImmutableList projections, + @Nullable String partitionIdent, + @Nullable String routingPreference) { assert !Iterables.any(toCollect, Predicates.instanceOf(InputColumn.class)) : "cannot collect inputcolumns"; Routing routing = tableInfo.getRouting(whereClause, routingPreference); if (partitionIdent != null && routing.hasLocations()) { routing = filterRouting(routing, PartitionName.fromPartitionIdent( tableInfo.ident().schema(), tableInfo.ident().name(), partitionIdent).stringValue()); } + plannerContext.allocateJobSearchContextIds(routing); CollectNode node = new CollectNode("collect", routing); node.whereClause(whereClause); node.toCollect(toCollect); @@ -149,17 +154,19 @@ private static Routing filterRouting(Routing routing, String includeTableName) { } public static CollectNode collect(TableInfo tableInfo, - WhereClause whereClause, - List toCollect, - ImmutableList projections) { - return collect(tableInfo, whereClause, toCollect, projections, null, null); + Planner.Context plannerContext, + WhereClause whereClause, + List toCollect, + ImmutableList projections) { + return collect(tableInfo, plannerContext, whereClause, toCollect, projections, null, null); } public static CollectNode collect(TableInfo tableInfo, - WhereClause whereClause, - List toCollect, - ImmutableList projections, - @Nullable String partitionIdent) { - return collect(tableInfo, whereClause, toCollect, projections, partitionIdent, null); + Planner.Context plannerContext, + WhereClause whereClause, + List toCollect, + ImmutableList projections, + @Nullable String partitionIdent) { + return collect(tableInfo, plannerContext, whereClause, toCollect, projections, partitionIdent, null); } } diff --git a/sql/src/main/java/io/crate/planner/Planner.java b/sql/src/main/java/io/crate/planner/Planner.java index e3ab014b7a47..c29721ce5023 100644 --- a/sql/src/main/java/io/crate/planner/Planner.java +++ b/sql/src/main/java/io/crate/planner/Planner.java @@ -95,7 +95,8 @@ public static class Context { * set on the given routing. */ public void allocateJobSearchContextIds(Routing routing) { - if (routing.jobSearchContextIdBase() > -1) { + if (routing.jobSearchContextIdBase() > -1 || routing.hasLocations() == false + || routing.numShards() == 0) { return; } int jobSearchContextId = jobSearchContextIdBaseSeq; @@ -189,15 +190,15 @@ protected Plan visitDeleteStatement(DeleteAnalyzedStatement analyzedStatement, C protected Plan visitCopyStatement(final CopyAnalyzedStatement analysis, Context context) { IterablePlan plan = new IterablePlan(); if (analysis.mode() == CopyAnalyzedStatement.Mode.FROM) { - copyFromPlan(analysis, plan); + copyFromPlan(analysis, plan, context); } else if (analysis.mode() == CopyAnalyzedStatement.Mode.TO) { - copyToPlan(analysis, plan); + copyToPlan(analysis, plan, context); } return plan; } - private void copyToPlan(CopyAnalyzedStatement analysis, IterablePlan plan) { + private void copyToPlan(CopyAnalyzedStatement analysis, IterablePlan plan, Context context) { TableInfo tableInfo = analysis.table(); WriterProjection projection = new WriterProjection(); projection.uri(analysis.uri()); @@ -230,6 +231,7 @@ private void copyToPlan(CopyAnalyzedStatement analysis, IterablePlan plan) { } CollectNode collectNode = PlanNodeBuilder.collect( tableInfo, + context, WhereClause.MATCH_ALL, outputs, ImmutableList.of(projection), @@ -241,7 +243,7 @@ private void copyToPlan(CopyAnalyzedStatement analysis, IterablePlan plan) { plan.add(mergeNode); } - private void copyFromPlan(CopyAnalyzedStatement analysis, IterablePlan plan) { + private void copyFromPlan(CopyAnalyzedStatement analysis, IterablePlan plan, Context context) { /** * copy from has two "modes": * diff --git a/sql/src/main/java/io/crate/planner/consumer/DistributedGroupByConsumer.java b/sql/src/main/java/io/crate/planner/consumer/DistributedGroupByConsumer.java index 1d6507296aad..f45c5c48e4e0 100644 --- a/sql/src/main/java/io/crate/planner/consumer/DistributedGroupByConsumer.java +++ b/sql/src/main/java/io/crate/planner/consumer/DistributedGroupByConsumer.java @@ -112,6 +112,7 @@ public AnalyzedRelation visitQueriedTable(QueriedTable table, Context context) { CollectNode collectNode = PlanNodeBuilder.distributingCollect( tableInfo, + context.consumerContext.plannerContext(), table.querySpec().where(), splitPoints.leaves(), Lists.newArrayList(routing.nodes()), diff --git a/sql/src/main/java/io/crate/planner/consumer/GlobalAggregateConsumer.java b/sql/src/main/java/io/crate/planner/consumer/GlobalAggregateConsumer.java index 02c3c4dd8892..8e25c90298fb 100644 --- a/sql/src/main/java/io/crate/planner/consumer/GlobalAggregateConsumer.java +++ b/sql/src/main/java/io/crate/planner/consumer/GlobalAggregateConsumer.java @@ -89,7 +89,7 @@ public PlannedAnalyzedRelation visitQueriedTable(QueriedTable table, ConsumerCon context.validationException(new VersionInvalidException()); return null; } - return globalAggregates(table, table.tableRelation(), table.querySpec().where()); + return globalAggregates(table, table.tableRelation(), table.querySpec().where(), context); } @Override @@ -110,7 +110,8 @@ private static boolean noGroupBy(List groupBy) { public static PlannedAnalyzedRelation globalAggregates(QueriedTable table, TableRelation tableRelation, - WhereClause whereClause) { + WhereClause whereClause, + ConsumerContext context) { assert noGroupBy(table.querySpec().groupBy()) : "must not have group by clause for global aggregate queries"; validateAggregationOutputs(tableRelation, table.querySpec().outputs()); // global aggregate: collect and partial aggregate on C and final agg on H @@ -126,6 +127,7 @@ public static PlannedAnalyzedRelation globalAggregates(QueriedTable table, CollectNode collectNode = PlanNodeBuilder.collect( tableRelation.tableInfo(), + context.plannerContext(), whereClause, splitPoints.leaves(), ImmutableList.of(ap) diff --git a/sql/src/main/java/io/crate/planner/consumer/NonDistributedGroupByConsumer.java b/sql/src/main/java/io/crate/planner/consumer/NonDistributedGroupByConsumer.java index c6aa2f261e56..2d5fe809edc8 100644 --- a/sql/src/main/java/io/crate/planner/consumer/NonDistributedGroupByConsumer.java +++ b/sql/src/main/java/io/crate/planner/consumer/NonDistributedGroupByConsumer.java @@ -140,6 +140,7 @@ private AnalyzedRelation nonDistributedGroupBy(QueriedTable table, Context conte CollectNode collectNode = PlanNodeBuilder.collect( tableInfo, + context.consumerContext.plannerContext(), table.querySpec().where(), splitPoints.leaves(), ImmutableList.of(groupProjection) diff --git a/sql/src/main/java/io/crate/planner/consumer/QueryAndFetchConsumer.java b/sql/src/main/java/io/crate/planner/consumer/QueryAndFetchConsumer.java index 5858224bf8a0..ecc7dd0a96e7 100644 --- a/sql/src/main/java/io/crate/planner/consumer/QueryAndFetchConsumer.java +++ b/sql/src/main/java/io/crate/planner/consumer/QueryAndFetchConsumer.java @@ -101,7 +101,7 @@ public AnalyzedRelation visitQueriedTable(QueriedTable table, Context context) { } if (table.querySpec().hasAggregates()) { context.result = true; - return GlobalAggregateConsumer.globalAggregates(table, tableRelation, table.querySpec().where()); + return GlobalAggregateConsumer.globalAggregates(table, tableRelation, table.querySpec().where(), context.consumerContext); } else { context.result = true; return normalSelect(table, table.querySpec().where(), tableRelation, context); @@ -163,7 +163,9 @@ private AnalyzedRelation normalSelect(QueriedTable table, "Analyzer should have thrown an exception already."; ImmutableList projections = ImmutableList.of(); - collectNode = PlanNodeBuilder.collect(tableInfo, whereClause, outputSymbols, projections); + collectNode = PlanNodeBuilder.collect(tableInfo, + context.consumerContext.plannerContext(), + whereClause, outputSymbols, projections); } else if (querySpec.isLimited() || orderBy != null) { /** * select id, name, order by id, date @@ -217,7 +219,9 @@ private AnalyzedRelation normalSelect(QueriedTable table, ); } tnp.outputs(allOutputs); - collectNode = PlanNodeBuilder.collect(tableInfo, whereClause, toCollect, ImmutableList.of(tnp)); + collectNode = PlanNodeBuilder.collect(tableInfo, + context.consumerContext.plannerContext(), + whereClause, toCollect, ImmutableList.of(tnp)); if (orderBy == null) { tnp = new TopNProjection(limit, querySpec.offset()); @@ -230,7 +234,9 @@ private AnalyzedRelation normalSelect(QueriedTable table, tnp.outputs(finalOutputs); mergeNode = PlanNodeBuilder.localMerge(ImmutableList.of(tnp), collectNode); } else { - collectNode = PlanNodeBuilder.collect(tableInfo, whereClause, outputSymbols, ImmutableList.of()); + collectNode = PlanNodeBuilder.collect(tableInfo, + context.consumerContext.plannerContext(), + whereClause, outputSymbols, ImmutableList.of()); mergeNode = PlanNodeBuilder.localMerge(ImmutableList.of(), collectNode); } return new QueryAndFetch(collectNode, mergeNode); diff --git a/sql/src/main/java/io/crate/planner/consumer/ReduceOnCollectorGroupByConsumer.java b/sql/src/main/java/io/crate/planner/consumer/ReduceOnCollectorGroupByConsumer.java index 0aed9cc4c840..058cbc72bfe6 100644 --- a/sql/src/main/java/io/crate/planner/consumer/ReduceOnCollectorGroupByConsumer.java +++ b/sql/src/main/java/io/crate/planner/consumer/ReduceOnCollectorGroupByConsumer.java @@ -187,6 +187,7 @@ private AnalyzedRelation optimizedReduceOnCollectorGroupBy(QueriedTable table, T CollectNode collectNode = PlanNodeBuilder.collect( tableInfo, + context.plannerContext(), table.querySpec().where(), splitPoints.leaves(), ImmutableList.copyOf(projections) diff --git a/sql/src/main/java/io/crate/planner/consumer/UpdateConsumer.java b/sql/src/main/java/io/crate/planner/consumer/UpdateConsumer.java index 6cdd10ad4842..63b81e667174 100644 --- a/sql/src/main/java/io/crate/planner/consumer/UpdateConsumer.java +++ b/sql/src/main/java/io/crate/planner/consumer/UpdateConsumer.java @@ -78,7 +78,7 @@ public UpdateConsumer(AnalysisMetaData analysisMetaData) { @Override public boolean consume(AnalyzedRelation rootRelation, ConsumerContext context) { - PlannedAnalyzedRelation plannedAnalyzedRelation = visitor.process(rootRelation, null); + PlannedAnalyzedRelation plannedAnalyzedRelation = visitor.process(rootRelation, context); if (plannedAnalyzedRelation == null) { return false; } @@ -86,7 +86,7 @@ public boolean consume(AnalyzedRelation rootRelation, ConsumerContext context) { return true; } - class Visitor extends AnalyzedRelationVisitor { + class Visitor extends AnalyzedRelationVisitor { private final AnalysisMetaData analysisMetaData; @@ -95,7 +95,7 @@ public Visitor(AnalysisMetaData analysisMetaData) { } @Override - public PlannedAnalyzedRelation visitUpdateAnalyzedStatement(UpdateAnalyzedStatement statement, Void context) { + public PlannedAnalyzedRelation visitUpdateAnalyzedStatement(UpdateAnalyzedStatement statement, ConsumerContext context) { assert statement.sourceRelation() instanceof TableRelation : "sourceRelation of update statement must be a TableRelation"; TableRelation tableRelation = (TableRelation) statement.sourceRelation(); TableInfo tableInfo = tableRelation.tableInfo(); @@ -119,7 +119,7 @@ public PlannedAnalyzedRelation visitUpdateAnalyzedStatement(UpdateAnalyzedStatem } upsertById(nestedAnalysis, tableInfo, whereClause, upsertByIdNode); } else { - childNodes.add(upsertByQuery(nestedAnalysis, tableInfo, whereClause)); + childNodes.add(upsertByQuery(nestedAnalysis, context, tableInfo, whereClause)); } } if (childNodes.size()>0){ @@ -130,6 +130,7 @@ public PlannedAnalyzedRelation visitUpdateAnalyzedStatement(UpdateAnalyzedStatem } private List upsertByQuery(UpdateAnalyzedStatement.NestedAnalyzedStatement nestedAnalysis, + ConsumerContext consumerContext, TableInfo tableInfo, WhereClause whereClause) { @@ -162,6 +163,7 @@ private List upsertByQuery(UpdateAnalyzedStatement.NestedAnalyzedSt CollectNode collectNode = PlanNodeBuilder.collect( tableInfo, + consumerContext.plannerContext(), whereClause, ImmutableList.of(uidReference), ImmutableList.of(updateProjection), @@ -210,7 +212,7 @@ private Tuple convertAssignments(Map assi } @Override - protected PlannedAnalyzedRelation visitAnalyzedRelation(AnalyzedRelation relation, Void context) { + protected PlannedAnalyzedRelation visitAnalyzedRelation(AnalyzedRelation relation, ConsumerContext context) { return null; } } diff --git a/sql/src/main/java/io/crate/plugin/SQLPlugin.java b/sql/src/main/java/io/crate/plugin/SQLPlugin.java index ca479d6be099..409436362ecb 100644 --- a/sql/src/main/java/io/crate/plugin/SQLPlugin.java +++ b/sql/src/main/java/io/crate/plugin/SQLPlugin.java @@ -40,6 +40,7 @@ import io.crate.metadata.shard.MetaDataShardModule; import io.crate.metadata.sys.MetaDataSysModule; import io.crate.operation.aggregation.impl.AggregationImplModule; +import io.crate.operation.collect.CollectContextService; import io.crate.operation.collect.CollectOperationModule; import io.crate.operation.collect.CollectShardModule; import io.crate.operation.operator.OperatorModule; @@ -98,7 +99,9 @@ public String description() { @Override public Collection> services() { - return ImmutableList.>of(SQLService.class); + return ImmutableList.>of( + SQLService.class, + CollectContextService.class); } @Override diff --git a/sql/src/test/java/io/crate/integrationtests/SQLTransportIntegrationTest.java b/sql/src/test/java/io/crate/integrationtests/SQLTransportIntegrationTest.java index eb22a1f191d9..c67b46639910 100644 --- a/sql/src/test/java/io/crate/integrationtests/SQLTransportIntegrationTest.java +++ b/sql/src/test/java/io/crate/integrationtests/SQLTransportIntegrationTest.java @@ -25,7 +25,6 @@ import io.crate.action.sql.*; import io.crate.action.sql.parser.SQLXContentSourceContext; import io.crate.action.sql.parser.SQLXContentSourceParser; -import io.crate.executor.transport.CollectContextService; import io.crate.test.integration.CrateIntegrationTest; import io.crate.testing.SQLTransportExecutor; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; @@ -36,14 +35,12 @@ import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; -import org.junit.After; import java.io.IOException; import java.util.Map; @@ -72,12 +69,6 @@ public Settings indexSettings() { return ImmutableSettings.builder().put("number_of_replicas", 0).build(); } - @After - public void releaseCachedSearchContexts() { - // release all cached SearchContexts, so no open Searchers are leaked - Releasables.close(cluster().getInstances(CollectContextService.class)); - } - /** * Execute an SQL Statement on a random node of the cluster * diff --git a/sql/src/test/java/io/crate/operation/collect/CollectContextServiceTest.java b/sql/src/test/java/io/crate/operation/collect/CollectContextServiceTest.java new file mode 100644 index 000000000000..9991126e145b --- /dev/null +++ b/sql/src/test/java/io/crate/operation/collect/CollectContextServiceTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to CRATE Technology GmbH ("Crate") under one or more contributor + * license agreements. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. Crate licenses + * this file to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * However, if you have executed another commercial license agreement + * with Crate these terms will supersede the license and you may use the + * software solely pursuant to the terms of the relevant commercial agreement. + */ + +package io.crate.operation.collect; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; +import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes; +import static org.hamcrest.Matchers.*; + +public class CollectContextServiceTest extends RandomizedTest { + + private final ThreadPool testThreadPool = new ThreadPool(getClass().getSimpleName()); + private final Settings settings = ImmutableSettings.EMPTY; + + private final CollectContextService collectContextService = new CollectContextService(settings, testThreadPool); + + @After + public void cleanUp() throws Exception { + collectContextService.close(); + testThreadPool.shutdown(); + } + + @Test + public void testAcquireContext() throws Exception { + // create new context + JobCollectContext ctx1 = collectContextService.acquireContext(UUID.randomUUID()); + assertThat(ctx1, instanceOf(JobCollectContext.class)); + assertThat(ctx1.lastAccessTime(), is(-1L)); + + // using same UUID must return existing context + JobCollectContext ctx2 = collectContextService.acquireContext(ctx1.id()); + assertThat(ctx2, is(ctx1)); + } + + @Test + public void testReleaseContext() throws Exception { + JobCollectContext ctx1 = collectContextService.acquireContext(UUID.randomUUID()); + collectContextService.releaseContext(ctx1.id()); + assertThat(ctx1.lastAccessTime(), greaterThan(-1L)); + } + + @Test + public void testCloseContext() throws Exception { + JobCollectContext ctx1 = collectContextService.acquireContext(UUID.randomUUID()); + collectContextService.closeContext(ctx1.id()); + + // context must be closed + Field closed = JobCollectContext.class.getDeclaredField("closed"); + closed.setAccessible(true); + assertThat(((AtomicBoolean)closed.get(ctx1)).get(), is(true)); + + Field activeContexts = CollectContextService.class.getDeclaredField("activeContexts"); + activeContexts.setAccessible(true); + assertThat(((Map) activeContexts.get(collectContextService)).size(), is(0)); + } + + @Test + public void testKeepAliveExpiration() throws Exception { + CollectContextService.DEFAULT_KEEP_ALIVE_INTERVAL = timeValueMillis(1); + CollectContextService.DEFAULT_KEEP_ALIVE = timeValueMillis(0).millis(); + CollectContextService collectContextService1 = new CollectContextService(settings, testThreadPool); + JobCollectContext ctx1 = collectContextService1.acquireContext(UUID.randomUUID()); + collectContextService1.releaseContext(ctx1.id()); + Field activeContexts = CollectContextService.class.getDeclaredField("activeContexts"); + activeContexts.setAccessible(true); + + Thread.sleep(300); + + assertThat(((Map) activeContexts.get(collectContextService1)).size(), is(0)); + + // close service, stop reaper thread + collectContextService1.close(); + + // set back original values + CollectContextService.DEFAULT_KEEP_ALIVE_INTERVAL = timeValueMinutes(1); + CollectContextService.DEFAULT_KEEP_ALIVE = timeValueMinutes(5).millis(); + } +} diff --git a/sql/src/test/java/io/crate/operation/collect/CollectContextTest.java b/sql/src/test/java/io/crate/operation/collect/CollectContextTest.java deleted file mode 100644 index 27ebfbdc61ba..000000000000 --- a/sql/src/test/java/io/crate/operation/collect/CollectContextTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to CRATE Technology GmbH ("Crate") under one or more contributor - * license agreements. See the NOTICE file distributed with this work for - * additional information regarding copyright ownership. Crate licenses - * this file to you under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. You may - * obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - * - * However, if you have executed another commercial license agreement - * with Crate these terms will supersede the license and you may use the - * software solely pursuant to the terms of the relevant commercial agreement. - */ - -package io.crate.operation.collect; - -import com.google.common.base.Function; -import io.crate.executor.transport.CollectContextService; -import org.apache.lucene.index.IndexReader; -import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.search.internal.SearchContext; -import org.elasticsearch.test.TestSearchContext; -import org.junit.After; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import javax.annotation.Nullable; -import java.util.UUID; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; - -public class CollectContextTest { - - private CollectContextService collectContextService; - - @Rule - public ExpectedException expectedException = ExpectedException.none(); - - @After - public void cleanup() { - Releasables.close(collectContextService); - } - - private static final Function CONTEXT_FUNCTION = new Function() { - - @Nullable - @Override - public SearchContext apply(IndexReader input) { - return new TestSearchContext(); - } - }; - - @Test - public void testSameForSameArgs() throws Throwable { - collectContextService = new CollectContextService(); - UUID jobId = UUID.randomUUID(); - SearchContext ctx1 = collectContextService.getOrCreateContext(jobId, 0, CONTEXT_FUNCTION); - SearchContext ctx2 = collectContextService.getOrCreateContext(jobId, 0, CONTEXT_FUNCTION); - assertThat(ctx1, is(ctx2)); - SearchContext ctx3 = collectContextService.getOrCreateContext(UUID.randomUUID(), 0, CONTEXT_FUNCTION); - assertThat(ctx3, is(not(ctx1))); - - SearchContext ctx4 = collectContextService.getOrCreateContext(jobId, 1, CONTEXT_FUNCTION); - assertThat(ctx4, is(not(ctx1))); - } -} diff --git a/sql/src/test/java/io/crate/operation/collect/DistributingCollectTest.java b/sql/src/test/java/io/crate/operation/collect/DistributingCollectTest.java index 0d5b7d246685..9759bf6bb13f 100644 --- a/sql/src/test/java/io/crate/operation/collect/DistributingCollectTest.java +++ b/sql/src/test/java/io/crate/operation/collect/DistributingCollectTest.java @@ -225,6 +225,7 @@ public TestShardModule(int shardId) { protected void configure() { IndexShard shard = mock(InternalIndexShard.class); bind(IndexShard.class).toInstance(shard); + when(shard.shardId()).thenReturn(shardId); Index index = new Index(TEST_TABLE_NAME); bind(Index.class).toInstance(index); bind(ShardId.class).toInstance(shardId); diff --git a/sql/src/test/java/io/crate/operation/collect/JobCollectContextTest.java b/sql/src/test/java/io/crate/operation/collect/JobCollectContextTest.java new file mode 100644 index 000000000000..8ea50fb3b9f0 --- /dev/null +++ b/sql/src/test/java/io/crate/operation/collect/JobCollectContextTest.java @@ -0,0 +1,259 @@ +/* + * Licensed to CRATE Technology GmbH ("Crate") under one or more contributor + * license agreements. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. Crate licenses + * this file to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * However, if you have executed another commercial license agreement + * with Crate these terms will supersede the license and you may use the + * software solely pursuant to the terms of the relevant commercial agreement. + */ + +package io.crate.operation.collect; + +import com.google.common.base.Function; +import io.crate.action.sql.query.CrateSearchContext; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.search.internal.SearchContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import javax.annotation.Nullable; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; +import static org.powermock.api.mockito.PowerMockito.*; + +/** + * This class requires PowerMock in order to mock the final {@link SearchContext#close} method. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(CrateSearchContext.class) +public class JobCollectContextTest { + + static final Function CONTEXT_FUNCTION = + new Function() { + @Nullable + @Override + public CrateSearchContext apply(Engine.Searcher input) { + CrateSearchContext searchContext = mock(CrateSearchContext.class); + when(searchContext.engineSearcher()).thenReturn(input); + when(searchContext.isEngineSearcherShared()).thenCallRealMethod(); + doCallRealMethod().when(searchContext).sharedEngineSearcher(Mockito.anyBoolean()); + doNothing().when(searchContext).close(); + return searchContext; + } + }; + + + private JobCollectContext jobCollectContext; + private IndexShard indexShard; + private ShardId shardId; + + @Before + public void setUp() throws Exception { + jobCollectContext = spy(new JobCollectContext(UUID.randomUUID())); + indexShard = mock(IndexShard.class); + shardId = new ShardId("dummy", 1); + when(indexShard.shardId()).thenReturn(shardId); + doReturn(mock(Engine.Searcher.class)).when(jobCollectContext).acquireNewSearcher(indexShard); + } + + @After + public void cleanUp() throws Exception { + jobCollectContext.close(); + } + + @Test + public void testRegisterJobContextId() throws Exception { + final Field jobContextIdMap = JobCollectContext.class.getDeclaredField("jobContextIdMap"); + jobContextIdMap.setAccessible(true); + final Field shardsMap = JobCollectContext.class.getDeclaredField("shardsMap"); + shardsMap.setAccessible(true); + + final ExecutorService executorService = Executors.newFixedThreadPool(10); + final CountDownLatch latch = new CountDownLatch(10); + List> tasks = new ArrayList<>(10); + for (int i = 0; i < 10; i++) { + final int jobSearchContextId = i; + tasks.add(new Callable() { + @Override + public Void call() throws Exception { + jobCollectContext.registerJobContextId(shardId, jobSearchContextId); + latch.countDown(); + assertThat((ShardId) ((Map) jobContextIdMap.get(jobCollectContext)).get(jobSearchContextId), is(shardId)); + return null; + } + }); + } + executorService.invokeAll(tasks); + latch.await(); + + assertThat(((Map) jobContextIdMap.get(jobCollectContext)).size(), is(10)); + assertThat(((Map)shardsMap.get(jobCollectContext)).size(), is(1)); + assertThat((List) ((Map) shardsMap.get(jobCollectContext)).get(shardId), containsInAnyOrder(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)); + } + + @Test + public void testCreateAndCloseContext() throws Exception { + final Field activeContexts = JobCollectContext.class.getDeclaredField("activeContexts"); + activeContexts.setAccessible(true); + + int jobSearchContextId = 1; + jobCollectContext.registerJobContextId(shardId, jobSearchContextId); + + CrateSearchContext ctx1 = jobCollectContext.createContext(indexShard, jobSearchContextId, CONTEXT_FUNCTION); + assertThat(ctx1, instanceOf(CrateSearchContext.class)); + + // calling again with same arguments results in same context + CrateSearchContext ctx2 = jobCollectContext.createContext(indexShard, jobSearchContextId, CONTEXT_FUNCTION); + assertEquals(ctx1, ctx2); + assertThat(((Map)activeContexts.get(jobCollectContext)).size(), is(1)); + + jobCollectContext.closeContext(jobSearchContextId); + assertThat(((Map) activeContexts.get(jobCollectContext)).size(), is(0)); + } + + @Test + public void testFindContext() throws Exception { + int jobSearchContextId = 1; + jobCollectContext.registerJobContextId(shardId, jobSearchContextId); + + // no context created, expect null + assertNull(jobCollectContext.findContext(1)); + + CrateSearchContext ctx1 = jobCollectContext.createContext(indexShard, jobSearchContextId, CONTEXT_FUNCTION); + CrateSearchContext ctx2 = jobCollectContext.findContext(jobSearchContextId); + assertEquals(ctx1, ctx2); + } + + @Test + public void testSharedEngineSearcher() throws Exception { + final Field engineSearchersRefCount = JobCollectContext.class.getDeclaredField("engineSearchersRefCount"); + engineSearchersRefCount.setAccessible(true); + + jobCollectContext.registerJobContextId(shardId, 1); + jobCollectContext.registerJobContextId(shardId, 2); + + CrateSearchContext ctx1 = jobCollectContext.createContext(indexShard, 1, CONTEXT_FUNCTION); + assertThat(ctx1, instanceOf(CrateSearchContext.class)); + + CrateSearchContext ctx2 = jobCollectContext.createContext(indexShard, 2, CONTEXT_FUNCTION); + + assertEquals(ctx1.engineSearcher(), ctx2.engineSearcher()); + assertThat(ctx1.isEngineSearcherShared(), is(true)); + assertThat(ctx2.isEngineSearcherShared(), is(true)); + assertThat(((Map)engineSearchersRefCount.get(jobCollectContext)).get(shardId), is(2)); + + jobCollectContext.closeContext(1); + assertThat(((Map) engineSearchersRefCount.get(jobCollectContext)).get(shardId), is(1)); + jobCollectContext.closeContext(2); + assertThat(((Map) engineSearchersRefCount.get(jobCollectContext)).get(shardId), is(0)); + } + + @Test + public void testSharedEngineSearcherConcurrent() throws Exception { + final Field engineSearchersRefCount = JobCollectContext.class.getDeclaredField("engineSearchersRefCount"); + engineSearchersRefCount.setAccessible(true); + + // open contexts concurrent (all sharing same engine searcher) + final ExecutorService executorService = Executors.newFixedThreadPool(10); + final CountDownLatch latch = new CountDownLatch(10); + List> tasks = new ArrayList<>(10); + for (int i = 0; i < 10; i++) { + final int jobSearchContextId = i; + jobCollectContext.registerJobContextId(shardId, jobSearchContextId); + tasks.add(new Callable() { + @Override + public Void call() throws Exception { + jobCollectContext.createContext(indexShard, jobSearchContextId, CONTEXT_FUNCTION); + latch.countDown(); + return null; + } + }); + } + executorService.invokeAll(tasks); + latch.await(); + assertThat(((Map) engineSearchersRefCount.get(jobCollectContext)).get(shardId), is(10)); + + // close contexts concurrent ( + final CountDownLatch latch2 = new CountDownLatch(10); + List> tasks2 = new ArrayList<>(10); + for (int i = 0; i < 10; i++) { + final int jobSearchContextId = i; + tasks2.add(new Callable() { + @Override + public Void call() throws Exception { + jobCollectContext.closeContext(jobSearchContextId); + latch2.countDown(); + return null; + } + }); + } + executorService.invokeAll(tasks2); + latch2.await(); + assertThat(((Map) engineSearchersRefCount.get(jobCollectContext)).get(shardId), is(0)); + } + + @Test + public void testClose() throws Exception { + final Field closed = JobCollectContext.class.getDeclaredField("closed"); + closed.setAccessible(true); + final Field activeContexts = JobCollectContext.class.getDeclaredField("activeContexts"); + activeContexts.setAccessible(true); + + assertThat(((AtomicBoolean)closed.get(jobCollectContext)).get(), is(false)); + + int jobSearchContextId = 1; + jobCollectContext.registerJobContextId(shardId, jobSearchContextId); + + CrateSearchContext ctx1 = jobCollectContext.createContext(indexShard, jobSearchContextId, CONTEXT_FUNCTION); + assertThat(ctx1, instanceOf(CrateSearchContext.class)); + + jobCollectContext.close(); + assertThat(((AtomicBoolean) closed.get(jobCollectContext)).get(), is(true)); + assertThat(((Map) activeContexts.get(jobCollectContext)).size(), is(0)); + } + + @Test + public void testAcquireAndReleaseContext() throws Exception { + int jobSearchContextId = 1; + jobCollectContext.registerJobContextId(shardId, jobSearchContextId); + + SearchContext ctx1 = jobCollectContext.createContext(indexShard, jobSearchContextId, CONTEXT_FUNCTION); + assertThat(ctx1, instanceOf(CrateSearchContext.class)); + + jobCollectContext.acquireContext(ctx1); + assertThat(SearchContext.current(), is(ctx1)); + + jobCollectContext.releaseContext(ctx1); + assertThat(SearchContext.current(), nullValue()); + } +} diff --git a/sql/src/test/java/io/crate/operation/collect/LocalDataCollectTest.java b/sql/src/test/java/io/crate/operation/collect/LocalDataCollectTest.java index 9bfb3fc86a14..f0475e1e8eb3 100644 --- a/sql/src/test/java/io/crate/operation/collect/LocalDataCollectTest.java +++ b/sql/src/test/java/io/crate/operation/collect/LocalDataCollectTest.java @@ -92,6 +92,9 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.mockito.Answers; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.*; import java.util.concurrent.ExecutionException; @@ -275,6 +278,7 @@ public TestShardModule(int shardId) { protected void configure() { IndexShard shard = mock(InternalIndexShard.class); bind(IndexShard.class).toInstance(shard); + when(shard.shardId()).thenReturn(shardId); Index index = new Index(TEST_TABLE_NAME); bind(Index.class).toInstance(index); bind(ShardId.class).toInstance(shardId); @@ -319,6 +323,13 @@ public void configure() { when(indicesService.indexServiceSafe(TEST_TABLE_NAME)).thenReturn(indexService); NodeSettingsService nodeSettingsService = mock(NodeSettingsService.class); + CollectContextService collectContextService = mock(CollectContextService.class); + when(collectContextService.acquireContext(Mockito.any(UUID.class))).thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + return new JobCollectContext((UUID)invocation.getArguments()[0]); + } + }); operation = new MapSideDataCollectOperation( injector.getInstance(ClusterService.class), @@ -330,7 +341,8 @@ public void configure() { discoveryService, functions, new StatsTables(ImmutableSettings.EMPTY, nodeSettingsService)) - )); + ), + collectContextService); } private Routing shardRouting(final Integer... shardIds) { diff --git a/sql/src/test/java/io/crate/operation/collect/MapSideDataCollectOperationTest.java b/sql/src/test/java/io/crate/operation/collect/MapSideDataCollectOperationTest.java index 77eba2c68088..500849d54b94 100644 --- a/sql/src/test/java/io/crate/operation/collect/MapSideDataCollectOperationTest.java +++ b/sql/src/test/java/io/crate/operation/collect/MapSideDataCollectOperationTest.java @@ -75,6 +75,7 @@ public ReferenceImplementation getImplementation(ReferenceIdent ident) { }; NodeSettingsService nodeSettingsService = mock(NodeSettingsService.class); + CollectContextService collectContextService = mock(CollectContextService.class); MapSideDataCollectOperation collectOperation = new MapSideDataCollectOperation( clusterService, ImmutableSettings.EMPTY, @@ -89,7 +90,8 @@ public ReferenceImplementation getImplementation(ReferenceIdent ident) { functions, new StatsTables(ImmutableSettings.EMPTY, nodeSettingsService) ) - )); + ), + collectContextService); File tmpFile = File.createTempFile("fileUriCollectOperation", ".json"); try (FileWriter writer = new FileWriter(tmpFile)) {