Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-1067: Physical execution graph and planner for fluent API #75

Closed
wants to merge 9 commits into from

Conversation

xinyuiscool
Copy link
Contributor

@xinyuiscool xinyuiscool commented Mar 3, 2017

Initial commit for the physical graph and plan. Design is there: https://issues.apache.org/jira/secure/attachment/12856670/SAMZA-1067.0.pdf.

The commit includes:

  1. Physical ProcessorGraph, where each processor represents a physical execution unit (e.g. a job in Yarn).
  2. A planner does the following:
    • create ProcessorGraph from StreamGraph. For this phase, the graph only contains a single node (single stage);
    • figure out the partitions of intermediate topics
    • create the topics

Please note currently the planner is used in the remote runner for now. Further changes/refactoring/cleanup are expected to be integrated with local runner.

Copy link
Contributor

@prateekm prateekm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @xinyuiscool, looks pretty good!

I have three general categories of comments here:
Readability/Naming: related to naming, code readability or documentation.
Assertions: My current understanding of the code, please correct me if I'm wrong. Will also be helpful to document these explicitly via comments so that others can verify/understand them. Would be even better to assert these in unit tests.
Issues: General questions and concerns.

Additionally, would be good to make these classes and methods immutable wherever possible, and otherwise document mutations and state updates clearly.

}

/**
* Validate the sinks should have outdegree being 0 and indegree greater than 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was some discussion about not making sendTo terminal. Let's check with @nickpan47 for more context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will talk to Yi about this.

I think if sendTo is not terminal, then the stream should be in both input and output streams of the StreamGraph. So it will be counted as an intermediate StreamEdge in the processorGraph. So the validation here won't be affected. But need to confirm with @nickpan47 about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think it's necessary to make them intermediate streams. We can use the MessageStream that's the input to the sendTo as an input to the next operator too.

calculatePartitions(streamGraph, processorGraph, sysAdmins);

// create the streams
createStreams(processorGraph, sysAdmins);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: I don't think the Planner should be creating the streams. Probably better for the ExecutionEnvironment/ApplicationRunner or a separate class to do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're exactly right! The stream creation shouldn't be part of planner. I need to refactor this into a new class named as StreamManager, so it can be used in both Remote/Local runners. Do you mind if I add the refactoring into the new patch (ApplicationRunner patch)? The reason I didn't include here is because at that time I didn't have a clear picture how to break down this part for LocalRunner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, we can do it later.

ProcessorGraph processorGraph = new ProcessorGraph(config);
Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInStreams().keySet());
Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutStreams().keySet());
Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: Isn't there a StreamGraph#getIntermediateStreams()? If not, would it be helpful for you if it did?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current there isn't such a method. Adding it will definitely make this part of logic easier to understand I think.

sinkStreams.removeAll(intStreams);

// add sources
sourceStreams.forEach(spec -> processorGraph.addSource(spec, processorId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: It'd be better if ProcessorGraph was immutable, took sources, sinks and intermediate streams as parameters and did the validation itself, either immediately on construction or in a validate() method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the planner decides the processors to create, and how to connect them using the intermediate streams, the construction of the graph is here instead inside the graph. The graph is immutable to classes outside the package since all the update methods are package private. I couldn't think of a better way to make it immutable. Any ideas?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Makes sense to keep it here for when the planner is doing more work and deciding which Processors to create.


/**
* Figure out the number of partitions of intermediate streams
* Package private for testing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Readability: I like the convention of adding /* package private */ before the method declaration. E.g.,

/* package private */ void calculatePartitions() {

Applies to other classes as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks for the recommendation.

}

