Skip to content

Commit

Permalink
[FLINK-4218] [checkpoints] Do not rely on FileSystem to determing sta…
Browse files Browse the repository at this point in the history
…te sizes

This prevents failures on eventually consistent S3, where the operations for
keys (=entries in the parent directory/bucket) are not guaranteed to be immediately
 consistent (visible) after a blob was written.
  • Loading branch information
StephanEwen committed Sep 26, 2016
1 parent 6f237cf commit 95e9004
Show file tree
Hide file tree
Showing 14 changed files with 58 additions and 56 deletions.
Expand Up @@ -108,7 +108,7 @@ public void discardState() throws Exception {
}

@Override
public long getStateSize() throws Exception {
public long getStateSize() throws IOException {
long result = 0L;

for (TaskState taskState : taskStates.values()) {
Expand Down
Expand Up @@ -152,7 +152,7 @@ public void discardState() throws Exception {


@Override
public long getStateSize() throws Exception {
public long getStateSize() throws IOException {
long result = 0L;

for (int i = 0; i < parallelism; i++) {
Expand Down
Expand Up @@ -197,6 +197,7 @@ public static void serializeStreamStateHandle(StreamStateHandle stateHandle, Dat
} else if (stateHandle instanceof FileStateHandle) {
dos.writeByte(FILE_STREAM_STATE_HANDLE);
FileStateHandle fileStateHandle = (FileStateHandle) stateHandle;
dos.writeLong(stateHandle.getStateSize());
dos.writeUTF(fileStateHandle.getFilePath().toString());

} else if (stateHandle instanceof ByteStreamStateHandle) {
Expand All @@ -218,12 +219,13 @@ public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis
if (NULL_HANDLE == type) {
return null;
} else if (FILE_STREAM_STATE_HANDLE == type) {
long size = dis.readLong();
String pathString = dis.readUTF();
return new FileStateHandle(new Path(pathString));
return new FileStateHandle(new Path(pathString), size);
} else if (BYTE_STREAM_STATE_HANDLE == type) {
int numBytes = dis.readInt();
byte[] data = new byte[numBytes];
dis.read(data);
dis.readFully(data);
return new ByteStreamStateHandle(data);
} else {
throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
Expand Down
Expand Up @@ -85,7 +85,7 @@ public void discardState() throws Exception {
}

@Override
public long getStateSize() throws Exception {
public long getStateSize() throws IOException {
long sumStateSize = 0;

if (operatorStateHandles != null) {
Expand Down
Expand Up @@ -118,7 +118,7 @@ public void discardState() throws Exception {
}

@Override
public long getStateSize() throws Exception {
public long getStateSize() throws IOException {
return stateHandle.getStateSize();
}

Expand Down
Expand Up @@ -20,8 +20,6 @@

import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
Expand All @@ -41,16 +39,17 @@ public class RetrievableStreamStateHandle<T extends Serializable> implements
StreamStateHandle, RetrievableStateHandle<T>, Closeable {

private static final long serialVersionUID = 314567453677355L;

/** wrapped inner stream state handle from which we deserialize on retrieval */
private final StreamStateHandle wrappedStreamStateHandle;

public RetrievableStreamStateHandle(StreamStateHandle streamStateHandle) {
this.wrappedStreamStateHandle = Preconditions.checkNotNull(streamStateHandle);
}

public RetrievableStreamStateHandle(Path filePath) {
public RetrievableStreamStateHandle(Path filePath, long stateSize) {
Preconditions.checkNotNull(filePath);
this.wrappedStreamStateHandle = new FileStateHandle(filePath);
this.wrappedStreamStateHandle = new FileStateHandle(filePath, stateSize);
}

@Override
Expand All @@ -71,7 +70,7 @@ public void discardState() throws Exception {
}

@Override
public long getStateSize() throws Exception {
public long getStateSize() throws IOException {
return wrappedStreamStateHandle.getStateSize();
}

Expand Down
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.state;

import java.io.IOException;

/**
* Base of all types that represent checkpointed state. Specializations are for
* example {@link StateHandle StateHandles} (directly resolve to state).
Expand Down Expand Up @@ -47,7 +49,7 @@ public interface StateObject extends java.io.Closeable, java.io.Serializable {
* <p>If the the size is not known, return {@code 0}.
*
* @return Size of the state in bytes.
* @throws Exception If the operation fails during size retrieval.
* @throws IOException If the operation fails during size retrieval.
*/
long getStateSize() throws Exception;
long getStateSize() throws IOException;
}
Expand Up @@ -26,7 +26,9 @@

import java.io.IOException;

import static java.util.Objects.requireNonNull;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;


/**
* {@link StreamStateHandle} for state that was written to a file stream. The written data is
Expand All @@ -36,23 +38,24 @@ public class FileStateHandle extends AbstractCloseableHandle implements StreamSt

private static final long serialVersionUID = 350284443258002355L;

/**
* The path to the file in the filesystem, fully describing the file system
*/
/** The path to the file in the filesystem, fully describing the file system */
private final Path filePath;

/**
* Cached file system handle
*/
/** The size of the state in the file */
private final long stateSize;

/** Cached file system handle */
private transient FileSystem fs;

/**
* Creates a new file state for the given file path.
*
* @param filePath The path to the file that stores the state.
*/
public FileStateHandle(Path filePath) {
this.filePath = requireNonNull(filePath);
public FileStateHandle(Path filePath, long stateSize) {
checkArgument(stateSize >= -1);
this.filePath = checkNotNull(filePath);
this.stateSize = stateSize;
}

/**
Expand Down Expand Up @@ -86,8 +89,7 @@ public void discardState() throws Exception {
// fail (and be ignored) when some files still exist
try {
getFileSystem().delete(filePath.getParent(), false);
} catch (IOException ignored) {
}
} catch (IOException ignored) {}
}

/**
Expand All @@ -98,7 +100,7 @@ public void discardState() throws Exception {
*/
@Override
public long getStateSize() throws IOException {
return getFileSystem().getFileStatus(filePath).getLen();
return stateSize;
}

/**
Expand All @@ -114,6 +116,7 @@ private FileSystem getFileSystem() throws IOException {
return fs;
}

// ------------------------------------------------------------------------

@Override
public boolean equals(Object o) {
Expand All @@ -133,4 +136,9 @@ public boolean equals(Object o) {
public int hashCode() {
return filePath.hashCode();
}

@Override
public String toString() {
return String.format("File State: %s [%d bytes]", filePath, stateSize);
}
}
Expand Up @@ -301,9 +301,16 @@ public StreamStateHandle closeAndGetHandle() throws IOException {
}
else {
flush();

long size = -1;
// make a best effort attempt to figure out the size
try {
size = outStream.getPos();
} catch (Exception ignored) {}

outStream.close();
closed = true;
return new FileStateHandle(statePath);
return new FileStateHandle(statePath, size);
}
}
else {
Expand Down
Expand Up @@ -25,10 +25,10 @@
import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;

/**
Expand Down Expand Up @@ -61,19 +61,14 @@ public RetrievableStateHandle<T> store(T state) throws Exception {

for (int attempt = 0; attempt < 10; attempt++) {
Path filePath = getNewFilePath();
FSDataOutputStream outStream;
try {
outStream = fs.create(filePath, false);

try (FSDataOutputStream outStream = fs.create(filePath, false)) {
InstantiationUtil.serializeObject(outStream, state);
return new RetrievableStreamStateHandle<T>(filePath, outStream.getPos());
}
catch (Exception e) {
latestException = e;
continue;
}

try(ObjectOutputStream os = new ObjectOutputStream(outStream)) {
os.writeObject(state);
}
return new RetrievableStreamStateHandle<T>(filePath);
}

throw new Exception("Could not open output stream for state backend", latestException);
Expand Down
Expand Up @@ -183,7 +183,7 @@ public void discardState() throws Exception {
}

@Override
public long getStateSize() throws Exception {
public long getStateSize() throws IOException {
return 0;
}

Expand Down
Expand Up @@ -334,7 +334,7 @@ private static CompletedCheckpoint[] generateRandomCheckpoints(
StreamStateHandle proxy = new StateHandleProxy(new Path(), proxySize);

SubtaskState subtaskState = new SubtaskState(
new ChainedStateHandle<>(Arrays.asList(proxy)),
new ChainedStateHandle<>(Collections.singletonList(proxy)),
duration);

taskState.putState(subtaskIndex, subtaskState);
Expand Down Expand Up @@ -371,21 +371,11 @@ private static class StateHandleProxy extends FileStateHandle {

private static final long serialVersionUID = 35356735683568L;

public StateHandleProxy(Path filePath, long proxySize) {
super(filePath);
this.proxySize = proxySize;
}

private long proxySize;

@Override
public void discardState() throws Exception {

public StateHandleProxy(Path filePath, long size) {
super(filePath, size);
}

@Override
public long getStateSize() {
return proxySize;
}
public void discardState() {}
}
}
Expand Up @@ -87,12 +87,10 @@ private static final class CloseableHandle extends AbstractCloseableHandle {
private static final long serialVersionUID = 1L;

@Override
public void discardState() throws Exception {

}
public void discardState() {}

@Override
public long getStateSize() throws Exception {
public long getStateSize() {
return 0;
}
}
Expand Down
Expand Up @@ -198,6 +198,7 @@ private void block() {
// an interrupt on a waiting object leads to an infinite loop
try {
synchronized (this) {
//noinspection WaitNotInLoop
wait();
}
}
Expand All @@ -216,7 +217,7 @@ private void block() {
public void discardState() throws Exception {}

@Override
public long getStateSize() throws Exception {
public long getStateSize() throws IOException {
return 0;
}
}
Expand Down

0 comments on commit 95e9004

Please sign in to comment.