Skip to content

Commit

Permalink
[streaming] StateHandleProvider added for configurable state backend
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed May 19, 2015
1 parent 59bee4a commit 197cd6c
Show file tree
Hide file tree
Showing 14 changed files with 192 additions and 30 deletions.
Expand Up @@ -176,7 +176,7 @@ public void shutdown() {


// clean and discard all successful checkpoints // clean and discard all successful checkpoints
for (SuccessfulCheckpoint checkpoint : completedCheckpoints) { for (SuccessfulCheckpoint checkpoint : completedCheckpoints) {
checkpoint.dispose(userClassLoader); checkpoint.discard(userClassLoader);
} }
completedCheckpoints.clear(); completedCheckpoints.clear();
} }
Expand Down Expand Up @@ -334,7 +334,7 @@ public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message) {
completed = checkpoint.toCompletedCheckpoint(); completed = checkpoint.toCompletedCheckpoint();
completedCheckpoints.addLast(completed); completedCheckpoints.addLast(completed);
if (completedCheckpoints.size() > numSuccessfulCheckpointsToRetain) { if (completedCheckpoints.size() > numSuccessfulCheckpointsToRetain) {
completedCheckpoints.removeFirst().dispose(userClassLoader);; completedCheckpoints.removeFirst().discard(userClassLoader);
} }
pendingCheckpoints.remove(checkpointId); pendingCheckpoints.remove(checkpointId);
rememberRecentCheckpointId(checkpointId); rememberRecentCheckpointId(checkpointId);
Expand Down Expand Up @@ -409,7 +409,7 @@ public void restoreLatestCheckpointedState(Map<JobVertexID, ExecutionJobVertex>
boolean allOrNothingState) throws Exception { boolean allOrNothingState) throws Exception {
synchronized (lock) { synchronized (lock) {
if (shutdown) { if (shutdown) {
throw new IllegalStateException("CheckpointCoordinator is hut down"); throw new IllegalStateException("CheckpointCoordinator is shut down");
} }


if (completedCheckpoints.isEmpty()) { if (completedCheckpoints.isEmpty()) {
Expand Down
Expand Up @@ -18,8 +18,6 @@


package org.apache.flink.runtime.checkpoint; package org.apache.flink.runtime.checkpoint;


import java.io.IOException;

import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.SerializedValue; import org.apache.flink.runtime.util.SerializedValue;
Expand Down
Expand Up @@ -66,7 +66,7 @@ public List<StateForTask> getStates() {


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------


public void dispose(ClassLoader userClassLoader) { public void discard(ClassLoader userClassLoader) {
for(StateForTask state: states){ for(StateForTask state: states){
state.discard(userClassLoader); state.discard(userClassLoader);
} }
Expand Down
Expand Up @@ -18,7 +18,6 @@


package org.apache.flink.runtime.state; package org.apache.flink.runtime.state;


import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
Expand All @@ -37,7 +36,7 @@ public abstract class ByteStreamStateHandle implements StateHandle<Serializable>


transient Serializable state; transient Serializable state;


public ByteStreamStateHandle(Serializable state) throws IOException { public ByteStreamStateHandle(Serializable state) {
this.state = state; this.state = state;
} }


Expand Down
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.util.StringUtils; import org.apache.flink.util.StringUtils;


import scala.util.Random; import java.util.Random;


/** /**
* Statehandle that writes the checkpointed state to a random file in the * Statehandle that writes the checkpointed state to a random file in the
Expand All @@ -44,7 +44,7 @@ public class FileStateHandle extends ByteStreamStateHandle {


private String pathString; private String pathString;


public FileStateHandle(Serializable state, String folder) throws IOException { public FileStateHandle(Serializable state, String folder) {
super(state); super(state);
this.pathString = folder + "/" + randomString(); this.pathString = folder + "/" + randomString();
} }
Expand All @@ -68,4 +68,34 @@ public void discardState() throws Exception {
FileSystem.get(new URI(pathString)).delete(new Path(pathString), false); FileSystem.get(new URI(pathString)).delete(new Path(pathString), false);
} }


/**
* Creates a {@link StateHandleProvider} for creating
* {@link FileStateHandle}s for a given checkpoint directory.
*
*/
public static StateHandleProvider<Serializable> createProvider(String checkpointDir) {
return new FileStateHandleProvider(checkpointDir);
}

/**
* {@link StateHandleProvider} to generate {@link FileStateHandle}s for the
* given checkpoint directory.
*
*/
private static class FileStateHandleProvider implements StateHandleProvider<Serializable> {

private static final long serialVersionUID = 3496670017955260518L;
private String path;

public FileStateHandleProvider(String path) {
this.path = path;
}

@Override
public FileStateHandle createStateHandle(Serializable state) {
return new FileStateHandle(state, path);
}

}

} }
Expand Up @@ -26,7 +26,7 @@
public class LocalStateHandle implements StateHandle<Serializable> { public class LocalStateHandle implements StateHandle<Serializable> {


private static final long serialVersionUID = 2093619217898039610L; private static final long serialVersionUID = 2093619217898039610L;

private final Serializable state; private final Serializable state;


public LocalStateHandle(Serializable state) { public LocalStateHandle(Serializable state) {
Expand All @@ -41,4 +41,19 @@ public Serializable getState() {
@Override @Override
public void discardState() throws Exception { public void discardState() throws Exception {
} }

public static LocalStateHandleProvider createProvider(){
return new LocalStateHandleProvider();
}

private static class LocalStateHandleProvider implements StateHandleProvider<Serializable> {

private static final long serialVersionUID = 4665419208932921425L;

@Override
public LocalStateHandle createStateHandle(Serializable state) {
return new LocalStateHandle(state);
}

}
} }
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

import java.io.Serializable;

/**
* Stateful streaming operators use a StateHandleProvider to create new
* {@link StateHandle}s to store each checkpoint in a persistent storage layer.
*/
public interface StateHandleProvider<T> extends Serializable {

/**
* Creates a new {@link StateHandle} instance that will be used to store the
* state checkpoint. This method is called for each state checkpoint saved.
*
* @param state
* State to be stored in the handle.
*
*/
public StateHandle<T> createStateHandle(T state);

}
Expand Up @@ -40,6 +40,8 @@
import org.apache.flink.client.program.ContextEnvironment; import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment; import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.FileStateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType; import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
Expand Down Expand Up @@ -236,6 +238,19 @@ public StreamExecutionEnvironment enableCheckpointing() {
return this; return this;
} }


/**
* Sets the {@link StateHandleProvider} used for storing operator state
* checkpoints when checkpointing is enabled.
* <p>
* An example would be using a {@link FileStateHandle#createProvider(Path)}
* to use any Flink supported file system as a state backend
*
*/
public StreamExecutionEnvironment setStateHandleProvider(StateHandleProvider<?> provider) {
streamGraph.setStateHandleProvider(provider);
return this;
}

/** /**
* Sets the number of times that failed tasks are re-executed. A value of * Sets the number of times that failed tasks are re-executed. A value of
* zero effectively disables fault tolerance. A value of {@code -1} * zero effectively disables fault tolerance. A value of {@code -1}
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.Map; import java.util.Map;


import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper; import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class StreamConfig implements Serializable {
private static final String EDGES_IN_ORDER = "edgesInOrder"; private static final String EDGES_IN_ORDER = "edgesInOrder";
private static final String OUT_STREAM_EDGES = "outStreamEdges"; private static final String OUT_STREAM_EDGES = "outStreamEdges";
private static final String IN_STREAM_EDGES = "inStreamEdges"; private static final String IN_STREAM_EDGES = "inStreamEdges";
private static final String STATEHANDLE_PROVIDER = "stateHandleProvider";


// DEFAULT VALUES // DEFAULT VALUES
private static final long DEFAULT_TIMEOUT = 100; private static final long DEFAULT_TIMEOUT = 100;
Expand Down Expand Up @@ -377,6 +379,25 @@ public Map<Integer, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl
throw new StreamTaskException("Could not instantiate configuration.", e); throw new StreamTaskException("Could not instantiate configuration.", e);
} }
} }

public void setStateHandleProvider(StateHandleProvider<?> provider) {

try {
InstantiationUtil.writeObjectToConfig(provider, this.config, STATEHANDLE_PROVIDER);
} catch (IOException e) {
throw new StreamTaskException("Could not serialize stateHandle provider.", e);
}
}

@SuppressWarnings("unchecked")
public <R> StateHandleProvider<R> getStateHandleProvider(ClassLoader cl) {
try {
return (StateHandleProvider<R>) InstantiationUtil
.readObjectFromConfig(this.config, STATEHANDLE_PROVIDER, cl);
} catch (Exception e) {
throw new StreamTaskException("Could not instantiate statehandle provider.", e);
}
}


public void setChainStart() { public void setChainStart() {
config.setBoolean(IS_CHAINED_VERTEX, true); config.setBoolean(IS_CHAINED_VERTEX, true);
Expand Down
Expand Up @@ -38,6 +38,8 @@
import org.apache.flink.optimizer.plan.StreamingPlan; import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator;
Expand Down Expand Up @@ -76,6 +78,7 @@ public class StreamGraph extends StreamingPlan {


private Map<Integer, StreamLoop> streamLoops; private Map<Integer, StreamLoop> streamLoops;
protected Map<Integer, StreamLoop> vertexIDtoLoop; protected Map<Integer, StreamLoop> vertexIDtoLoop;
private StateHandleProvider<?> stateHandleProvider = LocalStateHandle.createProvider();


public StreamGraph(StreamExecutionEnvironment environment) { public StreamGraph(StreamExecutionEnvironment environment) {


Expand Down Expand Up @@ -116,6 +119,14 @@ public void setCheckpointingInterval(long checkpointingInterval) {
this.checkpointingInterval = checkpointingInterval; this.checkpointingInterval = checkpointingInterval;
} }


public void setStateHandleProvider(StateHandleProvider<?> provider) {
this.stateHandleProvider = provider;
}

public StateHandleProvider<?> getStateHandleProvider() {
return this.stateHandleProvider;
}

public long getCheckpointingInterval() { public long getCheckpointingInterval() {
return checkpointingInterval; return checkpointingInterval;
} }
Expand Down
Expand Up @@ -258,6 +258,7 @@ private void setVertexConfig(Integer vertexID, StreamConfig config,
config.setNonChainedOutputs(nonChainableOutputs); config.setNonChainedOutputs(nonChainableOutputs);
config.setChainedOutputs(chainableOutputs); config.setChainedOutputs(chainableOutputs);
config.setStateMonitoring(streamGraph.isCheckpointingEnabled()); config.setStateMonitoring(streamGraph.isCheckpointingEnabled());
config.setStateHandleProvider(streamGraph.getStateHandleProvider());


Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass(); Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();


Expand Down
Expand Up @@ -30,8 +30,8 @@
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator; import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier; import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
Expand Down Expand Up @@ -61,6 +61,8 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
protected StreamingRuntimeContext context; protected StreamingRuntimeContext context;


protected ClassLoader userClassLoader; protected ClassLoader userClassLoader;

private StateHandleProvider<Serializable> stateHandleProvider;


private EventListener<TaskEvent> superstepListener; private EventListener<TaskEvent> superstepListener;


Expand All @@ -74,6 +76,7 @@ public void registerInputOutput() {
this.userClassLoader = getUserCodeClassLoader(); this.userClassLoader = getUserCodeClassLoader();
this.configuration = new StreamConfig(getTaskConfiguration()); this.configuration = new StreamConfig(getTaskConfiguration());
this.context = createRuntimeContext(getEnvironment().getTaskName()); this.context = createRuntimeContext(getEnvironment().getTaskName());
this.stateHandleProvider = configuration.getStateHandleProvider(userClassLoader);


outputHandler = new OutputHandler<OUT>(this); outputHandler = new OutputHandler<OUT>(this);


Expand Down Expand Up @@ -212,7 +215,7 @@ public void triggerCheckpoint(long checkpointId, long timestamp) throws Exceptio
: null; : null;
} }


state = userState == null ? null : new LocalStateHandle(userState); state = userState == null ? null : stateHandleProvider.createStateHandle(userState);
} }
catch (Exception e) { catch (Exception e) {
throw new Exception("Error while drawing snapshot of the user state.", e); throw new Exception("Error while drawing snapshot of the user state.", e);
Expand Down
Expand Up @@ -19,19 +19,17 @@
package org.apache.flink.streaming.api.scala package org.apache.flink.streaming.api.scala


import scala.reflect.ClassTag import scala.reflect.ClassTag

import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.Serializer
import org.apache.commons.lang.Validate import org.apache.commons.lang.Validate
import org.joda.time.Instant import org.joda.time.Instant

import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner import org.apache.flink.api.scala.ClosureCleaner
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv} import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction} import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction}

import scala.reflect.ClassTag import scala.reflect.ClassTag
import org.apache.flink.runtime.state.StateHandleProvider


class StreamExecutionEnvironment(javaEnv: JavaEnv) { class StreamExecutionEnvironment(javaEnv: JavaEnv) {


Expand Down Expand Up @@ -125,7 +123,16 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
javaEnv.enableCheckpointing() javaEnv.enableCheckpointing()
this this
} }


/**
* Sets the given StateHandleProvider to be used for storing operator state
* checkpoints when checkpointing is enabled.
*/
def setStateHandleProvider(provider: StateHandleProvider[_]): StreamExecutionEnvironment = {
javaEnv.setStateHandleProvider(provider)
this
}

/** /**
* Disables operator chaining for streaming operators. Operator chaining * Disables operator chaining for streaming operators. Operator chaining
* allows non-shuffle operations to be co-located in the same thread fully * allows non-shuffle operations to be co-located in the same thread fully
Expand Down

0 comments on commit 197cd6c

Please sign in to comment.