Skip to content

Commit

Permalink
HADOOP-11757. NFS gateway should shutdown when it can't start UDP or …
Browse files Browse the repository at this point in the history
…TCP server. Contributed by Brandon Li
  • Loading branch information
Brandon Li committed Apr 2, 2015
1 parent 6ccf4fb commit 60ce825
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 28 deletions.
5 changes: 4 additions & 1 deletion hadoop-common-project/hadoop-common/CHANGES.txt
Expand Up @@ -1186,7 +1186,10 @@ Release 2.7.0 - UNRELEASED


HADOOP-11787. OpensslSecureRandom.c pthread_threadid_np usage signature is HADOOP-11787. OpensslSecureRandom.c pthread_threadid_np usage signature is
wrong on 32-bit Mac. (Kiran Kumar M R via cnauroth) wrong on 32-bit Mac. (Kiran Kumar M R via cnauroth)


HADOOP-11757. NFS gateway should shutdown when it can't start UDP or TCP
server (brandonli)

Release 2.6.1 - UNRELEASED Release 2.6.1 - UNRELEASED


INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
Expand Down
Expand Up @@ -60,7 +60,17 @@ private void startUDPServer() {
SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(), SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(),
rpcProgram, 1); rpcProgram, 1);
rpcProgram.startDaemons(); rpcProgram.startDaemons();
udpServer.run(); try {
udpServer.run();
} catch (Throwable e) {
LOG.fatal("Failed to start the UDP server.", e);
if (udpServer.getBoundPort() > 0) {
rpcProgram.unregister(PortmapMapping.TRANSPORT_UDP,
udpServer.getBoundPort());
}
udpServer.shutdown();
terminate(1, e);
}
udpBoundPort = udpServer.getBoundPort(); udpBoundPort = udpServer.getBoundPort();
} }


Expand All @@ -69,7 +79,17 @@ private void startTCPServer() {
SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(), SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
rpcProgram, 1); rpcProgram, 1);
rpcProgram.startDaemons(); rpcProgram.startDaemons();
tcpServer.run(); try {
tcpServer.run();
} catch (Throwable e) {
LOG.fatal("Failed to start the TCP server.", e);
if (tcpServer.getBoundPort() > 0) {
rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP,
tcpServer.getBoundPort());
}
tcpServer.shutdown();
terminate(1, e);
}
tcpBoundPort = tcpServer.getBoundPort(); tcpBoundPort = tcpServer.getBoundPort();
} }


Expand All @@ -83,7 +103,7 @@ public void start(boolean register) {
rpcProgram.register(PortmapMapping.TRANSPORT_UDP, udpBoundPort); rpcProgram.register(PortmapMapping.TRANSPORT_UDP, udpBoundPort);
rpcProgram.register(PortmapMapping.TRANSPORT_TCP, tcpBoundPort); rpcProgram.register(PortmapMapping.TRANSPORT_TCP, tcpBoundPort);
} catch (Throwable e) { } catch (Throwable e) {
LOG.fatal("Failed to start the server. Cause:", e); LOG.fatal("Failed to register the MOUNT service.", e);
terminate(1, e); terminate(1, e);
} }
} }
Expand Down
Expand Up @@ -29,7 +29,6 @@


/** /**
* Nfs server. Supports NFS v3 using {@link RpcProgram}. * Nfs server. Supports NFS v3 using {@link RpcProgram}.
* Currently Mountd program is also started inside this class.
* Only TCP server is supported and UDP is not supported. * Only TCP server is supported and UDP is not supported.
*/ */
public abstract class Nfs3Base { public abstract class Nfs3Base {
Expand All @@ -55,7 +54,7 @@ public void start(boolean register) {
try { try {
rpcProgram.register(PortmapMapping.TRANSPORT_TCP, nfsBoundPort); rpcProgram.register(PortmapMapping.TRANSPORT_TCP, nfsBoundPort);
} catch (Throwable e) { } catch (Throwable e) {
LOG.fatal("Failed to start the server. Cause:", e); LOG.fatal("Failed to register the NFSv3 service.", e);
terminate(1, e); terminate(1, e);
} }
} }
Expand All @@ -65,7 +64,17 @@ private void startTCPServer() {
SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(), SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
rpcProgram, 0); rpcProgram, 0);
rpcProgram.startDaemons(); rpcProgram.startDaemons();
tcpServer.run(); try {
tcpServer.run();
} catch (Throwable e) {
LOG.fatal("Failed to start the TCP server.", e);
if (tcpServer.getBoundPort() > 0) {
rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP,
tcpServer.getBoundPort());
}
tcpServer.shutdown();
terminate(1, e);
}
nfsBoundPort = tcpServer.getBoundPort(); nfsBoundPort = tcpServer.getBoundPort();
} }


