Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-1271: Guarantee predictable, deterministic order for operator initialization and finalization #211

Closed

Conversation

vjagadish1989
Copy link
Contributor

Currently, the order of initialization of operators in the Samza high level API is not deterministic. The non-determinism arises from two primary causes:

  • No fixed order of iteration for all subscribed OperatorSpecs for a given MessageStream
  • No fixed order of iteration for all the OperatorImpls in the OperatorImplGraph

We aim to provide the following 2 guarantees in this patch:
For any 2 operators A, B in the graph, if B consumes the output of A:

  • A is initialized before B is initialized
  • A is finalized only after B is finalized

@vjagadish1989
Copy link
Contributor Author

Ping @prateekm , @nickpan47 , @xinyuiscool for reviews!

Copy link
Contributor

@prateekm prateekm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good, thanks!

@@ -18,6 +18,7 @@
*/
package org.apache.samza.operators.impl;

import com.google.common.collect.Lists;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Maybe Collections.reverse?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lists.reverse is slightly nicer as it does not mutate the passed in list. We appear to be having a guava dependency anyways in samza-core

public void close() {
filterFn.close();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete extra newline.

@@ -71,6 +71,11 @@ private OperatorSpecs() {}
public void init(Config config, TaskContext context) {
mapFn.init(config, context);
}

@Override
public void close() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks.

@@ -26,6 +26,10 @@
*
* <p> Implement {@link #close()} to free resources used during the execution of the function, clean up state etc.
*
* <p> Order of finalization: {@link ClosableFunction}s are invoked in the reverse topological order of operators in the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Order of closing", "are closed"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prateekm : Fixed in a follow-up commit here : 95bb8e2 since I ended up closing this PR!

@asfgit asfgit closed this in ad8ba96 Jun 5, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants