Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-16906. Abortable #2684

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
* Abort data being written to a stream, so that close() does
* not write the data. It is implemented by output streams in
* some object stores, and passed through {@link FSDataOutputStream}.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface Abortable {

/**
* Abort the active operation without the output becoming visible.
*
* This is to provide ability to cancel the write on stream; once
* a stream is aborted, the write MUST NOT become visible.
*
* @throws UnsupportedOperationException if the operation is not supported.
* @return the result.
*/
AbortableResult abort();

/**
* Interface for the result of aborts; allows subclasses to extend
* (IOStatistics etc) or for future enhancements if ever needed.
*/
interface AbortableResult {

/**
* Was the stream already closed/aborted?
* @return true if a close/abort operation had already
* taken place.
*/
boolean alreadyClosed();

/**
* Any exception caught during cleanup operations,
* exceptions whose raising/catching does not change
* the semantics of the abort.
* @return an exception or null.
*/
IOException anyCleanupException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,11 @@ private CommonPathCapabilities() {
public static final String FS_MULTIPART_UPLOADER =
"fs.capability.multipart.uploader";


/**
* Stream abort() capability implemented by {@link Abortable#abort()}.
* Value: {@value}.
*/
public static final String ABORTABLE_STREAM =
"fs.capability.outputstream.abortable";
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
@InterfaceStability.Stable
public class FSDataOutputStream extends DataOutputStream
implements Syncable, CanSetDropBehind, StreamCapabilities,
IOStatisticsSource {
IOStatisticsSource, Abortable {
private final OutputStream wrappedStream;

private static class PositionCache extends FilterOutputStream {
Expand Down Expand Up @@ -168,4 +168,21 @@ public void setDropBehind(Boolean dropBehind) throws IOException {
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(wrappedStream);
}

/**
* Invoke {@code abort()} on the wrapped stream if it
* is Abortable, otherwise raise an
* {@code UnsupportedOperationException}.
* @throws UnsupportedOperationException if not available.
* @return the result.
*/
@Override
public AbortableResult abort() {
if (wrappedStream instanceof Abortable) {
return ((Abortable) wrappedStream).abort();
} else {
throw new UnsupportedOperationException(
FSExceptionMessages.ABORTABLE_UNSUPPORTED);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,10 @@ public class FSExceptionMessages {

public static final String PERMISSION_DENIED_BY_STICKY_BIT =
"Permission denied by sticky bit";

/**
* A call was made to abort(), but it is not supported.
*/
public static final String ABORTABLE_UNSUPPORTED =
"Abortable.abort() is not supported";
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ public interface StreamCapabilities {
*/
String IOSTATISTICS = "iostatistics";

/**
* Stream abort() capability implemented by {@link Abortable#abort()}.
* This matches the Path Capability
* {@link CommonPathCapabilities#ABORTABLE_STREAM}.
*/
String ABORTABLE_STREAM = CommonPathCapabilities.ABORTABLE_STREAM;

/**
* Capabilities that a stream can support and be queried for.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
@InterfaceStability.Evolving
public final class StoreStatisticNames {

/** {@value}. */
public static final String OP_ABORT = "op_abort";

/** {@value}. */
public static final String OP_APPEND = "op_append";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->


<!-- ============================================================= -->
<!-- CLASS: FileSystem -->
<!-- ============================================================= -->

# interface `org.apache.hadoop.fs.Abortable`

<!-- MACRO{toc|fromDepth=1|toDepth=2} -->

Abort the active operation such that the output does not become
manifest.

Specifically, if supported on an [output stream](outputstream.html),
a successful `abort()` MUST guarantee that the stream will not be made visible in the `close()`
operation.

```java

@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface Abortable {

/**
* Abort the active operation without the output becoming visible.
*
* This is to provide ability to cancel the write on stream; once
* a stream is aborted, the write MUST NOT become visible.
*
* @throws UnsupportedOperationException if the operation is not supported.
* @return the result.
*/
AbortableResult abort();

/**
* Interface for the result of aborts; allows subclasses to extend
* (IOStatistics etc) or for future enhancements if ever needed.
*/
interface AbortableResult {

/**
* Was the stream already closed/aborted?
* @return true if a close/abort operation had already
* taken place.
*/
boolean alreadyClosed();

/**
* Any exception caught during cleanup operations,
* exceptions whose raising/catching does not change
* the semantics of the abort.
* @return an exception or null.
*/
IOException anyCleanupException();
}
}
```

## Method `abort()`

Aborts the ongoing operation such that no output SHALL become visible
when the operation is completed.

Unless and until other File System classes implement `Abortable`, the
interface is specified purely for output streams.

## Method `abort()` on an output stream

`Abortable.abort()` MUST only be supported on output streams
whose output is only made visible when `close()` is called,
for example. output streams returned by the S3A FileSystem.

## Preconditions

The stream MUST implement `Abortable` and `StreamCapabilities`.

```python
if unsupported:
throw UnsupportedException

if not isOpen(stream):
no-op

StreamCapabilities.hasCapability("fs.capability.outputstream.abortable") == True

```


## Postconditions

After `abort()` returns, the filesystem MUST be unchanged:

```
FS' = FS
```

A successful `abort()` operation MUST guarantee that
when the stream` close()` is invoked no output shall be manifest.

* The stream MUST retry any remote calls needed to force the abort outcome.
* If any file was present at the destination path, it MUST remain unchanged.

Strictly then:

> if `Abortable.abort()` does not raise `UnsupportedOperationException`
> then returns, then it guarantees that the write SHALL NOT become visible
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then -> and ?

> and that any existing data in the filesystem at the destination path SHALL
> continue to be available.


1. Calls to `write()` methods MUST fail.
1. Calls to `flush()` MUST be no-ops (applications sometimes call this on closed streams)
1. Subsequent calls to `abort()` MUST be no-ops.
1. `close()` MUST NOT manifest the file, and MUST NOT raise an exception

That is, the postconditions of `close()` becomes:

```
FS' = FS
```

### Cleanup

* If temporary data is stored in the local filesystem or in the store's upload
infrastructure then this MAY be cleaned up; best-effort is expected here.

* The stream SHOULD NOT retry cleanup operations; any failure there MUST be
caught and added to `AbortResult`

#### Returned `AbortResult`

The `AbortResult` value returned is primarily for testing and logging.

`alreadyClosed()`: MUST return `true` if the write had already been aborted or closed;

`anyCleanupException();`: SHOULD return any IOException raised during any optional
cleanup operations.


### Thread safety and atomicity

Output streams themselves aren't formally required to be thread safe,
but as applications do sometimes assume they are, this call MUST be thread safe.

## Path/Stream capability "fs.capability.outputstream.abortable"


An application MUST be able to verify that a stream supports the `Abortable.abort()`
operation without actually calling it. This is done through the `StreamCapabilities`
interface.

1. If a stream instance supports `Abortable` then it MUST return `true`
in the probe `hasCapability("fs.capability.outputstream.abortable")`

1. If a stream instance does not support `Abortable` then it MUST return `false`
in the probe `hasCapability("fs.capability.outputstream.abortable")`

That is: if a stream declares its support for the feature, a call to `abort()`
SHALL meet the defined semantics of the operation.

FileSystem/FileContext implementations SHOULD declare support similarly, to
allow for applications to probe for the feature in the destination directory/path.

If a filesystem supports `Abortable` under a path `P` then it SHOULD return `true` to
`PathCababilities.hasPathCapability(path, "fs.capability.outputstream.abortable")`
This is to allow applications to verify that the store supports the feature.

If a filesystem does not support `Abortable` under a path `P` then it MUST
return `false` to
`PathCababilities.hasPathCapability(path, "fs.capability.outputstream.abortable")`



Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ HDFS as these are commonly expected by Hadoop client applications.
1. [Model](model.html)
1. [FileSystem class](filesystem.html)
1. [OutputStream, Syncable and `StreamCapabilities`](outputstream.html)
1. [Abortable](abortable.html)
1. [FSDataInputStream class](fsdatainputstream.html)
1. [PathCapabilities interface](pathcapabilities.html)
1. [FSDataOutputStreamBuilder class](fsdataoutputstreambuilder.html)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ Object store streams MAY buffer the entire stream's output
until the final `close()` operation triggers a single `PUT` of the data
and materialization of the final output.

This significantly change's their behaviour compared to that of
This significantly changes their behaviour compared to that of
POSIX filesystems and that specified in this document.

#### Visibility of newly created objects
Expand Down Expand Up @@ -961,6 +961,10 @@ is present: the act of instantiating the object, while potentially exhibiting
create inconsistency, is atomic. Applications may be able to use that fact
to their advantage.

The [Abortable](abortable.html) interface exposes this ability to abort an output
stream before its data is made visible, so can be used for checkpointing and similar
operations.

## <a name="implementors"></a> Implementors notes.

### Always implement `Syncable` -even if just to throw `UnsupportedOperationException`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ public static byte[] readDataset(FileSystem fs, Path path, int len)
public static void verifyFileContents(FileSystem fs,
Path path,
byte[] original) throws IOException {
assertIsFile(fs, path);
FileStatus stat = fs.getFileStatus(path);
assertIsFile(path, stat);
String statText = stat.toString();
assertEquals("wrong length " + statText, original.length, stat.getLen());
byte[] bytes = readDataset(fs, path, original.length);
Expand Down