Skip to content

Commit

Permalink
[streaming] Initial rework of the operator state interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Jun 25, 2015
1 parent d42c732 commit a7e2458
Show file tree
Hide file tree
Showing 38 changed files with 1,244 additions and 269 deletions.
Expand Up @@ -34,7 +34,7 @@
public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction implements MapFunction<IN, OUT> { public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction implements MapFunction<IN, OUT> {


private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

@Override @Override
public abstract OUT map(IN value) throws Exception; public abstract OUT map(IN value) throws Exception;
} }
Expand Up @@ -29,6 +29,8 @@
import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.state.StateCheckpointer;


/** /**
* A RuntimeContext contains information about the context in which functions are executed. Each parallel instance * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
Expand Down Expand Up @@ -160,4 +162,49 @@ public interface RuntimeContext {
* @return The distributed cache of the worker executing this instance. * @return The distributed cache of the worker executing this instance.
*/ */
DistributedCache getDistributedCache(); DistributedCache getDistributedCache();

// --------------------------------------------------------------------------------------------

/**
* Returns the {@link OperatorState} of this operator instance, which can be
* used to store and update user state in a fault tolerant fashion. The
* state will be initialized by the provided default value, and the
* {@link StateCheckpointer} will be used to draw the state snapshots.
*
* <p>
* When storing a {@link Serializable} state the user can omit the
* {@link StateCheckpointer} in which case the full state will be written as
* the snapshot.
* </p>
*
* @param defaultState
* Default value for the operator state. This will be returned
* the first time {@link OperatorState#getState()} (for every
* state partition) is called before
* {@link OperatorState#updateState(Object)}.
* @param checkpointer
* The {@link StateCheckpointer} that will be used to draw
* snapshots from the user state.
* @return The {@link OperatorState} for this instance.
*/
<S,C extends Serializable> OperatorState<S> getOperatorState(S defaultState, StateCheckpointer<S,C> checkpointer);

/**
* Returns the {@link OperatorState} of this operator instance, which can be
* used to store and update user state in a fault tolerant fashion. The
* state will be initialized by the provided default value.
*
* <p>
* When storing a non-{@link Serializable} state the user needs to specify a
* {@link StateCheckpointer} for drawing snapshots.
* </p>
*
* @param defaultState
* Default value for the operator state. This will be returned
* the first time {@link OperatorState#getState()} (for every
* state partition) is called before
* {@link OperatorState#updateState(Object)}.
* @return The {@link OperatorState} for this instance.
*/
<S extends Serializable> OperatorState<S> getOperatorState(S defaultState);
} }
Expand Up @@ -33,6 +33,8 @@
import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.state.StateCheckpointer;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;


/** /**
Expand Down Expand Up @@ -170,4 +172,14 @@ private <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name
} }
return (Accumulator<V, A>) accumulator; return (Accumulator<V, A>) accumulator;
} }

@Override
public <S, C extends Serializable> OperatorState<S> getOperatorState(S defaultState, StateCheckpointer<S, C> checkpointer) {
throw new UnsupportedOperationException("Operator state is only accessible for streaming operators.");
}

@Override
public <S extends Serializable> OperatorState<S> getOperatorState(S defaultState) {
throw new UnsupportedOperationException("Operator state is only accessible for streaming operators.");
}
} }
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.state;

import org.apache.flink.api.common.functions.MapFunction;

/**
* Base class for all streaming operator states. It can represent both
* partitioned (when state partitioning is defined in the program) or
* non-partitioned user states.
*
* State can be accessed and manipulated using the {@link #getState()} and
* {@link #updateState(T)} methods. These calls are only valid in the
* transformation call the operator represents, for instance inside
* {@link MapFunction#map()} and invalid in
* {@link #open(org.apache.flink.configuration.Configuration)} or
* {@link #close()}.
*
* @param <T>
* Type of the operator state
*/
public interface OperatorState<T> {

/**
* Gets the current state for the operator. When the state is not
* partitioned the returned state is the same for all inputs. If state
* partitioning is applied the state returned depends on the current
* operator input, as the operator maintains an independent state for each
* partitions.
*
* <p>
* {@link #getState()} returns <code>null</code> if there is no state stored
* in the operator. This is the expected behaviour before initializing the
* state with {@link #updateState(T)}.
* </p>
*
* @return The operator state corresponding to the current input.
*/
T getState();

/**
* Updates the operator state accessible by {@link #getState()} to the given
* value. The next time {@link #getState()} is called (for the same state
* partition) the returned state will represent the updated value.
*
* @param state
* The updated state.
*/
void updateState(T state);

}
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.state;

