Skip to content

Commit

Permalink
[FLINK-1638] [streaming] Add StateHandle and include javadoc
Browse files Browse the repository at this point in the history
This closes #459
  • Loading branch information
senorcarbone authored and StephanEwen committed Mar 10, 2015
1 parent 490fa70 commit f2b5c21
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 69 deletions.
Expand Up @@ -23,12 +23,11 @@
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.runtime.state.StateHandle;


import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;


import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -79,7 +78,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
/** The list of JAR files required to run this task. */ /** The list of JAR files required to run this task. */
private final List<BlobKey> requiredJarFiles; private final List<BlobKey> requiredJarFiles;


private Map<String, OperatorState<?>> operatorStates; private StateHandle operatorStates;


/** /**
* Constructs a task deployment descriptor. * Constructs a task deployment descriptor.
Expand Down Expand Up @@ -129,13 +128,13 @@ public TaskDeploymentDescriptor(
Configuration taskConfiguration, String invokableClassName, Configuration taskConfiguration, String invokableClassName,
List<PartitionDeploymentDescriptor> producedPartitions, List<PartitionDeploymentDescriptor> producedPartitions,
List<PartitionConsumerDeploymentDescriptor> consumedPartitions, List<PartitionConsumerDeploymentDescriptor> consumedPartitions,
List<BlobKey> requiredJarFiles, int targetSlotNumber, Map<String,OperatorState<?>> operatorStates) { List<BlobKey> requiredJarFiles, int targetSlotNumber, StateHandle operatorStates) {


this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks, this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks,
jobConfiguration, taskConfiguration, invokableClassName, producedPartitions, jobConfiguration, taskConfiguration, invokableClassName, producedPartitions,
consumedPartitions, requiredJarFiles, targetSlotNumber); consumedPartitions, requiredJarFiles, targetSlotNumber);


setOperatorStates(operatorStates); setOperatorState(operatorStates);
} }


/** /**
Expand Down Expand Up @@ -244,11 +243,11 @@ public String toString() {
strProducedPartitions, strConsumedPartitions); strProducedPartitions, strConsumedPartitions);
} }


public void setOperatorStates(Map<String,OperatorState<?>> operatorStates) { public void setOperatorState(StateHandle operatorStates) {
this.operatorStates = operatorStates; this.operatorStates = operatorStates;
} }


public Map<String, OperatorState<?>> getOperatorStates() { public StateHandle getOperatorStates() {
return operatorStates; return operatorStates;
} }
} }
Expand Up @@ -46,7 +46,7 @@
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult; import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;


Expand All @@ -56,7 +56,6 @@
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -124,7 +123,7 @@ public class Execution implements Serializable {


private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution


private Map<String,OperatorState<?>> operatorStates; private StateHandle operatorState;


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------


Expand Down Expand Up @@ -858,11 +857,11 @@ public String toString() {
(assignedResource == null ? "(unassigned)" : assignedResource.toString()), state); (assignedResource == null ? "(unassigned)" : assignedResource.toString()), state);
} }


public void setOperatorStates(Map<String,OperatorState<?>> operatorStates) { public void setOperatorState(StateHandle operatorStates) {
this.operatorStates = operatorStates; this.operatorState = operatorStates;
} }


public Map<String,OperatorState<?>> getOperatorStates() { public StateHandle getOperatorState() {
return operatorStates; return operatorState;
} }
} }
Expand Up @@ -34,7 +34,7 @@
import org.apache.flink.runtime.jobmanager.StreamCheckpointCoordinator; import org.apache.flink.runtime.jobmanager.StreamCheckpointCoordinator;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.ExecutionGraphMessages; import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand Down Expand Up @@ -567,9 +567,9 @@ public boolean updateState(TaskExecutionState state) {
} }
} }


public synchronized void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , Map<String,OperatorState<?>>> states) public synchronized void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , StateHandle> states)
{ {
for(Map.Entry<Tuple3<JobVertexID, Integer, Long> , Map<String,OperatorState<?>>> state : states.entrySet()) for(Map.Entry<Tuple3<JobVertexID, Integer, Long> , StateHandle> state : states.entrySet())
{ {
tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue()); tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue());
} }
Expand Down
Expand Up @@ -18,17 +18,17 @@


package org.apache.flink.runtime.executiongraph; package org.apache.flink.runtime.executiongraph;


import org.apache.flink.runtime.deployment.PartialPartitionInfo;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.PartialPartitionInfo;
import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartitionInfo; import org.apache.flink.runtime.deployment.PartitionInfo;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.runtime.jobgraph.JobEdge;
Expand All @@ -38,23 +38,18 @@
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.runtime.state.StateHandle;
import org.slf4j.Logger; import org.slf4j.Logger;

import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;


import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;


import static com.google.common.base.Preconditions.checkElementIndex; import static com.google.common.base.Preconditions.checkElementIndex;
import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;


/** /**
* The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of * The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of
Expand Down Expand Up @@ -91,7 +86,7 @@ public class ExecutionVertex implements Serializable {


private volatile boolean scheduleLocalOnly; private volatile boolean scheduleLocalOnly;


private Map<String,OperatorState<?>> operatorState; private StateHandle operatorState;


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------


Expand Down Expand Up @@ -199,11 +194,11 @@ public InstanceConnectionInfo getCurrentAssignedResourceLocation() {
return currentExecution.getAssignedResourceLocation(); return currentExecution.getAssignedResourceLocation();
} }


public void setOperatorState(Map<String,OperatorState<?>> operatorState) { public void setOperatorState(StateHandle operatorState) {
this.operatorState = operatorState; this.operatorState = operatorState;
} }


public Map<String,OperatorState<?>> getOperatorState() { public StateHandle getOperatorState() {
return operatorState; return operatorState;
} }


Expand Down Expand Up @@ -382,7 +377,8 @@ public void resetForNewExecution() {
Execution execution = currentExecution; Execution execution = currentExecution;
ExecutionState state = execution.getState(); ExecutionState state = execution.getState();


if (state == FINISHED || state == CANCELED || state == FAILED) { if (state == ExecutionState.FINISHED || state == ExecutionState.CANCELED
|| state == ExecutionState.FAILED) {
priorExecutions.add(execution); priorExecutions.add(execution);
currentExecution = new Execution(this, execution.getAttemptNumber()+1, currentExecution = new Execution(this, execution.getAttemptNumber()+1,
System.currentTimeMillis(), timeout); System.currentTimeMillis(), timeout);
Expand All @@ -394,7 +390,7 @@ public void resetForNewExecution() {


if(operatorState!=null) if(operatorState!=null)
{ {
execution.setOperatorStates(operatorState); execution.setOperatorState(operatorState);
} }


} }
Expand Down Expand Up @@ -440,8 +436,9 @@ public void prepareForArchiving() throws IllegalStateException {
ExecutionState state = execution.getState(); ExecutionState state = execution.getState();


// sanity check // sanity check
if (!(state == FINISHED || state == CANCELED || state == FAILED)) { if (!(state == ExecutionState.FINISHED || state == ExecutionState.CANCELED || state == ExecutionState.FAILED)) {
throw new IllegalStateException("Cannot archive ExecutionVertex that is not in a finished state."); throw new IllegalStateException(
"Cannot archive ExecutionVertex that is not in a finished state.");
} }


// prepare the current execution for archiving // prepare the current execution for archiving
Expand Down
Expand Up @@ -18,10 +18,24 @@
package org.apache.flink.runtime.jobgraph.tasks; package org.apache.flink.runtime.jobgraph.tasks;




/**
* A BarrierTransceiver describes an operator's barrier checkpointing behavior used for
* fault tolerance. In the most common case [[broadcastBarrier]] is being expected to be called
* periodically upon receiving a checkpoint barrier. Furthermore, a [[confirmBarrier]] method should
* be implemented and used for acknowledging a specific checkpoint checkpoint.
*/
public interface BarrierTransceiver { public interface BarrierTransceiver {


/**
* A callback for notifying an operator of a new checkpoint barrier.
* @param barrierID
*/
public void broadcastBarrier(long barrierID); public void broadcastBarrier(long barrierID);


/**
* A callback for confirming that a barrier checkpoint is complete
* @param barrierID
*/
public void confirmBarrier(long barrierID); public void confirmBarrier(long barrierID);


} }
Expand Up @@ -18,12 +18,15 @@