if (sortedNodes.size() < pnodes.size()) {
// The remaining nodes have circles
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Readability: s/circles/cycles

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

sortedNodes.add(node);
node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
String nid = n.getId();
Long degree = indegree.get(nid) - 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Readability: Will be good to add a comment about what this whole block is trying to achieve.

Also, why decrement indegree? Are we trying to sort by indegree? If so, will be clearer to keep an incrementing counter and subtract that instead of mutating the indegree map directly (since then it's not really indegree anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added comments about what this algo does. Basically it implemented an indegree-based approach similar to this: http://www.geeksforgeeks.org/topological-sorting-indegree-based-solution/. It's different from the original approach in that it won't need to change the graph itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, the logic looks good. I meant to say that it'll be clearer to keep a running count and subtract that from the indegree, rather than mutating the indegree map directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not very clear about it. Let chat.



/**
* Validate the graph
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Readability: Document what validation entails.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the docs.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConfigInheritance {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: Looks like this is currently only used once. Is this going to be re-used in multiple places?

Readability: Needs documentation for what this is for and what it does. Also, the name doesn't make sense by itself, maybe 'ConfigInheritance[Helper|Util]'?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. It was used in multiple places before but the code got refactored and now only one place needs. it. I get rid of this class.

* under the License.
*/

package org.apache.samza.processorgraph;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming: Probably org.apache.samza.[execution|runner] or something similar would be more appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I put it here is that this class can access some package private functions of ProcessorGraph so the graph cannot be changed after it's created. Since it's tightly coupled with ProcessorGraph, I will keep this here for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, keeping them together makes sense. What I meant is that maybe "execution" or "runner" makes more sense as a top level package name than "processorgraph".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, execution seems to be a better package name. Thanks for the suggestion!

Copy link
Contributor

@prateekm prateekm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor documentation related comments and nitpicks, otherwise looks good to me. Thanks for the updates!

@@ -149,6 +149,7 @@ public void init(Config config, TaskContext context) {
* @param sinkFn the sink function
* @param stream the {@link OutputStream} where the message is sent to
* @param <M> type of input message
* @param opId operator ID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation: Should be before type parameters (the <M>)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

* under the License.
*/

package org.apache.samza.processorgraph;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, keeping them together makes sense. What I meant is that maybe "execution" or "runner" makes more sense as a top level package name than "processorgraph".

calculatePartitions(streamGraph, processorGraph, sysAdmins);

// create the streams
createStreams(processorGraph, sysAdmins);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, we can do it later.

/**
* Create the physical graph from StreamGraph
*/
/* package private for testing */ ProcessorGraph createProcessorGraph(StreamGraph streamGraph) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation: Don't need "for testing"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

sinkStreams.removeAll(intStreams);

// add sources
sourceStreams.forEach(spec -> processorGraph.addSource(spec, processorId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Makes sense to keep it here for when the planner is doing more work and deciding which Processors to create.

* It contains the topology of execution processors connected with source/sink/intermediate streams.
* High level APIs are transformed into ProcessorGraph for planing, validation and execution.
* Source/sink streams are external streams while intermediate streams are created and managed by Samza.
* Note that intermediate streams can be both the input and output of a ProcessorNode in ProcessorGraph.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation: s/can be/is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


/**
* Returns the processors to be executed in the topological order
* @return list of {@link ProcessorNode}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation: "unmodifiable list/set of ", here and other methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}

/**
* Validate the sinks should have outdegree being 0 and indegree greater than 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think it's necessary to make them intermediate streams. We can use the MessageStream that's the input to the sendTo as an input to the next operator too.

sortedNodes.add(node);
node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
String nid = n.getId();
Long degree = indegree.get(nid) - 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, the logic looks good. I meant to say that it'll be clearer to keep a running count and subtract that from the indegree, rather than mutating the indegree map directly.

Config scopedConfig = fullConfig.subset(configPrefix);

Config[] configPrecedence;
if (INHERIT_ROOT_CONFIGS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably remove this and the field since this method isn't shared.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a left-over from the early pipeline code. Let me remove it. Thanks for catching it.

*/
/* package private */ ProcessorGraph createProcessorGraph(StreamGraph streamGraph) {
// For this phase, we are going to create a processor for the whole dag
String processorId = config.get(JobConfig.JOB_NAME()); // only one processor, use the job name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need some combination of Job name and Job Id here? I thought the tuple [Job name, Job Id] identifies a job uniquely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I use job name for processorId since it's easier to generate multi-stage job names by appending indexes afterwards, like test-1, test-2. I don't expect multiple instances of the same job exists in the same application so I didn't include both. I agree if we want to support this, we need to have a tuple data structure like ProcessorUid which has both inside.

// set the partitions of a stream to its StreamEdge
streamToMetadata.forEach((stream, data) -> {
int partitions = data.getSystemStreamPartitionMetadata().size();
streamToStreamEdge.get(stream).setPartitions(partitions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: setPartitionCount(partitions)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

private final StreamSpec streamSpec;
private final List<ProcessorNode> sourceNodes = new ArrayList<>();
private final List<ProcessorNode> targetNodes = new ArrayList<>();
private final Config config;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appears that the config is no longer accessed (for instance, config does not have getters). Is this for a future use-case?

StreamEdge in the graph does not need the config object when the streamSpec and the enclosing ProcessorGraph already have the reference to the config right?

The constructor can instead be:
StreamEdge(StreamSpec spec)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Removed the config in this class.

*/
public class StreamEdge {
private final StreamSpec streamSpec;
private final List<ProcessorNode> sourceNodes = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, there is only one ProcessorNode created right for the entire application?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right!

return targetNodes;
}

int getPartitions() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document that we return -1 if not assigned yet. Even better, return a final constant and use it to consistently to check in all places. A couple of places in ExecutionPlanner check for -1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great. I made a final constant for -1 and used it in other places.

}
// assign the partition count
for (StreamEdge edge : joinSpecToStreamEdges.get(join)) {
if (edge.getPartitions() <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What conditions would cause an edge.getPartitions() == -1 at this point? Will be super-helpful to document this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Contributor

@vjagadish1989 vjagadish1989 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contributions!

looks good. approved.

@asfgit asfgit closed this in 1676eab Mar 16, 2017


/**
* The ExecutionPlanner creates the physical execution graph for the StreamGraph, and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please use {@link xxx} for the class names in Javadoc.


if (!processorGraph.getIntermediateStreams().isEmpty()) {
// figure out the partitions for internal streams
calculatePartitions(streamGraph, processorGraph, sysAdmins);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor comment: we can wrap streamGraph as a constructor parameter into ProcessorGraph.

shanthoosh pushed a commit to shanthoosh/samza that referenced this pull request Mar 16, 2017
Initial commit for the physical graph and plan. Design is there: https://issues.apache.org/jira/secure/attachment/12856670/SAMZA-1067.0.pdf.

The commit includes:

1) Physical ProcessorGraph, where each processor represents a physical execution unit (e.g. a job in Yarn).
2) A planner does the following:
   - create ProcessorGraph from StreamGraph. For this phase, the graph only contains a single node (single stage);
   - figure out the partitions of intermediate topics
   - create the topics

Please note currently the planner is used in the remote runner for now. Further changes/refactoring/cleanup are expected to be integrated with local runner.

Author: Xinyu Liu <xiliu@xiliu-ld.linkedin.biz>

Reviewers: Jagadish Venkatraman <jvenkatraman@linkedin.com>

Closes apache#75 from xinyuiscool/SAMZA-1067
prateekm pushed a commit to prateekm/samza that referenced this pull request Mar 17, 2017
Initial commit for the physical graph and plan. Design is there: https://issues.apache.org/jira/secure/attachment/12856670/SAMZA-1067.0.pdf.

The commit includes:

1) Physical ProcessorGraph, where each processor represents a physical execution unit (e.g. a job in Yarn).
2) A planner does the following:
   - create ProcessorGraph from StreamGraph. For this phase, the graph only contains a single node (single stage);
   - figure out the partitions of intermediate topics
   - create the topics

Please note currently the planner is used in the remote runner for now. Further changes/refactoring/cleanup are expected to be integrated with local runner.

Author: Xinyu Liu <xiliu@xiliu-ld.linkedin.biz>

Reviewers: Jagadish Venkatraman <jvenkatraman@linkedin.com>

Closes apache#75 from xinyuiscool/SAMZA-1067
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants