Skip to content

Conversation

StefanRRichter
Copy link
Contributor

@StefanRRichter StefanRRichter commented Oct 17, 2016

Partitionable operator and keyed state are currently only available by using backends. However, the serialization code for many operators is build around reading/writing their state to a stream for checkpointing. We want to provide partitionable states also through streams, so that migrating existing operators becomes more easy.

This PR includes the following main changes:
#1) KeyedStateCheckpointOutputStream and OperatorStateCheckpointedOutputStream

Those class allow writing partitionable keyed (and operator) state in a stream for checkpointing. They enhance the basic stream interface with methods to signal the start of new partitions.
#2) Changes to StreamTask and AbstractStreamOperator

The lifecycle of StreamTask is slightly modified for the initialization of operator states. In AbstractStreamOperator, two new hooks have ben added that new operators can override:

    /**
     * Stream operators with state, which want to participate in a snapshot need to override this hook method.
     *
     * @param context context that provides information and means required for taking a snapshot
     */
    public void snapshotState(StateSnapshotContext context) throws Exception {

    }

    /**
     * Stream operators with state which can be restored need to override this hook method.
     *
     * @param context context that allows to register different states.
     */
    public void initializeState(StateInitializationContext context) throws Exception {

    }

Access to snapshot/restore partitionable state is provided through the respective context
#3) Exposing partitionable states to UDFs

The interface CheckpointedFunction must be implemented by stateful UDFs:

    /**
     * This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
     * ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
     * the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
     *
     * @param context the context for drawing a snapshot of the operator
     * @throws Exception
     */
    void snapshotState(FunctionSnapshotContext context) throws Exception;

    /**
     * This method is called when an operator is initialized, so that the function can set up it's state through
     * the provided context. Initialization typically includes registering user states through the state stores
     * that the context offers.
     *
     * @param context the context for initializing the operator
     * @throws Exception
     */
    void initializeState(FunctionInitializationContext context) throws Exception;

Contexts for initialization and snapshot provide a subset of the functionality of the internal contexts from AbstractStreamOperator and which is safe to present to user code.
#4) Repartitioning in CheckpointCoordinator was enhanced and moved

The code now also handles the state handles created through streams and was moved out of CheckpointCoordinator into StateAssignmentOperation.
#5) This PR also introduces serval classes that bundle state handles

One example of this would be TaskStateHandles. The purpose of this is a) reducing the number of parameters passed through several methods and b) making adding/removing state handles simpler.

@StefanRRichter
Copy link
Contributor Author

Please review @aljoscha and whoever else wants to take a look.

Copy link
Contributor

@aljoscha aljoscha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall the design/changes of this PR are very good! 👍

I had inline comments about naming, placement of code in module/package and Javadocs. These should be addressed.

In hindsight, it would have been better to do some changes separately (such as renaming and simplifying/consolidating the StateHandle hierarchies) because it would have simplified reviewing a lot. I realise, however, that it would be very hard to split up these changes now.

}
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The methods don't need to be reordered here. Also, the state store is not used anywhere, as far as I can see.

* Base class of all Flink Kafka Consumer data sources.
* This implements the common behavior across all Kafka versions.
*
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file contains a lot of whitespace changes. It would be good to remove them before we merge this.

private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner;

private transient OperatorStateStore stateStore;
private transient ListState<Serializable> offsetsStateForCheckpoint;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can can have a more concrete type. You changed OperatorStateStore.getSerializableListState to this:

<T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception;

offsetsStateForCheckpoint.clear();

final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
if (fetcher == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a workaround for the fact that we initialise the fetcher in run() and not in open(). Might be worthwhile to change that in a follow-up, if at all possible.

*/
@Public
public interface RuntimeContext {
public interface RuntimeContext extends KeyedStateStore {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to not have RuntimeContext be a KeyedStateStore. In the not-so-far future RuntimeContext will probably provide a KeyedStateStore or at least use one internally to implement the state methods. Properly separating the two now seems prudent.

import java.io.IOException;
import java.io.InputStream;

public class NonClosingStreamDecorator extends InputStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's quite clear what it does but Javadocs would still be nice.

"non-partitioned state changed.");
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good additions! 👍 😺


@Override
public RunnableFuture<OperatorStateHandle> snapshotState(
public SnapshotInProgressSubtaskState snapshotState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be final, similarly to how initializeState(OperatorStateHandles) is final.

/**
* This class holds all state handles for one operator.
*/
public class OperatorStateHandles {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be @Internal or at least @PublicEvolving. Also, the name clashes a bit with OperatorStateHandle which does something quite different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed about @Internal. For the name, however, I think it should somehow reflect how this is related to TaskStateHandles.

*
*/
public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception {
public SnapshotInProgressSubtaskState snapshot(long checkpointId, long timestamp) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can keep the old method signature by doing something like this:

    /**
     * Calls {@link StreamOperator#snapshotState(long, long, CheckpointStreamFactory)}.
     */
    public final StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception {
        synchronized (checkpointLock) {
            CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory(
                    new JobID(),
                    "test_op").createCheckpointStateOutputStream(checkpointId, timestamp);

            if (operator instanceof StreamCheckpointedOperator) {
                ((StreamCheckpointedOperator) operator).snapshotState(
                        outStream,
                        checkpointId,
                        timestamp);
            }

            RunnableFuture<OperatorStateHandle> snapshotRunnable = operator.snapshotState(
                    checkpointId,
                    timestamp,
                    stateBackend.createStreamFactory(new JobID(), "test_op"));

            if (snapshotRunnable != null) {
                outStream.write(1);
                snapshotRunnable.run();
                OperatorStateHandle operatorStateHandle = snapshotRunnable.get();

                InstantiationUtil.serializeObject(outStream, operatorStateHandle);
            } else {
                outStream.write(0);
            }

            snapshotToStream(checkpointId, timestamp, outStream);

            return outStream.closeAndGetHandle();
        }
    }

This multiplexes the results from the different operator snapshotting methods into the same stream. The restore method just tweezes out the correct items from the stream and hands them to the correct operator methods.

This would let all tests use the same method and we can keep the name/signature the same if we evolve the operator/snapshot interfaces.

Copy link
Contributor Author

@StefanRRichter StefanRRichter Oct 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea of having a single method is nice, and if there is no special reason why we should keep the old signature, I suggest to do it the other way around. OperatorSnapshotResultis already a container for all operator states (except the legacy state that will be removed in the near future). Using this removes the need for the multiplexing.

However, OperatorSnapshotResult does not contain the legacy state anymore, so for the time being, we might return a Tuple2 of both, or some special container class which could also strip away the RunnableFuture part.

What do you think?

@StefanRRichter StefanRRichter force-pushed the stream-keyed-state-checkpointing branch 4 times, most recently from 3c27fcb to a667eac Compare October 19, 2016 08:28
@StefanRRichter StefanRRichter force-pushed the stream-keyed-state-checkpointing branch 2 times, most recently from 3575092 to a678fb0 Compare October 19, 2016 13:59
@StefanRRichter StefanRRichter force-pushed the stream-keyed-state-checkpointing branch from b8beab3 to a996bc1 Compare October 20, 2016 08:16
@StefanRRichter StefanRRichter deleted the stream-keyed-state-checkpointing branch January 18, 2017 13:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants