Skip to content
This repository has been archived by the owner on Aug 4, 2022. It is now read-only.

Commit

Permalink
[REEF-53]: Currently there is no interface for NameServer. And all th…
Browse files Browse the repository at this point in the history
…e parameters of

the NameServer have default values. This means as long as a client puts NameServer in the
Driver constructor, no matter it is binded or not, it will be started.

This introduces NameServer interface and renames original NameServer as NameServerImpl,
so that if NameServer is not in a Driver constructor and clients do not bind NameServerImpl
into Driver configuration, NameServer will be not started.

- Introduce NameServer interface and add default Name Server implementation
- remove DefaultNameServerImpl

Author: Julia Wang jwang98052@yahoo.com

JIRA:
  [REEF-53]: https://issues.apache.org/jira/browse/REEF-53

Pull Request:
  Closes #27
  • Loading branch information
jwang98052 authored and Byung-Gon Chun committed Dec 2, 2014
1 parent 37c2e5c commit 7492d26
Show file tree
Hide file tree
Showing 6 changed files with 331 additions and 236 deletions.
233 changes: 13 additions & 220 deletions reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.reef.io.network.naming;

import org.apache.reef.io.naming.NameAssignment;
import org.apache.reef.io.network.naming.serialization.*;
import org.apache.reef.tang.annotations.DefaultImplementation;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.Identifier;
Expand All @@ -43,253 +45,44 @@
import java.util.logging.Logger;

/**
* Naming server
* Naming server interface
*/
public class NameServer implements Stage {

private static final Logger LOG = Logger.getLogger(NameServer.class.getName());

private final Transport transport;
private final Map<Identifier, InetSocketAddress> idToAddrMap;
private final ReefEventStateManager reefEventStateManager;
private final int port;

/**
* @param port a listening port number
* @param factory an identifier factory
* @deprecated inject the NameServer instead of new it up
* Constructs a name server
*/
// TODO: All existing NameServer usage is currently new-up, need to make them injected as well.
@Deprecated
public NameServer(
final int port,
final IdentifierFactory factory) {

this.reefEventStateManager = null;
final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
final EventHandler<NamingMessage> handler = createEventHandler(codec);

this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), port, null,
new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000);

this.port = transport.getListeningPort();
this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>());

LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port);
}


/**
* Constructs a name server
*
* @param port a listening port number
* @param factory an identifier factory
* @param reefEventStateManager the event state manager used to register name server info
*/
@Inject
public NameServer(
final @Parameter(NameServerParameters.NameServerPort.class) int port,
final @Parameter(NameServerParameters.NameServerIdentifierFactory.class) IdentifierFactory factory,
final ReefEventStateManager reefEventStateManager) {

this.reefEventStateManager = reefEventStateManager;
final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
final EventHandler<NamingMessage> handler = createEventHandler(codec);

this.transport = new NettyMessagingTransport(NetUtils.getLocalAddress(), port, null,
new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000);

this.port = transport.getListeningPort();
this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>());

this.reefEventStateManager.registerServiceInfo(
AvroReefServiceInfo.newBuilder()
.setServiceName("NameServer")
.setServiceInfo(getNameServerId())
.build());
LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port);
}

private EventHandler<NamingMessage> createEventHandler(final Codec<NamingMessage> codec) {

final Map<Class<? extends NamingMessage>, EventHandler<? extends NamingMessage>>
clazzToHandlerMap = new HashMap<>();

clazzToHandlerMap.put(NamingLookupRequest.class, new NamingLookupRequestHandler(this, codec));
clazzToHandlerMap.put(NamingRegisterRequest.class, new NamingRegisterRequestHandler(this, codec));
clazzToHandlerMap.put(NamingUnregisterRequest.class, new NamingUnregisterRequestHandler(this));
final EventHandler<NamingMessage> handler = new MultiEventHandler<>(clazzToHandlerMap);

return handler;
}
public interface NameServer extends Stage {

/**
* Gets port
* get port number
* @return
*/
public int getPort() {
return port;
}

/**
* Closes resources
*/
@Override
public void close() throws Exception {
transport.close();
}
public int getPort();

/**
* Registers an (identifier, address) mapping locally
*
* @param id an identifier
* @param addr an Internet socket address
*/
public void register(final Identifier id, final InetSocketAddress addr) {
LOG.log(Level.FINE, "id: " + id + " addr: " + addr);
idToAddrMap.put(id, addr);
}
public void register(final Identifier id, final InetSocketAddress addr);

/**
* Unregisters an identifier locally
*
* @param id an identifier
*/
public void unregister(final Identifier id) {
LOG.log(Level.FINE, "id: " + id);
idToAddrMap.remove(id);
}
public void unregister(final Identifier id);

/**
* Finds an address for an identifier locally
*
* @param id an identifier
* @return an Internet socket address
*/
public InetSocketAddress lookup(final Identifier id) {
LOG.log(Level.FINE, "id: {0}", id);
return idToAddrMap.get(id);
}
public InetSocketAddress lookup(final Identifier id);

/**
* Finds addresses for identifiers locally
*
* @param identifiers an iterable of identifiers
* @param identifiers an Iterable of identifiers
* @return a list of name assignments
*/
public List<NameAssignment> lookup(final Iterable<Identifier> identifiers) {
LOG.log(Level.FINE, "identifiers");
final List<NameAssignment> nas = new ArrayList<>();
for (final Identifier id : identifiers) {
final InetSocketAddress addr = idToAddrMap.get(id);
LOG.log(Level.FINEST, "id : {0} addr: {1}", new Object[]{id, addr});
if (addr != null) {
nas.add(new NameAssignmentTuple(id, addr));
}
}
return nas;
}

private String getNameServerId() {
return NetUtils.getLocalAddress() + ":" + getPort();
}
}

