Skip to content

Commit

Permalink
3 Bug Fixes. 1) retry channel when send fails. 2) stop sender when th…
Browse files Browse the repository at this point in the history
…e node drops out. 3) make TcpAddresses unique across restarts even when the port is fixed.
  • Loading branch information
homer-simpleton committed May 7, 2017
1 parent 74122c7 commit e47ee76
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 82 deletions.
@@ -1,6 +1,7 @@
package net.dempsy.transport.tcp;

import java.net.InetAddress;
import java.util.UUID;

import net.dempsy.transport.NodeAddress;

Expand All @@ -25,7 +26,8 @@ protected TcpAddress() {
public TcpAddress(final InetAddress inetAddress, final int port, final String serializerId, final int recvBufferSize) {
this.inetAddress = inetAddress;
this.port = port;
this.guid = inetAddress.getHostAddress() + ":" + port;
final String uuid = UUID.randomUUID().toString();
this.guid = inetAddress.getHostAddress() + ":" + port + "-" + uuid;
this.serializerId = serializerId;
this.recvBufferSize = recvBufferSize;
}
Expand Down
22 changes: 14 additions & 8 deletions dempsy-framework.impl/src/main/java/net/dempsy/Router.java
Expand Up @@ -135,7 +135,11 @@ public ApplicationState apply(final Update update, final TransportManager tmanag
final Set<NodeAddress> toDelete = update.toDelete;

if (toDelete.size() > 0) { // just clear all senders.
senders.clear();
for (final NodeAddress a : toDelete) {
final Sender s = senders.get(a);
if (s != null)
s.stop();
}
}

final Map<NodeAddress, NodeInformation> newCurrent = new HashMap<>();
Expand Down Expand Up @@ -197,7 +201,14 @@ public void stop() {
LOGGER.warn("Problem while shutting down an outbound router", rte);
}
});
senders.clear();

final List<Sender> tmps = new ArrayList<>();

// keep removing
while (senders.size() > 0)
senders.keySet().forEach(k -> tmps.add(senders.remove(k)));

tmps.forEach(s -> s.stop());
}

public Sender getSender(final NodeAddress na) {
Expand All @@ -212,6 +223,7 @@ public Sender getSender(final NodeAddress na) {
}
return ret;
}

}

public Router(final RoutingStrategyManager manager, final NodeAddress thisNode, final String thisNodeId, final NodeReceiver nodeReciever,
Expand All @@ -226,12 +238,8 @@ public Router(final RoutingStrategyManager manager, final NodeAddress thisNode,

@Override
public void dispatch(final KeyedMessageWithType message) {
// if (LOGGER.isTraceEnabled())
// LOGGER.trace("Dispatching " + SafeString.objectDescription(message.message) + ".");

boolean messageSentSomewhere = false;
try {

ApplicationState tmp = outbounds.get();

// if we're in the midst of an update then we really want to wait for the new state.
Expand Down Expand Up @@ -310,7 +318,6 @@ public void dispatch(final KeyedMessageWithType message) {
LOGGER.error("Couldn't send message to " + curNode + " from " + thisNodeId + " because there's no "
+ Sender.class.getSimpleName());
} else {
// TODO: more error handling
sf.send(toSend);
messageSentSomewhere = true;
}
Expand Down Expand Up @@ -380,7 +387,6 @@ public boolean execute() {
try {
final ApplicationState newState = obs.apply(ud, tmanager, statsCollector, manager);
outbounds.set(newState);

isReady.set(true);
return true;
} catch (final RuntimeException rte) {
Expand Down
@@ -0,0 +1,11 @@
package net.dempsy.transport;

/**
* This interface is implemented by {@link Receiver}s that should go through tests where the
* connectivity is disrupted to make sure that the related {@link Sender} recovers.
*/
public interface DisruptableRecevier {

public boolean disrupt(NodeAddress senderToDistrupt);

}
Expand Up @@ -127,7 +127,7 @@ public synchronized void start(final Listener listener, final Infrastructure inf
"Cannot set a new Listener (" + SafeString.objectDescription(listener) + ") on a " + BlockingQueueReceiver.class.getSimpleName()
+ " when there's one already set (" + SafeString.objectDescription(this.listener) + ")");
this.listener = listener;
infra.getThreadingModel().runDaemon(this, "BQReceiver-" + address.getGuid());
infra.getThreadingModel().runDaemon(this, "BQReceiver-" + address.toString());
}

@Override
Expand Down
Expand Up @@ -12,6 +12,7 @@
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -24,16 +25,18 @@
import net.dempsy.Infrastructure;
import net.dempsy.serialization.Serializer;
import net.dempsy.threading.ThreadingModel;
import net.dempsy.transport.DisruptableRecevier;
import net.dempsy.transport.Listener;
import net.dempsy.transport.MessageTransportException;
import net.dempsy.transport.NodeAddress;
import net.dempsy.transport.RoutedMessage;
import net.dempsy.transport.tcp.AbstractTcpReceiver;
import net.dempsy.transport.tcp.TcpUtils;
import net.dempsy.transport.tcp.nio.internal.NioUtils;
import net.dempsy.transport.tcp.nio.internal.NioUtils.ReturnableBufferOutput;
import net.dempsy.util.io.MessageBufferInput;

public class NioReceiver<T> extends AbstractTcpReceiver<NioAddress, NioReceiver<T>> {
public class NioReceiver<T> extends AbstractTcpReceiver<NioAddress, NioReceiver<T>> implements DisruptableRecevier {
private static Logger LOGGER = LoggerFactory.getLogger(NioReceiver.class);

private final AtomicBoolean isRunning = new AtomicBoolean(true);
Expand Down Expand Up @@ -97,10 +100,10 @@ public void start(final Listener<?> listener, final Infrastructure infra) throws
// before starting the acceptor, make sure we have Readers created.
try {
for (int i = 0; i < readers.length; i++)
readers[i] = new Reader<T>(isRunning, address.getGuid(), (Listener<T>) listener, serializer, maxMessageSize);
readers[i] = new Reader<T>(isRunning, address, (Listener<T>) listener, serializer, maxMessageSize);
} catch (final IOException ioe) {
LOGGER.error(address.getGuid() + " failed to start up readers", ioe);
throw new MessageTransportException(address.getGuid() + " failed to start up readers", ioe);
LOGGER.error(address.toString() + " failed to start up readers", ioe);
throw new MessageTransportException(address.toString() + " failed to start up readers", ioe);
}

final ThreadingModel threadingModel = infra.getThreadingModel();
Expand All @@ -109,7 +112,7 @@ public void start(final Listener<?> listener, final Infrastructure infra) throws
threadingModel.runDaemon(readers[i], "nio-reader-" + i + "-" + address);

// start the acceptor
threadingModel.runDaemon(acceptor = new Acceptor(binding, isRunning, readers, address.getGuid()), "nio-acceptor-" + address);
threadingModel.runDaemon(acceptor = new Acceptor(binding, isRunning, readers, address), "nio-acceptor-" + address);
}

@Override
Expand All @@ -119,6 +122,16 @@ public NioReceiver<T> setNumHandlers(final int numHandlers) {
return this;
}

// =============================================================================
// These methods are to support testing
// =============================================================================
@Override
public boolean disrupt(final NodeAddress nodeAddress) {
return Arrays.stream(readers)
.filter(r -> r.disrupt(nodeAddress))
.findFirst().orElse(null) != null;
}

// =============================================================================
// These classes manages accepting external connections.
// =============================================================================
Expand Down Expand Up @@ -150,9 +163,9 @@ private static class Acceptor implements Runnable {
final long numReaders;
final AtomicLong messageNum = new AtomicLong(0);
final AtomicBoolean done = new AtomicBoolean(false);
final String thisNode;
final NioAddress thisNode;

private Acceptor(final Binding binding, final AtomicBoolean isRunning, final Reader<?>[] readers, final String thisNode) {
private Acceptor(final Binding binding, final AtomicBoolean isRunning, final Reader<?>[] readers, final NioAddress thisNode) {
this.binding = binding;
this.isRunning = isRunning;
this.readers = readers;
Expand Down Expand Up @@ -230,12 +243,12 @@ private void close() {
// =============================================================================
private static class Client<T> {
ReturnableBufferOutput partialRead = null;
private final String thisNode;
private final NioAddress thisNode;
private final Listener<T> typedListener;
private final Serializer serializer;
private final int maxMessageSize;

private Client(final String thisNode, final Listener<T> listener, final Serializer serializer, final int maxMessageSize) {
private Client(final NioAddress thisNode, final Listener<T> listener, final Serializer serializer, final int maxMessageSize) {
this.thisNode = thisNode;
this.typedListener = listener;
this.serializer = serializer;
Expand All @@ -250,17 +263,6 @@ private Client(final String thisNode, final Listener<T> listener, final Serializ
private final int readSize(final SocketChannel channel, final ByteBuffer bb) throws IOException {
final int size;

// if (bb.position() < 4) {
// bb.limit(4);
// if (channel.read(bb) == -1)
// return -2;
// }
//
// if (bb.position() >= 4)
// size = bb.getInt(0);
// else
// size = -1;

if (bb.position() < 2) {
// read a Short
bb.limit(2);
Expand Down Expand Up @@ -375,19 +377,18 @@ public void read(final SelectionKey key) throws IOException {
}

public static class Reader<T> implements Runnable {

private final AtomicReference<SocketChannel> landing = new AtomicReference<SocketChannel>(null);
private final Selector selector;
private final AtomicBoolean isRunning;
private final String thisNode;
private final NioAddress thisNode;
private final Listener<T> typedListener;
private final Serializer serializer;
private final int maxMessageSize;
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicReference<CloseCommand> clientToClose = new AtomicReference<CloseCommand>(null);

public Reader(final AtomicBoolean isRunning, final String thisNode, final Listener<T> typedListener, final Serializer serializer,
final int maxMessageSize)
throws IOException {
public Reader(final AtomicBoolean isRunning, final NioAddress thisNode, final Listener<T> typedListener, final Serializer serializer,
final int maxMessageSize) throws IOException {
selector = Selector.open();
this.isRunning = isRunning;
this.thisNode = thisNode;
Expand All @@ -398,7 +399,6 @@ public Reader(final AtomicBoolean isRunning, final String thisNode, final Listen

@Override
public void run() {

try {
while (isRunning.get()) {
try {
Expand All @@ -420,7 +420,6 @@ public void run() {
LOGGER.info(thisNode + " reciever got an unexpexted selection key " + key);
}
} else if (isRunning.get() && !done.get()) {

// if we processed no keys then maybe we have a new client passed over to us.
final SocketChannel newClient = landing.getAndSet(null); // mark it as retrieved.
if (newClient != null) {
Expand All @@ -431,17 +430,69 @@ public void run() {
LOGGER.debug(thisNode + " received connection from " + remote);
newClient.register(selector, SelectionKey.OP_READ,
new Client<T>(thisNode, typedListener, serializer, maxMessageSize));
} else if (clientToClose.get() != null) {
final NioAddress addr = clientToClose.get().addrToClose;
final Object[] toClose = selector.keys().stream()
.map(k -> new Object[] { k, (Client<?>) k.attachment() })
.filter(c -> ((Client<?>) c[1]).thisNode.equals(addr))
.findFirst()
.orElse(null);

if (toClose != null) {
final SelectionKey key = (SelectionKey) toClose[0];
final Client<?> client = ((Client<?>) toClose[1]);
try {
client.closeup((SocketChannel) key.channel(), key);
} finally {
clientToClose.get().set(client);
}
} else
clientToClose.set(null);
}
}
} catch (final IOException ioe) {
LOGGER.error("Failed during read loop.", ioe);
LOGGER.error("Failed during reader loop.", ioe);
}
}
} finally {
done.set(true);
}
}

private static class CloseCommand {
public final NioAddress addrToClose;
public volatile boolean done;
public Client<?> clientClosed;

CloseCommand(final NioAddress addrToClose) {
this.addrToClose = addrToClose;
done = false;
}

public void set(final Client<?> clientClosed) {
this.clientClosed = clientClosed;
done = true;
}
}

boolean disrupt(final NodeAddress addr) {
final CloseCommand cmd = new CloseCommand((NioAddress) addr);

// wait for the commend landing pad to be clear and claim it once available
while (!clientToClose.compareAndSet(null, cmd) && isRunning.get())
Thread.yield();

// now wait for the reader thread to pick up the command and respond
do {
selector.wakeup();
Thread.yield();
} while (!clientToClose.get().done && isRunning.get()); // double volatile read

clientToClose.set(null); // clear the command

return cmd.clientClosed != null;
}

// assumes isRunning is already set to false
private void close() {
while (!done.get()) {
Expand All @@ -450,7 +501,7 @@ private void close() {
}
}

public synchronized void newClient(final SocketChannel newClient) {
public synchronized void newClient(final SocketChannel newClient) throws IOException {
// attempt to set the landing as long as it's null
while (landing.compareAndSet(null, newClient))
Thread.yield();
Expand Down

0 comments on commit e47ee76

Please sign in to comment.