-
Notifications
You must be signed in to change notification settings - Fork 2.9k
/
UnderFileSystemBlockStore.java
392 lines (355 loc) · 12.8 KB
/
UnderFileSystemBlockStore.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/
package alluxio.worker.block;
import alluxio.exception.BlockAlreadyExistsException;
import alluxio.exception.BlockDoesNotExistException;
import alluxio.exception.ExceptionMessage;
import alluxio.proto.dataserver.Protocol;
import alluxio.resource.LockResource;
import alluxio.underfs.UfsManager;
import alluxio.worker.SessionCleanable;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.BlockWriter;
import alluxio.worker.block.meta.UnderFileSystemBlockMeta;
import com.google.common.base.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;
/**
* This class manages the virtual blocks in the UFS for delegated UFS reads/writes.
*
* The usage pattern:
* acquireAccess(sessionId, blockId, options)
* closeReaderOrWriter(sessionId, blockId)
* releaseAccess(sessionId, blockId)
*
* If the client is lost before releasing or cleaning up the session, the session cleaner will
* clean the data.
*/
public final class UnderFileSystemBlockStore implements SessionCleanable {
private static final Logger LOG = LoggerFactory.getLogger(UnderFileSystemBlockStore.class);
/**
* This lock protects mBlocks, mSessionIdToBlockIds and mBlockIdToSessionIds. For any read/write
* operations to these maps, the lock needs to be acquired. But once you get the block
* information from the map (e.g. mBlocks), the lock does not need to be acquired. For example,
* the block reader/writer within the BlockInfo can be updated without acquiring this lock.
* This is based on the assumption that one session won't open multiple readers/writers on the
* same block. If the client do that, the client can see failures but the worker won't crash.
*/
private final ReentrantLock mLock = new ReentrantLock();
@GuardedBy("mLock")
/** Maps from the {@link Key} to the {@link BlockInfo}. */
private final Map<Key, BlockInfo> mBlocks = new HashMap<>();
@GuardedBy("mLock")
/** Maps from the session ID to the block IDs. */
private final Map<Long, Set<Long>> mSessionIdToBlockIds = new HashMap<>();
@GuardedBy("mLock")
/** Maps from the block ID to the session IDs. */
private final Map<Long, Set<Long>> mBlockIdToSessionIds = new HashMap<>();
/** The Local block store. */
private final BlockStore mLocalBlockStore;
/** The manager for all ufs. */
private final UfsManager mUfsManager;
/** The manager for all ufs instream */
private final UfsInputStreamManager mUfsInstreamManager;
/**
* Creates an instance of {@link UnderFileSystemBlockStore}.
*
* @param localBlockStore the local block store
* @param ufsManager the file manager
*/
public UnderFileSystemBlockStore(BlockStore localBlockStore, UfsManager ufsManager) {
mLocalBlockStore = localBlockStore;
mUfsManager = ufsManager;
mUfsInstreamManager = new UfsInputStreamManager();
}
/**
* Acquires access for a UFS block given a {@link UnderFileSystemBlockMeta} and the limit on
* the maximum concurrency on the block. If the number of concurrent readers on this UFS block
* exceeds a threshold, the token is not granted and this method returns false.
*
* @param sessionId the session ID
* @param blockId maximum concurrency
* @param options the options
* @return whether an access token is acquired
* @throws BlockAlreadyExistsException if the block already exists for a session ID
*/
public boolean acquireAccess(long sessionId, long blockId, Protocol.OpenUfsBlockOptions options)
throws BlockAlreadyExistsException {
UnderFileSystemBlockMeta blockMeta = new UnderFileSystemBlockMeta(sessionId, blockId, options);
try (LockResource lr = new LockResource(mLock)) {
Key key = new Key(sessionId, blockId);
if (mBlocks.containsKey(key)) {
throw new BlockAlreadyExistsException(ExceptionMessage.UFS_BLOCK_ALREADY_EXISTS_FOR_SESSION,
blockId, blockMeta.getUnderFileSystemPath(), sessionId);
}
Set<Long> sessionIds = mBlockIdToSessionIds.get(blockId);
if (sessionIds != null && sessionIds.size() >= options.getMaxUfsReadConcurrency()) {
return false;
}
if (sessionIds == null) {
sessionIds = new HashSet<>();
mBlockIdToSessionIds.put(blockId, sessionIds);
}
sessionIds.add(sessionId);
mBlocks.put(key, new BlockInfo(blockMeta));
Set<Long> blockIds = mSessionIdToBlockIds.get(sessionId);
if (blockIds == null) {
blockIds = new HashSet<>();
mSessionIdToBlockIds.put(sessionId, blockIds);
}
blockIds.add(blockId);
}
return true;
}
/**
* Closes the block reader or writer and checks whether it is necessary to commit the block
* to Local block store.
*
* During UFS block read, this is triggered when the block is unlocked.
* During UFS block write, this is triggered when the UFS block is committed.
*
* @param sessionId the session ID
* @param blockId the block ID
*/
public void closeReaderOrWriter(long sessionId, long blockId) throws IOException {
BlockInfo blockInfo;
try (LockResource lr = new LockResource(mLock)) {
blockInfo = mBlocks.get(new Key(sessionId, blockId));
if (blockInfo == null) {
LOG.warn("Key (block ID: {}, session ID {}) is not found when cleaning up the UFS block.",
blockId, sessionId);
return;
}
}
blockInfo.closeReaderOrWriter();
}
/**
* Releases the access token of this block by removing this (sessionId, blockId) pair from the
* store.
*
* @param sessionId the session ID
* @param blockId the block ID
*/
public void releaseAccess(long sessionId, long blockId) {
try (LockResource lr = new LockResource(mLock)) {
Key key = new Key(sessionId, blockId);
if (!mBlocks.containsKey(key)) {
LOG.warn("Key (block ID: {}, session ID {}) is not found when releasing the UFS block.",
blockId, sessionId);
}
mBlocks.remove(key);
Set<Long> blockIds = mSessionIdToBlockIds.get(sessionId);
if (blockIds != null) {
blockIds.remove(blockId);
}
Set<Long> sessionIds = mBlockIdToSessionIds.get(blockId);
if (sessionIds != null) {
sessionIds.remove(sessionId);
}
}
}
/**
* Cleans up all the block information(e.g. block reader/writer) that belongs to this session.
*
* @param sessionId the session ID
*/
@Override
public void cleanupSession(long sessionId) {
Set<Long> blockIds;
try (LockResource lr = new LockResource(mLock)) {
blockIds = mSessionIdToBlockIds.get(sessionId);
if (blockIds == null) {
return;
}
}
for (Long blockId : blockIds) {
try {
// Note that we don't need to explicitly call abortBlock to cleanup the temp block
// in Local block store because they will be cleanup by the session cleaner in the
// Local block store.
closeReaderOrWriter(sessionId, blockId);
releaseAccess(sessionId, blockId);
} catch (Exception e) {
LOG.warn("Failed to cleanup UFS block {}, session {}.", blockId, sessionId);
}
}
}
/**
* Creates a block reader that reads from UFS and optionally caches the block to the Alluxio
* block store.
*
* @param sessionId the client session ID that requested this read
* @param blockId the ID of the block to read
* @param offset the read offset within the block (NOT the file)
* @return the block reader instance
* @throws BlockDoesNotExistException if the UFS block does not exist in the
* {@link UnderFileSystemBlockStore}
*/
public BlockReader getBlockReader(final long sessionId, long blockId, long offset)
throws BlockDoesNotExistException, IOException {
final BlockInfo blockInfo;
try (LockResource lr = new LockResource(mLock)) {
blockInfo = getBlockInfo(sessionId, blockId);
BlockReader blockReader = blockInfo.getBlockReader();
if (blockReader != null) {
return blockReader;
}
}
BlockReader reader =
UnderFileSystemBlockReader.create(blockInfo.getMeta(), offset, mLocalBlockStore,
mUfsManager, mUfsInstreamManager);
blockInfo.setBlockReader(reader);
return reader;
}
/**
* Gets the {@link UnderFileSystemBlockMeta} for a session ID and block ID pair.
*
* @param sessionId the session ID
* @param blockId the block ID
* @return the {@link UnderFileSystemBlockMeta} instance
* @throws BlockDoesNotExistException if the UFS block does not exist in the
* {@link UnderFileSystemBlockStore}
*/
private BlockInfo getBlockInfo(long sessionId, long blockId) throws BlockDoesNotExistException {
Key key = new Key(sessionId, blockId);
BlockInfo blockInfo = mBlocks.get(key);
if (blockInfo == null) {
throw new BlockDoesNotExistException(ExceptionMessage.UFS_BLOCK_DOES_NOT_EXIST_FOR_SESSION,
blockId, sessionId);
}
return blockInfo;
}
/**
* This class is to wrap session ID amd block ID.
*/
private static class Key {
private final long mSessionId;
private final long mBlockId;
/**
* Creates an instance of the Key class.
*
* @param sessionId the session ID
* @param blockId the block ID
*/
public Key(long sessionId, long blockId) {
mSessionId = sessionId;
mBlockId = blockId;
}
/**
* @return the block ID
*/
public long getBlockId() {
return mBlockId;
}
/**
* @return the session ID
*/
public long getSessionId() {
return mSessionId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Key)) {
return false;
}
Key that = (Key) o;
return Objects.equal(mBlockId, that.mBlockId) && Objects.equal(mSessionId, that.mSessionId);
}
@Override
public int hashCode() {
return Objects.hashCode(mBlockId, mSessionId);
}
@Override
public String toString() {
return Objects.toStringHelper(this).add("blockId", mBlockId).add("sessionId", mSessionId)
.toString();
}
}
/**
* This class is to wrap block reader/writer and the block meta into one class. The block
* reader/writer is not part of the {@link UnderFileSystemBlockMeta} because
* 1. UnderFileSystemBlockMeta only keeps immutable information.
* 2. We do not want a cyclic dependency between {@link UnderFileSystemBlockReader} and
* {@link UnderFileSystemBlockMeta}.
*/
private static class BlockInfo {
private final UnderFileSystemBlockMeta mMeta;
// A correct client implementation should never access the following reader/writer
// concurrently. But just to avoid crashing the server thread with runtime exception when
// the client is mis-behaving, we access them with locks acquired.
private BlockReader mBlockReader;
private BlockWriter mBlockWriter;
/**
* Creates an instance of {@link BlockInfo}.
*
* @param meta the UFS block meta
*/
public BlockInfo(UnderFileSystemBlockMeta meta) {
mMeta = meta;
}
/**
* @return the UFS block meta
*/
public UnderFileSystemBlockMeta getMeta() {
return mMeta;
}
/**
* @return the cached the block reader if it is not closed
*/
public synchronized BlockReader getBlockReader() {
if (mBlockReader != null && mBlockReader.isClosed()) {
mBlockReader = null;
}
return mBlockReader;
}
/**
* @param blockReader the block reader to be set
*/
public synchronized void setBlockReader(BlockReader blockReader) {
mBlockReader = blockReader;
}
/**
* @return the block writer
*/
public synchronized BlockWriter getBlockWriter() {
return mBlockWriter;
}
/**
* @param blockWriter the block writer to be set
*/
public synchronized void setBlockWriter(BlockWriter blockWriter) {
mBlockWriter = blockWriter;
}
/**
* Closes the block reader or writer.
*/
public synchronized void closeReaderOrWriter() throws IOException {
if (mBlockReader != null) {
mBlockReader.close();
mBlockReader = null;
}
if (mBlockWriter != null) {
mBlockWriter.close();
mBlockWriter = null;
}
}
}
}