Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.DelegateFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This {@link FileIO} implementation should be used when the FileSystem behind the Iceberg tables
* is also used for other purposes in a Flink job, like checkpoints/savepoints. Setting
* FlinkFileSystemFileIO prevents duplicated FileSystem related configurations and allows the usage
* of features already provided by the Flink FileSystem plugins, like Delegation Tokens.
*
* <p>The FlinkFileSystemFileIO should be set during catalog creation using the {@link
* org.apache.iceberg.CatalogProperties#FILE_IO_IMPL} property.
*
* <p>The FlinkFileSystemFileIO never should be set using table properties, as other engines will
* not be able to use the table in this case.
*/
public class FlinkFileSystemFileIO implements DelegateFileIO {
private static final Logger LOG = LoggerFactory.getLogger(FlinkFileSystemFileIO.class);
private static final String DELETE_FILE_PARALLELISM =
"iceberg.flinkfilesystem.delete-file-parallelism";
private static final String DELETE_FILE_POOL_NAME = "iceberg-flinkfilesystem-delete";
private static final int DELETE_RETRY_ATTEMPTS = 3;
private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
private static volatile ExecutorService executorService;
private SerializableMap<String, String> properties = SerializableMap.copyOf(ImmutableMap.of());

@Override
public InputFile newInputFile(String path) {
return new FlinkInputFile(new Path(path));
}

@Override
public InputFile newInputFile(String path, long length) {
return new FlinkInputFile(new Path(path), length);
}

@Override
public OutputFile newOutputFile(String path) {
return new FlinkOutputFile(new Path(path));
}

@Override
public void deleteFile(String path) {
Path toDelete = new Path(path);
try {
toDelete.getFileSystem().delete(toDelete, false /* not recursive */);
} catch (IOException e) {
throw new UncheckedIOException(String.format("Failed to delete file: %s", path), e);
}
}

@Override
public Iterable<FileInfo> listPrefix(String prefix) {
LOG.debug("Listing {}", prefix);
Path prefixToList = new Path(prefix);
try {
return listPrefix(prefixToList.getFileSystem(), prefixToList);
} catch (IOException e) {
throw new UncheckedIOException(String.format("Failed to listing prefix: %s", prefix), e);
}
}

@Override
public void deletePrefix(String prefix) {
Path prefixToDelete = new Path(prefix);

try {
prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive */);
} catch (IOException e) {
throw new UncheckedIOException(String.format("Failed to delete prefix: %s", prefix), e);
}
}

@Override
public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException {
AtomicInteger failureCount = new AtomicInteger(0);
Tasks.foreach(pathsToDelete)
.executeWith(executorService())
.retry(DELETE_RETRY_ATTEMPTS)
.stopRetryOn(FileNotFoundException.class)
.suppressFailureWhenFinished()
.onFailure(
(f, e) -> {
LOG.error("Failure during bulk delete on file: {} ", f, e);
failureCount.incrementAndGet();
})
.run(this::deleteFile);

if (failureCount.get() != 0) {
throw new BulkDeletionFailureException(failureCount.get());
}
}

@Override
public void initialize(Map<String, String> props) {
this.properties = SerializableMap.copyOf(props);
}

@Override
public Map<String, String> properties() {
return properties.immutableMap();
}

private Iterable<FileInfo> listPrefix(FileSystem fs, Path fileName) {
try {
FileStatus[] statuses = fs.listStatus(fileName);
LOG.debug("Listing path {} {}", fileName, fs.listStatus(fileName));
if (statuses == null) {
// Check the file is there and ready. If so, then we can assume this is an empty dir.
fs.getFileStatus(fileName);
statuses = new FileStatus[0];
}

return Iterables.concat(
Arrays.stream(statuses)
.map(
fileStatus -> {
if (fileStatus.isDir()) {
return listPrefix(fs, fileStatus.getPath());
} else {
return Collections.singleton(
new FileInfo(
fileStatus.getPath().toString(),
fileStatus.getLen(),
fileStatus.getModificationTime()));
}
})
.collect(Collectors.toList()));
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Failed to list path recursively: %s", fileName), e);
}
}

private ExecutorService executorService() {
if (executorService == null) {
synchronized (FlinkFileSystemFileIO.class) {
if (executorService == null) {
executorService = ThreadPools.newWorkerPool(DELETE_FILE_POOL_NAME, deleteThreads());
}
}
}

return executorService;
}

private int deleteThreads() {
int defaultValue = Runtime.getRuntime().availableProcessors() * DEFAULT_DELETE_CORE_MULTIPLE;
return properties.containsKey(DELETE_FILE_PARALLELISM)
? Integer.parseInt(properties.get(DELETE_FILE_PARALLELISM))
: defaultValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.Arrays;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.iceberg.encryption.NativeFileCryptoParameters;
import org.apache.iceberg.encryption.NativelyEncryptedFile;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.DelegatingInputStream;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkInputFile implements InputFile, NativelyEncryptedFile {
private static final Logger LOG = LoggerFactory.getLogger(FlinkInputFile.class);

private final Path path;
private final FileSystem fs;
private FileStatus stat = null;
private Long length = null;
private NativeFileCryptoParameters nativeDecryptionParameters;

public FlinkInputFile(Path path) {
this.path = path;
try {
this.fs = path.getFileSystem();
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Failed to get file system for path: %s", path), e);
}
}

public FlinkInputFile(Path path, long length) {
this(path);
this.length = length;
}

@Override
public long getLength() {
if (length == null) {
this.length = lazyStat().getLen();
}

return length;
}

@Override
public SeekableInputStream newStream() {
try {
return new FlinkSeekableInputStream(path.getFileSystem().open(path));
} catch (FileNotFoundException e) {
throw new NotFoundException(e, "Failed to open input stream for file: %s", path);
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Failed to open input stream for file: %s", path), e);
}
}

@Override
public String location() {
return path.toString();
}

@Override
public boolean exists() {
try {
return lazyStat() != null;
} catch (NotFoundException e) {
return false;
}
}

private FileStatus lazyStat() {
if (stat == null) {
try {
this.stat = fs.getFileStatus(path);
} catch (FileNotFoundException e) {
throw new NotFoundException(e, "File does not exist: %s", path);
} catch (IOException e) {
throw new UncheckedIOException(String.format("Failed to get status for file: %s", path), e);
}
}

return stat;
}

/** SeekableInputStream implementation for Flink FSDataInputStream. */
private static class FlinkSeekableInputStream extends SeekableInputStream
implements DelegatingInputStream {
private final FSDataInputStream stream;
private final StackTraceElement[] createStack;
private boolean closed;

FlinkSeekableInputStream(FSDataInputStream stream) {
this.stream = stream;
this.createStack = Thread.currentThread().getStackTrace();
this.closed = false;
}

@Override
public InputStream getDelegate() {
return stream;
}

@Override
public void close() throws IOException {
stream.close();
this.closed = true;
}

@Override
public long getPos() throws IOException {
return stream.getPos();
}

@Override
public void seek(long newPos) throws IOException {
stream.seek(newPos);
}

@Override
public int read() throws IOException {
return stream.read();
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return stream.read(b, off, len);
}

@SuppressWarnings({"checkstyle:NoFinalizer", "Finalize", "deprecation"})
@Override
protected void finalize() throws Throwable {
super.finalize();
if (!closed) {
close(); // releasing resources is more important than printing the warning
String trace =
Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length));
LOG.warn("Unclosed input stream created by:\n\t{}", trace);
}
}
}

@Override
public NativeFileCryptoParameters nativeCryptoParameters() {
return nativeDecryptionParameters;
}

@Override
public void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoParameters) {
this.nativeDecryptionParameters = nativeCryptoParameters;
}
}
Loading