From c6c86682b308e485bc7d073dea492624210dc83e Mon Sep 17 00:00:00 2001 From: 5herhom <543872547@qq.com> Date: Sun, 10 Jul 2022 15:30:16 +0800 Subject: [PATCH] [HUDI-4282] Wrap FSDataInputStream by BoundedFsDataInputStream to repair IOException in some dfs. --- .../common/fs/BoundedFsDataInputStream.java | 68 +++++++++++++++++++ .../org/apache/hudi/common/fs/FSUtils.java | 12 ++++ .../common/table/log/HoodieLogFileReader.java | 33 +++++++-- 3 files changed, 108 insertions(+), 5 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/fs/BoundedFsDataInputStream.java diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/BoundedFsDataInputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/BoundedFsDataInputStream.java new file mode 100644 index 0000000000000..27315f85e62c9 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/BoundedFsDataInputStream.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + */ + +package org.apache.hudi.common.fs; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +public class BoundedFsDataInputStream extends FSDataInputStream { + private FileSystem fs; + private Path file; + private long fileLen = -1L; + + public BoundedFsDataInputStream(FileSystem fs, Path file, InputStream in) { + super(in); + this.fs = fs; + this.file = file; + } + + @Override + public boolean markSupported() { + return false; + } + + /* Return the file length */ + private long getFileLength() throws IOException { + if (fileLen == -1L) { + fileLen = fs.getContentSummary(file).getLength(); + } + return fileLen; + } + + @Override + public synchronized void seek(long pos) throws IOException { + if (pos < 0 || pos > getFileLength()) { + throw new EOFException("Try to seek pos[" + pos + "] , but fileSize is " + getFileLength()); + } + super.seek(pos); + } + + @Override + public synchronized long skip(long n) throws IOException { + long curPos = getPos(); + long fileLength = getFileLength(); + if (n + curPos > fileLength) { + n = fileLength - curPos; + } + return super.skip(n); + } + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index cfc143e3d0caa..6dcfa92a292b6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -85,6 +85,9 @@ public class FSUtils { private static final PathFilter ALLOW_ALL_FILTER = file -> true; + private static final Set WRAPPED_BOUNDED_FS_DATA_STREAM_SCHEMES = + Stream.of(StorageSchemes.CHDFS).collect(Collectors.toSet()); + public static Configuration prepareHadoopConf(Configuration conf) { // look for all properties, prefixed to be picked up for (Entry prop : System.getenv().entrySet()) { @@ -632,6 +635,15 @@ public static boolean isGCSFileSystem(FileSystem fs) { return fs.getScheme().equals(StorageSchemes.GCS.getScheme()); } + /** + * Some filesystem(such as chdfs) will throw {@code IOException} instead of {@code EOFException}. It will cause error in isBlockCorrupted(). + * Wrapped by {@code BoundedFsDataInputStream}, to check whether the desired offset is out of the file size in advance. + */ + public static boolean shouldWrappedByBoundedDataStream(FileSystem fs) { + return WRAPPED_BOUNDED_FS_DATA_STREAM_SCHEMES.stream() + .map(e -> e.getScheme()).anyMatch(schema -> schema.equals(fs.getScheme())); + } + public static Configuration registerFileSystem(Path file, Configuration conf) { Configuration returnConf = new Configuration(conf); String scheme = FSUtils.getFs(file.toString(), conf).getScheme(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index c7c457b2e9ddd..4bc51faf90532 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.common.fs.BoundedFsDataInputStream; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream; import org.apache.hudi.common.fs.TimedFSDataInputStream; @@ -225,7 +226,7 @@ private HoodieLogBlock readBlock() throws IOException { String.format("Parquet block could not be of version (%d)", HoodieLogFormatVersion.DEFAULT_VERSION)); return new HoodieParquetDataBlock(inputStream, content, readBlockLazily, logBlockContentLoc, - Option.ofNullable(readerSchema), header, footer, keyField); + Option.ofNullable(readerSchema), header, footer, keyField); case DELETE_BLOCK: return new HoodieDeleteBlock(content, inputStream, readBlockLazily, Option.of(logBlockContentLoc), header, footer); @@ -267,7 +268,7 @@ private HoodieLogBlock createCorruptBlock() throws IOException { private boolean isBlockCorrupted(int blocksize) throws IOException { long currentPos = inputStream.getPos(); long blockSizeFromFooter; - + try { // check if the blocksize mentioned in the footer is the same as the header; // by seeking and checking the length of a long. We do not seek `currentPos + blocksize` @@ -464,7 +465,8 @@ public void remove() { /** * Fetch the right {@link FSDataInputStream} to be used by wrapping with required input streams. - * @param fs instance of {@link FileSystem} in use. + * + * @param fs instance of {@link FileSystem} in use. * @param bufferSize buffer size to be used. * @return the right {@link FSDataInputStream} as required. */ @@ -478,6 +480,10 @@ private static FSDataInputStream getFSDataInputStream(FileSystem fs, return new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, logFile, bufferSize), true); } + if (FSUtils.shouldWrappedByBoundedDataStream(fs)) { + return wrapStreamByBoundedFsDataInputStream(fs, logFile, fsDataInputStream, bufferSize); + } + if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); @@ -491,8 +497,9 @@ private static FSDataInputStream getFSDataInputStream(FileSystem fs, /** * GCS FileSystem needs some special handling for seek and hence this method assists to fetch the right {@link FSDataInputStream} to be * used by wrapping with required input streams. + * * @param fsDataInputStream original instance of {@link FSDataInputStream}. - * @param bufferSize buffer size to be used. + * @param bufferSize buffer size to be used. * @return the right {@link FSDataInputStream} as required. */ private static FSDataInputStream getFSDataInputStreamForGCS(FSDataInputStream fsDataInputStream, @@ -509,11 +516,27 @@ private static FSDataInputStream getFSDataInputStreamForGCS(FSDataInputStream fs if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream && ((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof FSInputStream) { - FSInputStream inputStream = (FSInputStream)((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream(); + FSInputStream inputStream = (FSInputStream) ((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream(); return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(new BufferedFSInputStream(inputStream, bufferSize))); } return fsDataInputStream; } + + /** + * Some filesystem(such as chdfs) will throw {@code IOException} instead of {@code EOFException}. It will cause error in isBlockCorrupted(). + * Wrapped by {@code BoundedFsDataInputStream}, to check whether the desired offset is out of the file size in advance. + */ + private static FSDataInputStream wrapStreamByBoundedFsDataInputStream(FileSystem fs, + HoodieLogFile logFile, + FSDataInputStream fsDataInputStream, + int bufferSize) { + if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { + return new TimedFSDataInputStream(logFile.getPath(), new BoundedFsDataInputStream(fs, logFile.getPath(), new FSDataInputStream( + new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)))); + } + + return new BoundedFsDataInputStream(fs, logFile.getPath(), fsDataInputStream); + } }