Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-21308] Support delayed message cancellation #241

Closed
wants to merge 5 commits into from

Conversation

igalshilman
Copy link
Contributor

@igalshilman igalshilman commented Jun 23, 2021

This PR adds the ability to cancel delayed messages.

  • see FLINK-21308 for the detailed use case that drives this PR.

High level changes

This PR introduces the following methods in the embedded SDK, and the corresponding remote SDKs (language specific flavors)

/**
   * Invokes another function with an input (associated with a {@code cancellationToken}),
   * identified by the target function's {@link Address}, after a given delay.
   *
   * <p>Providing an id to a message, allows "unsending" this message later. ({@link
   * #cancelDelayedMessage(String)}).
   *
   * @param delay the amount of delay before invoking the target function. Value needs to be &gt;=
   *     0.
   * @param to the target function's address.
   * @param message the input to provide for the delayed invocation.
   * @param cancellationToken the non-empty, non-null, unique token to attach to this message, to be
   *     used for message cancellation. (see {@link #cancelDelayedMessage(String)}.)
   */
  void sendAfter(Duration delay, Address to, Object message, String cancellationToken);

  /**
   * Cancel a delayed message (a message that was send via {@link #sendAfter(Duration, Address,
   * Object, String)}).
   *
   * <p>NOTE: this is a best-effort operation, since the message might have been already delivered.
   * If the message was delivered, this is a no-op operation.
   *
   * @param cancellationToken the id of the message to un-send.
   */
  void cancelDelayedMessage(String cancellationToken);
  • The semantic of the cancellation token is opaque to StateFun, its content and uniqueness is completely user defined.
  • violating uniqueness constraint for two different methods will result in a runtime error.
  • Once the delayed message has been dispatched, the cancellation token is forgotten.

Internal Changes

request reply protocol:

We attach the (optional) cancelltion_token to the delayed invocation in the request-reply protocol.

// DelayedInvocation represents a delayed remote function call with a target address, an argument
// and a delay in milliseconds, after which this message to be sent.
    message DelayedInvocation {
        // an optional cancellation token that can be used to request the "unsending" of a delayed message.
        string cancellation_token = 10;
        // the amount of milliseconds to wait before sending this message
        int64 delay_in_ms = 1;
        // the target address to send this message to
        Address target = 2;
        // the invocation argument
        TypedValue argument = 3;
    }

In addition, a new response message:

// DelayCancellation represents a single delayed-message cancellation request.
message DelayCancellation {
        string cancellation_token = 1;
}

State

We add an additional state handle (delayed-message-index) to keep track of the mapping between a cancellation_token and the absolute timestamp that this message needs to be dispatched at.

These changes are then wired throughout the SDK and the runtime to make the magic happen.

@tzulitai tzulitai self-requested a review July 2, 2021 07:00
Copy link
Contributor

@tzulitai tzulitai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks very good @igalshilman!

I have one minor comment about optimizing (though perhaps overly doing it), depending on how common the scenario I described occurs. What do you think?

Other than that, this looks good to merge!

+ messageId
+ " and timestamp "
+ untilTimestamp
+ ", but a message with the same id exists and with a timestamp "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: excessive "and"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: id --> cancellation token in the error messages.
We seem to refer to these as cancellation tokens on the API surface.

result.getOutgoingDelayCancellationsList()) {
String token = delayCancellation.getCancellationToken();
if (!token.isEmpty()) {
context.cancelDelayedMessage(delayCancellation.getCancellationToken());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assume the following scenario:
Within the same FromFunction response from a remote function, there is a delayed message with cancellation token foobar. At the same time, there is also a cancellation token for foobar.

I'm wondering if we want to be diligent in this scenario and eagerly "cancel" out the delayed message, without going through a roundtrip of adding it to the buffer / index state, just to delete it almost imediately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might even be a common case.

Say, for the same invocation batch with a given key, an earlier message triggers a delayed message that is cancellable. Maybe as a timer to purge state etc. in the absence of a follow up event.

However, within that same batch, the follow up event is processed and users immediately wants to cancel the delayed message. We can maybe optimize this scenario?

// so it would be re-enqueued into the delayedMessageBuffer.
delayedMessagesBuffer.clearForTimestamp(triggerTimestamp);
reductions.processEnvelopes();
public void onProcessingTime(InternalTimer<String, VoidNamespace> timer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

} catch (Exception e) {
throw new RuntimeException(
"Error accessing delayed message in state buffer for timestamp: " + timestamp, e);
throw new IllegalStateException("Failed clearing a message with id " + token, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change id in the error message to cancellation token, to be consistent and not confuse users.

@igalshilman
Copy link
Contributor Author

@tzulitai Thanks for the review! I've squashed the previous history into a single commit, as the history wasn't that important. and the last 4 commits are the result of our conversation earlier today.

@tzulitai
Copy link
Contributor

tzulitai commented Jul 8, 2021

Thanks @igalshilman! Merging this!

@tzulitai tzulitai closed this in e1b0c29 Jul 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants