Skip to content

Commit

Permalink
remove Engine.Searcher sharing logic from JobCollectContext
Browse files Browse the repository at this point in the history
This was (probably) intended to optimize self-joins but for
that to work it would actually have to be done in the
JobExecutionContext because the JobCollectContext is
specific to one relation/CollectNode.
  • Loading branch information
mfussenegger committed Jun 26, 2015
1 parent 97ab212 commit 46f0d56
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public class JobCollectContext implements ExecutionSubContext, RowUpstream, Exec

private final IntObjectOpenHashMap<JobQueryShardContext> queryContexts = new IntObjectOpenHashMap<>();
private final IntObjectOpenHashMap<JobFetchShardContext> fetchContexts = new IntObjectOpenHashMap<>();
private final ConcurrentMap<ShardId, EngineSearcherDelegate> shardsSearcherMap = new ConcurrentHashMap<>();
private final AtomicInteger activeQueryContexts = new AtomicInteger(0);
private final AtomicInteger activeFetchContexts = new AtomicInteger(0);
private final Object subContextLock = new Object();
Expand Down Expand Up @@ -239,18 +238,7 @@ private void callContextCallback() {
* If none is found, create new one with new acquired {@link Engine.Searcher}.
*/
protected EngineSearcherDelegate acquireSearcher(IndexShard indexShard) {
EngineSearcherDelegate engineSearcherDelegate;
for (;;) {
engineSearcherDelegate = shardsSearcherMap.get(indexShard.shardId());
if (engineSearcherDelegate == null) {
engineSearcherDelegate = new EngineSearcherDelegate(acquireNewSearcher(indexShard));
if (shardsSearcherMap.putIfAbsent(indexShard.shardId(), engineSearcherDelegate) == null) {
return engineSearcherDelegate;
}
} else {
return engineSearcherDelegate;
}
}
return new EngineSearcherDelegate(acquireNewSearcher(indexShard));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,8 @@

import javax.annotation.Nullable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -170,86 +163,6 @@ public void testGetFetchContext() throws Exception {
}
}

@Test
public void testSharedEngineSearcher() throws Exception {
Field engineSearcherDelegate = JobQueryShardContext.class.getDeclaredField("engineSearcherDelegate");
engineSearcherDelegate.setAccessible(true);
Field refCount = EngineSearcherDelegate.class.getDeclaredField("refCount");
refCount.setAccessible(true);

JobQueryShardContext queryContext1 = new JobQueryShardContext(
indexShard,
1,
false,
CONTEXT_FUNCTION);
jobCollectContext.addContext(1, queryContext1);

JobQueryShardContext queryContext2 = new JobQueryShardContext(
indexShard,
2,
false,
CONTEXT_FUNCTION);
jobCollectContext.addContext(2, queryContext2);

assertEquals(queryContext1.engineSearcher(), queryContext1.engineSearcher());
assertThat(((AtomicInteger) refCount.get(engineSearcherDelegate.get(queryContext1))).get(), is(2));

queryContext1.close();
queryContext2.close();
assertThat(((AtomicInteger) refCount.get(engineSearcherDelegate.get(queryContext1))).get(), is(0));
}

@Test
public void testSharedEngineSearcherConcurrent() throws Exception {
final Field engineSearcherDelegate = JobQueryShardContext.class.getDeclaredField("engineSearcherDelegate");
engineSearcherDelegate.setAccessible(true);
final Field refCount = EngineSearcherDelegate.class.getDeclaredField("refCount");
refCount.setAccessible(true);

// open contexts concurrent (all sharing same engine searcher)
final ExecutorService executorService = Executors.newFixedThreadPool(10);
final CountDownLatch latch = new CountDownLatch(10);
List<Callable<Void>> tasks = new ArrayList<>(10);
final List<JobQueryShardContext> queryShardContexts = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
final int jobSearchContextId = i;
tasks.add(new Callable<Void>() {
@Override
public Void call() throws Exception {
JobQueryShardContext queryContext = new JobQueryShardContext(
indexShard,
jobSearchContextId,
false,
CONTEXT_FUNCTION);
jobCollectContext.addContext(jobSearchContextId, queryContext);
queryShardContexts.add(queryContext);
latch.countDown();
return null;
}
});
}
executorService.invokeAll(tasks);
latch.await();
assertThat(((AtomicInteger) refCount.get(engineSearcherDelegate.get(queryShardContexts.get(0)))).get(), is(10));

// close contexts concurrent (
final CountDownLatch latch2 = new CountDownLatch(10);
List<Callable<Void>> tasks2 = new ArrayList<>(10);
for (final JobQueryShardContext queryShardContext : queryShardContexts) {
tasks2.add(new Callable<Void>() {
@Override
public Void call() throws Exception {
queryShardContext.close();
latch2.countDown();
return null;
}
});
}
executorService.invokeAll(tasks2);
latch2.await();
assertThat(((AtomicInteger) refCount.get(engineSearcherDelegate.get(queryShardContexts.get(0)))).get(), is(0));
}

@Test
public void testClose() throws Exception {
Field closed = JobCollectContext.class.getDeclaredField("closed");
Expand Down

0 comments on commit 46f0d56

Please sign in to comment.