From 4868c9410a42f85ee6b4b89cb55d18f714759093 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Mon, 13 Feb 2017 14:29:03 +0100 Subject: [PATCH] [FLINK-5788] [docs] Improve documentation of FileSystem and specify the data persistence contract. --- docs/internals/filesystems.md | 138 ++++++++++++++++++ .../flink/core/fs/FSDataInputStream.java | 11 +- .../flink/core/fs/FSDataOutputStream.java | 81 +++++++++- .../org/apache/flink/core/fs/FileSystem.java | 98 ++++++++++++- 4 files changed, 323 insertions(+), 5 deletions(-) create mode 100644 docs/internals/filesystems.md diff --git a/docs/internals/filesystems.md b/docs/internals/filesystems.md new file mode 100644 index 0000000000000..ada4b1a50c0bb --- /dev/null +++ b/docs/internals/filesystems.md @@ -0,0 +1,138 @@ +--- +title: "File Systems" +nav-parent_id: internals +nav-pos: 10 +--- + + +* Replaced by the TOC +{:toc} + +Flink has its own file system abstraction via the `org.apache.flink.core.fs.FileSystem` class. +This abstraction provides a common set of operations and minimal guarantees across various types +of file system implementations. + +The `FileSystem`'s set of available operations is quite limited, in order to suport a wide +range of file systems. For example, appending to or mutating existing files is not supported. + +File systems are identified by a *file system scheme*, such as `file://`, `hdfs://`, etc. + +# Implementations + +Flink implements the file systems directly, with the following file system schemes: + + - `file`, which represents the machines local file system. + +Other file system types are accessed by an implementation that bridges to the suite of file systems supported by +[Apache Hadoop](https://hadoop.apache.org/). The following is an incomplete list of examples: + + - `hdfs`: Hadoop Distributed File System + - `s3`, `s3n`, and `s3a`: Amazon S3 file system + - `gcs`: Google Cloud Storage + - `maprfs`: The MapR distributed file system + - ... + +Flink loads Hadoop's file systems transparently if it finds the Hadoop File System classes in the class path and finds a valid +Hadoop configuration. By default, it looks for the Hadoop configuration in the class path. Alternatively, one can specify a +custom location via the configuration entry `fs.hdfs.hadoopconf`. + + +# Persistence Guarantees + +These `FileSystem` and its `FsDataOutputStream` instances are used to persistently store data, both for results of applications +and for fault tolerance and recovery. It is therefore crucial that the persistence semantics of these streams are well defined. + +## Definition of Persistence Guarantees + +Data written to an output stream is considered persistent, if two requirements are met: + + 1. **Visibility Requirement:** It must be guaranteed that all other processes, machines, + virtual machines, containers, etc. that are able to access the file see the data consistently + when given the absolute file path. This requirement is similar to the *close-to-open* + semantics defined by POSIX, but restricted to the file itself (by its absolute path). + + 2. **Durability Requirement:** The file system's specific durability/persistence requirements + must be met. These are specific to the particular file system. For example the + {@link LocalFileSystem} does not provide any durability guarantees for crashes of both + hardware and operating system, while replicated distributed file systems (like HDFS) + guarantee typically durability in the presence of up to concurrent failure or *n* + nodes, where *n* is the replication factor. + +Updates to the file's parent directory (such as that the file shows up when +listing the directory contents) are not required to be complete for the data in the file stream +to be considered persistent. This relaxation is important for file systems where updates to +directory contents are only eventually consistent. + +The `FSDataOutputStream` has to guarantee data persistence for the written bytes once the call to +`FSDataOutputStream.close()` returns. + +## Examples + + - For **fault-tolerant distributed file systems**, data is considered persistent once + it has been received and acknowledged by the file system, typically by having been replicated + to a quorum of machines (*durability requirement*). In addition the absolute file path + must be visible to all other machines that will potentially access the file (*visibility requirement*). + + Whether data has hit non-volatile storage on the storage nodes depends on the specific + guarantees of the particular file system. + + The metadata updates to the file's parent directory are not required to have reached + a consistent state. It is permissible that some machines see the file when listing the parent + directory's contents while other do not, as long as access to the file by its absolute path + is possible on all nodes. + + - A **local file system** must support the POSIX *close-to-open* semantics. + Because the local file system does not have any fault tolerance guarantees, no further + requirements exist. + + The above implies specifically that data may still be in the OS cache when considered + persistent from the local file system's perspective. Crashes that cause the OS cache loose + data are considered fatal to the local machine and not covered by the local file system's + guarantees as defined by Flink. + + That means that computed results, checkpoints, and savepoints that are written only to + the local filesystem are not guaranteed to be recoverable from the local machine's failure, + making local file systems unsuitable for production setups. + +# Updating File Contents + +Many file systems either do not support overwriting contents of existing files at all, or do not support consistent visibility of the +updated contents in that case. For that reason, Flink's FileSystem does not support appending to existing files, or seeking output streams +such that previously written data could be changed within the same file. + +# Overwriting Files + +Overwriting files is in general possible. A file is overwritten by deleting it and creating a new file. +However, certain filesystems cannot make that change synchronously visible to all parties that have access to the file. +For example [Amazon S3](https://aws.amazon.com/documentation/s3/) guarantees only *eventual consistency* in +the visibility of the file replacement: Some machines may see the old file, some machines may see the new file. + +To avoid these consistency issues, the implementations of failure/recovery mechanisms in Flink strictly avoid writing to +the same file path more than once. + +# Thread Safety + +Implementations of `FileSystem` must be thread-safe: The same instance of `FileSystem` is frequently shared across multiple threads +in Flink and must be able to concurrently create input/output streams and list file metadata. + +The `FSDataOutputStream` and `FSDataOutputStream` implementations are strictly **not thread-safe**. +Instances of the streams should also not be passed between threads in between read or write operations, because there are no guarantees +about the visibility of operations across threads (many operations do not create memory fences). + diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java index 6ce1235eff510..44dbcb17174d5 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java @@ -25,6 +25,10 @@ /** * Interface for a data input stream to a file on a {@link FileSystem}. + * + *

