Skip to content

Commit

Permalink
spacemanager: Use imported PoolMonitor to select link group
Browse files Browse the repository at this point in the history
Space manager in some cases has to ask PoolManager to select a
link group. This patch changes the code to use an imported copy
of the PoolMonitor to achieve the same effect. This reduces
load on PoolManager and reduces latency in SpaceManager.

Target: trunk
Require-notes: yes
Require-book: no
Acked-by: Albert Rossi <arossi@fnal.gov>
Patch: http://rb.dcache.org/r/6351/
  • Loading branch information
gbehrmann committed Dec 18, 2013
1 parent 5273cfd commit d54d1eb
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 196 deletions.
Expand Up @@ -45,7 +45,6 @@
import diskCacheV111.vehicles.QuotaMgrCheckQuotaMessage;
import diskCacheV111.vehicles.StorageInfo;

import dmg.cells.nucleus.CDC;
import dmg.cells.nucleus.CellAddressCore;
import dmg.cells.nucleus.CellInfo;
import dmg.cells.nucleus.CellMessage;
Expand All @@ -65,7 +64,6 @@
import org.dcache.poolmanager.Utils;
import org.dcache.util.Version;
import org.dcache.vehicles.FileAttributes;
import org.dcache.vehicles.PoolManagerSelectLinkGroupForWriteMessage;

import static com.google.common.collect.Iterables.transform;

Expand Down Expand Up @@ -673,84 +671,6 @@ private boolean quotasExceeded(FileAttributes fileAttributes) {

}

public DelayedReply messageArrived(PoolManagerSelectLinkGroupForWriteMessage message)
{
return new LinkGroupSelectionTask(message);
}

/**
* Task for processing link group selection messages.
*/
public class LinkGroupSelectionTask
extends DelayedReply
implements Runnable
{
private final PoolManagerSelectLinkGroupForWriteMessage _message;
private final CDC _cdc;

public LinkGroupSelectionTask(PoolManagerSelectLinkGroupForWriteMessage message)
{
_message = message;
_cdc = new CDC();
new Thread(this, "LinkGroupSelectionTask").start();
}

@Override
public void run()
{
long started = System.currentTimeMillis();
try (CDC ignored = _cdc.restore()) {
_log.info("Select link group handler started");

_message.setLinkGroups(selectLinkGroups());
_message.setSucceeded();

_log.info("Select link group handler finished after {} ms",
(System.currentTimeMillis() - started));
} catch (Exception e) {
_message.setFailed(CacheException.UNEXPECTED_SYSTEM_EXCEPTION,
e.getMessage());
} finally {
reply(_message);
}
}

protected List<String> selectLinkGroups()
{
FileAttributes fileAttributes = _message.getFileAttributes();
ProtocolInfo protocolInfo = _message.getProtocolInfo();
String protocol =
protocolInfo.getProtocol() + "/" + protocolInfo.getMajorVersion();
String hostName =
(protocolInfo instanceof IpProtocolInfo)
? ((IpProtocolInfo) protocolInfo).getSocketAddress().getAddress().getHostAddress()
: null;

Collection<String> linkGroups = _message.getLinkGroups();
if (linkGroups == null) {
linkGroups =
Utils.linkGroupInfos(_selectionUnit, _costModule).keySet();
}

List<String> outputLinkGroups =
new ArrayList<>(linkGroups.size());

for (String linkGroup: linkGroups) {
PoolPreferenceLevel [] level =
_selectionUnit.match(DirectionType.WRITE,
hostName,
protocol,
fileAttributes.getStorageInfo(),
linkGroup);
if (level.length > 0) {
outputLinkGroups.add(linkGroup);
}
}

return outputLinkGroups;
}
}

