Skip to content

Commit

Permalink
[RM] Feature #59148. NettyMulticastService > fixed service disposal
Browse files Browse the repository at this point in the history
 * dropped bindIp setting
 * renamed localIp to interfaceIp
 * added TTL setting

Change-Id: I145ff4b120860f1dda9dce314ee4aaedd2191e74
  • Loading branch information
cordwelt committed Dec 20, 2019
1 parent 7374864 commit 137bfe0
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 57 deletions.
Expand Up @@ -15,16 +15,23 @@
******************************************************************************/
package com.exactpro.sf.services.netty;

import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.util.LinkedHashMap;

import com.exactpro.sf.common.messages.structures.IDictionaryStructure;
import com.exactpro.sf.configuration.IDictionaryManager;
import com.exactpro.sf.services.IServiceSettings;
import com.exactpro.sf.services.ServiceException;
import com.exactpro.sf.services.ServiceStatus;
import com.exactpro.sf.services.netty.handlers.ExceptionInboundHandler;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ChannelFactory;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
Expand All @@ -35,12 +42,6 @@
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;

import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.util.LinkedHashMap;
import java.util.Map.Entry;

public abstract class NettyMulticastClientService extends NettyClientService {

//
Expand All @@ -63,6 +64,7 @@ public void start() {
serviceName);

nettySession = createSession();
nioEventLoopGroup = new NioEventLoopGroup();

initChannelHandlers(serviceContext);

Expand All @@ -82,16 +84,21 @@ public void connect() throws Exception {
channelLock.writeLock().lock();
LinkedHashMap<String, ChannelHandler> handlers = getChannelHandlers();

String localIP = getSettings().getLocalIP();
String bindIp = (getSettings().getBindIp() == null) ? getSettings().getMulticastIp() : getSettings().getBindIp();
String interfaceIp = getSettings().getInterfaceIp();
String mcastIp = getSettings().getMulticastIp();
int mcastPort = getSettings().getMulticastPort();
this.localNetworkInterface = NetworkInterface.getByInetAddress(Inet4Address.getByName(localIP));
this.multicastGroup = new InetSocketAddress(Inet4Address.getByName(mcastIp), mcastPort);

this.localNetworkInterface = NetworkInterface.getByInetAddress(Inet4Address.getByName(interfaceIp));

if (this.localNetworkInterface == null) {
throw new ServiceException("Failed to resolve network interface via IP: " + interfaceIp);
}

this.multicastGroup = new InetSocketAddress(InetAddress.getByName(mcastIp), mcastPort);

Bootstrap cb = new Bootstrap();
// Fixme: use ITaskExecutor ?
cb.group(new NioEventLoopGroup());
cb.group(nioEventLoopGroup);
cb.channelFactory(new ChannelFactory<Channel>() {
@Override
public Channel newChannel() {
Expand All @@ -101,15 +108,14 @@ public Channel newChannel() {
});
cb.option(ChannelOption.SO_REUSEADDR, true);
cb.option(ChannelOption.IP_MULTICAST_IF, localNetworkInterface);
cb.localAddress(new InetSocketAddress(Inet4Address.getByName(bindIp), mcastPort));
cb.option(ChannelOption.IP_MULTICAST_TTL, getSettings().getTtl());
cb.localAddress(new InetSocketAddress(InetAddress.getByName(mcastIp), mcastPort));
// we can configure java -Dio.netty.allocator.numDirectArenas=... -Dio.netty.allocator.numHeapArenas=...
cb.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
for (Entry<String, ChannelHandler> entry : handlers.entrySet()) {
ch.pipeline().addLast(entry.getKey(), entry.getValue());
}
handlers.forEach((key, value) -> ch.pipeline().addLast(key, value));
// add exception handler for inbound messages
// outbound exceptions will be routed here by ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE
ch.pipeline().addLast(new ExceptionInboundHandler(nettySession));
Expand All @@ -131,7 +137,7 @@ public void operationComplete(ChannelFuture bindFuture) throws Exception {
if (sourceIP == null) {
future = channel.joinGroup(multicastGroup, localNetworkInterface);
} else {
future = channel.joinGroup(multicastGroup.getAddress(), localNetworkInterface, Inet4Address.getByName(sourceIP));
future = channel.joinGroup(multicastGroup.getAddress(), localNetworkInterface, InetAddress.getByName(sourceIP));
}
future.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
Expand All @@ -144,29 +150,6 @@ public void operationComplete(ChannelFuture bindFuture) throws Exception {
}
}

@Override
public void dispose() {
changeStatus(ServiceStatus.DISPOSING, "Service disposing", null);

NettySession session = nettySession;
if (session != null) {
nettySession = null;
session.close();
}

Channel localChannel = getChannel();

if (localChannel != null) {
localChannel.close().syncUninterruptibly();
}

changeStatus(ServiceStatus.DISPOSED, "Service disposed", null); // FIXME: the same called from closeFuture.listen

logConfigurator.destroyIndividualAppender(getClass().getName() + "@" + Integer.toHexString(hashCode()),
serviceName);
}


public void stop(String message, Throwable cause) {
changeStatus(ServiceStatus.DISPOSING, message, cause);
stopSendHeartBeats();
Expand Down
Expand Up @@ -24,11 +24,8 @@
public class NettyMulticastClientSettings extends NettyClientSettings {
private static final long serialVersionUID = -5243141927769743936L;

@Description("Specify default interface for outgoing multicast (service will resolve this IP to network interface)")
private String localIP;

@Description("Local IP (service will lookup this ip to determine network interface)")
private String bindIp;
@Description("Specify IP of an interface for outgoing multicast (service will resolve this IP to network interface)")
private String interfaceIp;

@Description("If specified - service will drop all messages receved from non-sourceIp addresses")
private String sourceIp;
Expand All @@ -39,20 +36,15 @@ public class NettyMulticastClientSettings extends NettyClientSettings {
@Description("Multicast port (service have to bind to this port in order to listen multicast)")
private int multicastPort;

public String getLocalIP() {
return localIP;
}
@Description("Specifies TTL for outgoing packets")
private int ttl = 1;

public void setLocalIP(String localIP) {
this.localIP = localIP;
}

public String getBindIp() {
return bindIp;
}
public String getInterfaceIp() {
return interfaceIp;
}

public void setBindIp(String bindIp) {
this.bindIp = bindIp;
public void setInterfaceIp(String interfaceIp) {
this.interfaceIp = interfaceIp;
}

public String getSourceIp() {
Expand All @@ -79,6 +71,14 @@ public void setMulticastPort(int multicastPort) {
this.multicastPort = multicastPort;
}

public int getTtl() {
return ttl;
}

public void setTtl(int ttl) {
this.ttl = ttl;
}

@Override
public SailfishURI getDictionaryName() {
return null;
Expand Down

0 comments on commit 137bfe0

Please sign in to comment.