Expand Down
Expand Up @@ -39,7 +39,9 @@ public class SimpleTcpServer {
protected final int port; protected final int port;
protected int boundPort = -1; // Will be set after server starts protected int boundPort = -1; // Will be set after server starts
protected final SimpleChannelUpstreamHandler rpcProgram; protected final SimpleChannelUpstreamHandler rpcProgram;

private ServerBootstrap server;
private Channel ch;

/** The maximum number of I/O worker threads */ /** The maximum number of I/O worker threads */
protected final int workerCount; protected final int workerCount;


Expand All @@ -53,7 +55,7 @@ public SimpleTcpServer(int port, RpcProgram program, int workercount) {
this.rpcProgram = program; this.rpcProgram = program;
this.workerCount = workercount; this.workerCount = workercount;
} }

public void run() { public void run() {
// Configure the Server. // Configure the Server.
ChannelFactory factory; ChannelFactory factory;
Expand All @@ -66,9 +68,9 @@ public void run() {
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
workerCount); workerCount);
} }

ServerBootstrap bootstrap = new ServerBootstrap(factory); server = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { server.setPipelineFactory(new ChannelPipelineFactory() {


@Override @Override
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
Expand All @@ -77,14 +79,14 @@ public ChannelPipeline getPipeline() throws Exception {
RpcUtil.STAGE_RPC_TCP_RESPONSE); RpcUtil.STAGE_RPC_TCP_RESPONSE);
} }
}); });
bootstrap.setOption("child.tcpNoDelay", true); server.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true); server.setOption("child.keepAlive", true);

// Listen to TCP port // Listen to TCP port
Channel ch = bootstrap.bind(new InetSocketAddress(port)); ch = server.bind(new InetSocketAddress(port));
InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress(); InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
boundPort = socketAddr.getPort(); boundPort = socketAddr.getPort();

LOG.info("Started listening to TCP requests at port " + boundPort + " for " LOG.info("Started listening to TCP requests at port " + boundPort + " for "
+ rpcProgram + " with workerCount " + workerCount); + rpcProgram + " with workerCount " + workerCount);
} }
Expand All @@ -93,4 +95,13 @@ public ChannelPipeline getPipeline() throws Exception {
public int getBoundPort() { public int getBoundPort() {
return this.boundPort; return this.boundPort;
} }

public void shutdown() {
if (ch != null) {
ch.close().awaitUninterruptibly();
}
if (server != null) {
server.releaseExternalResources();
}
}
} }
Expand Up @@ -41,8 +41,11 @@ public class SimpleUdpServer {
protected final SimpleChannelUpstreamHandler rpcProgram; protected final SimpleChannelUpstreamHandler rpcProgram;
protected final int workerCount; protected final int workerCount;
protected int boundPort = -1; // Will be set after server starts protected int boundPort = -1; // Will be set after server starts
private ConnectionlessBootstrap server;
private Channel ch;


public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, int workerCount) { public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program,
int workerCount) {
this.port = port; this.port = port;
this.rpcProgram = program; this.rpcProgram = program;
this.workerCount = workerCount; this.workerCount = workerCount;
Expand All @@ -53,20 +56,19 @@ public void run() {
DatagramChannelFactory f = new NioDatagramChannelFactory( DatagramChannelFactory f = new NioDatagramChannelFactory(
Executors.newCachedThreadPool(), workerCount); Executors.newCachedThreadPool(), workerCount);


ConnectionlessBootstrap b = new ConnectionlessBootstrap(f); server = new ConnectionlessBootstrap(f);
b.setPipeline(Channels.pipeline( server.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER,
RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram, rpcProgram, RpcUtil.STAGE_RPC_UDP_RESPONSE));
RpcUtil.STAGE_RPC_UDP_RESPONSE));
server.setOption("broadcast", "false");
server.setOption("sendBufferSize", SEND_BUFFER_SIZE);
server.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);


b.setOption("broadcast", "false");
b.setOption("sendBufferSize", SEND_BUFFER_SIZE);
b.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);

// Listen to the UDP port // Listen to the UDP port
Channel ch = b.bind(new InetSocketAddress(port)); ch = server.bind(new InetSocketAddress(port));
InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress(); InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
boundPort = socketAddr.getPort(); boundPort = socketAddr.getPort();

LOG.info("Started listening to UDP requests at port " + boundPort + " for " LOG.info("Started listening to UDP requests at port " + boundPort + " for "
+ rpcProgram + " with workerCount " + workerCount); + rpcProgram + " with workerCount " + workerCount);
} }
Expand All @@ -75,4 +77,13 @@ public void run() {
public int getBoundPort() { public int getBoundPort() {
return this.boundPort; return this.boundPort;
} }

public void shutdown() {
if (ch != null) {
ch.close().awaitUninterruptibly();
}
if (server != null) {
server.releaseExternalResources();
}
}
} }

0 comments on commit 60ce825

Please sign in to comment.