Skip to content

Commit d180e90

Browse files
committed
pool: Use ListenableFuture rather than CompletionHandler
The pool queuing and mover wrapping code relies on the CompletionHandler interface from the NIO 2 async code. This interface is a poor fit for our needs as it doesn't provide means of cancelling the result (which means we had to add a custom Cancellable interface) and has features like attachables, that we don't need. The Guava ListenableFuture - an extension of the regular JDK Future concept - fits our needs much better. This patch refactors the code to use ListenableFuture and this allows custom cancellation code to be removed, and also stream lines the code a bit. The patch moves some error handling from SimpleIoScheduler to PoolIORequest. The code was tightly coupled to PoolIORequest anyway and fits more naturally inside that class. The patch is part of a series of pathes to clean up the pool mover handling. Target: trunk Require-notes: no Require-book: no Acked-by: Paul Millar <paul.millar@desy.de> Patch: http://rb.dcache.org/r/5308
1 parent 3c255f3 commit d180e90

File tree

7 files changed

+105
-146
lines changed

7 files changed

+105
-146
lines changed

modules/dcache/src/main/java/org/dcache/chimera/nfsv41/mover/NfsExcecutionService.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package org.dcache.chimera.nfsv41.mover;
22

3+
import com.google.common.util.concurrent.AbstractFuture;
4+
import com.google.common.util.concurrent.Futures;
5+
import com.google.common.util.concurrent.ListenableFuture;
36
import org.ietf.jgss.GSSException;
47
import org.slf4j.Logger;
58
import org.slf4j.LoggerFactory;
69

710
import java.io.IOException;
811
import java.net.InetAddress;
912
import java.net.InetSocketAddress;
10-
import java.nio.channels.CompletionHandler;
1113
import java.util.List;
1214

1315
import diskCacheV111.vehicles.PoolPassiveIoFileMessage;
@@ -17,7 +19,6 @@
1719
import org.dcache.cells.CellMessageSender;
1820
import org.dcache.chimera.ChimeraFsException;
1921
import org.dcache.chimera.nfs.v4.xdr.stateid4;
20-
import org.dcache.pool.classic.Cancelable;
2122
import org.dcache.pool.classic.MoverExecutorService;
2223
import org.dcache.pool.classic.PoolIORequest;
2324
import org.dcache.pool.classic.PoolIOTransfer;
@@ -62,8 +63,7 @@ public void shutdown() throws IOException {
6263
}
6364

