Skip to content

Commit

Permalink
srmmanager: Add interface to query and abort transfers
Browse files Browse the repository at this point in the history
Motivation:

To allow a clustered SrmManager to detect and abort transfers
on other instances.

Modification:

Adds two messages for querying puts and aborting transfers on a particular
SURL.

Drops two unused messages.

Result:

Currently dead code, but I pushed this out quickly so that the code making
use of these messages could be backported to 3.0 if it doesn't get finished
in time for the 3.0.0 release.

Target: trunk
Request: 3.0
Require-notes: no
Require-book: no
Acked-by: Paul Millar <paul.millar@desy.de>

Reviewed at https://rb.dcache.org/r/9865/

(cherry picked from commit df12c09)
  • Loading branch information
gbehrmann committed Oct 26, 2016
1 parent d0d7a1d commit a1d7b4d
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 293 deletions.
@@ -0,0 +1,86 @@
/*
* dCache - http://www.dcache.org/
*
* Copyright (C) 2016 Deutsches Elektronen-Synchrotron
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package diskCacheV111.srm.dcache;

import com.google.common.collect.Iterables;
import org.springframework.beans.factory.annotation.Required;

import diskCacheV111.util.CacheException;
import diskCacheV111.vehicles.srm.SrmAbortTransfersMessage;
import diskCacheV111.vehicles.srm.SrmGetPutRequestMessage;

import dmg.cells.nucleus.CellMessageReceiver;

import org.dcache.srm.SRM;
import org.dcache.srm.SRMException;
import org.dcache.srm.SRMInvalidRequestException;
import org.dcache.srm.request.GetFileRequest;
import org.dcache.srm.request.PutFileRequest;
import org.dcache.srm.scheduler.IllegalStateTransition;

/**
* A message processor embedded in the SrmManager for handling the SURL locking
* semantics of SRM uploads.
*
* From the point of view of SRM, a SURL exists while a file is being uploaded to
* it. Due to the unique TURL feature of SRM uploads in dCache, the name space entry
* is not yet created under its final name. Thus we have to resort to querying all
* SrmManager instances for active uploads on a particular SURL.
*/
public class SurlService implements CellMessageReceiver
{
private SRM srm;

@Required
public void setSrm(SRM srm)
{
this.srm = srm;
}

public SrmGetPutRequestMessage messageArrived(SrmGetPutRequestMessage msg) throws CacheException
{
PutFileRequest request = Iterables.getFirst(
srm.getActiveFileRequests(PutFileRequest.class, msg.getSurl()), null);
if (request == null) {
throw new CacheException("No upload on SURL");
}
msg.setFileId(request.getFileId());
msg.setRequestId(request.getId());
msg.setSucceeded();
return msg;
}

public SrmAbortTransfersMessage messageArrived(SrmAbortTransfersMessage msg) throws SRMException
{
for (PutFileRequest request : srm.getActiveFileRequests(PutFileRequest.class, msg.getSurl())) {
try {
request.abort(msg.getReason());
} catch (SRMInvalidRequestException | IllegalStateTransition ignored) {
}
}
for (GetFileRequest request : srm.getActiveFileRequests(GetFileRequest.class, msg.getSurl())) {
try {
request.abort(msg.getReason());
} catch (SRMInvalidRequestException | IllegalStateTransition ignored) {
}
}
msg.setSucceeded();
return msg;
}
}
Expand Up @@ -147,6 +147,10 @@
<property name="poolManagerStub" ref="pool-manager-stub"/>
</bean>

<bean id="surl-service" class="diskCacheV111.srm.dcache.SurlService">
<property name="srm" ref="srm"/>
</bean>

<!--
AUTHORIZATION
=====================================================================================
Expand Down
@@ -0,0 +1,52 @@
/*
* dCache - http://www.dcache.org/
*
* Copyright (C) 2016 Deutsches Elektronen-Synchrotron
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package diskCacheV111.vehicles.srm;

import java.net.URI;

/**
* Aborts uploads or downloads on a particular SURL.
*
* The intended use of this message is when a SURL is deleted in one SrmManager and
* other SRM managers need to be informed about this event.
*/
public class SrmAbortTransfersMessage extends SrmMessage
{
private static final long serialVersionUID = 4505891598942836136L;

private final URI surl;

private final String reason;

public SrmAbortTransfersMessage(URI surl, String reason)
{
this.surl = surl;
this.reason = reason;
}

public URI getSurl()
{
return surl;
}

public String getReason()
{
return reason;
}
}

This file was deleted.

@@ -0,0 +1,71 @@
/*
* dCache - http://www.dcache.org/
*
* Copyright (C) 2016 Deutsches Elektronen-Synchrotron
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package diskCacheV111.vehicles.srm;

import java.net.URI;

/**
* Queries for the existing of an upload on a particular SURL.
*
* Although a particular SrmManager instance could have several put requests on the
* same SURL, the reply to this message at most refers to a single one of those.
*
* The intended use of this message is when an SrmManager queries other SrmManagers
* for the existence of other uploads.
*/
public class SrmGetPutRequestMessage extends SrmMessage
{
private static final long serialVersionUID = -2970662416496090431L;

private final URI surl;

private Long requestId;

private String fileId;

public SrmGetPutRequestMessage(URI surl)
{
this.surl = surl;
}

public URI getSurl()
{
return surl;
}

public Long getRequestId()
{
return requestId;
}

public void setRequestId(Long requestId)
{
this.requestId = requestId;
}

public String getFileId()
{
return fileId;
}

public void setFileId(String fileId)
{
this.fileId = fileId;
}
}

0 comments on commit a1d7b4d

Please sign in to comment.