/
WriteHandleImpl.java
434 lines (383 loc) · 16.1 KB
/
WriteHandleImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
package org.dcache.pool.repository.v5;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.isEmpty;
import static com.google.common.collect.Iterables.unmodifiableIterable;
import static java.util.Objects.requireNonNull;
import static org.dcache.namespace.FileAttribute.ACCESS_LATENCY;
import static org.dcache.namespace.FileAttribute.CHECKSUM;
import static org.dcache.namespace.FileAttribute.RETENTION_POLICY;
import static org.dcache.namespace.FileAttribute.SIZE;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import diskCacheV111.util.CacheException;
import diskCacheV111.util.FileCorruptedCacheException;
import diskCacheV111.util.PnfsHandler;
import diskCacheV111.util.PnfsId;
import diskCacheV111.util.TimeoutCacheException;
import java.io.IOException;
import java.net.URI;
import java.nio.file.OpenOption;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.dcache.alarms.AlarmMarkerFactory;
import org.dcache.alarms.PredefinedAlarm;
import org.dcache.pool.repository.Allocator;
import org.dcache.pool.repository.AllocatorAwareRepositoryChannel;
import org.dcache.pool.repository.FileStore;
import org.dcache.pool.repository.ReplicaDescriptor;
import org.dcache.pool.repository.ReplicaRecord;
import org.dcache.pool.repository.ReplicaState;
import org.dcache.pool.repository.RepositoryChannel;
import org.dcache.pool.repository.StickyRecord;
import org.dcache.pool.repository.checksums.ChecksumReplicaRecord;
import org.dcache.pool.repository.inotify.InotifyReplicaRecord;
import org.dcache.pool.statistics.IoStatisticsReplicaRecord;
import org.dcache.util.Checksum;
import org.dcache.vehicles.FileAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class WriteHandleImpl implements ReplicaDescriptor {
enum HandleState {
OPEN, COMMITTED, CLOSED
}
private static final Logger _log =
LoggerFactory.getLogger("logger.org.dcache.repository");
private static final Set<OpenOption> OPEN_OPTIONS = ImmutableSet.<OpenOption>builder()
.addAll(FileStore.O_RW)
.add(IoStatisticsReplicaRecord.OpenFlags.ENABLE_IO_STATISTICS)
.add(ChecksumReplicaRecord.OpenFlags.ENABLE_CHECKSUM_CALCULATION)
.build();
private static final Set<OpenOption> OPEN_OPTIONS_WITH_INOTIFY = ImmutableSet.<OpenOption>builder()
.addAll(OPEN_OPTIONS)
.add(InotifyReplicaRecord.OpenFlags.ENABLE_INOTIFY_MONITORING)
.build();
/**
* Time that a new CACHED file with no sticky flags will be marked sticky.
*/
private static final long HOLD_TIME = 5 * 60 * 1000; // 5 minutes
/**
* Callback for resilience handling. Pool name can be accessed here
*/
private final ReplicaRepository _repository;
/**
* Space allocation is delegated to this allocator.
*/
private final Allocator _allocator;
/**
* The handler provides access to this entry.
*/
private final ReplicaRecord _entry;
/**
* File attributes of the file being written.
*/
private final FileAttributes _fileAttributes;
/**
* Stub for talking to the PNFS manager.
*/
private final PnfsHandler _pnfs;
/**
* Sticky flags to be applied after the transfer.
*/
private final List<StickyRecord> _stickyRecords;
/**
* The entry state used during transfer.
*/
private final ReplicaState _initialState;
/**
* The entry state used when the handle is committed.
*/
private ReplicaState _targetState;
/**
* The state of the write handle.
*/
private HandleState _state;
/**
* Last access time of new replica.
*/
private Long _atime;
private boolean hasChannelBeenCreated;
private Exception _closedBy;
WriteHandleImpl(ReplicaRepository repository,
Allocator allocator,
PnfsHandler pnfs,
ReplicaRecord entry,
FileAttributes fileAttributes,
ReplicaState targetState,
List<StickyRecord> stickyRecords) {
_repository = requireNonNull(repository);
_allocator = requireNonNull(allocator);
_pnfs = requireNonNull(pnfs);
_entry = requireNonNull(entry);
_fileAttributes = requireNonNull(fileAttributes);
_initialState = entry.getState();
_targetState = requireNonNull(targetState);
_stickyRecords = requireNonNull(stickyRecords);
_state = HandleState.OPEN;
checkState(_initialState != ReplicaState.FROM_CLIENT || _fileAttributes.isDefined(
EnumSet.of(RETENTION_POLICY, ACCESS_LATENCY)));
checkState(_initialState == ReplicaState.FROM_CLIENT || _fileAttributes.isDefined(SIZE));
}
private synchronized void setState(HandleState state) {
_state = state;
}
/**
* Whether a createChannel request is intended for direct client IO.
*/
private boolean isChannelForClient() {
// The createChannel method may be called multiple times; for example,
// the onWrite behaviour within ChecksumModuleV1#enforcePostTransferPolicy.
// We use the heuristic that the first createChannel is to accept client data
// and any subsequent channels are for dCache-internal activity.
return _initialState == ReplicaState.FROM_CLIENT && !hasChannelBeenCreated;
}
@Override
public synchronized RepositoryChannel createChannel() throws IOException {
if (_state == HandleState.CLOSED) {
throw new IllegalStateException("Handle is closed");
}
Set<OpenOption> options = isChannelForClient()
? OPEN_OPTIONS_WITH_INOTIFY
: OPEN_OPTIONS;
RepositoryChannel channel = new AllocatorAwareRepositoryChannel(_entry.openChannel(options),
_repository, _fileAttributes.getPnfsId(), _allocator);
hasChannelBeenCreated = true;
return channel;
}
private void registerFileAttributesInNameSpace() throws CacheException {
FileAttributes attributesToUpdate = FileAttributes.ofLocation(_repository.getPoolName());
if (_fileAttributes.isDefined(CHECKSUM)) {
/* PnfsManager detects conflicting checksums and will fail the update. */
attributesToUpdate.setChecksums(_fileAttributes.getChecksums());
}
if (_initialState == ReplicaState.FROM_CLIENT) {
attributesToUpdate.setAccessLatency(_fileAttributes.getAccessLatency());
attributesToUpdate.setRetentionPolicy(_fileAttributes.getRetentionPolicy());
if (_fileAttributes.isDefined(SIZE)) {
attributesToUpdate.setSize(_fileAttributes.getSize());
}
}
_pnfs.setFileAttributes(_entry.getPnfsId(), attributesToUpdate);
}
private void verifyFileSize(long length) throws CacheException {
assert _initialState == ReplicaState.FROM_CLIENT || _fileAttributes.isDefined(SIZE);
if ((_initialState != ReplicaState.FROM_CLIENT ||
(_fileAttributes.isDefined(SIZE) && _fileAttributes.getSize() > 0)) &&
_fileAttributes.getSize() != length) {
throw new FileCorruptedCacheException(_fileAttributes.getSize(), length);
}
}
@Override
public synchronized void commit()
throws IllegalStateException, InterruptedException, CacheException {
if (_state != HandleState.OPEN) {
throw new IllegalStateException("Handle is closed");
}
try {
_entry.setLastAccessTime((_atime == null) ? System.currentTimeMillis() : _atime);
_fileAttributes.setCreationTime(System.currentTimeMillis());
_fileAttributes.setAccessTime(System.currentTimeMillis());
long length = _entry.getReplicaSize();
verifyFileSize(length);
_fileAttributes.setSize(length);
boolean namespaceUpdated = false;
do {
/*
* We may run into timeout if PnfsManager or network is down.
* (NOTICE, that PnfsHandler converts NoRouteToCell into Timeout exception)
* In such situations we should re-try the request. If timeout exception
* is propagated, then file will be stored in error state, to recover it
* during the next restart.
*/
try {
registerFileAttributesInNameSpace();
namespaceUpdated = true;
} catch (TimeoutCacheException e) {
_log.warn("Failed to update namespace: {}. Retrying in 15 s", e.getMessage());
TimeUnit.SECONDS.sleep(15);
}
} while (!namespaceUpdated);
_entry.update("Committing new file", r -> {
r.setFileAttributes(_fileAttributes);
/* In several situations, dCache requests a CACHED file
* without having any sticky flags on it. Such files are
* subject to immediate garbage collection if we are short on
* disk space. Thus to give other clients time to access the
* file, we mark it sticky for a short amount of time.
*/
if (_targetState == ReplicaState.CACHED && _stickyRecords.isEmpty()) {
long now = System.currentTimeMillis();
r.setSticky("self", now + HOLD_TIME, false);
}
/* Move entry to target state.
*/
for (StickyRecord record : _stickyRecords) {
r.setSticky(record.owner(), record.expire(), false);
}
return r.setState(_targetState);
});
setState(HandleState.COMMITTED);
} catch (CacheException e) {
/* If any of the PNFS operations return FILE_NOT_FOUND,
* then we change the target state and the close method
* will take care of removing the file.
*/
if (e.getRc() == CacheException.FILE_NOT_FOUND) {
_targetState = ReplicaState.REMOVED;
}
throw e;
}
}
/**
* Fails the operation. Called by close without a successful commit. The file is either removed
* or marked bad, depending on its state.
*/
private synchronized void fail() {
String why = null;
/* Files from tape or from another pool are deleted in case of
* errors.
*/
if (_initialState == ReplicaState.FROM_POOL ||
_initialState == ReplicaState.FROM_STORE) {
_targetState = ReplicaState.REMOVED;
why = "replica is " + _initialState;
}
/* If nothing was uploaded, we delete the replica and leave the name space
* entry it is virgin state.
*/
long length = _entry.getReplicaSize();
if (length == 0) {
_targetState = ReplicaState.REMOVED;
if (why == null) {
why = "replica is empty";
}
} else {
/* ... otherwise, if the transfer was a client upload then we
* register the file's actual size, which will update the
* namespace value and trigger that the namespace marks the file
* as having state STORED.
*
* Note that this may override a previous value in the namespace,
* if the client provided an expected file size.
*/
if (_initialState == ReplicaState.FROM_CLIENT) {
_fileAttributes.setSize(length);
_fileAttributes.setCreationTime(System.currentTimeMillis());
_fileAttributes.setAccessTime(System.currentTimeMillis());
}
}
/* Unless replica is to be removed, register cache location and
* other attributes.
*/
if (_targetState != ReplicaState.REMOVED) {
try {
/* We register cache location separately in the failure flow, because
* updating other attributes (such as checksums) may itself trigger
* failures in PNFS, and at the very least our cache location should
* be registered.
*/
_pnfs.addCacheLocation(_entry.getPnfsId(), _repository.getPoolName());
registerFileAttributesInNameSpace();
} catch (CacheException e) {
if (e.getRc() == CacheException.FILE_NOT_FOUND) {
_targetState = ReplicaState.REMOVED;
why = "file no longer exists in namespace";
} else {
_log.warn("Failed to register {} after failed replica creation: {}",
_fileAttributes, e.getMessage());
}
}
}
if (_targetState == ReplicaState.REMOVED) {
try {
String reason = why == null ? "Transfer failed" : "Transfer failed and " + why;
_entry.update(reason, r -> r.setState(ReplicaState.REMOVED));
} catch (CacheException e) {
_log.warn("Failed to remove replica: {}", e.getMessage());
}
} else {
PnfsId id = _entry.getPnfsId();
_log.warn(AlarmMarkerFactory.getMarker(PredefinedAlarm.BROKEN_FILE,
id.toString(),
_repository.getPoolName()),
"Marking pool entry {} on {} as BROKEN",
_entry.getPnfsId(),
_repository.getPoolName());
try {
_entry.update("Transfer failed for " + _initialState + " replica",
r -> r.setState(ReplicaState.BROKEN));
} catch (CacheException e) {
_log.warn("Failed to mark replica as broken: {}", e.getMessage());
}
}
}
@Override
public synchronized void close()
throws IllegalStateException {
switch (_state) {
case CLOSED:
throw new IllegalStateException("Handle is closed", _closedBy);
case OPEN:
fail();
setState(HandleState.CLOSED);
break;
case COMMITTED:
setState(HandleState.CLOSED);
break;
}
_closedBy = new Exception("Previous, successful close.");
}
/**
* @return disk file
* @throws IllegalStateException if EntryIODescriptor is closed.
*/
@Override
public synchronized URI getReplicaFile() throws IllegalStateException {
if (_state == HandleState.CLOSED) {
throw new IllegalStateException("Handle is closed");
}
return _entry.getReplicaUri();
}
@Override
public FileAttributes getFileAttributes() throws IllegalStateException {
return _fileAttributes;
}
@Override
public synchronized Iterable<Checksum> getChecksums() throws CacheException {
if (!_fileAttributes.isDefined(CHECKSUM)) {
_fileAttributes.setChecksums(_pnfs
.getFileAttributes(_entry.getPnfsId(), EnumSet.of(CHECKSUM))
.getChecksums());
}
return unmodifiableIterable(_fileAttributes.getChecksums());
}
@Override
public synchronized void addChecksums(Iterable<Checksum> checksums) {
if (!isEmpty(checksums)) {
Iterable<Checksum> newChecksums;
if (_fileAttributes.isDefined(CHECKSUM)) {
newChecksums = concat(_fileAttributes.getChecksums(), checksums);
} else {
newChecksums = checksums;
}
_fileAttributes.setChecksums(Sets.newHashSet(newChecksums));
}
}
@Override
public void setLastAccessTime(long time) {
if (_state == HandleState.CLOSED) {
throw new IllegalStateException("Handle is closed");
}
_atime = time;
}
@Override
public long getReplicaSize() {
return _entry.getReplicaSize();
}
@Override
public long getReplicaCreationTime() {
return _entry.getCreationTime();
}
}