Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEMO-332] Refactor RunTimePass #191

Merged
merged 77 commits into from
Feb 7, 2019
Merged

[NEMO-332] Refactor RunTimePass #191

merged 77 commits into from
Feb 7, 2019

Conversation

johnyangk
Copy link
Contributor

@johnyangk johnyangk commented Jan 30, 2019

JIRA: NEMO-332: Refactor RunTimePass

Major changes:

  • Refactors the RunTimePass API
  • Refactors the run-time plan rewriting logic into the NemoPlanRewriter

Minor changes to note:

  • Replaces ResourceSkewedDataProperty with ResourceAntiAffinityProperty (generalizing the directive)

Tests for the changes:

  • Existing tests continue to pass

Closes #191

@johnyangk johnyangk self-assigned this Jan 30, 2019
@johnyangk johnyangk changed the title Refactor RunTimePass [NEMO-332] Refactor RunTimePass Jan 30, 2019
@johnyangk
Copy link
Contributor Author

Please review this PR after #190 is merged, as this PR builds on top of #190. Thanks!

Copy link
Contributor

@jeongyooneo jeongyooneo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I've left some comments.

/**
* DataSkewMetric ExecutionProperty.
* Number of partitions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate on where this 'number of partitions' info used?
Maybe mentioning some use cases(ex. skew handling) would be helpful.

public final class PartitionerProperty extends EdgeExecutionProperty<PartitionerProperty.Value> {
public final class PartitionerProperty
extends EdgeExecutionProperty<Pair<PartitionerProperty.Type, Integer>> {
public static final int AUTO_NUMBER_OF_PARTITIONS = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does 'auto' mean here? How about DESTINATION_PARALLELISM?

*
* @param value value of the new execution property.
* @return the newly created execution property.
* Use the automatic number of partitions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

@@ -23,13 +23,13 @@
/**
* Parallelism ExecutionProperty.
*/
public final class ParallelismProperty extends VertexExecutionProperty<Integer> {
public final class MinParallelismProperty extends VertexExecutionProperty<Integer> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What differs 'MinParallelism' and 'Parallelism'?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added class-level comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think that this property would be meaningful even when it doesn't show the accurate values of the parallelism of each vertex? Why not update the value of the ParallelismProperty instead when the cases described above occur?

@@ -16,28 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.common;
package org.apache.nemo.common.ir.vertex.system;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about having basic, optimization(or specialized?) 2-package structure?

  • optimization package for MessageAggregatorVertex and MessageBarrierVertex, as they're used in DAG optimization
  • basic package for StreamVertex, OperatorVertex, ... etc other than IRVertex, since they all extend IRVertex

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've explicitly added the SystemIRVertex abstract class to make it clearer that Stream/MessageBarrier/MessageAggregate vertices are provided by Nemo and allowed to be inserted into an IRDAG to optimize execution. (hence the "system" vertices)

@@ -28,14 +28,14 @@
* This transform can be used for merging input data into the {@link OutputCollector}.
* @param <T> input/output type.
*/
public final class RelayTransform<T> implements Transform<T, T> {
public final class StreamTransform<T> implements Transform<T, T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamTransform intuitively sounds like transform used in stream operation(other than batch).
How about ShuffleDataStreamTransform or something like that to keep the nuance of 'relaying promptly'?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this vertex can be decoupled from Shuffle, as it can be inserted regardless of the communication pattern, but I can't think of a better name at the moment. I do think that Stream is better than Relay in capturing the behavior of "immediately forwarding the input element without modifying it".

How about we revisit this later, after we add more system vertices and streaming functionalities?

@@ -33,7 +33,7 @@
* Constructor.
*/
public DedicatedKeyPerElementPartitioner() {
// TODO #288: DedicatedKeyPerElementPartitioner should not always return 0
// TODO #288: DedicatedKeyPerElement should not always return 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DedicatedKeyPerElement type Partitioner should not always return 0?

partitioner = new DedicatedKeyPerElementPartitioner();
break;
case Hash:
final int numOfPartitions = edgeProperties.get(PartitionerProperty.class).get().right();
// If AUTO, use the number of destination parallelism to minimize the number of partitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe copy this comment to the definition of AUTO_NUMBER_OF_PARTITIONS?

@wonook
Copy link
Member

wonook commented Jan 31, 2019

i'll take a look after #190

@johnyangk
Copy link
Contributor Author

Thanks @jeongyooneo, I've addressed the comments.

@johnyangk
Copy link
Contributor Author

Thanks @wonook, I've addressed the comments.

@johnyangk
Copy link
Contributor Author

@jeongyooneo @wonook Ping. 😄

@wonook
Copy link
Member

wonook commented Feb 7, 2019

LGTM on my side. @jeongyooneo anything else you want to take a look?

@jeongyooneo
Copy link
Contributor

Thanks! I'll merge this.

@jeongyooneo jeongyooneo merged commit 8ec2469 into apache:master Feb 7, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants