Skip to content

Commit

Permalink
Use FileAttributes class in pool and pool related messages
Browse files Browse the repository at this point in the history
A long time ago we introduced the FileAttributes class. Much code has been
ported to this class, however there is still plenty of code that uses StorageInfo
directly.

This patch updates the pool and pool related messages to use FileAttributes. Care has
been taken to maintain backwards compatibility with older pools.

StorageInfo is still used, and will likely also be used in the future: The
pool serializes this class and thus it will not be easy to get rid of. Our code
should however as far as possible use FileAttributes.

Now is a perfect time for the patch. The code in the messages that maintains
backwards compatibility can be removed between the release of 2.6 and 2.7.

Target: trunk
Require-notes: no
Require-book: no
Acked-by: Tigran Mkrtchyan <tigran.mkrtchyan@desy.de>
Patch: http://rb.dcache.org/r/5107/
  • Loading branch information
Gerd Behrmann committed Jan 22, 2013
1 parent e59531a commit 5245c98
Show file tree
Hide file tree
Showing 54 changed files with 549 additions and 436 deletions.
Expand Up @@ -58,7 +58,7 @@ public final RetentionPolicy getDefaultRetentionPolicy() {
/*
* (non-Javadoc)
*
* @see diskCacheV111.util.StorageInfoExtractable#getStorageInfo(java.lang.String,
* @see diskCacheV111.util.StorageInfoExtractable#getFileAttributes(java.lang.String,
* diskCacheV111.util.PnfsId)
*/

Expand Down
Expand Up @@ -2221,17 +2221,15 @@ private void storeChecksumInPnfs( PnfsId pnfsId , String checksumString){
poolMessage =
new PoolDeliverFileMessage(
pool,
_fileAttributes.getPnfsId(),
_protocolInfo ,
_fileAttributes.getStorageInfo());
_fileAttributes);
}else if( reply instanceof PoolMgrSelectWritePoolMsg ){

poolMessage =
new PoolAcceptFileMessage(
pool,
_fileAttributes.getPnfsId(),
_protocolInfo ,
_fileAttributes.getStorageInfo());
_fileAttributes);
}else{
sendReply( "poolMgrGetPoolArrived" , 7 ,
"Illegal Message arrived : "+reply.getClass().getName() ) ;
Expand Down
Expand Up @@ -10,6 +10,7 @@
import java.util.Map;
import java.security.MessageDigest;

import org.dcache.vehicles.FileAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -278,15 +279,13 @@ protected String getCellDomainName()
}

