Skip to content

Commit

Permalink
pool: Add support for xrootd ChannelHandler plugins
Browse files Browse the repository at this point in the history
This patch adds support for injecting xrootd channel handler
plugins in pools. Such plugins can intercept and alter any
xrootd message exchanged between the client and the server.
Such plugins were already supported on xrootd doors, but this
patch adds the same mechanism to pools. Obvious plugins to add
are additional authentication plugins or monitoring plugins.

The xrootdPlugins property is now defined separately for pools
and doors. If specified outside the scope of a door or pool,
the property has to be prefixed with the scoping operator. 
Existing deployments may have to update their configuration
accordingly!

Require-book: yes
Require-notes: yes
Target: trunk
Acked-by: Karsten Schwank <karsten.schwank@desy.de>
Patch: http://rb.dcache.org/r/4864/
  • Loading branch information
Gerd Behrmann committed Jan 31, 2013
1 parent d5a4df0 commit 4acadb6
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 32 deletions.
Expand Up @@ -3,6 +3,7 @@
import static org.jboss.netty.channel.Channels.pipeline;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.dcache.vehicles.XrootdProtocolInfo;
Expand All @@ -11,6 +12,7 @@
import org.dcache.xrootd.core.XrootdDecoder;
import org.dcache.xrootd.core.XrootdEncoder;
import org.dcache.xrootd.core.XrootdHandshakeHandler;
import org.dcache.xrootd.plugins.ChannelHandlerFactory;
import org.dcache.xrootd.protocol.XrootdProtocol;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
Expand Down Expand Up @@ -44,28 +46,33 @@ public class XrootdPoolNettyServer
private final long _clientIdleTimeout;

private int _numberClientConnections;
private List<ChannelHandlerFactory> _plugins;

public XrootdPoolNettyServer(int threadPoolSize,
int memoryPerConnection,
int maxMemory,
long clientIdleTimeout) {
long clientIdleTimeout,
List<ChannelHandlerFactory> plugins) {
this(threadPoolSize,
memoryPerConnection,
maxMemory,
clientIdleTimeout,
plugins,
-1);
}

