Skip to content

Commit

Permalink
Merge pull request #4791 from apache/nouveau-indexmanager-improvements
Browse files Browse the repository at this point in the history
Nouveau indexmanager improvements
  • Loading branch information
rnewson committed Oct 4, 2023
2 parents aaf9005 + 891bd02 commit 170c58d
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@

package org.apache.couchdb.nouveau;

import com.github.benmanes.caffeine.cache.Scheduler;
import io.dropwizard.core.Application;
import io.dropwizard.core.setup.Environment;
import io.swagger.v3.jaxrs2.integration.resources.OpenApiResource;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.couchdb.nouveau.core.IndexManager;
import org.apache.couchdb.nouveau.health.AnalyzeHealthCheck;
import org.apache.couchdb.nouveau.health.IndexHealthCheck;
Expand All @@ -25,7 +27,6 @@
import org.apache.couchdb.nouveau.resources.AnalyzeResource;
import org.apache.couchdb.nouveau.resources.IndexResource;
import org.apache.couchdb.nouveau.tasks.CloseAllIndexesTask;
import org.apache.lucene.search.SearcherFactory;

public class NouveauApplication extends Application<NouveauApplicationConfiguration> {

Expand All @@ -40,17 +41,20 @@ public String getName() {

@Override
public void run(NouveauApplicationConfiguration configuration, Environment environment) throws Exception {

// configure index manager
final IndexManager indexManager = new IndexManager();
indexManager.setCommitIntervalSeconds(configuration.getCommitIntervalSeconds());
indexManager.setIdleSeconds(configuration.getIdleSeconds());
indexManager.setMaxIndexesOpen(configuration.getMaxIndexesOpen());
indexManager.setMetricRegistry(environment.metrics());
indexManager.setScheduler(environment
final ScheduledExecutorService schedulerExecutorService = environment
.lifecycle()
.scheduledExecutorService("index-manager-%d")
.threads(5)
.build());
.threads(configuration.getSchedulerThreadCount())
.build();
indexManager.setScheduler(Scheduler.forScheduledExecutorService(schedulerExecutorService));
indexManager.setSearcherFactory(new ParallelSearcherFactory(ForkJoinPool.commonPool()));
indexManager.setObjectMapper(environment.getObjectMapper());
indexManager.setRootDir(configuration.getRootDir());
environment.lifecycle().manage(indexManager);
Expand All @@ -63,8 +67,7 @@ public void run(NouveauApplicationConfiguration configuration, Environment envir
environment.jersey().register(analyzeResource);

// IndexResource
final SearcherFactory searcherFactory = new ParallelSearcherFactory(ForkJoinPool.commonPool());
final IndexResource indexResource = new IndexResource(indexManager, searcherFactory);
final IndexResource indexResource = new IndexResource(indexManager);
environment.jersey().register(indexResource);

// Health checks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ public class NouveauApplicationConfiguration extends Configuration {
@NotNull
private Path rootDir = null;

@Min(2)
private int schedulerThreadCount = 5;

@JsonProperty
public void setMaxIndexesOpen(int maxIndexesOpen) {
this.maxIndexesOpen = maxIndexesOpen;
Expand Down Expand Up @@ -68,4 +71,12 @@ public void setRootDir(Path rootDir) {
public Path getRootDir() {
return rootDir;
}

public int getSchedulerThreadCount() {
return schedulerThreadCount;
}

public void setSchedulerThreadCount(int schedulerThreadCount) {
this.schedulerThreadCount = schedulerThreadCount;
}
}
68 changes: 14 additions & 54 deletions nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.couchdb.nouveau.api.DocumentDeleteRequest;
import org.apache.couchdb.nouveau.api.DocumentUpdateRequest;
import org.apache.couchdb.nouveau.api.IndexInfo;
Expand All @@ -40,8 +39,6 @@ public abstract class Index implements Closeable {
private long updateSeq;
private long purgeSeq;
private boolean deleteOnClose = false;
private long lastCommit = now();
private volatile boolean closed;
private final Semaphore permits = new Semaphore(Integer.MAX_VALUE);

protected Index(final long updateSeq, final long purgeSeq) {
Expand All @@ -50,25 +47,7 @@ protected Index(final long updateSeq, final long purgeSeq) {
}

public final boolean tryAcquire() {
if (permits.tryAcquire() == false) {
return false;
}
if (closed) {
permits.release();
return false;
}
return true;
}

public final boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
if (permits.tryAcquire(timeout, unit) == false) {
return false;
}
if (closed) {
permits.release();
return false;
}
return true;
return permits.tryAcquire();
}

public final void release() {
Expand Down Expand Up @@ -117,17 +96,13 @@ public final boolean commit() throws IOException {
final long updateSeq;
final long purgeSeq;
synchronized (this) {
if (deleteOnClose) {
return false;
}
updateSeq = this.updateSeq;
purgeSeq = this.purgeSeq;
}
final boolean result = doCommit(updateSeq, purgeSeq);
if (result) {
final long now = now();
synchronized (this) {
this.lastCommit = now;
}
}
return result;
return doCommit(updateSeq, purgeSeq);
}

protected abstract boolean doCommit(final long updateSeq, final long purgeSeq) throws IOException;
Expand All @@ -154,28 +129,24 @@ public final synchronized void setPurgeSeq(final long matchSeq, final long purge

@Override
public final void close() throws IOException {
synchronized (this) {
closed = true;
}
// Ensures exclusive access to the index before closing.
permits.acquireUninterruptibly(Integer.MAX_VALUE);
try {
doClose();
} finally {
permits.release(Integer.MAX_VALUE);
}
doClose();
// Never release permits.
}

protected abstract void doClose() throws IOException;

public boolean isDeleteOnClose() {
public synchronized boolean isDeleteOnClose() {
return deleteOnClose;
}

public void setDeleteOnClose(final boolean deleteOnClose) {
synchronized (this) {
this.deleteOnClose = deleteOnClose;
}
public synchronized void setDeleteOnClose(final boolean deleteOnClose) {
this.deleteOnClose = deleteOnClose;
}

public final boolean isActive() {
return permits.availablePermits() < Integer.MAX_VALUE || permits.hasQueuedThreads();
}

protected final void assertUpdateSeqProgress(final long matchSeq, final long updateSeq)
Expand Down Expand Up @@ -211,15 +182,4 @@ protected final void incrementPurgeSeq(final long matchSeq, final long purgeSeq)
assertPurgeSeqProgress(matchSeq, purgeSeq);
this.purgeSeq = purgeSeq;
}

public boolean needsCommit(final long duration, final TimeUnit unit) {
final long commitNeededSince = now() - unit.toNanos(duration);
synchronized (this) {
return this.lastCommit < commitNeededSince;
}
}

private long now() {
return System.nanoTime();
}
}

This file was deleted.

0 comments on commit 170c58d

Please sign in to comment.