Skip to content

Commit

Permalink
enable consuming and publishing for RabbitMQ connections after a reco…
Browse files Browse the repository at this point in the history
…nnect to the broker

Signed-off-by: Florian Fendt <Florian.Fendt@bosch-si.com>
  • Loading branch information
ffendt committed Sep 6, 2018
1 parent c08d26b commit cdd8636
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 17 deletions.
Expand Up @@ -185,6 +185,8 @@ public void postStop() {
public Receive createReceive() {
return ReceiveBuilder.create()
.match(RetrieveConnectionStatusResponse.class, command -> {
log.debug("Retrieved connection status response for connection {} with status: {}",
command.getConnectionId(), command.getConnectionStatus());
if (!ConnectionStatus.OPEN.equals(command.getConnectionStatus())) {
connectionIds.remove(command.getConnectionId());
}
Expand Down
Expand Up @@ -37,13 +37,15 @@
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.services.connectivity.messaging.BaseClientActor;
import org.eclipse.ditto.services.connectivity.messaging.BaseClientData;
import org.eclipse.ditto.services.connectivity.messaging.BaseClientState;
import org.eclipse.ditto.services.connectivity.messaging.internal.ClientConnected;
import org.eclipse.ditto.services.connectivity.messaging.internal.ClientDisconnected;
import org.eclipse.ditto.services.connectivity.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionFailedException;

import com.newmotion.akka.rabbitmq.ChannelActor;
import com.newmotion.akka.rabbitmq.ChannelCreated;
import com.newmotion.akka.rabbitmq.CreateChannel;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
Expand All @@ -59,6 +61,7 @@
import akka.actor.Status;
import akka.japi.Pair;
import akka.japi.pf.FI;
import akka.japi.pf.FSMStateFunctionBuilder;
import akka.pattern.PatternsCS;
import scala.Option;
import scala.concurrent.duration.FiniteDuration;
Expand Down Expand Up @@ -146,6 +149,19 @@ private static Connection validateConnection(final Connection connection) {
return connection;
}

@Override
protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectedState() {
return super.inConnectedState()
.event(ClientConnected.class, BaseClientData.class,
(event, data) -> {
// when connection is lost, the library (ChannelActor) will automatically reconnect
// without the state of this actor changing. But we will receive a new ClientConnected message
// that we can use to bind our consumers to the channels.
this.allocateResourcesOnConnection(event);
return stay();
});
}

@Override
protected CompletionStage<Status.Status> doTestConnection(final Connection connection) {
return connect(connection, FiniteDuration.apply(TEST_CONNECTION_TIMEOUT, TimeUnit.SECONDS));
Expand Down Expand Up @@ -263,9 +279,16 @@ private CompletionStage<Status.Status> connect(final Connection connection, fina
rmqPublisherActor = startRmqPublisherActor().orElse(null);

// create publisher channel
final ActorRef finalRmqPublisherActor = rmqPublisherActor;
final CreateChannel createChannel = CreateChannel.apply(
ChannelActor.props((channel, s) -> {
log.info("Did set up publisher channel: {}", channel);
ChannelActor.props((channel, channelActor) -> {
log.info("Did set up publisher channel: {}. Telling the publisher actor the new channel",
channel);
// provide the new channel to the publisher after the channel was connected (also includes reconnects)
if (finalRmqPublisherActor != null) {
final ChannelCreated channelCreated = new ChannelCreated(channelActor);
finalRmqPublisherActor.tell(channelCreated, channelActor);
}
return null;
}),
Option.apply(PUBLISHER_CHANNEL));
Expand All @@ -274,9 +297,6 @@ private CompletionStage<Status.Status> connect(final Connection connection, fina
if (throwable != null) {
future.complete(new Status.Failure(throwable));
} else {
if (rmqPublisherActor != null) {
rmqPublisherActor.tell(reply, rmqConnectionActor);
}
future.complete(new Status.Success("channel created"));
}
return null;
Expand All @@ -294,9 +314,9 @@ private static void createConsumerChannelAndNotifySelf(final Status.Status statu
@Nullable final ActorRef rmqConnectionActor,
final ActorRef self) {
if (consuming && status instanceof Status.Success && rmqConnectionActor != null) {
// send self the created channel once
// send self the created channel
final CreateChannel createChannel =
CreateChannel.apply(ChannelActor.props(SendChannelOnce.to(self)::apply),
CreateChannel.apply(ChannelActor.props(SendChannel.to(self)::apply),
Option.apply(CONSUMER_CHANNEL));
// connection actor sends ChannelCreated; use an ASK to swallow the reply in which we are disinterested
PatternsCS.ask(rmqConnectionActor, createChannel, askTimeoutMillis());
Expand Down Expand Up @@ -426,30 +446,27 @@ private Channel getChannel() {
public Optional<ActorRef> getOrigin() {
return Optional.empty();
}

}

private static final class SendChannelOnce implements FI.Apply2<Channel, ActorRef, Object> {
private static final class SendChannel implements FI.Apply2<Channel, ActorRef, Object> {

private boolean hasFired = false;
private final ActorRef recipient;

private SendChannelOnce(final ActorRef recipient) {
private SendChannel(final ActorRef recipient) {
this.recipient = recipient;
}

private static SendChannelOnce to(final ActorRef recipient) {
return new SendChannelOnce(recipient);
private static SendChannel to(final ActorRef recipient) {
return new SendChannel(recipient);
}

@Override
public Object apply(final Channel channel, final ActorRef channelActor) {
if (!hasFired) {
hasFired = true;
recipient.tell(new RmqConsumerChannelCreated(channel), channelActor);
}
// return value ignored by caller com.newmotion.akka.rabbitmq.ChannelActor
recipient.tell(new RmqConsumerChannelCreated(channel), channelActor);
return channel;
}

}

/**
Expand Down

0 comments on commit cdd8636

Please sign in to comment.