-
Notifications
You must be signed in to change notification settings - Fork 131
/
MovePinRequestProcessor.java
280 lines (244 loc) · 10.2 KB
/
MovePinRequestProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
package org.dcache.pinmanager;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.dcache.pinmanager.model.Pin.State.FAILED_TO_UNPIN;
import static org.dcache.pinmanager.model.Pin.State.PINNED;
import static org.dcache.pinmanager.model.Pin.State.PINNING;
import static org.dcache.pinmanager.model.Pin.State.READY_TO_UNPIN;
import static org.dcache.pinmanager.model.Pin.State.UNPINNING;
import static org.springframework.transaction.annotation.Isolation.REPEATABLE_READ;
import diskCacheV111.poolManager.PoolSelectionUnit;
import diskCacheV111.util.CacheException;
import diskCacheV111.util.InvalidMessageCacheException;
import diskCacheV111.util.PermissionDeniedCacheException;
import diskCacheV111.util.PnfsId;
import diskCacheV111.util.TimeoutCacheException;
import diskCacheV111.vehicles.PoolSetStickyMessage;
import dmg.cells.nucleus.CellMessageReceiver;
import dmg.cells.nucleus.CellPath;
import dmg.cells.nucleus.NoRouteToCellException;
import java.util.Collection;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.dcache.auth.Subjects;
import org.dcache.cells.CellStub;
import org.dcache.pinmanager.model.Pin;
import org.dcache.pool.repository.StickyRecord;
import org.dcache.poolmanager.PoolMonitor;
import org.dcache.services.pinmanager1.PinManagerMovePinMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.transaction.annotation.Transactional;
/**
* Processes requests to move pins.
* <p>
* The stratetegy for moving a pin is the following:
* <p>
* 1. Create a record in the DB for the target pool 2. Create a sticky flag on the target pool 3.
* Change the original record to point to the target pool and change the record created in step 1 to
* point to the old pool. 4. Remove the sticky flag from the old pool.
* <p>
* If the above process is aborted at any point the regular recovery tasks of the pin manager will
* remove any stale sticky flag from either the old or the new pool.
* <p>
* Pin lifetime extension is treated as a special case of move (moving to the same pool, but with a
* longer lifetime).
*/
public class MovePinRequestProcessor
implements CellMessageReceiver {
private static final Logger LOGGER =
LoggerFactory.getLogger(MovePinRequestProcessor.class);
private static final long POOL_LIFETIME_MARGIN = MINUTES.toMillis(30);
private PinDao _dao;
private CellStub _poolStub;
private AuthorizationPolicy _pdp;
private long _maxLifetime;
private TimeUnit _maxLifetimeUnit;
private PoolMonitor _poolMonitor;
@Required
public void setDao(PinDao dao) {
_dao = dao;
}
@Required
public void setPoolStub(CellStub stub) {
_poolStub = stub;
}
@Required
public void setAuthorizationPolicy(AuthorizationPolicy pdp) {
_pdp = pdp;
}
@Required
public void setPoolMonitor(PoolMonitor poolMonitor) {
_poolMonitor = poolMonitor;
}
@Required
public void setMaxLifetimeUnit(TimeUnit unit) {
_maxLifetimeUnit = unit;
}
public TimeUnit getMaxLifetimeUnit() {
return _maxLifetimeUnit;
}
@Required
public void setMaxLifetime(long maxLifetime) {
_maxLifetime = maxLifetime;
}
@Required
public long getMaxLifetime() {
return _maxLifetime;
}
protected Pin createTemporaryPin(PnfsId pnfsId, String pool) {
long now = System.currentTimeMillis();
return _dao.create(_dao.set()
.subject(Subjects.ROOT)
.pnfsId(pnfsId)
.state(PINNING)
.pool(pool)
.sticky("PinManager-" + UUID.randomUUID().toString())
.expirationTime(new Date(now + 2 * _poolStub.getTimeoutInMillis())));
}
@Transactional(isolation = REPEATABLE_READ)
protected Pin swapPins(Pin pin, Pin tmpPin, Date expirationTime)
throws CacheException {
Pin targetPin = _dao.get(_dao.where()
.id(tmpPin.getPinId())
.sticky(tmpPin.getSticky())
.state(PINNING));
if (targetPin == null) {
/* The pin likely expired. We are now in a situation in
* which we may or may not have a sticky flag on the
* target pool, but no record in the database. To be on
* the safe side we create a new record in the database
* and then abort.
*/
_dao.create(_dao.set()
.subject(Subjects.ROOT)
.pnfsId(tmpPin.getPnfsId())
.pool(tmpPin.getPool())
.sticky(tmpPin.getSticky())
.state(READY_TO_UNPIN));
throw new TimeoutCacheException("Move expired");
}
Pin sourcePin =
_dao.update(_dao.where()
.id(pin.getPinId())
.sticky(pin.getSticky())
.state(PINNED),
_dao.set()
.pool(targetPin.getPool())
.sticky(targetPin.getSticky())
.expirationTime(expirationTime));
if (sourcePin == null) {
/* The target pin will expire by itself.
*/
throw new CacheException("Pin no longer valid");
}
return _dao.update(targetPin,
_dao.set()
.pool(pin.getPool())
.sticky(pin.getSticky())
.state(READY_TO_UNPIN));
}
private void setSticky(String poolName, PnfsId pnfsId, boolean sticky, String owner,
long validTill)
throws CacheException, InterruptedException, NoRouteToCellException {
PoolSelectionUnit.SelectionPool pool = _poolMonitor.getPoolSelectionUnit()
.getPool(poolName);
if (pool == null || !pool.isActive()) {
throw new CacheException(
"Unable to move sticky flag because pool " + poolName + " is unavailable");
}
PoolSetStickyMessage msg =
new PoolSetStickyMessage(poolName, pnfsId, sticky, owner, validTill);
_poolStub.sendAndWait(new CellPath(pool.getAddress()), msg);
}
protected Pin move(Pin pin, String pool, Date expirationTime)
throws CacheException, InterruptedException, NoRouteToCellException {
Pin tmpPin = createTemporaryPin(pin.getPnfsId(), pool);
setSticky(tmpPin.getPool(),
tmpPin.getPnfsId(),
true,
tmpPin.getSticky(),
(expirationTime == null) ? -1 : (expirationTime.getTime() + POOL_LIFETIME_MARGIN));
return swapPins(pin, tmpPin, expirationTime);
}
private boolean containsPin(Collection<Pin> pins, String sticky) {
for (Pin pin : pins) {
if (sticky.equals(pin.getSticky())) {
return true;
}
}
return false;
}
public PinManagerMovePinMessage
messageArrived(PinManagerMovePinMessage message)
throws CacheException, InterruptedException {
try {
PnfsId pnfsId = message.getPnfsId();
String source = message.getSourcePool();
String target = message.getTargetPool();
Collection<Pin> pins = _dao.get(_dao.where().pnfsId(pnfsId).pool(source));
/* Remove all stale sticky flags.
*/
for (StickyRecord record : message.getRecords()) {
if (!containsPin(pins, record.owner())) {
setSticky(source, pnfsId, false, record.owner(), 0);
}
}
/* Move all pins to the target pool.
*/
for (Pin pin : pins) {
Pin tmpPin = move(pin, target, pin.getExpirationTime());
setSticky(tmpPin.getPool(),
tmpPin.getPnfsId(),
false,
tmpPin.getSticky(),
0);
_dao.delete(tmpPin);
}
LOGGER.info("Moved pins for {} from {} to {}", pnfsId, source, target);
} catch (NoRouteToCellException e) {
throw new CacheException(
"Failed to move pin due to communication failure: " + e.getDestinationPath(), e);
}
return message;
}
public PinManagerExtendPinMessage
messageArrived(PinManagerExtendPinMessage message)
throws CacheException, InterruptedException {
try {
Pin pin = _dao.get(_dao.where().pnfsId(message.getFileAttributes().getPnfsId())
.id(message.getPinId()));
if (pin == null) {
throw new InvalidMessageCacheException("Pin does not exist");
} else if (!_pdp.canExtend(message.getSubject(), pin)) {
throw new PermissionDeniedCacheException("Access denied");
} else if (pin.getState() == PINNING) {
throw new InvalidMessageCacheException("File is not pinned yet");
} else if (pin.getState() == READY_TO_UNPIN
|| pin.getState() == UNPINNING
|| pin.getState() == FAILED_TO_UNPIN) {
throw new InvalidMessageCacheException("Pin is no longer valid");
}
if (_maxLifetime > -1) {
message.setLifetime(
Math.min(_maxLifetimeUnit.toMillis(_maxLifetime), message.getLifetime()));
}
long lifetime = message.getLifetime();
if (pin.hasRemainingLifetimeLessThan(lifetime)) {
long now = System.currentTimeMillis();
Date date = (lifetime == -1) ? null : new Date(now + lifetime);
move(pin, pin.getPool(), date);
message.setExpirationTime(date);
} else {
message.setExpirationTime(pin.getExpirationTime());
}
LOGGER.info("Extended pin for {} ({})", pin.getPnfsId(), pin.getPinId());
return message;
} catch (NoRouteToCellException e) {
throw new CacheException(
"Failed to extend pin due to communication failure: " + e.getDestinationPath(),
e);
}
}
}