Skip to content

Commit

Permalink
Cluster State Update APIs (master node) to respect master_timeout better
Browse files Browse the repository at this point in the history
also respect the timeout when trying to obtain the md lock
relates #3365
  • Loading branch information
kimchy committed Jul 22, 2013
1 parent 677f126 commit 3d3cd14
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 41 deletions.
Expand Up @@ -69,6 +69,8 @@
import java.io.InputStreamReader;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -126,9 +128,12 @@ public void createIndex(final Request request, final Listener userListener) {

// we lock here, and not within the cluster service callback since we don't want to
// block the whole cluster state handling
MetaDataService.MdLock mdLock = metaDataService.indexMetaDataLock(request.index);
Semaphore mdLock = metaDataService.indexMetaDataLock(request.index);
try {
mdLock.lock();
if (!mdLock.tryAcquire(request.masterTimeout.nanos(), TimeUnit.NANOSECONDS)) {
userListener.onFailure(new ProcessClusterEventTimeoutException(request.masterTimeout, "acquire index lock"));
return;
}
} catch (InterruptedException e) {
userListener.onFailure(e);
return;
Expand Down Expand Up @@ -396,16 +401,12 @@ public void clusterStateProcessed(ClusterState clusterState) {
class CreateIndexListener implements Listener {

private final AtomicBoolean notified = new AtomicBoolean();

private final MetaDataService.MdLock mdLock;

private final Semaphore mdLock;
private final Request request;

private final Listener listener;

volatile ScheduledFuture future;

private CreateIndexListener(MetaDataService.MdLock mdLock, Request request, Listener listener) {
private CreateIndexListener(Semaphore mdLock, Request request, Listener listener) {
this.mdLock = mdLock;
this.request = request;
this.listener = listener;
Expand All @@ -414,7 +415,7 @@ private CreateIndexListener(MetaDataService.MdLock mdLock, Request request, List
@Override
public void onResponse(final Response response) {
if (notified.compareAndSet(false, true)) {
mdLock.unlock();
mdLock.release();
if (future != null) {
future.cancel(false);
}
Expand All @@ -425,7 +426,7 @@ public void onResponse(final Response response) {
@Override
public void onFailure(Throwable t) {
if (notified.compareAndSet(false, true)) {
mdLock.unlock();
mdLock.release();
if (future != null) {
future.cancel(false);
}
Expand Down
Expand Up @@ -38,6 +38,8 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -73,9 +75,12 @@ public MetaDataDeleteIndexService(Settings settings, ThreadPool threadPool, Clus
public void deleteIndex(final Request request, final Listener userListener) {
// we lock here, and not within the cluster service callback since we don't want to
// block the whole cluster state handling
MetaDataService.MdLock mdLock = metaDataService.indexMetaDataLock(request.index);
Semaphore mdLock = metaDataService.indexMetaDataLock(request.index);
try {
mdLock.lock();
if (!mdLock.tryAcquire(request.masterTimeout.nanos(), TimeUnit.NANOSECONDS)) {
userListener.onFailure(new ProcessClusterEventTimeoutException(request.masterTimeout, "acquire index lock"));
return;
}
} catch (InterruptedException e) {
userListener.onFailure(e);
return;
Expand Down Expand Up @@ -156,16 +161,12 @@ public void clusterStateProcessed(ClusterState clusterState) {
class DeleteIndexListener implements Listener {

private final AtomicBoolean notified = new AtomicBoolean();

private final MetaDataService.MdLock mdLock;

private final Semaphore mdLock;
private final Request request;

private final Listener listener;

volatile ScheduledFuture future;

private DeleteIndexListener(MetaDataService.MdLock mdLock, Request request, Listener listener) {
private DeleteIndexListener(Semaphore mdLock, Request request, Listener listener) {
this.mdLock = mdLock;
this.request = request;
this.listener = listener;
Expand All @@ -174,7 +175,7 @@ private DeleteIndexListener(MetaDataService.MdLock mdLock, Request request, List
@Override
public void onResponse(final Response response) {
if (notified.compareAndSet(false, true)) {
mdLock.unlock();
mdLock.release();
if (future != null) {
future.cancel(false);
}
Expand All @@ -185,7 +186,7 @@ public void onResponse(final Response response) {
@Override
public void onFailure(Throwable t) {
if (notified.compareAndSet(false, true)) {
mdLock.unlock();
mdLock.release();
if (future != null) {
future.cancel(false);
}
Expand Down
Expand Up @@ -24,39 +24,24 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;

import java.util.concurrent.Semaphore;

/**
*/
public class MetaDataService extends AbstractComponent {

private final MdLock[] indexMdLocks;
private final Semaphore[] indexMdLocks;

@Inject
public MetaDataService(Settings settings) {
super(settings);
indexMdLocks = new MdLock[500];
indexMdLocks = new Semaphore[1];
for (int i = 0; i < indexMdLocks.length; i++) {
indexMdLocks[i] = new MdLock();
indexMdLocks[i] = new Semaphore(1);
}
}

public MdLock indexMetaDataLock(String index) {
public Semaphore indexMetaDataLock(String index) {
return indexMdLocks[Math.abs(DjbHashFunction.DJB_HASH(index) % indexMdLocks.length)];
}

public class MdLock {

private boolean isLocked = false;

public synchronized void lock() throws InterruptedException {
while (isLocked) {
wait();
}
isLocked = true;
}

public synchronized void unlock() {
isLocked = false;
notifyAll();
}
}
}

0 comments on commit 3d3cd14

Please sign in to comment.