This extends the {@link java.io.InputStream} with methods for accessing + * the stream's {@link #getPos() current position} and + * {@link #seek(long) seeking} to a desired position. */ @Public public abstract class FSDataInputStream extends InputStream { @@ -35,15 +39,16 @@ public abstract class FSDataInputStream extends InputStream { * * @param desired * the desired offset - * @throws IOException - * thrown if an error occurred while seeking inside the input stream + * @throws IOException Thrown if an error occurred while seeking inside the input stream. */ public abstract void seek(long desired) throws IOException; /** - * Get the current position in the input stream. + * Gets the current position in the input stream. * * @return current position in the input stream + * @throws IOException Thrown if an I/O error occurred in the underlying stream + * implementation while accessing the stream's position. */ public abstract long getPos() throws IOException; } diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java index 0318d1f16e678..a8df5c18c0d3f 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java @@ -24,14 +24,93 @@ import java.io.OutputStream; /** - * Interface for a data output stream to a file on a {@link FileSystem}. + * An output stream to a file that is created via a {@link FileSystem}. + * This class extends the base {@link java.io.OutputStream} with some additional important methods. + * + *

Data Persistence Guarantees

+ * + * These streams are used to persistently store data, both for results of streaming applications + * and for fault tolerance and recovery. It is therefore crucial that the persistence semantics + * of these streams are well defined. + * + *

Please refer to the class-level docs of {@link FileSystem} for the definition of data persistence + * via Flink's FileSystem abstraction and the {@code FSDataOutputStream}. + * + *

Thread Safety

+ * + * Implementations of the {@code FSDataOutputStream} are generally not assumed to be thread safe. + * Instances of {@code FSDataOutputStream} should not be passed between threads, because there + * are no guarantees about the order of visibility of operations across threads. + * + * @see FileSystem + * @see FSDataInputStream */ @Public public abstract class FSDataOutputStream extends OutputStream { + /** + * Gets the position of the stream (non-negative), defined as the number of bytes + * from the beginning of the file to the current writing position. The position + * corresponds to the zero-based index of the next byte that will be written. + * + *

This method must report accurately report the current position of the stream. + * Various components of the high-availability and recovery logic rely on the accurate + * + * @return The current position in the stream, defined as the number of bytes + * from the beginning of the file to the current writing position. + * + * @throws IOException Thrown if an I/O error occurs while obtaining the position from + * the stream implementation. + */ public abstract long getPos() throws IOException; + /** + * Flushes the stream, writing any data currently buffered in stream implementation + * to the proper output stream. After this method has been called, the stream implementation + * must not hold onto any buffered data any more. + * + *

A completed flush does not mean that the data is necessarily persistent. Data + * persistence can is only assumed after calls to {@link #close()} or {@link #sync()}. + * + *

Implementation note: This overrides the method defined in {@link OutputStream} + * as abstract to force implementations of the {@code FSDataOutputStream} to implement + * this method directly. + * + * @throws IOException Thrown if an I/O error occurs while flushing the stream. + */ public abstract void flush() throws IOException; + /** + * Flushes the data all the way to the persistent non-volatile storage (for example disks). + * The method behaves similar to the fsync function, forcing all data to + * be persistent on the devices. + * + *

+ * + * @throws IOException Thrown if an I/O error occurs + */ public abstract void sync() throws IOException; + + /** + * Closes the output stream. After this method returns, the implementation must guarantee + * that all data written to the stream is persistent/visible, as defined in the + * {@link FileSystem class-level docs}. + * + *

The above implies that the method must block until persistence can be guaranteed. + * For example for distributed replicated file systems, the method must block until the + * replication quorum has been reached. If the calling thread is interrupted in the + * process, it must fail with an {@code IOException} to indicate that persistence cannot + * be guaranteed. + * + *

If this method throws an exception, the data in the stream cannot be assumed to be + * persistent. + * + *

Implementation note: This overrides the method defined in {@link OutputStream} + * as abstract to force implementations of the {@code FSDataOutputStream} to implement + * this method directly. + * + * @throws IOException Thrown, if an error occurred while closing the stream or guaranteeing + * that the data is persistent. + */ + public abstract void close() throws IOException; } diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index d8efcbc03146e..d34da2dc22390 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -52,12 +52,108 @@ /** * Abstract base class of all file systems used by Flink. This class may be extended to implement * distributed file systems, or local file systems. The abstraction by this file system is very simple, - * and teh set of allowed operations quite limited, to support the common denominator of a wide + * and the set of allowed operations quite limited, to support the common denominator of a wide * range of file systems. For example, appending to or mutating existing files is not supported. * *

Flink implements and supports some file system types directly (for example the default * machine-local file system). Other file system types are accessed by an implementation that bridges * to the suite of file systems supported by Hadoop (such as for example HDFS). + * + *

Data Persistence

+ * + * The FileSystem's {@link FSDataOutputStream output streams} are used to persistently store data, + * both for results of streaming applications and for fault tolerance and recovery. It is therefore + * crucial that the persistence semantics of these streams are well defined. + * + *

Definition of Persistence Guarantees

+ * + * Data written to an output stream is considered persistent, if two requirements are met: + * + *
    + *
  1. Visibility Requirement: It must be guaranteed that all other processes, machines, + * virtual machines, containers, etc. that are able to access the file see the data consistently + * when given the absolute file path. This requirement is similar to the close-to-open + * semantics defined by POSIX, but restricted to the file itself (by its absolute path).
  2. + * + *
  3. Durability Requirement: The file system's specific durability/persistence requirements + * must be met. These are specific to the particular file system. For example the + * {@link LocalFileSystem} does not provide any durability guarantees for crashes of both + * hardware and operating system, while replicated distributed file systems (like HDFS) + * guarantee typically durability in the presence of up to concurrent failure or nn is the replication factor.
  4. + *
+ * + *

Updates to the file's parent directory (such as that the file shows up when + * listing the directory contents) are not required to be complete for the data in the file stream + * to be considered persistent. This relaxation is important for file systems where updates to + * directory contents are only eventually consistent. + * + *

The {@link FSDataOutputStream} has to guarantee data persistence for the written bytes + * once the call to {@link FSDataOutputStream#close()} returns. + * + *

Examples

+ * + * + * + *

Updating File Contents

+ * + * Many file systems either do not support overwriting contents of existing files at all, or do + * not support consistent visibility of the updated contents in that case. For that reason, + * Flink's FileSystem does not support appending to existing files, or seeking output streams + * so that previously written data could be overwritten. + * + *

Overwriting Files

+ * + * Overwriting files is in general possible. A file is overwritten by deleting it and creating + * a new file. However, certain filesystems cannot make that change synchronously visible + * to all parties that have access to the file. + * For example Amazon S3 guarantees only + * eventual consistency in the visibility of the file replacement: Some machines may see + * the old file, some machines may see the new file. + * + *

To avoid these consistency issues, the implementations of failure/recovery mechanisms in + * Flink strictly avoid writing to the same file path more than once. + * + *

Thread Safety

+ * + * Implementations of {@code FileSystem} must be thread-safe: The same instance of FileSystem + * is frequently shared across multiple threads in Flink and must be able to concurrently + * create input/output streams and list file metadata. + * + *

The {@link FSDataOutputStream} and {@link FSDataOutputStream} implementations are strictly + * not thread-safe. Instances of the streams should also not be passed between threads + * in between read or write operations, because there are no guarantees about the visibility of + * operations across threads (many operations do not create memory fences). + * + * @see FSDataInputStream + * @see FSDataOutputStream */ @Public public abstract class FileSystem {