import java.io.Serializable;

/**
* Basic interface for creating {@link OperatorState} snapshots in stateful
* streaming programs.
*
* The user needs to implement the {@link #snapshotState(S, long, long)} and
* {@link #restoreState(C)} methods that will be called to create and restore
* state snapshots of the given states.
*
* <p>
* Note that the {@link OperatorState} is <i>synchronously</i> checkpointed.
* While the state is written, the state cannot be accessed or modified so the
* function needs not return a copy of its state, but may return a reference to
* its state.
* </p>
*
* @param <S>
* Type of the operator state.
* @param <C>
* Type of the snapshot that will be persisted.
*/
public interface StateCheckpointer<S, C extends Serializable> {

/**
* Takes a snapshot of a given operator state. The snapshot returned will be
* persisted in the state backend for this job and restored upon failure.
* This method is called for all state partitions in case of partitioned
* state when creating a checkpoint.
*
* @param state
* The state for which the snapshot needs to be taken
* @param checkpointId
* The ID of the checkpoint.
* @param checkpointTimestamp
* The timestamp of the checkpoint, as derived by
* System.currentTimeMillis() on the JobManager.
*
* @return A snapshot of the operator state.
*/
public C snapshotState(S state, long checkpointId, long checkpointTimestamp);

/**
* Restores the operator states from a given snapshot. The restores state
* will be loaded back to the function. In case of partitioned state, each
* partition is restored independently.
*
* @param stateSnapshot
* The state snapshot that needs to be restored.
* @return The state corresponding to the snapshot.
*/
public S restoreState(C stateSnapshot);
}
Expand Up @@ -23,36 +23,33 @@
/** /**
* A StateHandle that includes the operator states directly. * A StateHandle that includes the operator states directly.
*/ */
public class LocalStateHandle implements StateHandle<Serializable> { public class LocalStateHandle<T extends Serializable> implements StateHandle<T> {


private static final long serialVersionUID = 2093619217898039610L; private static final long serialVersionUID = 2093619217898039610L;


private final Serializable state; private final T state;


public LocalStateHandle(Serializable state) { public LocalStateHandle(T state) {
this.state = state; this.state = state;
} }


@Override @Override
public Serializable getState() { public T getState() {
return state; return state;
} }


@Override @Override
public void discardState() throws Exception { public void discardState() throws Exception {
} }

public static LocalStateHandleProvider createProvider(){
return new LocalStateHandleProvider();
}


private static class LocalStateHandleProvider implements StateHandleProvider<Serializable> { public static class LocalStateHandleProvider<R extends Serializable> implements
StateHandleProvider<R> {


private static final long serialVersionUID = 4665419208932921425L; private static final long serialVersionUID = 4665419208932921425L;


@Override @Override
public LocalStateHandle createStateHandle(Serializable state) { public LocalStateHandle<R> createStateHandle(R state) {
return new LocalStateHandle(state); return new LocalStateHandle<R>(state);
} }


} }
Expand Down
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

import java.io.Serializable;
import java.util.Map;

public class PartitionedStateHandle implements
StateHandle<Map<Serializable, StateHandle<Serializable>>> {

private static final long serialVersionUID = 7505365403501402100L;

Map<Serializable, StateHandle<Serializable>> handles;

public PartitionedStateHandle(Map<Serializable, StateHandle<Serializable>> handles) {
this.handles = handles;
}

@Override
public Map<Serializable, StateHandle<Serializable>> getState() throws Exception {
return handles;
}

@Override
public void discardState() throws Exception {
for (StateHandle<Serializable> handle : handles.values()) {
handle.discardState();
}
}

}
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

import java.io.Serializable;
import java.util.Map;

/**
* Interface for storing and accessing partitioned state. The interface is
* designed in a way that allows implementations for lazily state access.
*
* @param <S>
* Type of the state.
* @param <C>
* Type of the state snapshot.
*/
public interface PartitionedStateStore<S, C extends Serializable> {

S getStateForKey(Serializable key) throws Exception;

void setStateForKey(Serializable key, S state);

Map<Serializable, S> getPartitionedState() throws Exception;

Map<Serializable, StateHandle<C>> snapshotStates(long checkpointId, long checkpointTimestamp) throws Exception;

void restoreStates(Map<Serializable, StateHandle<C>> snapshots) throws Exception;

boolean containsKey(Serializable key);

}

0 comments on commit a7e2458

Please sign in to comment.