-
Notifications
You must be signed in to change notification settings - Fork 2.9k
/
HdfsUnderFileSystem.java
516 lines (470 loc) · 17.9 KB
/
HdfsUnderFileSystem.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/
package alluxio.underfs.hdfs;
import alluxio.AlluxioURI;
import alluxio.Configuration;
import alluxio.Constants;
import alluxio.PropertyKey;
import alluxio.retry.CountingRetry;
import alluxio.retry.RetryPolicy;
import alluxio.security.authorization.Permission;
import alluxio.underfs.BaseUnderFileSystem;
import alluxio.underfs.UnderFileStatus;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.CreateOptions;
import alluxio.underfs.options.DeleteOptions;
import alluxio.underfs.options.FileLocationOptions;
import alluxio.underfs.options.MkdirsOptions;
import com.google.common.base.Throwables;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.security.SecurityUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Stack;
import javax.annotation.concurrent.ThreadSafe;
/**
* HDFS {@link UnderFileSystem} implementation.
*/
@ThreadSafe
public class HdfsUnderFileSystem extends BaseUnderFileSystem {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
private static final int MAX_TRY = 5;
// TODO(hy): Add a sticky bit and narrow down the permission in hadoop 2.
private static final FsPermission PERMISSION = new FsPermission((short) 0777)
.applyUMask(FsPermission.createImmutable((short) 0000));
private FileSystem mFileSystem;
/**
* Constructs a new HDFS {@link UnderFileSystem}.
*
* @param uri the {@link AlluxioURI} for this UFS
* @param conf the configuration for Hadoop
*/
public HdfsUnderFileSystem(AlluxioURI uri, Object conf) {
super(uri);
final String ufsPrefix = uri.toString();
final org.apache.hadoop.conf.Configuration hadoopConf;
if (conf != null && conf instanceof org.apache.hadoop.conf.Configuration) {
hadoopConf = (org.apache.hadoop.conf.Configuration) conf;
} else {
hadoopConf = new org.apache.hadoop.conf.Configuration();
}
prepareConfiguration(ufsPrefix, hadoopConf);
hadoopConf.addResource(
new Path(hadoopConf.get(PropertyKey.UNDERFS_HDFS_CONFIGURATION.toString())));
HdfsUnderFileSystemUtils.addS3Credentials(hadoopConf);
Path path = new Path(ufsPrefix);
try {
mFileSystem = path.getFileSystem(hadoopConf);
} catch (IOException e) {
LOG.error("Exception thrown when trying to get FileSystem for {}", ufsPrefix, e);
throw Throwables.propagate(e);
}
}
@Override
public String getUnderFSType() {
return "hdfs";
}
/**
* Prepares the Hadoop configuration necessary to successfully obtain a {@link FileSystem}
* instance that can access the provided path.
* <p>
* Derived implementations that work with specialised Hadoop {@linkplain FileSystem} API
* compatible implementations can override this method to add implementation specific
* configuration necessary for obtaining a usable {@linkplain FileSystem} instance.
* </p>
*
* @param path file system path
* @param hadoopConf Hadoop configuration
*/
protected void prepareConfiguration(String path,
org.apache.hadoop.conf.Configuration hadoopConf) {
// On Hadoop 2.x this is strictly unnecessary since it uses ServiceLoader to automatically
// discover available file system implementations. However this configuration setting is
// required for earlier Hadoop versions plus it is still honoured as an override even in 2.x so
// if present propagate it to the Hadoop configuration
String ufsHdfsImpl = Configuration.get(PropertyKey.UNDERFS_HDFS_IMPL);
if (!StringUtils.isEmpty(ufsHdfsImpl)) {
hadoopConf.set("fs.hdfs.impl", ufsHdfsImpl);
}
// Disable hdfs client caching so that input configuration is respected. Configurable from
// system property
hadoopConf.set("fs.hdfs.impl.disable.cache",
System.getProperty("fs.hdfs.impl.disable.cache", "true"));
HdfsUnderFileSystemUtils.addKey(hadoopConf, PropertyKey.UNDERFS_HDFS_CONFIGURATION);
}
@Override
public void close() throws IOException {
// Don't close; file systems are singletons and closing it here could break other users
}
@Override
public OutputStream createDirect(String path, CreateOptions options) throws IOException {
IOException te = null;
RetryPolicy retryPolicy = new CountingRetry(MAX_TRY);
Permission perm = options.getPermission();
while (retryPolicy.attemptRetry()) {
try {
LOG.debug("Creating HDFS file at {} with perm {}", path, perm.toString());
// TODO(chaomin): support creating HDFS files with specified block size and replication.
return FileSystem.create(mFileSystem, new Path(path),
new FsPermission(perm.getMode().toShort()));
} catch (IOException e) {
LOG.error("Retry count {} : {} ", retryPolicy.getRetryCount(), e.getMessage(), e);
te = e;
}
}
throw te;
}
@Override
public boolean deleteDirectory(String path, DeleteOptions options) throws IOException {
return isDirectory(path) && delete(path, options.isRecursive());
}
@Override
public boolean deleteFile(String path) throws IOException {
return isFile(path) && delete(path, false);
}
@Override
public long getBlockSizeByte(String path) throws IOException {
Path tPath = new Path(path);
if (!mFileSystem.exists(tPath)) {
throw new FileNotFoundException(path);
}
FileStatus fs = mFileSystem.getFileStatus(tPath);
return fs.getBlockSize();
}
@Override
public Object getConf() {
return mFileSystem.getConf();
}
@Override
public List<String> getFileLocations(String path) throws IOException {
return getFileLocations(path, FileLocationOptions.defaults());
}
@Override
public List<String> getFileLocations(String path, FileLocationOptions options)
throws IOException {
List<String> ret = new ArrayList<>();
try {
FileStatus fStatus = mFileSystem.getFileStatus(new Path(path));
BlockLocation[] bLocations =
mFileSystem.getFileBlockLocations(fStatus, options.getOffset(), 1);
if (bLocations.length > 0) {
String[] names = bLocations[0].getHosts();
Collections.addAll(ret, names);
}
} catch (IOException e) {
LOG.error("Unable to get file location for {}", path, e);
}
return ret;
}
@Override
public long getFileSize(String path) throws IOException {
Path tPath = new Path(path);
RetryPolicy retryPolicy = new CountingRetry(MAX_TRY);
while (retryPolicy.attemptRetry()) {
try {
FileStatus fs = mFileSystem.getFileStatus(tPath);
return fs.getLen();
} catch (IOException e) {
LOG.error("{} try to get file size for {} : {}", retryPolicy.getRetryCount(), path,
e.getMessage(), e);
}
}
return -1;
}
@Override
public long getModificationTimeMs(String path) throws IOException {
Path tPath = new Path(path);
if (!mFileSystem.exists(tPath)) {
throw new FileNotFoundException(path);
}
FileStatus fs = mFileSystem.getFileStatus(tPath);
return fs.getModificationTime();
}
@Override
public long getSpace(String path, SpaceType type) throws IOException {
// Ignoring the path given, will give information for entire cluster
// as Alluxio can load/store data out of entire HDFS cluster
if (mFileSystem instanceof DistributedFileSystem) {
switch (type) {
case SPACE_TOTAL:
// Due to Hadoop 1 support we stick with the deprecated version. If we drop support for it
// FileSystem.getStatus().getCapacity() will be the new one.
return ((DistributedFileSystem) mFileSystem).getDiskStatus().getCapacity();
case SPACE_USED:
// Due to Hadoop 1 support we stick with the deprecated version. If we drop support for it
// FileSystem.getStatus().getUsed() will be the new one.
return ((DistributedFileSystem) mFileSystem).getDiskStatus().getDfsUsed();
case SPACE_FREE:
// Due to Hadoop 1 support we stick with the deprecated version. If we drop support for it
// FileSystem.getStatus().getRemaining() will be the new one.
return ((DistributedFileSystem) mFileSystem).getDiskStatus().getRemaining();
default:
throw new IOException("Unknown getSpace parameter: " + type);
}
}
return -1;
}
@Override
public boolean isDirectory(String path) throws IOException {
return mFileSystem.isDirectory(new Path(path));
}
@Override
public boolean isFile(String path) throws IOException {
return mFileSystem.isFile(new Path(path));
}
@Override
public UnderFileStatus[] list(String path) throws IOException {
FileStatus[] files = listStatus(path);
if (files != null && !isFile(path)) {
UnderFileStatus[] rtn = new UnderFileStatus[files.length];
int i = 0;
for (FileStatus status : files) {
// only return the relative path, to keep consistent with java.io.File.list()
rtn[i++] = new UnderFileStatus(status.getPath().getName(), status.isDirectory());
}
return rtn;
} else {
return null;
}
}
@Override
public void connectFromMaster(String host) throws IOException {
if (!Configuration.containsKey(PropertyKey.MASTER_KEYTAB_KEY_FILE)
|| !Configuration.containsKey(PropertyKey.MASTER_PRINCIPAL)) {
return;
}
String masterKeytab = Configuration.get(PropertyKey.MASTER_KEYTAB_KEY_FILE);
String masterPrincipal = Configuration.get(PropertyKey.MASTER_PRINCIPAL);
login(PropertyKey.MASTER_KEYTAB_KEY_FILE, masterKeytab, PropertyKey.MASTER_PRINCIPAL,
masterPrincipal, host);
}
@Override
public void connectFromWorker(String host) throws IOException {
if (!Configuration.containsKey(PropertyKey.WORKER_KEYTAB_FILE)
|| !Configuration.containsKey(PropertyKey.WORKER_PRINCIPAL)) {
return;
}
String workerKeytab = Configuration.get(PropertyKey.WORKER_KEYTAB_FILE);
String workerPrincipal = Configuration.get(PropertyKey.WORKER_PRINCIPAL);
login(PropertyKey.WORKER_KEYTAB_FILE, workerKeytab, PropertyKey.WORKER_PRINCIPAL,
workerPrincipal, host);
}
private void login(PropertyKey keytabFileKey, String keytabFile, PropertyKey principalKey,
String principal, String hostname) throws IOException {
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set(keytabFileKey.toString(), keytabFile);
conf.set(principalKey.toString(), principal);
SecurityUtil.login(conf, keytabFileKey.toString(), principalKey.toString(), hostname);
}
@Override
public boolean mkdirs(String path, MkdirsOptions options) throws IOException {
IOException te = null;
RetryPolicy retryPolicy = new CountingRetry(MAX_TRY);
while (retryPolicy.attemptRetry()) {
try {
Path hdfsPath = new Path(path);
if (mFileSystem.exists(hdfsPath)) {
LOG.debug("Trying to create existing directory at {}", path);
return false;
}
// Create directories one by one with explicit permissions to ensure no umask is applied,
// using mkdirs will apply the permission only to the last directory
Stack<Path> dirsToMake = new Stack<>();
dirsToMake.push(hdfsPath);
Path parent = hdfsPath.getParent();
while (!mFileSystem.exists(parent)) {
dirsToMake.push(parent);
parent = parent.getParent();
}
while (!dirsToMake.empty()) {
if (!FileSystem.mkdirs(mFileSystem, dirsToMake.pop(),
new FsPermission(options.getPermission().getMode().toShort()))) {
return false;
}
}
return true;
} catch (IOException e) {
LOG.error("{} try to make directory for {} : {}", retryPolicy.getRetryCount(), path,
e.getMessage(), e);
te = e;
}
}
throw te;
}
@Override
public FSDataInputStream open(String path) throws IOException {
IOException te = null;
RetryPolicy retryPolicy = new CountingRetry(MAX_TRY);
while (retryPolicy.attemptRetry()) {
try {
return mFileSystem.open(new Path(path));
} catch (IOException e) {
LOG.error("{} try to open {} : {}", retryPolicy.getRetryCount(), path, e.getMessage(), e);
te = e;
}
}
throw te;
}
@Override
public boolean renameDirectory(String src, String dst) throws IOException {
LOG.debug("Renaming directory from {} to {}", src, dst);
if (!isDirectory(src)) {
LOG.error("Unable to rename {} to {} because source does not exist or is a file", src, dst);
return false;
}
return rename(src, dst);
}
@Override
public boolean renameFile(String src, String dst) throws IOException {
if (!isFile(src)) {
LOG.error("Unable to rename {} to {} because source does not exist or is a directory",
src, dst);
return false;
}
LOG.debug("Renaming file from {} to {}", src, dst);
return rename(src, dst);
}
@Override
public void setConf(Object conf) {
mFileSystem.setConf((org.apache.hadoop.conf.Configuration) conf);
}
@Override
public void setOwner(String path, String user, String group) throws IOException {
try {
FileStatus fileStatus = mFileSystem.getFileStatus(new Path(path));
LOG.info("Changing file '{}' user from: {} to {}, group from: {} to {}", fileStatus.getPath(),
fileStatus.getOwner(), user, fileStatus.getGroup(), group);
mFileSystem.setOwner(fileStatus.getPath(), user, group);
} catch (IOException e) {
LOG.error("Fail to set owner for {} with user: {}, group: {}", path, user, group, e);
LOG.warn("In order for Alluxio to create HDFS files with the correct user and groups, "
+ "Alluxio should be added to the HDFS superusers.");
throw e;
}
}
@Override
public void setMode(String path, short mode) throws IOException {
try {
FileStatus fileStatus = mFileSystem.getFileStatus(new Path(path));
LOG.info("Changing file '{}' permissions from: {} to {}", fileStatus.getPath(),
fileStatus.getPermission(), mode);
mFileSystem.setPermission(fileStatus.getPath(), new FsPermission(mode));
} catch (IOException e) {
LOG.error("Fail to set permission for {} with perm {}", path, mode, e);
throw e;
}
}
@Override
public String getOwner(String path) throws IOException {
try {
return mFileSystem.getFileStatus(new Path(path)).getOwner();
} catch (IOException e) {
LOG.error("Fail to get owner for {} ", path, e);
throw e;
}
}
@Override
public String getGroup(String path) throws IOException {
try {
return mFileSystem.getFileStatus(new Path(path)).getGroup();
} catch (IOException e) {
LOG.error("Fail to get group for {} ", path, e);
throw e;
}
}
@Override
public short getMode(String path) throws IOException {
try {
return mFileSystem.getFileStatus(new Path(path)).getPermission().toShort();
} catch (IOException e) {
LOG.error("Fail to get permission for {} ", path, e);
throw e;
}
}
@Override
public boolean supportsFlush() {
return true;
}
/**
* Delete a file or directory at path.
*
* @param path file or directory path
* @param recursive whether to delete path recursively
* @return true, if succeed
* @throws IOException when a non-alluxio error occurs
*/
private boolean delete(String path, boolean recursive) throws IOException {
LOG.debug("deleting {} {}", path, recursive);
IOException te = null;
RetryPolicy retryPolicy = new CountingRetry(MAX_TRY);
while (retryPolicy.attemptRetry()) {
try {
return mFileSystem.delete(new Path(path), recursive);
} catch (IOException e) {
LOG.error("Retry count {} : {}", retryPolicy.getRetryCount(), e.getMessage(), e);
te = e;
}
}
throw te;
}
/**
* List status for given path. Returns an array of {@link FileStatus} with an entry for each file
* and directory in the directory denoted by this path.
*
* @param path the pathname to list
* @return {@code null} if the path is not a directory
* @throws IOException
*/
private FileStatus[] listStatus(String path) throws IOException {
FileStatus[] files;
try {
files = mFileSystem.listStatus(new Path(path));
} catch (FileNotFoundException e) {
return null;
}
return files;
}
/**
* Rename a file or folder to a file or folder.
*
* @param src path of source file or directory
* @param dst path of destination file or directory
* @return true if rename succeeds
* @throws IOException
*/
private boolean rename(String src, String dst) throws IOException {
IOException te = null;
RetryPolicy retryPolicy = new CountingRetry(MAX_TRY);
while (retryPolicy.attemptRetry()) {
try {
return mFileSystem.rename(new Path(src), new Path(dst));
} catch (IOException e) {
LOG.error("{} try to rename {} to {} : {}", retryPolicy.getRetryCount(), src, dst,
e.getMessage(), e);
te = e;
}
}
throw te;
}
}