Skip to content

Commit

Permalink
Fixed a few race conditions and better vertx close process, also disa…
Browse files Browse the repository at this point in the history
…bled redeploy
  • Loading branch information
purplefox committed Apr 16, 2015
1 parent 0e8d370 commit 64852df
Show file tree
Hide file tree
Showing 14 changed files with 179 additions and 204 deletions.
1 change: 1 addition & 0 deletions pom.xml
Expand Up @@ -161,6 +161,7 @@
<failIfNoSpecifiedTests>false</failIfNoSpecifiedTests>
<excludes>
<exclude>**/RedeploySourceVerticle.java</exclude>
<exclude>**/RedeploymentTest.java</exclude>
</excludes>
<additionalClasspathElements>
<additionalClasspathElement>${project.build.testSourceDirectory}</additionalClasspathElement>
Expand Down
9 changes: 3 additions & 6 deletions src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java
Expand Up @@ -714,12 +714,9 @@ private <T> void receiveMessage(MessageImpl msg, long timeoutID, Handler<AsyncRe
}

private <T> void sendNoHandlersFailure(String address, Handler<AsyncResult<Message<T>>> handler) {
vertx.runOnContext(new Handler<Void>() {
@Override
public void handle(Void v) {
metrics.replyFailure(address, ReplyFailure.NO_HANDLERS);
handler.handle(Future.failedFuture(new ReplyException(ReplyFailure.NO_HANDLERS)));
}
vertx.runOnContext(v -> {
metrics.replyFailure(address, ReplyFailure.NO_HANDLERS);
handler.handle(Future.failedFuture(new ReplyException(ReplyFailure.NO_HANDLERS)));
});
}

Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Expand Up @@ -913,4 +913,5 @@ protected void finalize() throws Throwable {
close();
super.finalize();
}

}
44 changes: 9 additions & 35 deletions src/main/java/io/vertx/core/http/impl/HttpServerImpl.java
Expand Up @@ -17,30 +17,14 @@
package io.vertx.core.http.impl;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
Expand All @@ -54,12 +38,7 @@
import io.vertx.core.AsyncResultHandler;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerRequestStream;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.http.ServerWebSocketStream;
import io.vertx.core.http.*;
import io.vertx.core.http.impl.cgbystrom.FlashPolicyHandler;
import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
Expand All @@ -68,16 +47,9 @@
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.impl.LoggerFactory;
import io.vertx.core.spi.metrics.Metrics;
import io.vertx.core.net.impl.*;
import io.vertx.core.spi.metrics.HttpServerMetrics;
import io.vertx.core.net.impl.HandlerHolder;
import io.vertx.core.net.impl.HandlerManager;
import io.vertx.core.net.impl.KeyStoreHelper;
import io.vertx.core.net.impl.PartialPooledByteBufAllocator;
import io.vertx.core.net.impl.SSLHelper;
import io.vertx.core.net.impl.ServerID;
import io.vertx.core.net.impl.SocketAddressImpl;
import io.vertx.core.net.impl.VertxEventLoopGroup;
import io.vertx.core.spi.metrics.Metrics;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.streams.ReadStream;

Expand All @@ -89,7 +61,7 @@
import java.util.concurrent.ConcurrentHashMap;

import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.*;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

/**
* This class is thread-safe
Expand Down Expand Up @@ -434,7 +406,9 @@ private void actualClose(final ContextImpl closeContext, final Handler<AsyncResu
throw new IllegalStateException("Context was changed");
}

metrics.close();
if (metrics != null) {
metrics.close();
}

ChannelGroupFuture fut = serverChannelGroup.close();
fut.addListener(cgf -> executeCloseDone(closeContext, done, fut.cause()));
Expand Down
54 changes: 34 additions & 20 deletions src/main/java/io/vertx/core/impl/ContextImpl.java
Expand Up @@ -53,6 +53,7 @@ public abstract class ContextImpl implements Context {
protected final Executor orderedInternalPoolExec;
protected final Executor workerExec;
protected VertxThread contextThread;
private volatile boolean closeHooksRun;

protected ContextImpl(VertxInternal vertx, Executor orderedInternalPoolExec, Executor workerExec, String deploymentID, JsonObject config,
ClassLoader tccl) {
Expand Down Expand Up @@ -94,7 +95,8 @@ public Deployment getDeployment() {

public void addCloseHook(Closeable hook) {
if (closeHooks == null) {
closeHooks = new HashSet<>();
// Has to be concurrent as can be removed from non context thread
closeHooks = new ConcurrentHashSet<>();
}
closeHooks.add(hook);
}
Expand All @@ -105,29 +107,41 @@ public void removeCloseHook(Closeable hook) {
}
}


public void runCloseHooks(Handler<AsyncResult<Void>> completionHandler) {
if (closeHooksRun) {
// Sanity check
throw new IllegalStateException("Close hooks already run");
}
closeHooksRun = true;
if (closeHooks != null && !closeHooks.isEmpty()) {
final int num = closeHooks.size();
AtomicInteger count = new AtomicInteger();
AtomicBoolean failed = new AtomicBoolean();
// Copy to avoid ConcurrentModificationException
for (Closeable hook: new HashSet<>(closeHooks)) {
try {
hook.close(ar -> {
if (ar.failed()) {
if (failed.compareAndSet(false, true)) {
// Only report one failure
completionHandler.handle(Future.failedFuture(ar.cause()));
// Must copy before looping as can be removed during loop otherwise
Set<Closeable> copy = new HashSet<>(closeHooks);
int num = copy.size();
if (num != 0) {
AtomicInteger count = new AtomicInteger();
AtomicBoolean failed = new AtomicBoolean();
for (Closeable hook: copy) {
try {
hook.close(ar -> {
if (ar.failed()) {
if (failed.compareAndSet(false, true)) {
// Only report one failure
completionHandler.handle(Future.failedFuture(ar.cause()));
}
} else {
if (count.incrementAndGet() == num) {
// closeHooksRun = true;
completionHandler.handle(Future.succeededFuture());
}
}
} else {
if (count.incrementAndGet() == num) {
completionHandler.handle(Future.succeededFuture());
}
}
});
} catch (Throwable t) {
log.warn("Failed to run close hooks", t);
});
} catch (Throwable t) {
log.warn("Failed to run close hooks", t);
}
}
} else {
completionHandler.handle(Future.succeededFuture());
}
} else {
completionHandler.handle(Future.succeededFuture());
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/io/vertx/core/impl/DeploymentManager.java
Expand Up @@ -527,6 +527,7 @@ public void doUndeploy(ContextImpl undeployingContext, Handler<AsyncResult<Void>
deployments.remove(deploymentID);
vertx.metricsSPI().verticleUndeployed(verticleHolder.verticle);
context.runCloseHooks(ar2 -> {

if (ar2.failed()) {
// Log error but we report success anyway
log.error("Failed to run close hook", ar2.cause());
Expand Down Expand Up @@ -585,9 +586,10 @@ public String deploymentID() {

// This is run on the context of the actual verticle, not the context that did the deploy
private void startRedeployTimer() {
if (redeployer != null) {
doStartRedeployTimer();
}
// Redeployment is disabled.
// if (redeployer != null) {
// doStartRedeployTimer();
// }
}

private void doStartRedeployTimer() {
Expand Down
15 changes: 9 additions & 6 deletions src/main/java/io/vertx/core/impl/FutureImpl.java
Expand Up @@ -104,9 +104,7 @@ public void setHandler(Handler<AsyncResult<T>> handler) {
* Set the result. Any handler will be called, if there is one
*/
public void complete(T result) {
if (this.succeeded || this.failed) {
throw new IllegalStateException("Result has already been set");
}
checkComplete();
this.result = result;
succeeded = true;
checkCallHandler();
Expand All @@ -121,9 +119,7 @@ public void complete() {
* Set the failure. Any handler will be called, if there is one
*/
public void fail(Throwable throwable) {
if (this.succeeded || this.failed) {
throw new IllegalStateException("Failure has already been set");
}
checkComplete();
this.throwable = throwable;
failed = true;
checkCallHandler();
Expand All @@ -139,4 +135,11 @@ private void checkCallHandler() {
handler.handle(this);
}
}

private void checkComplete() {
if (succeeded || failed) {
throw new IllegalStateException("Result is already complete: " + (succeeded ? "succeeded" : "failed"));
}
}

}
27 changes: 12 additions & 15 deletions src/main/java/io/vertx/core/impl/HAManager.java
Expand Up @@ -224,18 +224,15 @@ public void failDuringFailover(boolean fail) {

private void doDeployVerticle(final String verticleName, DeploymentOptions deploymentOptions,
final Handler<AsyncResult<String>> doneHandler) {
final Handler<AsyncResult<String>> wrappedHandler = new Handler<AsyncResult<String>>() {
@Override
public void handle(AsyncResult<String> asyncResult) {
if (asyncResult.succeeded()) {
// Tell the other nodes of the cluster about the verticle for HA purposes
addToHA(asyncResult.result(), verticleName, deploymentOptions);
}
if (doneHandler != null) {
doneHandler.handle(asyncResult);
} else if (asyncResult.failed()) {
log.error("Failed to deploy verticle", asyncResult.cause());
}
final Handler<AsyncResult<String>> wrappedHandler = asyncResult -> {
if (asyncResult.succeeded()) {
// Tell the other nodes of the cluster about the verticle for HA purposes
addToHA(asyncResult.result(), verticleName, deploymentOptions);
}
if (doneHandler != null) {
doneHandler.handle(asyncResult);
} else if (asyncResult.failed()) {
log.error("Failed to deploy verticle", asyncResult.cause());
}
};
deploymentManager.deployVerticle(verticleName, deploymentOptions, wrappedHandler);
Expand All @@ -257,8 +254,8 @@ private synchronized void nodeLeft(String leftNodeID) {
if (attainedQuorum) {

// Check for failover

String sclusterInfo = clusterMap.get(leftNodeID);

if (sclusterInfo == null) {
// Clean close - do nothing
} else {
Expand Down Expand Up @@ -429,7 +426,7 @@ private void checkFailover(String failedNodeID, JsonObject theHAInfo) {
String chosen = chooseHashedNode(group, failedNodeID.hashCode());
if (chosen != null && chosen.equals(this.nodeID)) {
if (deployments != null) {
log.info("Node " + failedNodeID + " has failed. This node will deploy " + deployments.size() + " deploymentIDs from that node.");
log.info("node" + nodeID + " says: Node " + failedNodeID + " has failed. This node will deploy " + deployments.size() + " deploymentIDs from that node.");
for (Object obj: deployments) {
JsonObject app = (JsonObject)obj;
processFailover(app);
Expand All @@ -454,7 +451,7 @@ private void callFailoverCompleteHandler(boolean result) {
latch.countDown();
});
try {
latch.await(10, TimeUnit.SECONDS);
latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException ignore) {
}
}
Expand Down

0 comments on commit 64852df

Please sign in to comment.