package org.apache.flink.runtime.jobgraph.tasks; package org.apache.flink.runtime.jobgraph.tasks;


import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.runtime.state.StateHandle;

import java.util.Map;


/**
* This is an interface meant to be implemented by any invokable that has to support state recovery.
* It is mainly used by the TaskManager to identify operators that support state recovery in order
* to inject their initial state upon creation.
*/
public interface OperatorStateCarrier { public interface OperatorStateCarrier {

public void injectStates(Map<String, OperatorState<?>> state); public void injectState(StateHandle stateHandle);

} }
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;


import java.util.Map;

/**
* A StateHandle that includes a copy of the state itself. This state handle is recommended for
* cases where the operatorState is lightweight enough to pass throughout the network.
*
*/
public class LocalStateHandle implements StateHandle{

private final Map<String, OperatorState<?>> state;

public LocalStateHandle(Map<String,OperatorState<?>> state) {
this.state = state;
}

@Override
public Map<String,OperatorState<?>> getState() {
return state;
}
}
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;


import java.io.Serializable;
import java.util.Map;

/**
* StateHandle is a general handle interface meant to abstract operator state fetching.
* A StateHandle implementation can for example include the state itself in cases where the state
* is lightweight or fetching it lazily from some external storage when the state is too large.
*
*/
public interface StateHandle extends Serializable{

/**
* getState should retrieve and return the state managed the handle.
*
* @return
*/
public Map<String,OperatorState<?>> getState();

}

0 comments on commit f2b5c21

Please sign in to comment.