From 46f0d56dfd3d1c7fb0e925ef7a20c9f439cd6ff5 Mon Sep 17 00:00:00 2001 From: Mathias Fussenegger Date: Thu, 25 Jun 2015 10:30:46 +0200 Subject: [PATCH] remove Engine.Searcher sharing logic from JobCollectContext This was (probably) intended to optimize self-joins but for that to work it would actually have to be done in the JobExecutionContext because the JobCollectContext is specific to one relation/CollectNode. --- .../operation/collect/JobCollectContext.java | 14 +-- .../collect/JobCollectContextTest.java | 87 ------------------- 2 files changed, 1 insertion(+), 100 deletions(-) diff --git a/sql/src/main/java/io/crate/operation/collect/JobCollectContext.java b/sql/src/main/java/io/crate/operation/collect/JobCollectContext.java index 4800570cee6c..6aa0babdc96c 100644 --- a/sql/src/main/java/io/crate/operation/collect/JobCollectContext.java +++ b/sql/src/main/java/io/crate/operation/collect/JobCollectContext.java @@ -58,7 +58,6 @@ public class JobCollectContext implements ExecutionSubContext, RowUpstream, Exec private final IntObjectOpenHashMap queryContexts = new IntObjectOpenHashMap<>(); private final IntObjectOpenHashMap fetchContexts = new IntObjectOpenHashMap<>(); - private final ConcurrentMap shardsSearcherMap = new ConcurrentHashMap<>(); private final AtomicInteger activeQueryContexts = new AtomicInteger(0); private final AtomicInteger activeFetchContexts = new AtomicInteger(0); private final Object subContextLock = new Object(); @@ -239,18 +238,7 @@ private void callContextCallback() { * If none is found, create new one with new acquired {@link Engine.Searcher}. */ protected EngineSearcherDelegate acquireSearcher(IndexShard indexShard) { - EngineSearcherDelegate engineSearcherDelegate; - for (;;) { - engineSearcherDelegate = shardsSearcherMap.get(indexShard.shardId()); - if (engineSearcherDelegate == null) { - engineSearcherDelegate = new EngineSearcherDelegate(acquireNewSearcher(indexShard)); - if (shardsSearcherMap.putIfAbsent(indexShard.shardId(), engineSearcherDelegate) == null) { - return engineSearcherDelegate; - } - } else { - return engineSearcherDelegate; - } - } + return new EngineSearcherDelegate(acquireNewSearcher(indexShard)); } /** diff --git a/sql/src/test/java/io/crate/operation/collect/JobCollectContextTest.java b/sql/src/test/java/io/crate/operation/collect/JobCollectContextTest.java index c0ac6b9fd5af..d15f08121076 100644 --- a/sql/src/test/java/io/crate/operation/collect/JobCollectContextTest.java +++ b/sql/src/test/java/io/crate/operation/collect/JobCollectContextTest.java @@ -45,15 +45,8 @@ import javax.annotation.Nullable; import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.List; import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -170,86 +163,6 @@ public void testGetFetchContext() throws Exception { } } - @Test - public void testSharedEngineSearcher() throws Exception { - Field engineSearcherDelegate = JobQueryShardContext.class.getDeclaredField("engineSearcherDelegate"); - engineSearcherDelegate.setAccessible(true); - Field refCount = EngineSearcherDelegate.class.getDeclaredField("refCount"); - refCount.setAccessible(true); - - JobQueryShardContext queryContext1 = new JobQueryShardContext( - indexShard, - 1, - false, - CONTEXT_FUNCTION); - jobCollectContext.addContext(1, queryContext1); - - JobQueryShardContext queryContext2 = new JobQueryShardContext( - indexShard, - 2, - false, - CONTEXT_FUNCTION); - jobCollectContext.addContext(2, queryContext2); - - assertEquals(queryContext1.engineSearcher(), queryContext1.engineSearcher()); - assertThat(((AtomicInteger) refCount.get(engineSearcherDelegate.get(queryContext1))).get(), is(2)); - - queryContext1.close(); - queryContext2.close(); - assertThat(((AtomicInteger) refCount.get(engineSearcherDelegate.get(queryContext1))).get(), is(0)); - } - - @Test - public void testSharedEngineSearcherConcurrent() throws Exception { - final Field engineSearcherDelegate = JobQueryShardContext.class.getDeclaredField("engineSearcherDelegate"); - engineSearcherDelegate.setAccessible(true); - final Field refCount = EngineSearcherDelegate.class.getDeclaredField("refCount"); - refCount.setAccessible(true); - - // open contexts concurrent (all sharing same engine searcher) - final ExecutorService executorService = Executors.newFixedThreadPool(10); - final CountDownLatch latch = new CountDownLatch(10); - List> tasks = new ArrayList<>(10); - final List queryShardContexts = new ArrayList<>(10); - for (int i = 0; i < 10; i++) { - final int jobSearchContextId = i; - tasks.add(new Callable() { - @Override - public Void call() throws Exception { - JobQueryShardContext queryContext = new JobQueryShardContext( - indexShard, - jobSearchContextId, - false, - CONTEXT_FUNCTION); - jobCollectContext.addContext(jobSearchContextId, queryContext); - queryShardContexts.add(queryContext); - latch.countDown(); - return null; - } - }); - } - executorService.invokeAll(tasks); - latch.await(); - assertThat(((AtomicInteger) refCount.get(engineSearcherDelegate.get(queryShardContexts.get(0)))).get(), is(10)); - - // close contexts concurrent ( - final CountDownLatch latch2 = new CountDownLatch(10); - List> tasks2 = new ArrayList<>(10); - for (final JobQueryShardContext queryShardContext : queryShardContexts) { - tasks2.add(new Callable() { - @Override - public Void call() throws Exception { - queryShardContext.close(); - latch2.countDown(); - return null; - } - }); - } - executorService.invokeAll(tasks2); - latch2.await(); - assertThat(((AtomicInteger) refCount.get(engineSearcherDelegate.get(queryShardContexts.get(0)))).get(), is(0)); - } - @Test public void testClose() throws Exception { Field closed = JobCollectContext.class.getDeclaredField("closed");