public XrootdPoolNettyServer(int threadPoolSize,
int memoryPerConnection,
int maxMemory,
long clientIdleTimeout,
List<ChannelHandlerFactory> plugins,
int socketThreads) {
super(threadPoolSize,
memoryPerConnection,
maxMemory,
socketThreads);
_clientIdleTimeout = clientIdleTimeout;
_plugins = plugins;

String range = System.getProperty("org.globus.tcp.port.range");
PortRange portRange =
Expand Down Expand Up @@ -112,6 +119,10 @@ public ChannelPipeline getPipeline() throws Exception {
pipeline.addLast("handshake",
new XrootdHandshakeHandler(XrootdProtocol.DATA_SERVER));
pipeline.addLast("executor", new ExecutionHandler(getDiskExecutor()));
for (ChannelHandlerFactory plugin: _plugins) {
pipeline.addLast("plugin:" + plugin.getName(),
plugin.createHandler());
}
pipeline.addLast("timeout", new IdleStateHandler(_timer,
0,
0,
Expand Down
@@ -1,5 +1,6 @@
package org.dcache.xrootd.pool;

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.InetAddress;
import java.net.Inet4Address;
Expand All @@ -8,28 +9,39 @@
import java.util.Collection;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import dmg.cells.nucleus.CellPath;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;

import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import dmg.cells.nucleus.CellEndpoint;
import dmg.cells.nucleus.CellMessage;
import dmg.cells.nucleus.NoRouteToCellException;

import dmg.util.Args;
import org.dcache.pool.movers.MoverProtocol;
import dmg.cells.nucleus.CellPath;
import dmg.util.Formats;
import dmg.util.Replaceable;
import org.dcache.pool.movers.MoverChannel;
import org.dcache.pool.movers.IoMode;
import org.dcache.pool.movers.MoverProtocol;
import org.dcache.pool.repository.RepositoryChannel;
import org.dcache.pool.repository.Allocator;
import org.dcache.util.NetworkUtils;
import org.dcache.vehicles.FileAttributes;
import org.dcache.pool.movers.IoMode;
import org.dcache.vehicles.XrootdProtocolInfo;
import org.dcache.vehicles.XrootdDoorAdressInfoMessage;
import diskCacheV111.util.CacheException;
import diskCacheV111.movers.NetIFContainer;
import diskCacheV111.vehicles.ProtocolInfo;

import org.dcache.xrootd.plugins.ChannelHandlerFactory;
import org.dcache.xrootd.plugins.ChannelHandlerProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -74,6 +86,9 @@ public class XrootdProtocol_3
private static final Logger _log =
LoggerFactory.getLogger(XrootdProtocol_3.class);

private final static ServiceLoader<ChannelHandlerProvider> _channelHandlerProviders =
ServiceLoader.load(ChannelHandlerProvider.class);

/**
* Communication endpoint.
*/
Expand All @@ -96,35 +111,113 @@ public class XrootdProtocol_3
*/
private XrootdProtocolInfo _protocolInfo;

private static synchronized void initSharedResources(Args args) {
if (_server == null) {
int threads = args.getIntOption("xrootd-mover-disk-threads");
int perChannelLimit = args.getIntOption("xrootd-mover-max-memory-per-connection");
int totalLimit = args.getIntOption("xrootd-mover-max-memory");
private static List<ChannelHandlerFactory> createPluginFactories(
Properties properties)
throws Exception
{
List<ChannelHandlerFactory> factories = Lists.newArrayList();
String plugins = properties.getProperty("xrootdPlugins");
for (String plugin: Splitter.on(",").omitEmptyStrings().split(plugins)) {
factories.add(createChannelHandlerFactory(properties, plugin));
}
return factories;
}

int clientIdleTimeout = args.getIntOption("xrootd-mover-idle-client-timeout");
private static ChannelHandlerFactory createChannelHandlerFactory(
Properties properties, String plugin)
throws Exception
{
for (ChannelHandlerProvider provider: _channelHandlerProviders) {
ChannelHandlerFactory factory =
provider.createFactory(plugin, properties);
if (factory != null) {
return factory;
}
}
throw new IllegalArgumentException("Channel handler plugin not found: " + plugin);
}

String socketThreads = args.getOpt("xrootd-mover-socket-threads");
private static synchronized void initSharedResources(CellEndpoint endpoint)
throws Exception
{
if (_server == null) {
Properties properties = getConfiguration(endpoint);

int threads = Integer.parseInt(properties.getProperty("xrootd-mover-disk-threads"));
int perChannelLimit = Integer.parseInt(properties
.getProperty("xrootd-mover-max-memory-per-connection"));
int totalLimit = Integer.parseInt(properties
.getProperty("xrootd-mover-max-memory"));
int clientIdleTimeout = Integer.parseInt(properties
.getProperty("xrootd-mover-idle-client-timeout"));
String socketThreads = properties.getProperty("xrootd-mover-socket-threads");
List<ChannelHandlerFactory> plugins = createPluginFactories(properties);

if (socketThreads == null || socketThreads.isEmpty()) {
_server = new XrootdPoolNettyServer(threads,
perChannelLimit,
totalLimit,
clientIdleTimeout);
_server = new XrootdPoolNettyServer(
threads,
perChannelLimit,
totalLimit,
clientIdleTimeout,
plugins);
} else {
_server = new XrootdPoolNettyServer(threads,
perChannelLimit,
totalLimit,
clientIdleTimeout,
Integer.parseInt(socketThreads));
_server = new XrootdPoolNettyServer(
threads,
perChannelLimit,
totalLimit,
clientIdleTimeout,
plugins,
Integer.parseInt(socketThreads));
}
}
}

private static Properties getConfiguration(CellEndpoint endpoint)
{
try {
/* REVISIT: UGLY hack to get to the environment. We should change
* CellEndpoint to provide access to the configuration rather
* just to the cell arguments.
*
* For now we do it like this because we need this code to be
* backportable to 2.2 and 1.9.12.
*/
Field field = endpoint.getClass().getDeclaredField("_environment");
field.setAccessible(true);

final Map<String,Object> env = (Map<String,Object>) field.get(endpoint);
Replaceable replaceable = new Replaceable() {
@Override
public String getReplacement(String name)
{
Object value = env.get(name);
return (value == null) ? null : value.toString().trim();
}
};

Properties properties = new Properties();
for (Map.Entry<String,Object> e: env.entrySet()) {
String key = e.getKey();
String value = String.valueOf(e.getValue());
properties.setProperty(key, Formats.replaceKeywords(value, replaceable));
}

for (Map.Entry<String,String> opt: endpoint.getArgs().optionsAsMap().entrySet()) {
properties.setProperty(opt.getKey(), opt.getValue());
}

return properties;
} catch (NoSuchFieldException e) {
throw Throwables.propagate(e);
} catch (IllegalAccessException e) {
throw Throwables.propagate(e);
}
}

public XrootdProtocol_3(CellEndpoint endpoint)
public XrootdProtocol_3(CellEndpoint endpoint) throws Exception
{
_endpoint = endpoint;
initSharedResources(_endpoint.getArgs());
initSharedResources(_endpoint);
}

@Override
Expand Down
Expand Up @@ -97,7 +97,7 @@ xrootdAllowedWritePaths=/
[dCacheDomain/xrootd]
cell.name=Xrootd/gsi-${host.name}
port=1095
xrootdPlugins=gplazma:gsi
xrootd/xrootdPlugins=gplazma:gsi
xrootdAllowedWritePaths=/

[dCacheDomain/webdav]
Expand Down
35 changes: 35 additions & 0 deletions skel/share/defaults/pool.properties
Expand Up @@ -138,6 +138,41 @@ xrootdMoverMaxMemory=67108864
#
xrootdMoverMaxFrameSize=2097152

# ---- Xrootd plugins
#
# Comma seperated list of plugins to inject into the xrootd
# request processing chain.
#
# dCache ships with a few authentication and authorization plugins:
#
# authn:gsi - any xrootd request to the door will use a key-exchange
# process to identify the end-user (pool only).
#
# authz:alice-token - ALICE token based authorization plugin.
#
# The property applies to both the xrootd door and the xrootd mover.
# The defaults are scoped to the xrootd service and pool service,
# respectively. Hence the property must either be defined per service
# or with an explicit scope prefix. See pool.properties for information
# about xrootd plugins for doors.
#
# For pools no plugins are required. If an authentication plugin is
# specified, then note that the subject will *not* be mapped by
# gPlazma.
#
# Pools authorize access using a one-time token generated by the door
# when redirecting the client to the pool. For this reason an
# authentication or authorization plugin is usually *not* needed
# on pools.
#
# Third party plugins can be used by adding the plugin to the plugin
# directory of dCache and specifing the plugin name here. Authentication
# and authorization plugins can be loaded as authn:<plugin> and
# authz:<plugin> respectively.
#
pool/xrootdPlugins=


# ---- Thread pool size for http disk IO threads
httpMoverDiskThreads=20

Expand Down
21 changes: 13 additions & 8 deletions skel/share/defaults/xrootd.properties
Expand Up @@ -116,30 +116,35 @@ xrootdAllowedWritePaths=

# ---- Xrootd plugins
#
# Comman seperated list of plugins to inject into the xrootd
# Comma seperated list of plugins to inject into the xrootd
# request processing chain.
#
# dCache ships with a few authentication and authorization plugins:
#
# gplazma:none - no authentication is performed; user identity is
# set to the value of xrootdUser
# set to the value of xrootdUser (door only).
#
# gplazma:gsi - any xrootd request to the door will use a key-exchange
# process to identify the end-user.
# process to identify the end-user (door only).
#
# authz:none - no additional authorization is performed (file system
# level authorization checks are still enforced).
# authz:alice-token - ALICE token based authorization plugin.
#
# authz:alice-token - ALICE token based authorization plugin
# The property applies to both the xrootd door and the xrootd mover.
# The defaults are scoped to the xrootd service and pool service,
# respectively. Hence the property must either be defined per service
# or with an explicit scope prefix. See pool.properties for information
# about xrootd plugins for pools.
#
# For xrootd doors, a gplazma authentication plugin is required; use
# gplazma:none if no authentication is desired. Authorization plugins
# have to be placed after the authentication plugin.
#
# Third party plugins can be used by adding the plugin to the plugin
# directory of dCache and specifing the plugin name here.
# directory of dCache and specifing the plugin name here. Note that
# third party authentication plugins have to be loaded with
# gplazma:<plugin> in doors.
#
xrootdPlugins=gplazma:${xrootdAuthNPlugin},authz:${xrootdAuthzPlugin}
xrootd/xrootdPlugins=gplazma:${xrootdAuthNPlugin},authz:${xrootdAuthzPlugin}

# ---- User identity used for authorizing operations
#
Expand Down

0 comments on commit 4acadb6

Please sign in to comment.