public PoolManagerGetPoolMonitor
messageArrived(PoolManagerGetPoolMonitor msg)
{
Expand Down
Expand Up @@ -47,6 +47,7 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
Expand All @@ -58,6 +59,8 @@
import java.util.Objects;
import java.util.Set;

import diskCacheV111.poolManager.PoolPreferenceLevel;
import diskCacheV111.poolManager.PoolSelectionUnit;
import diskCacheV111.services.space.message.CancelUse;
import diskCacheV111.services.space.message.ExtendLifetime;
import diskCacheV111.services.space.message.GetFileSpaceTokensMessage;
Expand All @@ -81,9 +84,9 @@
import diskCacheV111.util.PnfsId;
import diskCacheV111.util.RetentionPolicy;
import diskCacheV111.util.ThreadManager;
import diskCacheV111.util.TimeoutCacheException;
import diskCacheV111.util.VOInfo;
import diskCacheV111.vehicles.DoorTransferFinishedMessage;
import diskCacheV111.vehicles.IpProtocolInfo;
import diskCacheV111.vehicles.Message;
import diskCacheV111.vehicles.PnfsDeleteEntryNotificationMessage;
import diskCacheV111.vehicles.PoolAcceptFileMessage;
Expand All @@ -108,9 +111,10 @@
import org.dcache.auth.Subjects;
import org.dcache.cells.CellStub;
import org.dcache.namespace.FileAttribute;
import org.dcache.poolmanager.PoolMonitor;
import org.dcache.poolmanager.Utils;
import org.dcache.util.JdbcConnectionPool;
import org.dcache.vehicles.FileAttributes;
import org.dcache.vehicles.PoolManagerSelectLinkGroupForWriteMessage;

/**
* <pre> Space Manager dCache service provides ability
Expand Down Expand Up @@ -164,7 +168,7 @@ public final class Manager
private static IoPackage<File> fileIO = new FileIO();
private static IoPackage<Space> spaceReservationIO = new SpaceReservationIO();
private static IoPackage<LinkGroup> linkGroupIO = new LinkGroupIO();

private PoolMonitor poolMonitor;

@Required
public void setPoolManagerStub(CellStub poolManagerStub)
Expand All @@ -184,6 +188,11 @@ public void setPnfsHandler(PnfsHandler pnfs)
this.pnfs = pnfs;
}

public void setPoolMonitor(PoolMonitor poolMonitor)
{
this.poolMonitor = poolMonitor;
}

@Required
public void setJdbcUrl(String jdbcUrl)
{
Expand Down Expand Up @@ -4316,35 +4325,11 @@ private long reserveSpace(Subject subject,
protocolInfo != null &&
fileAttributes != null) {
try {
PoolManagerSelectLinkGroupForWriteMessage msg=
new PoolManagerSelectLinkGroupForWriteMessage(pnfsId,
fileAttributes,
protocolInfo,
sizeInBytes);
msg.setLinkGroups(linkGroupNames);
logger.trace("Sending PoolManagerSelectLinkGroupForWriteMessage");
msg=poolManagerStub.sendAndWait(msg);
linkGroupNames=msg.getLinkGroups();
logger.trace("received PoolManagerSelectLink" +
"GroupForWriteMessage reply, number " +
"of LinkGroups={}", linkGroupNames.size());
linkGroupNames = selectLinkGroupForWrite(protocolInfo, fileAttributes, linkGroupNames);
if(linkGroupNames.isEmpty()) {
throw new SpaceAuthorizationException("PoolManagerSelectLinkGroupForWriteMessage: Failed to find LinkGroup where user is authorized to reserve space.");
}
}
catch (TimeoutCacheException e) {
throw new SpaceException(
"PoolManagerSelectLinkGroupForWriteMessage: request timed out ",
e);
}
catch (CacheException e) {
throw new SpaceException("Internal error : PoolManagerSelectLinkGroupForWriteMessage exception ",
e);
}
catch (InterruptedException e) {
throw new SpaceException("Request was interrupted",
e);
}
catch (SpaceAuthorizationException e) {
logger.warn("authorization problem: {}",
e.getMessage());
Expand Down Expand Up @@ -4375,6 +4360,30 @@ private long reserveSpace(Subject subject,
description);
}

private List<String> selectLinkGroupForWrite(ProtocolInfo protocolInfo, FileAttributes fileAttributes,
Collection<String> linkGroups)
{
String protocol = protocolInfo.getProtocol() + "/" + protocolInfo.getMajorVersion();
String hostName =
(protocolInfo instanceof IpProtocolInfo)
? ((IpProtocolInfo) protocolInfo).getSocketAddress().getAddress().getHostAddress()
: null;

List<String> outputLinkGroups = new ArrayList<>(linkGroups.size());
for (String linkGroup: linkGroups) {
PoolPreferenceLevel[] level =
poolMonitor.getPoolSelectionUnit().match(PoolSelectionUnit.DirectionType.WRITE,
hostName,
protocol,
fileAttributes.getStorageInfo(),
linkGroup);
if (level.length > 0) {
outputLinkGroups.add(linkGroup);
}
}
return outputLinkGroups;
}

private long reserveSpaceInLinkGroup(long linkGroupId,
String voGroup,
String voRole,
Expand Down

This file was deleted.

Expand Up @@ -14,13 +14,27 @@
<property name="location" value="arguments:"/>
</bean>

<bean id="scheduled-executor"
class="java.util.concurrent.Executors"
factory-method="newScheduledThreadPool"
destroy-method="shutdown">
<description>Task scheduler</description>
<constructor-arg value="1"/>
</bean>

<bean id="pool-manager-stub" class="org.dcache.cells.CellStub">
<description>Pool manager communication stub</description>
<property name="destination" value="${srm.service.poolmanager}"/>
<property name="timeout" value="${srm.service.poolmanager.timeout}"/>
<property name="timeoutUnit" value="${srm.service.poolmanager.timeout.unit}"/>
</bean>

<bean id="pool-monitor" class="org.dcache.poolmanager.RemotePoolMonitorFactoryBean" init-method="init">
<description>Maintains runtime information about all pools</description>
<property name="poolManagerStub" ref="pool-manager-stub"/>
<property name="executor" ref="scheduled-executor"/>
</bean>

<bean id="pnfs-stub" class="org.dcache.cells.CellStub">
<description>PNFS manager communication stub</description>
<property name="destination" value="${srm.service.pnfsmanager}"/>
Expand Down Expand Up @@ -74,6 +88,7 @@
<property name="pnfsStub" ref="pnfs-stub"/>
<property name="pnfsHandler" ref="pnfs"/>
<property name="poolManagerStub" ref="pool-manager-stub"/>
<property name="poolMonitor" ref="pool-monitor"/>
<property name="jdbcUrl" value="${spacemanager.db.url}"/>
<property name="jdbcDriver" value="${spacemanager.db.driver}"/>
<property name="jdbcUser" value="${spacemanager.db.user}"/>
Expand Down

0 comments on commit d54d1eb

Please sign in to comment.