Skip to content
Browse files

BOOKKEEPER-55: SubscribeReconnectRetryTask might retry subscription e…

…ndlessly when another subscription is already successfully created previously (sijie via ivank)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/bookkeeper/trunk@1428059 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent 7b2b915 commit a27b64e340a6ad12f1e9588d1d55fa51f0b2d1f0 @ivankelly ivankelly committed Jan 2, 2013
Showing with 324 additions and 99 deletions.
  1. +2 −0 CHANGES.txt
  2. +14 −1 hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java
  3. +34 −0 hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ResubscribeException.java
  4. +44 −0 hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java
  5. +34 −22 ...ig-client/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java
  6. +6 −1 hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java
  7. +7 −0 hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java
  8. +20 −0 ...-client/src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java
  9. +22 −3 ...rc/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java
  10. +11 −4 hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java
  11. +29 −35 ...ient/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
  12. +8 −0 hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java
  13. +29 −25 hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
  14. +1 −0 hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
  15. +60 −0 hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
  16. +3 −8 hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java
View
2 CHANGES.txt
@@ -183,6 +183,8 @@ Trunk (unreleased changes)
BOOKKEEPER-470: Possible infinite loop in simple.SubscribeReconnectCallback (sijie via ivank)
+ BOOKKEEPER-55: SubscribeReconnectRetryTask might retry subscription endlessly when another subscription is already successfully created previously (sijie via ivank)
+
hedwig-server:
BOOKKEEPER-302: No more messages delivered when hub server scans messages over two ledgers. (sijie via ivank)
View
15 hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java
@@ -20,6 +20,7 @@
import java.util.List;
import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
@@ -82,6 +83,8 @@
// For synchronous calls, this variable is used to know when the background
// async process for it has completed, set in the VoidCallback.
public boolean isDone = false;
+ // Record the original channel for a resubscribe request
+ private HChannel origChannel = null;
// Constructor for all types of PubSub request data to send to the server
public PubSubData(final ByteString topic, final Message msg, final ByteString subscriberId,
@@ -106,10 +109,20 @@ public void setCallback(Callback<PubSubProtocol.ResponseBody> callback) {
}
public void operationFinishedToCallback(Object context, PubSubProtocol.ResponseBody response){
-
callback.operationFinished(context, response);
}
+ public boolean isResubscribeRequest() {
+ return null != origChannel;
+ }
+
+ public HChannel getOriginalChannelForResubscribe() {
+ return origChannel;
+ }
+
+ public void setOriginalChannelForResubscribe(HChannel channel) {
+ this.origChannel = channel;
+ }
// Clear all of the stored servers we've contacted or attempted to in this
// request.
View
34 hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ResubscribeException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.exceptions;
+
+/**
+ * This is a Hedwig client side exception when the client failed to resubscribe
+ * when topic moved or subscription is closed.
+ */
+public class ResubscribeException extends Exception {
+
+ public ResubscribeException(String message) {
+ super(message);
+ }
+
+ public ResubscribeException(String message, Throwable t) {
+ super(message, t);
+ }
+
+}
View
44 hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java
@@ -76,6 +76,50 @@ public HChannel addChannel(T key, HChannel channel) {
}
/**
+ * Replace channel only if currently mapped to the given <code>oldChannel</code>.
+ *
+ * @param key
+ * Key
+ * @param oldChannel
+ * Old Channel
+ * @param newChannel
+ * New Channel
+ * @return true if replaced successfully, otherwise false.
+ */
+ public boolean replaceChannel(T key, HChannel oldChannel, HChannel newChannel) {
+ this.closedLock.readLock().lock();
+ try {
+ if (closed) {
+ if (null != oldChannel) oldChannel.close();
+ if (null != newChannel) newChannel.close();
+ return false;
+ }
+ if (null == oldChannel) {
+ HChannel existedChannel = channels.putIfAbsent(key, newChannel);
+ if (null != existedChannel) {
+ logger.info("Channel for {} already exists, so no need to replace it.", key);
+ newChannel.close();
+ return false;
+ } else {
+ logger.debug("Storing a new channel for {}.", key);
+ return true;
+ }
+ } else {
+ if (channels.replace(key, oldChannel, newChannel)) {
+ logger.debug("Replacd channel {} for {}.", oldChannel, key);
+ oldChannel.close();
+ return true;
+ } else {
+ newChannel.close();
+ return false;
+ }
+ }
+ } finally {
+ this.closedLock.readLock().unlock();
+ }
+ }
+
+ /**
* Returns the channel bound with <code>key</code>.
*
* @param key Key
View
56 ...t/src/main/java/org/apache/hedwig/client/netty/impl/AbstractSubscribeResponseHandler.java
@@ -62,6 +62,7 @@
import org.apache.hedwig.protoextensions.MessageIdUtils;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.Either;
import org.apache.hedwig.util.SubscriptionListener;
import static org.apache.hedwig.util.VarArgs.va;
@@ -98,8 +99,8 @@ protected ActiveSubscriber getActiveSubscriber(TopicSubscriber ts) {
protected ActiveSubscriber createActiveSubscriber(
ClientConfiguration cfg, AbstractHChannelManager channelManager,
TopicSubscriber ts, PubSubData op, SubscriptionPreferences preferences,
- Channel channel) {
- return new ActiveSubscriber(cfg, channelManager, ts, op, preferences, channel);
+ Channel channel, HChannel hChannel) {
+ return new ActiveSubscriber(cfg, channelManager, ts, op, preferences, channel, hChannel);
}
@Override
@@ -129,33 +130,30 @@ public void handleResponse(PubSubResponse response, PubSubData pubSubData,
}
}
- ActiveSubscriber ss = createActiveSubscriber(cfg, aChannelManager, ts,
- pubSubData, preferences, channel);
- boolean success = false;
+ Either<StatusCode, HChannel> result;
+ StatusCode statusCode;
+ ActiveSubscriber ss = null;
// Store the Subscribe state
disconnectLock.readLock().lock();
try {
- ActiveSubscriber oldSS = subscriptions.putIfAbsent(ts, ss);
- if (null != oldSS) {
- logger.warn("Subscribe {} has existed in channel {}.",
- va(ts, channel));
- success = false;
- } else {
- logger.debug("Succeed to add subscription {} in channel {}.",
- va(ts, channel));
- success = true;
+ result = handleSuccessResponse(ts, pubSubData, channel);
+ statusCode = result.left();
+ if (StatusCode.SUCCESS == statusCode) {
+ ss = createActiveSubscriber(
+ cfg, aChannelManager, ts, pubSubData, preferences, channel, result.right());
+ statusCode = addSubscription(ts, ss);
}
} finally {
disconnectLock.readLock().unlock();
}
- if (success) {
- handleSuccessResponse(ts, ss, channel);
+ if (StatusCode.SUCCESS == statusCode) {
+ postHandleSuccessResponse(ts, ss);
// Response was success so invoke the callback's operationFinished
// method.
pubSubData.getCallback().operationFinished(pubSubData.context, null);
} else {
- ClientAlreadySubscribedException exception =
- new ClientAlreadySubscribedException("Client is already subscribed for " + ts);
+ PubSubException exception = PubSubException.create(statusCode,
+ "Client is already subscribed for " + ts);
pubSubData.getCallback().operationFailed(pubSubData.context, exception);
}
break;
@@ -195,13 +193,27 @@ public void handleResponse(PubSubResponse response, PubSubData pubSubData,
*
* @param ts
* Topic Subscriber.
- * @param ss
- * Active Subscriber Object handle subscription actions for the subscriber.
+ * @param pubSubData
+ * Pub/Sub Request data for this subscribe request.
* @param channel
* Subscription Channel.
+ * @return status code to indicate what happened
*/
- protected abstract void handleSuccessResponse(TopicSubscriber ts, ActiveSubscriber as,
- Channel channel);
+ protected abstract Either<StatusCode, HChannel> handleSuccessResponse(
+ TopicSubscriber ts, PubSubData pubSubData, Channel channel);
+
+ protected void postHandleSuccessResponse(TopicSubscriber ts, ActiveSubscriber ss) {
+ // do nothing now
+ }
+
+ private StatusCode addSubscription(TopicSubscriber ts, ActiveSubscriber ss) {
+ ActiveSubscriber oldSS = subscriptions.putIfAbsent(ts, ss);
+ if (null != oldSS) {
+ return StatusCode.CLIENT_ALREADY_SUBSCRIBED;
+ } else {
+ return StatusCode.SUCCESS;
+ }
+ }
@Override
public void handleSubscribeMessage(PubSubResponse response) {
View
7 hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ActiveSubscriber.java
@@ -34,6 +34,7 @@
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
+import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.client.netty.NetUtils;
import org.apache.hedwig.client.netty.FilterableMessageHandler;
import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
@@ -64,6 +65,7 @@
// the underlying netty channel to send request
protected final Channel channel;
+ protected final HChannel hChannel;
// Counter for the number of consumed messages so far to buffer up before we
// send the Consume message back to the server along with the last/largest
@@ -99,13 +101,15 @@ public ActiveSubscriber(ClientConfiguration cfg,
AbstractHChannelManager channelManager,
TopicSubscriber ts, PubSubData op,
SubscriptionPreferences preferences,
- Channel channel) {
+ Channel channel,
+ HChannel hChannel) {
this.cfg = cfg;
this.channelManager = channelManager;
this.topicSubscriber = ts;
this.op = op;
this.preferences = preferences;
this.channel = channel;
+ this.hChannel = hChannel;
}
/**
@@ -368,6 +372,7 @@ public void resubscribeIfNecessary(SubscriptionEvent event) {
channelManager, retryWaitTime);
op.setCallback(resubscribeCb);
op.context = null;
+ op.setOriginalChannelForResubscribe(hChannel);
if (logger.isDebugEnabled()) {
logger.debug("Resubscribe {} with origSubData {}",
va(topicSubscriber, op));
View
7 hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/ResubscribeCallback.java
@@ -25,6 +25,7 @@
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ResubscribeException;
import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
import org.apache.hedwig.util.Callback;
import static org.apache.hedwig.util.VarArgs.va;
@@ -77,6 +78,12 @@ public void operationFinished(Object ctx, ResponseBody resultOfOperation) {
@Override
public void operationFailed(Object ctx, PubSubException exception) {
+ if (exception instanceof ResubscribeException) {
+ // it might be caused by closesub when resubscribing.
+ // so we don't need to retry resubscribe again
+ logger.warn("Failed to resubscribe {} : but it is caused by closesub when resubscribing. "
+ + "so we don't need to retry subscribe again.", origSubData);
+ }
// If the resubscribe fails, just keep retrying the subscribe
// request. There isn't a way to flag to the application layer that
// a topic subscription has failed. So instead, we'll just keep
View
20 ...src/main/java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexHChannelManager.java
@@ -48,6 +48,7 @@
import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.Either;
import static org.apache.hedwig.util.VarArgs.va;
@@ -61,6 +62,9 @@
// Find which HChannel that a given TopicSubscriber used.
protected final CleanupChannelMap<InetSocketAddress> subscriptionChannels;
+ // A index map for each topic subscriber is served by which subscription channel
+ protected final CleanupChannelMap<TopicSubscriber> sub2Channels;
+
// Concurrent Map to store Message handler for each topic + sub id combination.
// Store it here instead of in SubscriberResponseHandler as we don't want to lose the handler
// user set when connection is recovered
@@ -74,6 +78,7 @@ public MultiplexHChannelManager(ClientConfiguration cfg,
ChannelFactory socketFactory) {
super(cfg, socketFactory);
subscriptionChannels = new CleanupChannelMap<InetSocketAddress>();
+ sub2Channels = new CleanupChannelMap<TopicSubscriber>();
subscriptionChannelPipelineFactory =
new MultiplexSubscriptionChannelPipelineFactory(cfg, this);
}
@@ -296,4 +301,19 @@ protected void closeSubscriptionChannels() {
subscriptionChannels.close();
}
+ protected Either<Boolean, HChannel> storeSubscriptionChannel(
+ TopicSubscriber topicSubscriber, PubSubData txn, HChannel channel) {
+ boolean replaced = sub2Channels.replaceChannel(
+ topicSubscriber, txn.getOriginalChannelForResubscribe(), channel);
+ if (replaced) {
+ return Either.of(replaced, channel);
+ } else {
+ return Either.of(replaced, null);
+ }
+ }
+
+ protected boolean removeSubscriptionChannel(
+ TopicSubscriber topicSubscriber, HChannel channel) {
+ return sub2Channels.removeChannel(topicSubscriber, channel);
+ }
}
View
25 ...java/org/apache/hedwig/client/netty/impl/multiplex/MultiplexSubscribeResponseHandler.java
@@ -36,7 +36,9 @@
import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.Either;
import static org.apache.hedwig.util.VarArgs.va;
public class MultiplexSubscribeResponseHandler extends AbstractSubscribeResponseHandler {
@@ -72,9 +74,25 @@ public void handleResponse(PubSubResponse response, PubSubData pubSubData,
}
@Override
- protected void handleSuccessResponse(TopicSubscriber ts, ActiveSubscriber as,
- Channel channel) {
- // do nothing now
+ protected Either<StatusCode, HChannel> handleSuccessResponse(
+ TopicSubscriber ts, PubSubData pubSubData, Channel channel) {
+ // Store the mapping for the TopicSubscriber to the Channel.
+ // This is so we can control the starting and stopping of
+ // message deliveries from the server on that Channel. Store
+ // this only on a successful ack response from the server.
+ Either<Boolean, HChannel> result =
+ sChannelManager.storeSubscriptionChannel(ts, pubSubData, hChannel);
+ if (result.left()) {
+ return Either.of(StatusCode.SUCCESS, result.right());
+ } else {
+ StatusCode code;
+ if (pubSubData.isResubscribeRequest()) {
+ code = StatusCode.RESUBSCRIBE_EXCEPTION;
+ } else {
+ code = StatusCode.CLIENT_ALREADY_SUBSCRIBED;
+ }
+ return Either.of(code, null);
+ }
}
@Override
@@ -92,6 +110,7 @@ public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
@Override
public void operationFinished(Object ctx, ResponseBody respBody) {
removeSubscription(topicSubscriber, ss);
+ sChannelManager.removeSubscriptionChannel(topicSubscriber, hChannel);
callback.operationFinished(context, null);
}
View
15 ...lient/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleHChannelManager.java
@@ -31,10 +31,10 @@
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
-
import org.apache.hedwig.client.netty.CleanupChannelMap;
import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.client.netty.NetUtils;
@@ -48,6 +48,7 @@
import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.Either;
import static org.apache.hedwig.util.VarArgs.va;
/**
@@ -103,12 +104,18 @@ protected HChannel createAndStoreSubscriptionChannel(InetSocketAddress host) {
getSubscriptionChannelPipelineFactory());
}
- protected HChannel storeSubscriptionChannel(TopicSubscriber topicSubscriber,
- Channel channel) {
+ protected Either<Boolean, HChannel> storeSubscriptionChannel(
+ TopicSubscriber topicSubscriber, PubSubData txn, Channel channel) {
InetSocketAddress host = NetUtils.getHostFromChannel(channel);
HChannel newHChannel = new HChannelImpl(host, channel, this,
getSubscriptionChannelPipelineFactory());
- return topicSubscriber2Channel.addChannel(topicSubscriber, newHChannel);
+ boolean replaced = topicSubscriber2Channel.replaceChannel(
+ topicSubscriber, txn.getOriginalChannelForResubscribe(), newHChannel);
+ if (replaced) {
+ return Either.of(replaced, newHChannel);
+ } else {
+ return Either.of(replaced, null);
+ }
}
@Override
View
64 .../main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscribeResponseHandler.java
@@ -36,6 +36,7 @@
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
+import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.client.netty.HChannelManager;
import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
import org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler;
@@ -51,6 +52,7 @@
import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
import org.apache.hedwig.protoextensions.MessageIdUtils;
import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.Either;
public class SimpleSubscribeResponseHandler extends AbstractSubscribeResponseHandler {
@@ -73,8 +75,9 @@ public SimpleActiveSubscriber(ClientConfiguration cfg,
AbstractHChannelManager channelManager,
TopicSubscriber ts, PubSubData op,
SubscriptionPreferences preferences,
- Channel channel) {
- super(cfg, channelManager, ts, op, preferences, channel);
+ Channel channel,
+ HChannel hChannel) {
+ super(cfg, channelManager, ts, op, preferences, channel, hChannel);
outstandingMsgSet = Collections.newSetFromMap(
new ConcurrentHashMap<Message, Boolean>(
cfg.getMaximumOutstandingMessages(), 1.0f));
@@ -175,8 +178,8 @@ protected SimpleSubscribeResponseHandler(ClientConfiguration cfg,
protected ActiveSubscriber createActiveSubscriber(
ClientConfiguration cfg, AbstractHChannelManager channelManager,
TopicSubscriber ts, PubSubData op, SubscriptionPreferences preferences,
- Channel channel) {
- return new SimpleActiveSubscriber(cfg, channelManager, ts, op, preferences, channel);
+ Channel channel, HChannel hChannel) {
+ return new SimpleActiveSubscriber(cfg, channelManager, ts, op, preferences, channel, hChannel);
}
@Override
@@ -234,17 +237,32 @@ public void handleSubscribeMessage(PubSubResponse response) {
}
@Override
- protected void handleSuccessResponse(TopicSubscriber ts, ActiveSubscriber as,
- Channel channel) {
- synchronized (this) {
- origTopicSubscriber = ts;
- origActiveSubscriber = as;
- }
+ protected Either<StatusCode, HChannel> handleSuccessResponse(
+ TopicSubscriber ts, PubSubData pubSubData, Channel channel) {
// Store the mapping for the TopicSubscriber to the Channel.
// This is so we can control the starting and stopping of
// message deliveries from the server on that Channel. Store
// this only on a successful ack response from the server.
- sChannelManager.storeSubscriptionChannel(ts, channel);
+ Either<Boolean, HChannel> result =
+ sChannelManager.storeSubscriptionChannel(ts, pubSubData, channel);
+ if (result.left()) {
+ return Either.of(StatusCode.SUCCESS, result.right());
+ } else {
+ StatusCode code;
+ if (pubSubData.isResubscribeRequest()) {
+ code = StatusCode.RESUBSCRIBE_EXCEPTION;
+ } else {
+ code = StatusCode.CLIENT_ALREADY_SUBSCRIBED;
+ }
+ return Either.of(code, null);
+ }
+ }
+
+ @Override
+ protected synchronized void postHandleSuccessResponse(
+ TopicSubscriber ts, ActiveSubscriber as) {
+ origTopicSubscriber = ts;
+ origActiveSubscriber = as;
}
@Override
@@ -256,28 +274,4 @@ public void asyncCloseSubscription(final TopicSubscriber topicSubscriber,
callback.operationFinished(context, (ResponseBody)null);
}
- @Override
- protected void resubscribeIfNecessary(final ActiveSubscriber ss,
- final SubscriptionEvent event) {
- final TopicSubscriber ts = ss.getTopicSubscriber();
- // clear subscription status
- sChannelManager.asyncCloseSubscription(ts, new Callback<ResponseBody>() {
-
- @Override
- public void operationFinished(Object ctx, ResponseBody result) {
- finish();
- }
-
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- finish();
- }
-
- private void finish() {
- SimpleSubscribeResponseHandler.super.resubscribeIfNecessary(ss, event);
- }
-
- }, null);
- }
-
}
View
8 hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java
@@ -74,6 +74,8 @@ public static PubSubException create(StatusCode code, String msg) {
return new TopicOwnerInfoExistsException(msg);
} else if (code == StatusCode.INVALID_MESSAGE_FILTER) {
return new InvalidMessageFilterException(msg);
+ } else if (code == StatusCode.RESUBSCRIBE_EXCEPTION) {
+ return new ResubscribeException(msg);
}
/*
* Insert new ones here
@@ -103,6 +105,12 @@ public ClientNotSubscribedException(String msg) {
}
}
+ public static class ResubscribeException extends PubSubException {
+ public ResubscribeException(String msg) {
+ super(StatusCode.RESUBSCRIBE_EXCEPTION, msg);
+ }
+ }
+
public static class MalformedRequestException extends PubSubException {
public MalformedRequestException(String msg) {
super(StatusCode.MALFORMED_REQUEST, msg);
View
54 hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
@@ -236,19 +236,20 @@ private SubscriptionEvent(int index, int value) {
CLIENT_NOT_SUBSCRIBED(4, 404),
COULD_NOT_CONNECT(5, 405),
TOPIC_BUSY(6, 406),
- NOT_RESPONSIBLE_FOR_TOPIC(7, 501),
- SERVICE_DOWN(8, 502),
- UNCERTAIN_STATE(9, 503),
- INVALID_MESSAGE_FILTER(10, 504),
- BAD_VERSION(11, 520),
- NO_TOPIC_PERSISTENCE_INFO(12, 521),
- TOPIC_PERSISTENCE_INFO_EXISTS(13, 522),
- NO_SUBSCRIPTION_STATE(14, 523),
- SUBSCRIPTION_STATE_EXISTS(15, 524),
- NO_TOPIC_OWNER_INFO(16, 525),
- TOPIC_OWNER_INFO_EXISTS(17, 526),
- UNEXPECTED_CONDITION(18, 600),
- COMPOSITE(19, 700),
+ RESUBSCRIBE_EXCEPTION(7, 407),
+ NOT_RESPONSIBLE_FOR_TOPIC(8, 501),
+ SERVICE_DOWN(9, 502),
+ UNCERTAIN_STATE(10, 503),
+ INVALID_MESSAGE_FILTER(11, 504),
+ BAD_VERSION(12, 520),
+ NO_TOPIC_PERSISTENCE_INFO(13, 521),
+ TOPIC_PERSISTENCE_INFO_EXISTS(14, 522),
+ NO_SUBSCRIPTION_STATE(15, 523),
+ SUBSCRIPTION_STATE_EXISTS(16, 524),
+ NO_TOPIC_OWNER_INFO(17, 525),
+ TOPIC_OWNER_INFO_EXISTS(18, 526),
+ UNEXPECTED_CONDITION(19, 600),
+ COMPOSITE(20, 700),
;
public static final int SUCCESS_VALUE = 0;
@@ -258,6 +259,7 @@ private SubscriptionEvent(int index, int value) {
public static final int CLIENT_NOT_SUBSCRIBED_VALUE = 404;
public static final int COULD_NOT_CONNECT_VALUE = 405;
public static final int TOPIC_BUSY_VALUE = 406;
+ public static final int RESUBSCRIBE_EXCEPTION_VALUE = 407;
public static final int NOT_RESPONSIBLE_FOR_TOPIC_VALUE = 501;
public static final int SERVICE_DOWN_VALUE = 502;
public static final int UNCERTAIN_STATE_VALUE = 503;
@@ -284,6 +286,7 @@ public static StatusCode valueOf(int value) {
case 404: return CLIENT_NOT_SUBSCRIBED;
case 405: return COULD_NOT_CONNECT;
case 406: return TOPIC_BUSY;
+ case 407: return RESUBSCRIBE_EXCEPTION;
case 501: return NOT_RESPONSIBLE_FOR_TOPIC;
case 502: return SERVICE_DOWN;
case 503: return UNCERTAIN_STATE;
@@ -327,7 +330,7 @@ public StatusCode findValueByNumber(int number) {
}
private static final StatusCode[] VALUES = {
- SUCCESS, MALFORMED_REQUEST, NO_SUCH_TOPIC, CLIENT_ALREADY_SUBSCRIBED, CLIENT_NOT_SUBSCRIBED, COULD_NOT_CONNECT, TOPIC_BUSY, NOT_RESPONSIBLE_FOR_TOPIC, SERVICE_DOWN, UNCERTAIN_STATE, INVALID_MESSAGE_FILTER, BAD_VERSION, NO_TOPIC_PERSISTENCE_INFO, TOPIC_PERSISTENCE_INFO_EXISTS, NO_SUBSCRIPTION_STATE, SUBSCRIPTION_STATE_EXISTS, NO_TOPIC_OWNER_INFO, TOPIC_OWNER_INFO_EXISTS, UNEXPECTED_CONDITION, COMPOSITE,
+ SUCCESS, MALFORMED_REQUEST, NO_SUCH_TOPIC, CLIENT_ALREADY_SUBSCRIBED, CLIENT_NOT_SUBSCRIBED, COULD_NOT_CONNECT, TOPIC_BUSY, RESUBSCRIBE_EXCEPTION, NOT_RESPONSIBLE_FOR_TOPIC, SERVICE_DOWN, UNCERTAIN_STATE, INVALID_MESSAGE_FILTER, BAD_VERSION, NO_TOPIC_PERSISTENCE_INFO, TOPIC_PERSISTENCE_INFO_EXISTS, NO_SUBSCRIPTION_STATE, SUBSCRIPTION_STATE_EXISTS, NO_TOPIC_OWNER_INFO, TOPIC_OWNER_INFO_EXISTS, UNEXPECTED_CONDITION, COMPOSITE,
};
public static StatusCode valueOf(
@@ -16621,21 +16624,22 @@ public Builder clearNumTopics() {
"\013UNSUBSCRIBE\020\003\022\022\n\016START_DELIVERY\020\004\022\021\n\rST" +
"OP_DELIVERY\020\005\022\025\n\021CLOSESUBSCRIPTION\020\006*D\n\021" +
"SubscriptionEvent\022\017\n\013TOPIC_MOVED\020\001\022\036\n\032SU" +
- "BSCRIPTION_FORCED_CLOSED\020\002*\205\004\n\nStatusCod" +
+ "BSCRIPTION_FORCED_CLOSED\020\002*\241\004\n\nStatusCod" +
"e\022\013\n\007SUCCESS\020\000\022\026\n\021MALFORMED_REQUEST\020\221\003\022\022" +
"\n\rNO_SUCH_TOPIC\020\222\003\022\036\n\031CLIENT_ALREADY_SUB" +
"SCRIBED\020\223\003\022\032\n\025CLIENT_NOT_SUBSCRIBED\020\224\003\022\026",
"\n\021COULD_NOT_CONNECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226\003\022" +
- "\036\n\031NOT_RESPONSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SERVI" +
- "CE_DOWN\020\366\003\022\024\n\017UNCERTAIN_STATE\020\367\003\022\033\n\026INVA" +
- "LID_MESSAGE_FILTER\020\370\003\022\020\n\013BAD_VERSION\020\210\004\022" +
- "\036\n\031NO_TOPIC_PERSISTENCE_INFO\020\211\004\022\"\n\035TOPIC" +
- "_PERSISTENCE_INFO_EXISTS\020\212\004\022\032\n\025NO_SUBSCR" +
- "IPTION_STATE\020\213\004\022\036\n\031SUBSCRIPTION_STATE_EX" +
- "ISTS\020\214\004\022\030\n\023NO_TOPIC_OWNER_INFO\020\215\004\022\034\n\027TOP" +
- "IC_OWNER_INFO_EXISTS\020\216\004\022\031\n\024UNEXPECTED_CO" +
- "NDITION\020\330\004\022\016\n\tCOMPOSITE\020\274\005B\036\n\032org.apache",
- ".hedwig.protocolH\001"
+ "\032\n\025RESUBSCRIBE_EXCEPTION\020\227\003\022\036\n\031NOT_RESPO" +
+ "NSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SERVICE_DOWN\020\366\003\022\024" +
+ "\n\017UNCERTAIN_STATE\020\367\003\022\033\n\026INVALID_MESSAGE_" +
+ "FILTER\020\370\003\022\020\n\013BAD_VERSION\020\210\004\022\036\n\031NO_TOPIC_" +
+ "PERSISTENCE_INFO\020\211\004\022\"\n\035TOPIC_PERSISTENCE" +
+ "_INFO_EXISTS\020\212\004\022\032\n\025NO_SUBSCRIPTION_STATE" +
+ "\020\213\004\022\036\n\031SUBSCRIPTION_STATE_EXISTS\020\214\004\022\030\n\023N" +
+ "O_TOPIC_OWNER_INFO\020\215\004\022\034\n\027TOPIC_OWNER_INF" +
+ "O_EXISTS\020\216\004\022\031\n\024UNEXPECTED_CONDITION\020\330\004\022\016",
+ "\n\tCOMPOSITE\020\274\005B\036\n\032org.apache.hedwig.prot" +
+ "ocolH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
View
1 hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
@@ -248,6 +248,7 @@ enum StatusCode{
CLIENT_NOT_SUBSCRIBED = 404;
COULD_NOT_CONNECT = 405;
TOPIC_BUSY = 406;
+ RESUBSCRIBE_EXCEPTION = 407;
//server-side errors (5xx)
NOT_RESPONSIBLE_FOR_TOPIC = 501;
View
60 hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
@@ -605,4 +605,64 @@ public boolean isSubscriptionChannelSharingEnabled() {
assertEquals(SubscriptionEvent.TOPIC_MOVED, eventQueue.take());
}
+ @Test
+ public void testCloseSubscribeDuringResubscribe() throws Exception {
+ client.close();
+
+ final long reconnectWaitTime = 2000L;
+ client = new HedwigClient(new ClientConfiguration() {
+ @Override
+ public HedwigSocketAddress getDefaultServerHedwigSocketAddress() {
+ return getDefaultHedwigAddress();
+ }
+
+ @Override
+ public boolean isSubscriptionChannelSharingEnabled() {
+ return TestPubSubClient.this.isSubscriptionChannelSharingEnabled;
+ }
+
+ @Override
+ public long getSubscribeReconnectRetryWaitTime() {
+ return reconnectWaitTime;
+ }
+ });
+
+ publisher = client.getPublisher();
+ subscriber = client.getSubscriber();
+
+ ByteString topic = ByteString.copyFromUtf8("testCloseSubscribeDuringResubscribe");
+ ByteString subscriberId = ByteString.copyFromUtf8("mysub");
+ subscriber.addSubscriptionListener(new TestSubscriptionListener());
+ SubscriptionOptions options =
+ SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH)
+ .setForceAttach(false).setEnableResubscribe(true).build();
+ subscriber.subscribe(topic, subscriberId, options);
+ logger.info("Subscribed topic {}, subscriber {}.", topic.toStringUtf8(),
+ subscriberId.toStringUtf8());
+ subscriber.startDelivery(topic, subscriberId, new TestMessageHandler());
+
+ // tear down the hub server to let subscribe enter
+ tearDownHubServer();
+ logger.info("Tear down the hub server");
+
+ // wait for client enter to resubscribe logic
+ Thread.sleep(reconnectWaitTime / 2);
+
+ // close sub
+ subscriber.closeSubscription(topic, subscriberId);
+
+ // start the hub server again
+ startHubServer(conf);
+
+ // publish a new message
+ publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #1")).build(),
+ new TestCallback(), null);
+ assertTrue(queue.take());
+
+ // wait for another reconnect time period
+ assertNull("Should not receive any messages since the subscription has already been closed.",
+ consumeQueue.poll(reconnectWaitTime + reconnectWaitTime / 2, TimeUnit.MILLISECONDS));
+ }
+
}
View
11 hedwig-server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java
@@ -64,13 +64,15 @@ public ServerConfiguration getStandAloneServerConfiguration() {
}
protected PubSubServer server;
+ protected ServerConfiguration conf;
protected HedwigSocketAddress defaultAddress;
@Override
@Before
public void setUp() throws Exception {
logger.info("STARTING " + getName());
- startHubServer();
+ conf = getStandAloneServerConfiguration();
+ startHubServer(conf);
logger.info("Standalone PubSubServer test setup finished");
}
@@ -83,13 +85,6 @@ public void tearDown() throws Exception {
logger.info("FINISHED " + getName());
}
- protected void startHubServer() throws Exception {
- ServerConfiguration conf = getStandAloneServerConfiguration();
- defaultAddress = new HedwigSocketAddress("localhost", conf.getServerPort(),
- conf.getSSLServerPort());
- startHubServer(conf);
- }
-
protected HedwigSocketAddress getDefaultHedwigAddress() {
return defaultAddress;
}

0 comments on commit a27b64e

Please sign in to comment.
Something went wrong with that request. Please try again.