6465
@Override
65-
public Cancelable execute(PoolIORequest request, final CompletionHandler completionHandler) {
66-
66+
public ListenableFuture<Void> execute(PoolIORequest request) {
6767
try {
6868
NFS4ProtocolInfo nfs4ProtocolInfo = (NFS4ProtocolInfo) request.getTransfer().getProtocolInfo();
6969
PoolIOTransfer transfer = request.getTransfer();
@@ -80,22 +80,26 @@ public Cancelable execute(PoolIORequest request, final CompletionHandler complet
8080
request.sendToDoor(new PoolPassiveIoFileMessage<>(
8181
request.getPoolAddress().getCellName(), _localSocketAddresses, stateid));
8282

83-
return new Cancelable() {
83+
/* An NFS mover doesn't complete until it is cancelled (the door sends a mover kill
84+
* message when the file is closed).
85+
*/
86+
return new AbstractFuture<Void>() {
8487
@Override
85-
public void cancel() {
88+
public boolean cancel(boolean mayInterruptIfRunning)
89+
{
8690
_nfsIO.removeHandler(moverBridge);
8791
try {
8892
repositoryChannel.close();
8993
} catch (IOException e) {
9094
_log.error("failed to close RAF", e);
9195
}
92-
completionHandler.completed(null, null);
96+
set(null);
97+
return false;
9398
}
9499
};
95-
} catch (Throwable e) {
96-
completionHandler.failed(e, null);
100+
} catch (Exception e) {
101+
return Futures.immediateFailedFuture(e);
97102
}
98-
return null;
99103
}
100104

101105
public void setEnableGss(boolean withGss) {

modules/dcache/src/main/java/org/dcache/pool/classic/Cancelable.java

Lines changed: 0 additions & 9 deletions
This file was deleted.

modules/dcache/src/main/java/org/dcache/pool/classic/IoProcessable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@ public interface IoProcessable {
2525
*
2626
* @return true if successful, false otherwise
2727
*/
28-
public boolean kill();
28+
public void kill();
2929
}
Lines changed: 21 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package org.dcache.pool.classic;
22

3+
import com.google.common.util.concurrent.ListenableFuture;
4+
import com.google.common.util.concurrent.ListeningExecutorService;
5+
import com.google.common.util.concurrent.MoreExecutors;
6+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
37
import org.slf4j.Logger;
48
import org.slf4j.LoggerFactory;
59

6-
import java.nio.channels.CompletionHandler;
7-
import java.util.concurrent.ExecutorService;
10+
import java.util.concurrent.Callable;
811
import java.util.concurrent.Executors;
9-
import java.util.concurrent.ThreadFactory;
1012

1113
import dmg.cells.nucleus.CDC;
1214

@@ -21,94 +23,50 @@ public class LegacyMoverExecutorService implements MoverExecutorService
2123
private final static String _name =
2224
LegacyMoverExecutorService.class.getSimpleName();
2325

24-
private final ExecutorService _executor =
25-
Executors.newCachedThreadPool(
26-
new ThreadFactory()
27-
{
28-
private int _counter;
29-
30-
private ThreadFactory _factory =
31-
Executors.defaultThreadFactory();
32-
33-
@Override
34-
public Thread newThread(Runnable r) {
35-
Thread t = _factory.newThread(r);
36-
t.setName(_name + "-worker-" + ++_counter);
37-
return t;
38-
}
39-
});
26+
private final ListeningExecutorService _executor =
27+
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(
28+
new ThreadFactoryBuilder().setNameFormat(_name + "-worker-%d").build()));
4029

4130
@Override
42-
public Cancelable execute(final PoolIORequest request,
43-
final CompletionHandler completionHandler)
31+
public ListenableFuture<Void> execute(PoolIORequest request)
4432
{
45-
final MoverTask moverTask = new MoverTask(request, completionHandler);
46-
_executor.execute(moverTask);
47-
return moverTask;
33+
return _executor.submit(new MoverTask(request));
4834
}
4935

5036
public void shutdown()
5137
{
5238
_executor.shutdown();
5339
}
5440

55-
private static class MoverTask implements Runnable, Cancelable {
56-
41+
private static class MoverTask implements Callable<Void>
42+
{
5743
private final PoolIORequest _request;
5844
private final CDC _cdc = new CDC();
59-
private final CompletionHandler _completionHandler;
6045

61-
private Thread _thread;
62-
private boolean _needInterruption;
63-
64-
public MoverTask(PoolIORequest request, CompletionHandler completionHandler) {
46+
public MoverTask(PoolIORequest request) {
6547
_request = request;
66-
_completionHandler = completionHandler;
6748
}
6849

6950
@Override
70-
public void run() {
51+
public Void call() throws Exception {
7152
try {
72-
setThread();
7353
_cdc.restore();
74-
try {
75-
_request.getTransfer().transfer();
76-
} catch (Throwable e) {
77-
_completionHandler.failed(e, null);
78-
throw e;
79-
}
80-
_completionHandler.completed(null, null);
54+
_request.getTransfer().transfer();
55+
return null;
56+
} catch (RuntimeException e) {
57+
_log.error("Transfer failed due to a bug: {}", e);
58+
throw e;
8159
} catch (Exception e) {
8260
_log.error("Transfer failed: {}", e.toString());
61+
throw e;
8362
} catch (Throwable e) {
8463
_log.error("Transfer failed:", e);
8564
Thread t = Thread.currentThread();
8665
t.getUncaughtExceptionHandler().uncaughtException(t, e);
66+
throw e;
8767
} finally {
88-
cleanThread();
8968
CDC.clear();
9069
}
9170
}
92-
93-
private synchronized void setThread() throws InterruptedException {
94-
if(_needInterruption) {
95-
throw new InterruptedException("Thread interrupted before excecution");
96-
}
97-
98-
_thread = Thread.currentThread();
99-
}
100-
101-
private synchronized void cleanThread() {
102-
_thread = null;
103-
}
104-
105-
@Override
106-
public synchronized void cancel() {
107-
if (_thread != null) {
108-
_thread.interrupt();
109-
} else {
110-
_needInterruption = true;
111-
}
112-
}
11371
}
11472
}
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
package org.dcache.pool.classic;
22

3-
import java.nio.channels.CompletionHandler;
3+
import com.google.common.util.concurrent.ListenableFuture;
44

55
/**
66
*
77
* @since 1.9.11
88
*/
99
public interface MoverExecutorService {
10-
11-
Cancelable execute(PoolIORequest transfer, CompletionHandler completionHandler);
10+
ListenableFuture<Void> execute(PoolIORequest transfer);
1211
}

modules/dcache/src/main/java/org/dcache/pool/classic/PoolIORequest.java

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package org.dcache.pool.classic;
22

3+
import com.google.common.util.concurrent.FutureCallback;
4+
import com.google.common.util.concurrent.Futures;
5+
import com.google.common.util.concurrent.ListenableFuture;
6+
import com.google.common.util.concurrent.SettableFuture;
37
import org.slf4j.Logger;
48
import org.slf4j.LoggerFactory;
59

610
import java.io.IOException;
711
import java.io.Serializable;
8-
import java.nio.channels.CompletionHandler;
12+
import java.util.concurrent.CancellationException;
913

1014
import diskCacheV111.util.CacheException;
1115
import diskCacheV111.util.PnfsId;
@@ -17,6 +21,8 @@
1721
import dmg.cells.nucleus.NoRouteToCellException;
1822

1923
import org.dcache.cells.CellStub;
24+
import org.dcache.pool.FaultAction;
25+
import org.dcache.pool.FaultEvent;
2026
import org.dcache.pool.FaultListener;
2127
import org.dcache.vehicles.FileAttributes;
2228

@@ -42,7 +48,7 @@ public class PoolIORequest implements IoProcessable {
4248
private final static Logger _log = LoggerFactory.getLogger(PoolIORequest.class);
4349
private final FaultListener _faultListener;
4450

45-
private Cancelable _mover;
51+
private ListenableFuture<Void> _mover;
4652
/**
4753
* Request creation time.
4854
*/
@@ -61,7 +67,6 @@ public class PoolIORequest implements IoProcessable {
6167
/** transfer status error message */
6268
private volatile String _errorMessage = "";
6369

64-
private boolean _canceled;
6570
private final CellStub _billing;
6671
private final CellStub _door;
6772
private final CellAddressCore _pool;
@@ -176,26 +181,54 @@ public long getClientId() {
176181
}
177182

178183
@Override
179-
public synchronized boolean kill() {
184+
public synchronized void kill() {
180185
_state = CANCELED;
181-
_canceled = true;
182-
183-
if (_mover == null) {
184-
return false;
186+
if (_mover != null) {
187+
_mover.cancel(true);
185188
}
186-
187-
_mover.cancel();
188-
return true;
189189
}
190190

191-
synchronized void transfer(MoverExecutorService moverExecutorService, CompletionHandler<Object,Object> completionHandler) {
192-
_startTime = System.currentTimeMillis();
193-
if(_canceled) {
194-
completionHandler.failed( new InterruptedException("Mover canceled"), null);
195-
} else {
196-
_state = RUNNING;
197-
_mover = moverExecutorService.execute(this, completionHandler);
191+
synchronized ListenableFuture<Void> transfer(MoverExecutorService moverExecutorService) {
192+
if (_state != QUEUED) {
193+
return Futures.immediateFailedFuture(new InterruptedException("Mover canceled"));
198194
}
195+
196+
_state = RUNNING;
197+
_startTime = System.currentTimeMillis();
198+
_mover = moverExecutorService.execute(this);
199+
final SettableFuture<Void> future = SettableFuture.create();
200+
Futures.addCallback(_mover,
201+
new FutureCallback<Void>()
202+
{
203+
@Override
204+
public void onSuccess(Void result)
205+
{
206+
future.set(null);
207+
}
208+
209+
@Override
210+
public void onFailure(Throwable e)
211+
{
212+
int rc;
213+
String msg;
214+
if (e instanceof CancellationException) {
215+
rc = CacheException.DEFAULT_ERROR_CODE;
216+
msg = "Transfer was killed";
217+
} else if (e instanceof CacheException) {
218+
rc = ((CacheException) e).getRc();
219+
msg = e.getMessage();
220+
if (rc == CacheException.ERROR_IO_DISK) {
221+
getFaultListener().faultOccurred(new FaultEvent("repository", FaultAction.DISABLED, msg, e));
222+
}
223+
} else {
224+
rc = CacheException.UNEXPECTED_SYSTEM_EXCEPTION;
225+
msg = "Transfer failed due to unexpected exception: " + e;
226+
}
227+
setTransferStatus(rc, msg);
228+
future.set(null);
229+
}
230+
});
231+
return future;
199232
}
200233

201234
void close()

0 commit comments

Comments
 (0)