/
RedirectedTransfer.java
91 lines (79 loc) · 2.87 KB
/
RedirectedTransfer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package org.dcache.util;
import javax.security.auth.Subject;
import java.util.concurrent.TimeUnit;
import diskCacheV111.util.CacheException;
import diskCacheV111.util.FsPath;
import diskCacheV111.util.PnfsHandler;
import diskCacheV111.util.TimeoutCacheException;
import static org.dcache.util.MathUtils.addWithInfinity;
import static org.dcache.util.MathUtils.subWithInfinity;
/**
* A transfer where the mover can send a redirect message to the door.
*/
public class RedirectedTransfer<T> extends Transfer
{
private boolean _isRedirected;
private T _redirectObject;
public RedirectedTransfer(PnfsHandler pnfs, Subject namespaceSubject, Subject subject, FsPath path) {
super(pnfs, namespaceSubject, subject, path);
}
public RedirectedTransfer(PnfsHandler pnfs, Subject subject, FsPath path) {
super(pnfs, subject, path);
}
/**
* Signals that the transfer is redirected.
*/
public synchronized void redirect(T object)
{
_isRedirected = true;
_redirectObject = object;
notifyAll();
}
/**
* Returns the redirect object injected through a call to
* <code>redirect</code>, or null if <code>redirect</code> has not
* been called.
*/
public synchronized T getRedirect()
{
return _redirectObject;
}
/**
* Blocks until the mover of this transfer has send a redirect
* notification, until a timeout is reached, or until the mover
* failed. Relies on the redirect being injected into the transfer
* through the <code>redirect</code> method.
*
* @param millis The timeout in milliseconds
* @return The redirect object
* @throws CacheException if the mover failed
* @throws TimeoutCacheException when the timeout was reached
* @throws InterruptedException if the thread was interrupted
*/
public synchronized T waitForRedirect(long millis)
throws CacheException, InterruptedException
{
try {
setStatus("Mover " + getPool() + "/" +
getMoverId() + ": Waiting for redirect");
long deadline = addWithInfinity(System.currentTimeMillis(), millis);
while (hasMover() && !_isRedirected &&
System.currentTimeMillis() < deadline) {
wait(subWithInfinity(deadline, System.currentTimeMillis()));
}
if (waitForMover(0)) {
throw new CacheException("Mover finished without redirect");
} else if (!_isRedirected) {
throw new TimeoutCacheException("No redirect from mover");
}
} finally {
setStatus(null);
}
return _redirectObject;
}
public synchronized T waitForRedirect(long timeout, TimeUnit unit)
throws CacheException, InterruptedException
{
return waitForRedirect(unit.toMillis(timeout));
}
}