Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-5929] Allow Access to Per-Window State in ProcessWindowFunction #3479

Closed
wants to merge 7 commits into from

Conversation

sjwiesman
Copy link
Contributor

Right now, the state that a WindowFunction or ProcessWindowFunction can
access is scoped to the key of the window but not the window itself.
That is, state is global across all windows for a given key.
For some use cases it is beneficial to keep state scoped to a window.
For example, if you expect to have several Trigger firings (due to early
and late firings) a user can keep state per window to keep some
information between those firings.

@aljoscha

Right now, the state that a WindowFunction or ProcessWindowFunction can
access is scoped to the key of the window but not the window itself.
That is, state is global across all windows for a given key.
For some use cases it is beneficial to keep state scoped to a window.
For example, if you expect to have several Trigger firings (due to early
and late firings) a user can keep state per window to keep some
information between those firings.
@aljoscha
Copy link
Contributor

aljoscha commented Mar 6, 2017

Thanks @sjwiesman! I'll have a look.

@aljoscha
Copy link
Contributor

aljoscha commented Mar 6, 2017

One quick initial remark: instead of each time having an anonymous inner class for the Context you can create a reusable class for that like this:

class InternalProcessWindowContext<IN, OUT, KEY, W extends Window>
        extends ProcessWindowFunction<IN, OUT, KEY, W>.Context {

    W window;
    InternalWindowFunction.InternalWindowContext internalContext;

    InternalProcessWindowContext(ProcessWindowFunction<IN, OUT, KEY, W> function) {
        function.super();
    }

    @Override
    public W window() {
        return window;
    }

    @Override
    public KeyedStateStore windowState() {
        return internalContext.windowState();
    }

    @Override
    public KeyedStateStore globalState() {
        return internalContext.globalState();
    }
}

Using it would looks something like this:

public final class InternalIterableProcessWindowFunction<IN, OUT, KEY, W extends Window>
		extends WrappingFunction<ProcessWindowFunction<IN, OUT, KEY, W>>
		implements InternalWindowFunction<Iterable<IN>, OUT, KEY, W> {

	private static final long serialVersionUID = 1L;

	private transient InternalProcessWindowContext<IN, OUT, KEY, W> context;

	public InternalIterableProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) {
		super(wrappedFunction);
	}

	@Override
	public void open(Configuration parameters) throws Exception {
		super.open(parameters);
		context = new InternalProcessWindowContext<>(wrappedFunction);
	}

	@Override
	public void apply(KEY key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
		// can deprecate
	}

	@Override
	public void process(KEY key, final W window, final InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception {
		this.context.window = window;
		this.context.internalContext = context;
		wrappedFunction.process(key, this.context, input, out);
	}

	@Override
	public void clear(final W window, final InternalWindowContext context) throws Exception {
		this.context.window = window;
		this.context.internalContext = context;
		wrappedFunction.clear(this.context);
	}

	@Override
	public RuntimeContext getRuntimeContext() {
		throw new RuntimeException("This should never be called.");
	}

	@Override
	public IterationRuntimeContext getIterationRuntimeContext() {
		throw new RuntimeException("This should never be called.");
	}

}

The function.super() call in there makes it work even though Context is itself defined as an inner abstract class of ProcessWindowFunction. It's a bit of black magic and not really too well known, I think. 😉


@Override
public KeyedStateStore windowState() {
if (windowAssigner instanceof MergingWindowAssigner) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of checking every time you could initialise the WindowContext with either a WindowStateStore or the (exception throwing) MergingStateStore at the beginning.


@Override
public KeyedStateStore globalState() {
if (windowAssigner instanceof MergingWindowAssigner) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think access to global state is fine for merging windows.

@@ -39,5 +40,31 @@
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
@Deprecated
void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aljoscha I meant to ask, should I leave this method or remove it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I meant to actually write that earlier. Yes: please remove. 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed an issue when removing apply. The method is used inside of AccumulatingKeyedTimePanes which takes in an AbstractStreamOperator as an argument to its evaluateWindow method. When creating the context I can get the global keyed state backend from the operator, but not the partitioned state because those methods are protected. Now the only two uses of this class are its subclasses which have both been deprecated. My question is, do you think I should modify the evaluateWindow method to accept a keyed state store which wraps the operator partitioned state or just throw an exception on context.windowState() because all valid uses of this method have been deprecated?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for now it's OK to throw an Exception here.

@sjwiesman
Copy link
Contributor Author

@aljoscha I made the changes you asked for. Just a heads up, there are a number of files that were superficially changed when migrating from apply -> process but are otherwise untouched.

Copy link
Contributor

@aljoscha aljoscha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks almost done now! 👍

I had some comments about leftover inner anonymous class contexts that were not yet moved over to the reusable context class.

@@ -153,6 +161,8 @@

protected transient Context context = new Context(null, null);

protected transient WindowContext windowContext = new WindowContext(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to make it more clear what they do we should rename these two contexts to triggerContext and processContext.

@@ -628,6 +644,123 @@ protected final boolean isCleanupTime(W window, long time) {
return time == cleanupTime(window);
}

public abstract class KeyedStateStoreWithWindow implements KeyedStateStore {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe comment that we have a base class where we can set the window so that we can once create a MergingKeyStore or WindowPaneKeyStore (depending on the window assigner) and then not care about the distinction anymore.

I remember I suggested this but now struggled to see why there is the base class. 😉

@Override
public void clear(final W window, final InternalWindowContext context) throws Exception {
ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction;
final ProcessAllWindowFunction<V, R, W>.Context ctx = wrappedFunction.new Context() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover anonymous inner Context. This should also use this.ctx like in process().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops 😱

@@ -92,12 +93,45 @@ public void process(K key, final Context context, Iterable<T> values, Collector<
result = foldFunction.fold(result, val);
}

windowFunction.process(key, windowFunction.new Context() {
ProcessWindowFunction<ACC, R, K, W>.Context ctx = windowFunction.new Context() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can benefit from a similar refactoring as the internal window functions, i.e. creating an internal context class instead of the anonymous inner classes.

This also holds for FoldApplyProcessAllWindowFunction, ReduceApplyProcessAllWindowFunction and ReduceApplyProcessWindowFunction.

@sjwiesman
Copy link
Contributor Author

sorry for the delay, things got crazy at work. let me know if there are any issues.

@aljoscha
Copy link
Contributor

Don't worry. 😃 Is it ready for another review pass now?

@sjwiesman
Copy link
Contributor Author

It looks like when I rebased on master I broke one of the scala side outputs test. I'm going to push a fix right now but it won't change any of the code surrounding this pr.

@aljoscha
Copy link
Contributor

Ok, please ping me when you pushed the fix. 😃

@sjwiesman
Copy link
Contributor Author

Pushed the fix, I had to update SideOutputsITCase so the ProcessAllWindowFunctions had a noop clear method. All tests passed locally, take a look

@aljoscha
Copy link
Contributor

Thanks!

@aljoscha
Copy link
Contributor

Thanks for implementing this! 😃

I just merged, could you please close this PR?

@sjwiesman
Copy link
Contributor Author

Done! Thank you for for helping me get this feature merged in. This has to be one of the most painless commits I've ever made to an open source project of this size.

@sjwiesman sjwiesman closed this Mar 25, 2017
@aljoscha
Copy link
Contributor

Hehe, thanks! 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants