Skip to content

Commit

Permalink
pool: Let DoorTransferFinished and PoolFileFlushed use FileAttributes
Browse files Browse the repository at this point in the history
We are slowly moving away from StorageInfo. This patch moves two
pool notification messages to FileAttributes, although it is still
used to transport StorageInfo.

Care is taken to maintain backwards compatibility with old pools
(two way for DoorTransferFinished as it may be sent between pools).

Target: trunk
Require-notes: no
Require-book: no
Acked-by: Tigran Mkrtchyan <tigran.mkrtchyan@desy.de>
Acked-by: Paul Millar <paul.millar@desy.de>
Patch: http://rb.dcache.org/r/5323/
  • Loading branch information
gbehrmann committed Apr 2, 2013
1 parent f31e94b commit 6c7de8a
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 81 deletions.
Expand Up @@ -2378,7 +2378,7 @@ public void poolPassiveIoFileMessage( PoolPassiveIoFileMessage<byte[]> reply) {

if( reply.getReturnCode() == 0 ){

long filesize = reply.getStorageInfo().getFileSize() ;
long filesize = reply.getFileAttributes().getSize() ;
_log.info("doorTransferArrived : fs={};strict={};m={}", filesize, _strictSize, _ioMode);
if( _strictSize && ( filesize > 0L ) && (_ioMode.contains("w")) ){

Expand Down
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import diskCacheV111.util.AccessLatency;
import diskCacheV111.util.CacheException;
import diskCacheV111.util.ChecksumFactory;
import diskCacheV111.util.FileMetaData;
Expand Down Expand Up @@ -54,7 +53,6 @@
import diskCacheV111.vehicles.PnfsSetChecksumMessage;
import diskCacheV111.vehicles.PnfsSetFileMetaDataMessage;
import diskCacheV111.vehicles.PoolFileFlushedMessage;
import diskCacheV111.vehicles.PoolSetStickyMessage;
import diskCacheV111.vehicles.StorageInfo;

import dmg.cells.nucleus.CDC;
Expand Down Expand Up @@ -1705,7 +1703,7 @@ public void processFlushMessage(PoolFileFlushedMessage pnfsMessage)
{
try {
FileAttributes attributesToUpdate = new FileAttributes();
attributesToUpdate.setStorageInfo(pnfsMessage.getStorageInfo());
attributesToUpdate.setStorageInfo(pnfsMessage.getFileAttributes().getStorageInfo());
_nameSpaceProvider.setFileAttributes(pnfsMessage.getSubject(), pnfsMessage.getPnfsId(), attributesToUpdate);
} catch (CacheException e) {
pnfsMessage.setFailed(e.getRc(), e.getMessage());
Expand Down
Expand Up @@ -556,7 +556,7 @@ public void poolDoorMessageArrived(DoorTransferFinishedMessage doorMessage) {

DoorTransferFinishedMessage finished = doorMessage;
if(store && tlog != null) {
tlog.middle(finished.getStorageInfo().getFileSize());
tlog.middle(finished.getFileAttributes().getSize());
}
sendSuccessReply();
}
Expand Down
Expand Up @@ -4254,8 +4254,7 @@ private void transferStarted(PnfsId pnfsId,boolean success) {
private void transferFinished(DoorTransferFinishedMessage finished) throws Exception {
boolean weDeleteStoredFileRecord = deleteStoredFileRecord;
PnfsId pnfsId = finished.getPnfsId();
StorageInfo storageInfo = finished.getStorageInfo();
long size = storageInfo.getFileSize();
long size = finished.getFileAttributes().getSize();
boolean success = finished.getReturnCode() == 0;
logger.debug("transferFinished({},{})", pnfsId, success);
Connection connection = null;
Expand Down Expand Up @@ -4379,14 +4378,14 @@ private void fileFlushed(PoolFileFlushedMessage fileFlushed) throws Exception {
return;
}
logger.debug("fileFlushed({})", pnfsId);
StorageInfo storageInfo = fileFlushed.getStorageInfo();
AccessLatency ac = storageInfo.getAccessLatency();
if ( ac != null && ac.equals(AccessLatency.ONLINE)) {
FileAttributes fileAttributes = fileFlushed.getFileAttributes();
AccessLatency ac = fileAttributes.getAccessLatency();
if (ac.equals(AccessLatency.ONLINE)) {
logger.debug("File Access latency is ONLINE " +
"fileFlushed does nothing");
return;
}
long size = storageInfo.getFileSize();
long size = fileAttributes.getSize();
Connection connection = null;
try {
connection = connection_pool.getConnection();
Expand Down
Expand Up @@ -434,9 +434,9 @@ public void putPnfsFlag(PnfsId pnfsId, String flag, String value)
notify(flagMessage);
}

public void fileFlushed(PnfsId pnfsId, StorageInfo storageInfo ) throws CacheException {
public void fileFlushed(PnfsId pnfsId, FileAttributes fileAttributes) throws CacheException {

PoolFileFlushedMessage fileFlushedMessage = new PoolFileFlushedMessage(_poolName, pnfsId, storageInfo);
PoolFileFlushedMessage fileFlushedMessage = new PoolFileFlushedMessage(_poolName, pnfsId, fileAttributes);

// throws exception if something goes wrong
pnfsRequest(fileFlushedMessage);
Expand Down
Expand Up @@ -4,59 +4,62 @@

import diskCacheV111.util.PnfsId;

import org.dcache.vehicles.FileAttributes;

/**
* Signals the completion of a transfer on a pool.
*/
public class DoorTransferFinishedMessage extends Message {
private ProtocolInfo _protocol;
private StorageInfo _info;
private PnfsId _pnfsId;
private String _poolName;
private String _ioQueueName;
private final ProtocolInfo _protocol;
private FileAttributes _fileAttributes;
@Deprecated // Can be removed in 2.7
private final StorageInfo _info;
private final PnfsId _pnfsId;
private final String _poolName;
private final String _ioQueueName;
private static final long serialVersionUID = -7563456962335030196L;

public DoorTransferFinishedMessage( long id ,
PnfsId pnfsId ,
ProtocolInfo protocol ,
StorageInfo info ){

setId( id ) ;
_protocol = protocol ;
_info = info ;
_pnfsId = pnfsId ;
public DoorTransferFinishedMessage(long id,
PnfsId pnfsId,
ProtocolInfo protocol,
FileAttributes fileAttributes,
String poolName,
String ioQueueName) {
setId(id);
_protocol = protocol;
_fileAttributes = fileAttributes;
_info = fileAttributes.getStorageInfo();
_pnfsId = pnfsId;
_poolName = poolName;
_ioQueueName = ioQueueName;
}
public DoorTransferFinishedMessage( long id ,
PnfsId pnfsId ,
ProtocolInfo protocol ,
StorageInfo info ,
String poolName ){

setId( id ) ;
_protocol = protocol ;
_info = info ;
_pnfsId = pnfsId ;
_poolName = poolName ;

public String getIoQueueName() {
return _ioQueueName;
}
/*
public DoorTransferFinishedMessage( long id , ProtocolInfo info,
int rc , String errorMsg ){
setFailed( rc , errorMsg ) ;
setId( id ) ;
_protocol = info ;

public ProtocolInfo getProtocolInfo() {
return _protocol;
}
public DoorTransferFinishedMessage( long id , ProtocolInfo info ){
setSucceeded() ;
setId(id) ;
_protocol = info ;

public FileAttributes getFileAttributes() {
if (_fileAttributes == null && _info != null) {
_fileAttributes = new FileAttributes();
_fileAttributes.setStorageInfo(_info);
_fileAttributes.setSize(_info.getFileSize());
_fileAttributes.setAccessLatency(_info.getAccessLatency());
_fileAttributes.setRetentionPolicy(_info.getRetentionPolicy());
}
return _fileAttributes;
}
*/
public void setIoQueueName( String ioQueueName ){
_ioQueueName = ioQueueName ;

public PnfsId getPnfsId() {
return _pnfsId;
}
public String getIoQueueName(){
return _ioQueueName ;

public String getPoolName() {
return _poolName;
}
public ProtocolInfo getProtocolInfo(){ return _protocol ; }
public StorageInfo getStorageInfo(){ return _info ; }
public PnfsId getPnfsId(){ return _pnfsId ; }
public String getPoolName(){ return _poolName ; }
}


@@ -1,28 +1,37 @@
// $Id: PoolFileFlushedMessage.java,v 1.3 2007-07-10 20:57:35 tigran Exp $

package diskCacheV111.vehicles;

import diskCacheV111.util.PnfsId;

/*
* @Immutable
*/
public class PoolFileFlushedMessage extends PnfsMessage {
import org.dcache.vehicles.FileAttributes;

private final StorageInfo _storageInfo;
/**
* Signals that a file was flushed.
*/
public class PoolFileFlushedMessage extends PnfsMessage
{
@Deprecated // Can be removed in 2.7
private StorageInfo _storageInfo;
private FileAttributes _fileAttributes;
private final String _poolName;

private static final long serialVersionUID = 1856537534158868883L;

public PoolFileFlushedMessage(String poolName, PnfsId pnfsId, StorageInfo storageInfo ) {
public PoolFileFlushedMessage(String poolName, PnfsId pnfsId, FileAttributes fileAttributes) {
super(pnfsId);
_poolName = poolName;
_storageInfo = storageInfo;
_fileAttributes = fileAttributes;
setReplyRequired(true);
}

public StorageInfo getStorageInfo() {
return _storageInfo;
public FileAttributes getFileAttributes() {
if (_fileAttributes == null && _storageInfo != null) {
_fileAttributes = new FileAttributes();
_fileAttributes.setStorageInfo(_storageInfo);
_fileAttributes.setAccessLatency(_storageInfo.getAccessLatency());
_fileAttributes.setRetentionPolicy(_storageInfo.getRetentionPolicy());
_fileAttributes.setSize(_storageInfo.getFileSize());
}
return _fileAttributes;
}

public String getPoolName() {
Expand Down
Expand Up @@ -942,12 +942,12 @@ public void run()

Set<OpenFlags> flags = Collections.emptySet();
ReplicaDescriptor handle = _repository.openEntry(pnfsId, flags);
StorageInfo storageInfo;
FileAttributes fileAttributesForNotification = new FileAttributes();
try {
doChecksum(handle);

FileAttributes fileAttributes = handle.getEntry().getFileAttributes();
storageInfo = fileAttributes.getStorageInfo().clone();
StorageInfo storageInfo = fileAttributes.getStorageInfo().clone();
_infoMsg.setStorageInfo(storageInfo);
_infoMsg.setFileSize(fileAttributes.getSize());
long now = System.currentTimeMillis();
Expand Down Expand Up @@ -994,6 +994,11 @@ public void run()
throw new RuntimeException("Bug detected");
}
}

fileAttributesForNotification.setAccessLatency(storageInfo.getAccessLatency());
fileAttributesForNotification.setRetentionPolicy(storageInfo.getRetentionPolicy());
fileAttributesForNotification.setStorageInfo(storageInfo);
fileAttributesForNotification.setSize(fileAttributes.getSize());
} finally {
/* Surpress thread interruptions after this point.
*/
Expand All @@ -1005,7 +1010,7 @@ public void run()

while (true) {
try {
_pnfs.fileFlushed(pnfsId, storageInfo);
_pnfs.fileFlushed(pnfsId, fileAttributesForNotification);
break;
} catch (CacheException e) {
if (e.getRc() == CacheException.FILE_NOT_FOUND ||
Expand Down Expand Up @@ -1051,7 +1056,7 @@ public void run()
Thread.sleep(120000); // 2 minutes
}

notifyFlushMessageTarget(storageInfo);
notifyFlushMessageTarget(fileAttributesForNotification);

_log.info("File successfully stored to tape");

Expand Down Expand Up @@ -1119,14 +1124,11 @@ private void doChecksum(ReplicaDescriptor handle)
factory, null, checksum);
}

private void notifyFlushMessageTarget(StorageInfo info)
private void notifyFlushMessageTarget(FileAttributes fileAttributes)
{
try {
PoolFileFlushedMessage poolFileFlushedMessage =
new PoolFileFlushedMessage(getCellName(), getPnfsId(), info);
/*
* no replays from secondary message targets
*/
new PoolFileFlushedMessage(getCellName(), getPnfsId(), fileAttributes);
poolFileFlushedMessage.setReplyRequired(false);
CellMessage msg =
new CellMessage(new CellPath(_flushMessageTarget),
Expand Down
Expand Up @@ -122,9 +122,9 @@ void sendFinished() {
new DoorTransferFinishedMessage(getClientId(),
getFileAttributes().getPnfsId(),
getProtocolInfo(),
getFileAttributes().getStorageInfo(),
_poolName);
finished.setIoQueueName(_queue);
getFileAttributes(),
_poolName,
_queue);
if (_errorCode == 0) {
finished.setSucceeded();
} else {
Expand Down
Expand Up @@ -172,6 +172,7 @@ public void close()
/* Temporary workaround to ensure that the correct size is
* logged in billing and send back to the door.
*/
_fileAttributes.setSize(getFileSize());
_fileAttributes.getStorageInfo().setFileSize(getFileSize());
}
}
Expand Down
2 changes: 1 addition & 1 deletion modules/dcache/src/main/java/org/dcache/util/Transfer.java
Expand Up @@ -442,7 +442,7 @@ public final synchronized void finished(int rc, String error)
*/
public final synchronized void finished(DoorTransferFinishedMessage msg)
{
setStorageInfo(msg.getStorageInfo());
setFileAttributes(msg.getFileAttributes());
setProtocolInfo(msg.getProtocolInfo());
if (msg.getReturnCode() != 0) {
finished(CacheExceptionFactory.exceptionOf(msg));
Expand Down

0 comments on commit 6c7de8a

Please sign in to comment.