-
Notifications
You must be signed in to change notification settings - Fork 497
/
FileMetadataLoader.java
388 lines (345 loc) · 14.6 KB
/
FileMetadataLoader.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Reference;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.HudiUtil;
import org.apache.impala.util.ListMap;
import org.apache.impala.util.ThreadNameAnnotator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
/**
* Utility for loading file metadata within a partition directory.
*/
public class FileMetadataLoader {
private final static Logger LOG = LoggerFactory.getLogger(FileMetadataLoader.class);
private static final Configuration CONF = new Configuration();
// The number of unfinished instances. Incremented in the constructor and decremented
// at the end of load().
public static final AtomicInteger TOTAL_TASKS = new AtomicInteger();
protected final Path partDir_;
protected final boolean recursive_;
protected final ImmutableMap<String, FileDescriptor> oldFdsByPath_;
private final ListMap<TNetworkAddress> hostIndex_;
@Nullable
private final ValidWriteIdList writeIds_;
@Nullable
private final ValidTxnList validTxnList_;
@Nullable
private final HdfsFileFormat fileFormat_;
protected boolean forceRefreshLocations = false;
protected List<FileDescriptor> loadedFds_;
private List<FileDescriptor> loadedInsertDeltaFds_;
private List<FileDescriptor> loadedDeleteDeltaFds_;
protected LoadStats loadStats_;
protected String debugAction_;
/**
* @param partDir the dir for which to fetch file metadata
* @param recursive whether to recursively list files
* @param oldFds any pre-existing file descriptors loaded for this table, used
* to optimize refresh if available.
* @param hostIndex the host index with which to associate the file descriptors
* @param validTxnList if non-null, it can tell whether a given transaction id is
* committed or not. We need it to ignore base directories of in-progress
* compactions.
* @param writeIds if non-null, a write-id list which will filter the returned
* file descriptors to only include those indicated to be valid.
* @param fileFormat if non-null and equal to HdfsFileFormat.HUDI_PARQUET,
* this loader will filter files based on Hudi's HoodieROTablePathFilter method
*/
public FileMetadataLoader(Path partDir, boolean recursive, List<FileDescriptor> oldFds,
ListMap<TNetworkAddress> hostIndex, @Nullable ValidTxnList validTxnList,
@Nullable ValidWriteIdList writeIds, @Nullable HdfsFileFormat fileFormat) {
// Either both validTxnList and writeIds are null, or none of them.
Preconditions.checkState((validTxnList == null && writeIds == null)
|| (validTxnList != null && writeIds != null));
partDir_ = Preconditions.checkNotNull(partDir);
recursive_ = recursive;
hostIndex_ = Preconditions.checkNotNull(hostIndex);
oldFdsByPath_ = Maps.uniqueIndex(oldFds, FileDescriptor::getPath);
writeIds_ = writeIds;
validTxnList_ = validTxnList;
fileFormat_ = fileFormat;
if (writeIds_ != null) {
Preconditions.checkArgument(recursive_, "ACID tables must be listed recursively");
}
TOTAL_TASKS.incrementAndGet();
}
public FileMetadataLoader(Path partDir, boolean recursive, List<FileDescriptor> oldFds,
ListMap<TNetworkAddress> hostIndex, @Nullable ValidTxnList validTxnList,
@Nullable ValidWriteIdList writeIds) {
this(partDir, recursive, oldFds, hostIndex, validTxnList, writeIds, null);
}
/**
* If 'refresh' is true, force re-fetching block locations even if a file does not
* appear to have changed.
*/
public void setForceRefreshBlockLocations(boolean refresh) {
forceRefreshLocations = refresh;
}
/**
* @return the file descriptors that were loaded after an invocation of load()
*/
public List<FileDescriptor> getLoadedFds() {
Preconditions.checkState(loadedFds_ != null,
"Must have successfully loaded first");
return loadedFds_;
}
public List<FileDescriptor> getLoadedInsertDeltaFds() {
return loadedInsertDeltaFds_;
}
public List<FileDescriptor> getLoadedDeleteDeltaFds() {
return loadedDeleteDeltaFds_;
}
/**
* @return statistics about the descriptor loading process, after an invocation of
* load()
*/
public LoadStats getStats() {
Preconditions.checkState(loadedFds_ != null,
"Must have successfully loaded first");
return loadStats_;
}
Path getPartDir() { return partDir_; }
/**
* Load the file descriptors, which may later be fetched using {@link #getLoadedFds()}.
* After a successful load, stats may be fetched using {@link #getStats()}.
*
* If the directory does not exist, this succeeds and yields an empty list of
* descriptors.
*
* @throws IOException if listing fails.
* @throws CatalogException on ACID errors. TODO: remove this once IMPALA-9042 is
* resolved.
*/
public void load() throws CatalogException, IOException {
try {
loadInternal();
} finally {
TOTAL_TASKS.decrementAndGet();
}
}
private void loadInternal() throws CatalogException, IOException {
Preconditions.checkState(loadStats_ == null, "already loaded");
loadStats_ = new LoadStats(partDir_);
FileSystem fs = partDir_.getFileSystem(CONF);
// If we don't have any prior FDs from which we could re-use old block location info,
// we'll need to fetch info for every returned file. In this case we can inline
// that request with the 'list' call and save a round-trip per file.
//
// In the case that we _do_ have existing FDs which we can reuse, we'll optimistically
// assume that most _can_ be reused, in which case it's faster to _not_ prefetch
// the locations.
boolean listWithLocations = FileSystemUtil.supportsStorageIds(fs) &&
(oldFdsByPath_.isEmpty() || forceRefreshLocations);
String msg = String.format("%s file metadata%s from path %s",
oldFdsByPath_.isEmpty() ? "Loading" : "Refreshing",
listWithLocations ? " with eager location-fetching" : "", partDir_);
LOG.trace(msg);
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(msg)) {
List<FileStatus> fileStatuses = getFileStatuses(fs, listWithLocations);
loadedFds_ = new ArrayList<>();
if (fileStatuses == null) return;
Reference<Long> numUnknownDiskIds = new Reference<>(0L);
if (writeIds_ != null) {
fileStatuses = AcidUtils.filterFilesForAcidState(fileStatuses, partDir_,
validTxnList_, writeIds_, loadStats_);
}
if (fileFormat_ == HdfsFileFormat.HUDI_PARQUET) {
fileStatuses = HudiUtil.filterFilesForHudiROPath(fileStatuses);
}
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
continue;
}
if (!FileSystemUtil.isValidDataFile(fileStatus)) {
++loadStats_.hiddenFiles;
continue;
}
FileDescriptor fd = getFileDescriptor(fs, listWithLocations, numUnknownDiskIds,
fileStatus);
loadedFds_.add(Preconditions.checkNotNull(fd));
}
if (writeIds_ != null) {
loadedInsertDeltaFds_ = new ArrayList<>();
loadedDeleteDeltaFds_ = new ArrayList<>();
for (FileDescriptor fd : loadedFds_) {
if (AcidUtils.isDeleteDeltaFd(fd)) {
loadedDeleteDeltaFds_.add(fd);
} else {
loadedInsertDeltaFds_.add(fd);
}
}
}
loadStats_.unknownDiskIds += numUnknownDiskIds.getRef();
if (LOG.isTraceEnabled()) {
LOG.trace(loadStats_.debugString());
}
}
}
/**
* Return fd created by the given fileStatus or from the cache(oldFdsByPath_).
*/
protected FileDescriptor getFileDescriptor(FileSystem fs, boolean listWithLocations,
Reference<Long> numUnknownDiskIds, FileStatus fileStatus) throws IOException {
String relPath = FileSystemUtil.relativizePath(fileStatus.getPath(), partDir_);
FileDescriptor fd = oldFdsByPath_.get(relPath);
if (listWithLocations || forceRefreshLocations || fd == null ||
fd.isChanged(fileStatus)) {
fd = createFd(fs, fileStatus, relPath, numUnknownDiskIds);
++loadStats_.loadedFiles;
} else {
++loadStats_.skippedFiles;
}
return fd;
}
/**
* Return located file status list when listWithLocations is true.
*/
protected List<FileStatus> getFileStatuses(FileSystem fs, boolean listWithLocations)
throws IOException {
RemoteIterator<? extends FileStatus> fileStatuses;
if (listWithLocations) {
fileStatuses = FileSystemUtil
.listFiles(fs, partDir_, recursive_, debugAction_);
} else {
fileStatuses = FileSystemUtil
.listStatus(fs, partDir_, recursive_, debugAction_);
// TODO(todd): we could look at the result of listing without locations, and if
// we see that a substantial number of the files have changed, it may be better
// to go back and re-list with locations vs doing an RPC per file.
}
if (fileStatuses == null) return null;
List<FileStatus> stats = new ArrayList<>();
while (fileStatuses.hasNext()) {
stats.add(fileStatuses.next());
}
return stats;
}
/**
* Create a FileDescriptor for the given FileStatus. If the FS supports block locations,
* and FileStatus is a LocatedFileStatus (i.e. the location was prefetched) this uses
* the already-loaded information; otherwise, this may have to remotely look up the
* locations.
* 'absPath' is null except for the Iceberg tables, because datafiles of the
* Iceberg tables may not be in the table location.
*/
protected FileDescriptor createFd(FileSystem fs, FileStatus fileStatus,
String relPath, Reference<Long> numUnknownDiskIds, String absPath)
throws IOException {
if (!FileSystemUtil.supportsStorageIds(fs)) {
return FileDescriptor.createWithNoBlocks(fileStatus, relPath, absPath);
}
BlockLocation[] locations;
if (fileStatus instanceof LocatedFileStatus) {
locations = ((LocatedFileStatus) fileStatus).getBlockLocations();
} else {
locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
}
return FileDescriptor.create(fileStatus, relPath, locations, hostIndex_,
fileStatus.isEncrypted(), fileStatus.isErasureCoded(), numUnknownDiskIds,
absPath);
}
private FileDescriptor createFd(FileSystem fs, FileStatus fileStatus,
String relPath, Reference<Long> numUnknownDiskIds) throws IOException {
return createFd(fs, fileStatus, relPath, numUnknownDiskIds, null);
}
/**
* Given a file descriptor list 'oldFds', returns true if the loaded file descriptors
* are the same as them.
*/
public boolean hasFilesChangedCompareTo(List<FileDescriptor> oldFds) {
if (oldFds.size() != loadedFds_.size()) return true;
ImmutableMap<String, FileDescriptor> oldFdsByRelPath =
Maps.uniqueIndex(oldFds, FileDescriptor::getPath);
for (FileDescriptor fd : loadedFds_) {
FileDescriptor oldFd = oldFdsByRelPath.get(fd.getPath());
if (fd.isChanged(oldFd)) return true;
}
return false;
}
/**
* Enables injection of a debug actions to introduce delays in HDFS listStatus or
* listFiles call during the file-metadata loading.
*/
public void setDebugAction(String debugAction) {
this.debugAction_ = debugAction;
}
// File/Block metadata loading stats for a single HDFS path.
public static class LoadStats {
private final Path partDir_;
LoadStats(Path partDir) {
this.partDir_ = Preconditions.checkNotNull(partDir);
}
/** Number of files skipped because they pertain to an uncommitted ACID transaction */
public int uncommittedAcidFilesSkipped = 0;
/**
* Number of files skipped because they pertain to ACID directories superseded
* by compaction or newer base.
*/
public int filesSupersededByAcidState = 0;
// Number of files for which the metadata was loaded.
public int loadedFiles = 0;
// Number of hidden files excluded from file metadata loading. More details at
// isValidDataFile().
public int hiddenFiles = 0;
// Number of files skipped from file metadata loading because the files have not
// changed since the last load. More details at hasFileChanged().
//
// TODO(todd) rename this to something indicating it was fast-pathed, not skipped
public int skippedFiles = 0;
// Number of unknown disk IDs encountered while loading block
// metadata for this path.
public int unknownDiskIds = 0;
public String debugString() {
return MoreObjects.toStringHelper("")
.add("path", partDir_)
.add("loaded files", loadedFiles)
.add("hidden files", nullIfZero(hiddenFiles))
.add("skipped files", nullIfZero(skippedFiles))
.add("uncommited files", nullIfZero(uncommittedAcidFilesSkipped))
.add("superceded files", nullIfZero(filesSupersededByAcidState))
.add("unknown diskIds", nullIfZero(unknownDiskIds))
.omitNullValues()
.toString();
}
private Integer nullIfZero(int x) {
return x > 0 ? x : null;
}
}
}