@Override
public void runIO(RepositoryChannel fileChannel,
public void runIO(FileAttributes fileAttributes,
RepositoryChannel fileChannel,
ProtocolInfo protocol,
StorageInfo storage,
PnfsId pnfsId,
Allocator allocator,
IoMode access )

throws Exception {

throws Exception
{
Exception ioException = null;

if(! (protocol instanceof DCapProtocolInfo)) {
Expand All @@ -295,7 +294,8 @@ public void runIO(RepositoryChannel fileChannel,
}
DCapProtocolInfo dcapProtocolInfo = (DCapProtocolInfo)protocol;

_pnfsId = pnfsId;
StorageInfo storage = fileAttributes.getStorageInfo();
_pnfsId = fileAttributes.getPnfsId();
_spaceMonitorHandler = new SpaceMonitorHandler(allocator);

////////////////////////////////////////////////////////////////////////
Expand Down
@@ -1,8 +1,6 @@
package org.dcache.pool.movers;

import java.text.MessageFormat;
import java.nio.channels.FileChannel;
import java.io.RandomAccessFile;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.InetAddress;
Expand All @@ -20,17 +18,16 @@
import diskCacheV111.vehicles.ProtocolInfo;
import diskCacheV111.vehicles.GFtpProtocolInfo;
import diskCacheV111.vehicles.GFtpTransferStartedMessage;
import diskCacheV111.vehicles.StorageInfo;
import org.dcache.util.Checksum;
import org.dcache.util.ChecksumType;
import java.security.NoSuchAlgorithmException;
import diskCacheV111.util.ChecksumFactory;
import diskCacheV111.util.PnfsId;
import diskCacheV111.util.CacheException;
import org.dcache.pool.repository.Allocator;
import org.dcache.util.PortRange;
import org.dcache.util.NetworkUtils;

import org.dcache.vehicles.FileAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.dcache.ftp.*;
Expand Down Expand Up @@ -401,10 +398,9 @@ public void transfer(RepositoryChannel fileChannel, Role role,

/** Part of the MoverProtocol interface. */
@Override
public void runIO(RepositoryChannel fileChannel,
public void runIO(FileAttributes fileAttributes,
RepositoryChannel fileChannel,
ProtocolInfo protocol,
StorageInfo storage,
PnfsId pnfsId,
Allocator allocator,
IoMode access)
throws Exception
Expand Down Expand Up @@ -480,7 +476,7 @@ public void runIO(RepositoryChannel fileChannel,
int localPort =
channel.socket().getLocalPort();
message =
new GFtpTransferStartedMessage(pnfsId.getId(),
new GFtpTransferStartedMessage(fileAttributes.getPnfsId().getId(),
localHostName,
localPort);
mode.setPassive(channel);
Expand All @@ -489,7 +485,7 @@ public void runIO(RepositoryChannel fileChannel,
* active mode. When notified about this, the door
* will fall back to proxy mode.
*/
message = new GFtpTransferStartedMessage(pnfsId.getId());
message = new GFtpTransferStartedMessage(fileAttributes.getPnfsId().getId());
}
CellPath path = new CellPath(gftpProtocolInfo.getDoorCellName(),
gftpProtocolInfo.getDoorCellDomainName());
Expand Down
@@ -1,18 +1,15 @@
package org.dcache.xrootd.pool;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.InetAddress;
import java.net.Inet4Address;
import java.net.NetworkInterface;
import java.net.UnknownHostException;
import java.net.SocketException;
import java.util.Collection;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import dmg.cells.nucleus.CellPath;
import dmg.cells.nucleus.CellEndpoint;
Expand All @@ -23,19 +20,15 @@
import org.dcache.pool.movers.MoverProtocol;
import org.dcache.pool.movers.MoverChannel;
import org.dcache.pool.movers.IoMode;
import org.dcache.pool.movers.MoverChannel;
import org.dcache.pool.repository.RepositoryChannel;
import org.dcache.pool.repository.Allocator;
import org.dcache.pool.repository.RepositoryChannel;
import org.dcache.util.NetworkUtils;
import org.dcache.vehicles.FileAttributes;
import org.dcache.vehicles.XrootdProtocolInfo;
import org.dcache.vehicles.XrootdDoorAdressInfoMessage;
import diskCacheV111.util.CacheException;
import diskCacheV111.util.PnfsId;
import diskCacheV111.util.TimeoutCacheException;
import diskCacheV111.movers.NetIFContainer;
import diskCacheV111.vehicles.ProtocolInfo;
import diskCacheV111.vehicles.StorageInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -135,10 +128,9 @@ public XrootdProtocol_3(CellEndpoint endpoint)
}

@Override
public void runIO(RepositoryChannel fileChannel,
public void runIO(FileAttributes fileAttributes,
RepositoryChannel fileChannel,
ProtocolInfo protocol,
StorageInfo storage,
PnfsId pnfsId,
Allocator allocator,
IoMode access)
throws Exception
Expand Down
Expand Up @@ -968,8 +968,10 @@ private PnfsFlagReply setPnfsFlag(
String dest = args.argv(2) ;
PnfsId pnfsId = new PnfsId( args.argv(0) ) ;

FileAttributes fileAttributes = new FileAttributes();
fileAttributes.setPnfsId(pnfsId);
Pool2PoolTransferMsg p2p =
new Pool2PoolTransferMsg( source , dest , pnfsId , null ) ;
new Pool2PoolTransferMsg( source , dest , fileAttributes ) ;


cellEndPoint.sendMessage(
Expand Down
Expand Up @@ -13,6 +13,7 @@
import dmg.util.Args;
import dmg.util.CommandSyntaxException;

import org.dcache.vehicles.FileAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -51,14 +52,12 @@ public void messageArrived( CellMessage msg ){
}
String pnfsid = PnfsId.toCompleteId( args.argv(1) ) ;


FileAttributes fileAttributes = new FileAttributes();
fileAttributes.setPnfsId(new PnfsId(pnfsid));
fileAttributes.setStorageInfo(new GenericStorageInfo(hsmName, "any"));
_nucleus.sendMessage(
new CellMessage(
path ,
new PoolFetchFileMessage( poolName ,
new GenericStorageInfo(hsmName,"any") ,
pnfsid )
) ) ;
path, new PoolFetchFileMessage( poolName , fileAttributes)));
return "Stay tuned" ;
}
public static final String hh_send_getpool =
Expand Down
Expand Up @@ -1963,10 +1963,10 @@ public void getFileAttributes(PnfsGetFileAttributes message)
Set<FileAttribute> requested = message.getRequestedAttributes();
if(requested.contains(FileAttribute.STORAGEINFO)) {
/*
* TODO: The 'classic' result of getStorageInfo was a
* TODO: The 'classic' result of getFileAttributes was a
* cobination of fileMetadata + storageInfo. This was
* used add the owner and group information into
* sorageInfo's internal Map. Uid and Gid used by the
* used to add the owner and group information into
* storageInfo's internal Map. Uid and Gid are used by the
* HSM flush scripts.
*
* This atavism will have to be cut out when HSM
Expand Down
Expand Up @@ -14,6 +14,8 @@
import java.util.ArrayList;
import java.util.regex.Pattern;

import org.dcache.namespace.FileAttribute;
import org.dcache.vehicles.FileAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.dcache.cells.CellCommandListener;
Expand Down Expand Up @@ -262,15 +264,16 @@ public synchronized void messageToForward(PoolIoFileMessage msg)

int diff = 0;
long pinned = 0;
FileAttributes attributes = msg.getFileAttributes();
if (msg.isReply() && msg.getReturnCode() != 0) {
diff = -1;
if (msg instanceof PoolAcceptFileMessage) {
pinned = -msg.getStorageInfo().getFileSize();
if (msg instanceof PoolAcceptFileMessage && attributes.isDefined(FileAttribute.SIZE)) {
pinned = -msg.getFileAttributes().getSize();
}
} else if (!msg.isReply() && !_magic) {
diff = 1;
if (msg instanceof PoolAcceptFileMessage) {
pinned = msg.getStorageInfo().getFileSize();
if (msg instanceof PoolAcceptFileMessage && attributes.isDefined(FileAttribute.SIZE)) {
pinned = msg.getFileAttributes().getSize();
}
}

Expand Down Expand Up @@ -333,14 +336,19 @@ public synchronized void messageToForward(PoolFetchFileMessage msg)

int diff;
long pinned;
if (!msg.isReply()) {
diff = 1;
pinned = msg.getStorageInfo().getFileSize();
} else {
diff = -1;
pinned = 0;
}
queue.modifyQueue(diff);
if (msg.isReply()) {
diff = -1;
pinned = 0;
} else {
diff = 1;
FileAttributes attributes = msg.getFileAttributes();
if (attributes.isDefined(FileAttribute.SIZE)) {
pinned = attributes.getSize();
} else {
pinned = 0;
}
}
queue.modifyQueue(diff);
spaceInfo.modifyPinnedSpace(pinned);
considerInvalidatingCache(currentPerformanceCost, costInfo);
xsay("Restore", poolName, diff, pinned, msg);
Expand Down Expand Up @@ -421,7 +429,7 @@ public synchronized void messageToForward(Pool2PoolTransferMsg msg)
destinationCostInfo.getSpaceInfo();

int diff = msg.isReply() ? -1 : 1;
long pinned = msg.getStorageInfo().getFileSize();
long pinned = msg.getFileAttributes().getSize();

sourceQueue.modifyQueue(diff);
destinationQueue.modifyQueue(diff);
Expand Down
Expand Up @@ -1051,16 +1051,15 @@ private void setError( int errorCode , String errorMessage ){
_currentRc = errorCode ;
_currentRm = errorMessage ;
}
private boolean sendFetchRequest( String poolName , StorageInfo storageInfo )

private boolean sendFetchRequest(String poolName)
throws NoRouteToCellException
{

CellMessage cellMessage = new CellMessage(
new CellPath( poolName ),
new PoolFetchFileMessage(
poolName,
storageInfo,
_pnfsId )
_fileAttributes)
);
synchronized( _messageHash ){
if( ( _maxRestore >=0 ) &&
Expand All @@ -1078,7 +1077,7 @@ private void sendPool2PoolRequest( String sourcePool , String destPool )
throws NoRouteToCellException
{
Pool2PoolTransferMsg pool2pool =
new Pool2PoolTransferMsg(sourcePool,destPool,_pnfsId,_storageInfo) ;
new Pool2PoolTransferMsg(sourcePool, destPool, _fileAttributes);
pool2pool.setDestinationFileStatus( _destinationFileStatus ) ;
_log.info("[p2p] Sending transfer request: "+pool2pool);
CellMessage cellMessage =
Expand Down Expand Up @@ -2117,7 +2116,7 @@ private int askForStaging()
_stageCandidateHost = pool.getHostName();

_log.info("[staging] poolCandidate -> {}", _poolCandidate);
if (!sendFetchRequest(_poolCandidate, _storageInfo)) {
if (!sendFetchRequest(_poolCandidate)) {
return RT_OUT_OF_RESOURCES;
}

Expand Down
Expand Up @@ -15,6 +15,7 @@
import java.io.* ;
import java.util.*;

import org.dcache.vehicles.FileAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -511,8 +512,10 @@ private void startTransfer( PoolFileEntry entry ){

entry.timestamp = System.currentTimeMillis() ;

FileAttributes fileAttributes = new FileAttributes();
fileAttributes.setPnfsId(entry.getPnfsId());
Pool2PoolTransferMsg pool2pool =
new Pool2PoolTransferMsg( _source , entry._destination , entry.getPnfsId() , null ) ;
new Pool2PoolTransferMsg(_source, entry._destination, fileAttributes);

CellMessage msg = new CellMessage( new CellPath(entry._destination) , pool2pool ) ;

Expand Down

0 comments on commit 5245c98

Please sign in to comment.