Skip to content

Commit

Permalink
[FLINK-2916] [streaming] Expose operator and task information to Stat…
Browse files Browse the repository at this point in the history
…eBackend
  • Loading branch information
gyfora committed Nov 24, 2015
1 parent 8cabe67 commit ad6f826
Show file tree
Hide file tree
Showing 10 changed files with 362 additions and 187 deletions.
Expand Up @@ -18,96 +18,98 @@

package org.apache.flink.runtime.state;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.execution.Environment;

import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;

/**
* A state backend defines how state is stored and snapshotted during checkpoints.
*
*
* @param <Backend> The type of backend itself. This generic parameter is used to refer to the
* type of backend when creating state backed by this backend.
*/
public abstract class StateBackend<Backend extends StateBackend<Backend>> implements java.io.Serializable {

private static final long serialVersionUID = 4620413814639220247L;

// ------------------------------------------------------------------------
// initialization and cleanup
// ------------------------------------------------------------------------

/**
* This method is called by the task upon deployment to initialize the state backend for
* data for a specific job.
*
* @param job The ID of the job for which the state backend instance checkpoints data.
*
* @param The {@link Environment} of the task that instantiated the state backend
* @throws Exception Overwritten versions of this method may throw exceptions, in which
* case the job that uses the state backend is considered failed during
* deployment.
*/
public abstract void initializeForJob(JobID job) throws Exception;
public abstract void initializeForJob(Environment env) throws Exception;

/**
* Disposes all state associated with the current job.
*
*
* @throws Exception Exceptions may occur during disposal of the state and should be forwarded.
*/
public abstract void disposeAllStateForCurrentJob() throws Exception;

/**
* Closes the state backend, releasing all internal resources, but does not delete any persistent
* checkpoint data.
*
*
* @throws Exception Exceptions can be forwarded and will be logged by the system
*/
public abstract void close() throws Exception;

// ------------------------------------------------------------------------
// key/value state
// ------------------------------------------------------------------------

/**
* Creates a key/value state backed by this state backend.
*
*
* @param operatorId Unique id for the operator creating the state
* @param stateName Name of the created state
* @param keySerializer The serializer for the key.
* @param valueSerializer The serializer for the value.
* @param defaultValue The value that is returned when no other value has been associated with a key, yet.
* @param <K> The type of the key.
* @param <V> The type of the value.
*
*
* @return A new key/value state backed by this backend.
*
*
* @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
*/
public abstract <K, V> KvState<K, V, Backend> createKvState(
public abstract <K, V> KvState<K, V, Backend> createKvState(int operatorId, String stateName,
TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
V defaultValue) throws Exception;


// ------------------------------------------------------------------------
// storing state for a checkpoint
// ------------------------------------------------------------------------

/**
* Creates an output stream that writes into the state of the given checkpoint. When the stream
* is closes, it returns a state handle that can retrieve the state back.
*
*
* @param checkpointID The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @return An output stream that writes state for the given checkpoint.
*
*
* @throws Exception Exceptions may occur while creating the stream and should be forwarded.
*/
public abstract CheckpointStateOutputStream createCheckpointStateOutputStream(
long checkpointID, long timestamp) throws Exception;

/**
* Creates a {@link DataOutputView} stream that writes into the state of the given checkpoint.
* When the stream is closes, it returns a state handle that can retrieve the state back.
Expand All @@ -125,20 +127,20 @@ public CheckpointStateOutputView createCheckpointStateOutputView(

/**
* Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
*
*
* @param state The state to be checkpointed.
* @param checkpointID The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @param <S> The type of the state.
*
*
* @return A state handle that can retrieve the checkpoined state.
*
*
* @throws Exception Exceptions may occur during serialization / storing the state and should be forwarded.
*/
public abstract <S extends Serializable> StateHandle<S> checkpointStateSerializable(
S state, long checkpointID, long timestamp) throws Exception;


// ------------------------------------------------------------------------
// Checkpoint state output stream
// ------------------------------------------------------------------------
Expand All @@ -151,7 +153,7 @@ public static abstract class CheckpointStateOutputStream extends OutputStream {
/**
* Closes the stream and gets a state handle that can create an input stream
* producing the data written to this stream.
*
*
* @return A state handle that can create an input stream producing the data written to this stream.
* @throws IOException Thrown, if the stream cannot be closed.
*/
Expand All @@ -162,9 +164,9 @@ public static abstract class CheckpointStateOutputStream extends OutputStream {
* A dedicated DataOutputView stream that produces a {@code StateHandle<DataInputView>} when closed.
*/
public static final class CheckpointStateOutputView extends DataOutputViewStreamWrapper {

private final CheckpointStateOutputStream out;

public CheckpointStateOutputView(CheckpointStateOutputStream out) {
super(out);
this.out = out;
Expand Down Expand Up @@ -193,7 +195,7 @@ public void close() throws IOException {
private static final class DataInputViewHandle implements StateHandle<DataInputView> {

private static final long serialVersionUID = 2891559813513532079L;

private final StreamStateHandle stream;

private DataInputViewHandle(StreamStateHandle stream) {
Expand All @@ -202,7 +204,7 @@ private DataInputViewHandle(StreamStateHandle stream) {

@Override
public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
}

@Override
Expand Down

0 comments on commit ad6f826

Please sign in to comment.