Skip to content

[FLINK-16391][http] Add DelayedMessages and EgressMessages#43

Closed
igalshilman wants to merge 7 commits intoapache:masterfrom
igalshilman:enhence-http-protocol
Closed

[FLINK-16391][http] Add DelayedMessages and EgressMessages#43
igalshilman wants to merge 7 commits intoapache:masterfrom
igalshilman:enhence-http-protocol

Conversation

@igalshilman
Copy link
Copy Markdown
Contributor

This PR adds the following functionality to the RequestReply protocol:
functionality:

  • send messages with a delay
  • send a message to an egress.

Copy link
Copy Markdown
Contributor

@tzulitai tzulitai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @igalshilman, overall this looks good.
Some minor comments to be addresses before this is merged.

// from the egress_identifier.
message EgressMessage {
// The target egress id in the form of <namespace>/<name>
string egress_identifier = 1;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be consistent with the Address message, where the namespace and name are 2 separate fields?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either that, or we change the Address message to use the Python / YAML way of formatting namespace + name pairs: <namespace>/<name> and do the parsing in the RequestReplyFunction

PersistedTable.of("states", String.class, byte[].class);

private final SingleThreadedLruCache<String, EgressIdentifier<Any>> egressIdentifierCache =
new SingleThreadedLruCache<>(16);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move the 16 to a named class static final variable.

functionUnderTest.invoke(context, Any.getDefaultInstance());

// A message returned from the function
// that asks to put "hello" into the session state.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comments here is not coherent with what is being done in the test.

functionUnderTest.invoke(context, Any.getDefaultInstance());

// A message returned from the function
// that asks to put "hello" into the session state.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here: this comment is not coherent with the code

@tzulitai
Copy link
Copy Markdown
Contributor

tzulitai commented Mar 3, 2020

+1, LGTM now.
Will proceed to merge this ...

@tzulitai tzulitai closed this in 397d6ad Mar 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants