Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions storm-core/src/jvm/org/apache/storm/messaging/IContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public interface IContext {

/**
* This method establish a client side connection to a remote server
* implementation should return a new connection every call
*
* @param storm_id topology ID
* @param host remote host
* @param port remote port
Expand Down
12 changes: 2 additions & 10 deletions storm-core/src/jvm/org/apache/storm/messaging/netty/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
private final ClientBootstrap bootstrap;
private final InetSocketAddress dstAddress;
protected final String dstAddressPrefixedName;
//The actual name of the host we are trying to connect to so that
// when we remove ourselves from the connection cache there is no concern that
// the resolved host name is different.
private final String dstHost;

private volatile Map<Integer, Double> serverLoad = null;

/**
Expand Down Expand Up @@ -133,20 +130,17 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
*/
private volatile boolean closing = false;

private final Context context;

private final HashedWheelTimer scheduler;

private final MessageBuffer batcher;

private final Object writeLock = new Object();

@SuppressWarnings("rawtypes")
Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port, Context context) {
Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port) {
this.stormConf = stormConf;
closing = false;
this.scheduler = scheduler;
this.context = context;
int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
// if SASL authentication is disabled, saslChannelReady is initialized as true; otherwise false
saslChannelReady.set(!Utils.getBoolean(stormConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false));
Expand All @@ -160,7 +154,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa

// Initiate connection to remote destination
bootstrap = createClientBootstrap(factory, bufferSize, stormConf);
dstHost = host;
dstAddress = new InetSocketAddress(host, port);
dstAddressPrefixedName = prefixedName(dstAddress);
launchChannelAliveThread();
Expand Down Expand Up @@ -425,7 +418,6 @@ private boolean closeChannelAndReconnect(Channel channel) {
public void close() {
if (!closing) {
LOG.info("closing Netty Client {}", dstAddressPrefixedName);
context.removeClient(dstHost, dstAddress.getPort());
// Set closing to true to prevent any further reconnection attempts.
closing = true;
waitForPendingMessagesToBeSent();
Expand Down
40 changes: 12 additions & 28 deletions storm-core/src/jvm/org/apache/storm/messaging/netty/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.util.HashedWheelTimer;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.HashMap;
import java.util.Map;

import org.apache.storm.Config;
Expand All @@ -32,7 +34,7 @@
public class Context implements IContext {
@SuppressWarnings("rawtypes")
private Map storm_conf;
private Map<String, IConnection> connections;
private List<Server> serverConnections;
private NioClientSocketChannelFactory clientChannelFactory;

private HashedWheelTimer clientScheduleService;
Expand All @@ -43,7 +45,7 @@ public class Context implements IContext {
@SuppressWarnings("rawtypes")
public void prepare(Map storm_conf) {
this.storm_conf = storm_conf;
connections = new HashMap<>();
serverConnections = new ArrayList<>();

//each context will have a single client channel factory
int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
Expand All @@ -64,30 +66,17 @@ public void prepare(Map storm_conf) {
* establish a server with a binding port
*/
public synchronized IConnection bind(String storm_id, int port) {
IConnection server = new Server(storm_conf, port);
connections.put(key(storm_id, port), server);
Server server = new Server(storm_conf, port);
serverConnections.add(server);
return server;
}

/**
* establish a connection to a remote server
*/
public synchronized IConnection connect(String storm_id, String host, int port) {
IConnection connection = connections.get(key(host,port));
if(connection !=null)
{
return connection;
}
IConnection client = new Client(storm_conf, clientChannelFactory,
clientScheduleService, host, port, this);
connections.put(key(host, port), client);
return client;
}

synchronized void removeClient(String host, int port) {
if (connections != null) {
connections.remove(key(host, port));
}
public IConnection connect(String storm_id, String host, int port) {
return new Client(storm_conf, clientChannelFactory,
clientScheduleService, host, port);
}

/**
Expand All @@ -96,18 +85,13 @@ synchronized void removeClient(String host, int port) {
public synchronized void term() {
clientScheduleService.stop();

for (IConnection conn : connections.values()) {
for (Server conn : serverConnections) {
conn.close();
}

connections = null;
serverConnections = null;

//we need to release resources associated with client channel factory
clientChannelFactory.releaseExternalResources();

}

private String key(String host, int port) {
return String.format("%s:%d", host, port);
}
}