/**
* Naming server transport event handler that invokes a specific naming message handler
*/
class NamingServerHandler implements EventHandler<TransportEvent> {

private final Codec<NamingMessage> codec;
private final EventHandler<NamingMessage> handler;

NamingServerHandler(final EventHandler<NamingMessage> handler, final Codec<NamingMessage> codec) {
this.codec = codec;
this.handler = handler;
}

@Override
public void onNext(final TransportEvent value) {
final byte[] data = value.getData();
final NamingMessage message = codec.decode(data);
message.setLink(value.getLink());
handler.onNext(message);
}
}

/**
* Naming lookup request handler
*/
class NamingLookupRequestHandler implements EventHandler<NamingLookupRequest> {

private static final Logger LOG = Logger.getLogger(NamingLookupRequestHandler.class.getName());


private final NameServer server;
private final Codec<NamingMessage> codec;

public NamingLookupRequestHandler(final NameServer server, final Codec<NamingMessage> codec) {
this.server = server;
this.codec = codec;
}

@Override
public void onNext(final NamingLookupRequest value) {
final List<NameAssignment> nas = server.lookup(value.getIdentifiers());
final byte[] resp = codec.encode(new NamingLookupResponse(nas));
try {
value.getLink().write(resp);
} catch (final IOException e) {
//Actually, there is no way Link.write can throw and IOException
//after netty4 merge. This needs to cleaned up
LOG.throwing("NamingLookupRequestHandler", "onNext", e);
}
}
}

/**
* Naming register request handler
*/
class NamingRegisterRequestHandler implements EventHandler<NamingRegisterRequest> {

private static final Logger LOG = Logger.getLogger(NamingRegisterRequestHandler.class.getName());


private final NameServer server;
private final Codec<NamingMessage> codec;

public NamingRegisterRequestHandler(final NameServer server, final Codec<NamingMessage> codec) {
this.server = server;
this.codec = codec;
}

@Override
public void onNext(final NamingRegisterRequest value) {
server.register(value.getNameAssignment().getIdentifier(), value.getNameAssignment().getAddress());
final byte[] resp = codec.encode(new NamingRegisterResponse(value));
try {
value.getLink().write(resp);
} catch (final IOException e) {
//Actually, there is no way Link.write can throw and IOException
//after netty4 merge. This needs to cleaned up
LOG.throwing("NamingRegisterRequestHandler", "onNext", e);
}
}
}

/**
* Naming unregister request handler
*/
class NamingUnregisterRequestHandler implements EventHandler<NamingUnregisterRequest> {

private final NameServer server;

public NamingUnregisterRequestHandler(final NameServer server) {
this.server = server;
}

@Override
public void onNext(final NamingUnregisterRequest value) {
server.unregister(value.getIdentifier());
}
}
public List<NameAssignment> lookup(final Iterable<Identifier> identifiers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.reef.io.network.naming;

import org.apache.reef.tang.formats.ConfigurationModule;
Expand Down Expand Up @@ -45,5 +46,6 @@ public final class NameServerConfiguration extends ConfigurationModuleBuilder {
.bindNamedParameter(NameServerParameters.NameServerPort.class, NAME_SERVICE_PORT)
.bindNamedParameter(NameServerParameters.NameServerAddr.class, NAME_SERVER_HOSTNAME)
.bindNamedParameter(NameServerParameters.NameServerIdentifierFactory.class, NAME_SERVER_IDENTIFIER_FACTORY)
.bindImplementation(NameServer.class, NameServerImpl.class)
.build();
}

0 comments on commit 7492d26

Please sign in to comment.