Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEMO-7] Intra-TaskGroup pipelining #2

Merged
merged 395 commits into from May 10, 2018
Merged

[NEMO-7] Intra-TaskGroup pipelining #2

merged 395 commits into from May 10, 2018

Conversation

jeongyooneo
Copy link
Contributor

@jeongyooneo jeongyooneo commented Mar 5, 2018

JIRA: NEMO-7: Refactor TaskGroupExecutor to a batch/stream-unified execution model

Major changes:

  • Refactored TaskGroupExecutor to a pipelined execution model(Thanks to @sanha)
    • Per-Task InputReader/OutputWriter and TaskState management are removed.
    • Per-Task Pipe acts as a local writer for this Task to where it emits output, and a local reader for its children Tasks from where they receives input.
    • Input is deserialized element-wise, trickle down to the end of TaskGroup DAG in a recursive manner.
      • iteratorIdToDstTasksMap maintains source input(data from bounded source or other TaskGroups) to the Task DAG that processes this input.
      • pipeToDstTasksMap maintains intra-TaskGroup input to the Task DAG that processes this input.

TaskGroup-wise InputReaders and OutputWriters

Recursive execution of intra-TaskGroup pipelining

Minor changes to note:

  • None

Tests for the changes:

  • The existing tests are sufficient.

Other comments:

  • None

wynot12 and others added 30 commits August 18, 2017 18:02
Closes #268

#268 has reported that importing some Beam's hdfs-related maven artifacts causes integration failures.

But I checked that it causes no problem and the tests run well.
So now we can use it to deal with IO to/from Hdfs.
Resolves #235.

- On master side, removed the Task state updating which isn't used anywhere.
- On master and executor side, removed failedTaskIds information which is not used.
The only Task state that is meaningful to master is ON_HOLD, thus tasksPutOnHold information is maintained.
* [Vortex-418] Collect Executor Metric in Master

* Flush remaining metrics while closing the metric sender

* rm index increment
This PR addressed #404 by

- implements MetadataManager that manages the metadata in the master side
- makes RemoteFileMetadata to communicate with the master to store / get the metadata

Closes #404
Resolves #417.

This PR:

- Defines formats for Executor side Job/Stage/TaskGroup/Task execution metric
Part of #440

* Adds FileArea and FileStore#getFileAreas method. This will be used for
  zero-copy FilePartition transfer(#353), as descriptors for file regions.
* Adds HashRange to designate hash ranges and modifies existing codes
  to use it.
This PR:
Resolves #355, #300

- Makes metadata server support the block region reservation.
- Makes GlusterFileStore support block appending write.
(During this process, the store communicates with the metadata server for each block append)
- Refactors FilePartitions and FileMetadatas.
- Revises the partition state managing for I-File write.
(This is because that I-File has multiple producer tasks)
This PR:

- Makes the stage partitioning part into a separate pass.
- Makes stages into an attribute to tag each IR Vertex with.
- Makes the Optimizer completely static
- Makes necessary changes to the tests.

resolves #426
…en compiler and runtime (#429)

This PR:
Resolves #423

- Creates a PubSubEventHandlerWrapper for a global PubSubEventHandler
We can create subscriptions between different events and their corresponding handlers.
- Creates a CompilerEventHandler to handle events on the compiler side
- Creates a CompilerEvent interface
- Creates a DynamicOptimizationEvent
- Creates a RuntimeEventHandler to handle event on the runtime side
- Creates a RuntimeEvent interface
- Creates a UpdatePhysicalPlanEvent
- Refactors Optimizer to become static
- Refactors the dynamic optimization pass to a new separate class, for better modularity.
- Provides an API on the scheduler to be able to update the PhysicalPlan.
- Fixes tests to use the updated structure
Resolves #173
A part of #440

- Defines data transfer protocol. 
- Introduces two types of the frame, namely control frame and data frame.
This PR adds a command line parameter for JVM heap slack

Resolves #451
This PR:

- Does minor cleanups and fixes that have been lost during review of #429.
- Fixes confusion that might occur about the position of each handlers and events.
This PR improves job state logging by

* removing duplicated logs
* removing messages meaning that job state has been saved to a json file
* adding messages about the number of remaining TaskGroups.
Resolves #450 
A part of #440

With this PR, the vortex runtime can
* Start PartitionTransport server (See PartitionTransport)
* Set up channel pipeline (See ChannelInitializer)
* Expose API to initiate pull/push-based transfer
  (See PartitionTransfer)
Resolves #460 
A part of #440 

ControlMessageToPartitionStreamCodec builds control messages for outgoing requests, and PartitionStreams for incoming requests.
This PR:

- Implements the algorithm required for even data partitioning in DataSkewDynamicOptimizationPass.
- Provides types for metric data
- Provides a minor fix in DataSkewPass about the part where it was tied to BEAM.

resolves #390
Resolves #448, since current REEF master uses Netty 4.0.23.Final

- Removes dependency: REEF 0.15.0
- Adds dependency: REEF 0.17.0-SNAPSHOT
- Adds bundled Maven local repository to include prebuilt artifacts
- Removes custom Wake JAR file (see #272)
- Makes NativeChannelImplementationSelector default
- Fixes a bug that made PartitionTransport unable to retry
binding if one of the candidate ports is unavailable
This PR:

- Adds the ScheduleGroupNum attribute
- Provides a ScheduleGroupPass for annotating each vertices with the attribute
- Provides tests to make sure it functions as intended

resolves #437
Resolves #465
A part of #440

This PR implements end-to-end stream-based partition transfer API.
Netty EventLoopGroup threads, encoder/decoder threads, and
user threads (PartitionManagerWorker) are involved.
Resolves #265 .

This PR:

- Waits for all requested executors to be returned even when `ContainerManager` is terminated to ensure REEF job completes.
- Resolves the possible root cause for occasional CI build failures.
Resolves #487 .

This PR:

- Adds ScheduleGroup to `Stage` and `PhysicalStage`
- Checks that all vertices in a single `Stage` have identical ScheduleGroupIndex (Integrity check)
Drops PartitionTransferPeer to use PartitionTransfer

Resolves #440 

* Drops PartitionTransferPeer in favor of PartitionTransfer, to transfer data between executors
* Rename {fetch=>pull}, {send=>push} as discussed
This PR:

- Exposes the Block to upper level and make it as a unit of write (& read in #463)
- Makes PartitionStore to unaware some conditions such as writable or hashed (instead, make PartitionManagerWorker and above things manage it)
- Prepares to support the "Incremental" read (#463).
-> Make FilePartition can "commit" blocks after write
- Modularizes the functions in OutputWriter to ready to "componentize" them asDataTransferStrategy
-> Make OutputWriter can "commit" partitions
-> Make OutputWriter can choose the synchronization of write
Resolves #475, #444
Resolves #419, #443.

This PR:

- Collects execution times of Job/Stage/TaskGroup/Task and log them.
- Job/Stage metrics: collected in Master at JobStateManager.
- TaskGroup/Task metrics: collected in Executor at TaskGroupStateManager and sent to Master. Especially, CONTAINER_FAILURE among FAILURE_RECOVERABLE is now propagated to Master in the form of protobuf control messages.
…p (#496)

Resolves #438 .

This PR:

- Changes `BatchScheduler` to schedule `TaskGroups` according to `ScheduleGroup`
- Changes `BatchSchedulerTest` to meet the changes.
This PR handles committed partitions of a TaskGroup that fails while executing
…fferent (#502)

Resolves #481 .

This PR:

- Adds `getAncestors()` to `DAG.java`
- Enables `PendingTaskGroupPriorityQueue` to let lower priority stages to be scheduled if the container type differs from the higher priority stages.
@sanha
Copy link
Contributor

sanha commented Mar 9, 2018

Unfortunately, I've noticed that there is a critical bug in this change.
When I run our MapReduce example for 512 GB Wikimedia data in Amazon EC2, all executors hanged at the first TaskGroup of reduce stage. (It seems that this hanging does not occur in our ITCase size.)
We must resolve this problem before we merge this PR. Let's investigate this issue thoroughly.

@sanha
Copy link
Contributor

sanha commented Mar 9, 2018

I strongly suspect that the sink vertex creates temporary file per every element.
After I killed the hanged job, HDFS failed to access the output directory.

@bgchun
Copy link

bgchun commented Mar 9, 2018

@jeongyooneo How do you test your changes in this PR?

@sanha
Copy link
Contributor

sanha commented Mar 9, 2018

Here's a good news!
I confirmed that the bug I've reported is fixed after the last commit.
The experiment works well. Thanks for the hard work!

@jeongyooneo
Copy link
Contributor Author

jeongyooneo commented Mar 9, 2018

@bgchun I’ve modified the TaskGroupExecutorTest to element-wise version and checked that all ITCases worked correctly with this PR.
If you have any additional suggestion, please let me know, and I’ll try them right away!

@jeongyooneo
Copy link
Contributor Author

Thanks for the review!
I've addressed most of them, except the following two:

  1. For refactoring metric collection code into a separate class requested by @johnyangk, I think we'd better do that in another issue.
    Since there are more types of metric to be added, it'd be great if we clean them up after the types of metric we need and their usages become concrete.
  2. For checking TaskGroup complete event by collecting finished Task ids, there are subtle task completion timing issues that made me keep this as is.
  • TaskGroup is complete when all of its Tasks are complete, i.e. each Task consumed all of its input.
  • As input is processed element-wise, speculating the exact point when final task is finished is hard. There can be multiple final tasks that process input independently, whose finishing point are known only to each task. Thus, we still need to collect those final task ids to check TaskGroup completion.

sanha
sanha previously approved these changes Mar 18, 2018
Copy link
Contributor

@sanha sanha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work @jeongyooneo! I've added some comments but it looks good to me now.
@johnyangk @seojangho please check it if you want.

@@ -35,13 +33,13 @@ public RelayTransform() {
}

@Override
public void prepare(final Context context, final OutputCollector<T> oc) {
this.outputCollector = oc;
public void prepare(final Context context, final OutputCollector<T> p) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's change p to oc again. (for all other changes also)

*/
private final Map<String, List<InputReader>> physicalTaskIdToInputReaderMap;
private final Map<String, List<OutputWriter>> physicalTaskIdToOutputWriterMap;
private final List<InputReader> inputReaders;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check this comment again.

private final Map<String, List<TaskDataHandler>> srcIteratorIdToDataHandlersMap;
private final Map<String, List<TaskDataHandler>> iteratorIdToDataHandlersMap;
private final LinkedBlockingQueue<Pair<String, DataUtil.IteratorWithNumBytes>> partitionQueue;
private List<TaskDataHandler> taskDataHandlers;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to maintain this list and srcIteratorIdToDataHandlersMap, srcIteratorIdToDataHandlersMap at the same time?

private final Map<String, List<TaskDataHandler>> iteratorIdToDataHandlersMap;
private final LinkedBlockingQueue<Pair<String, DataUtil.IteratorWithNumBytes>> partitionQueue;
private List<TaskDataHandler> taskDataHandlers;
private Map<OutputCollectorImpl, List<TaskDataHandler>> outputToChildrenDataHandlersMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we see this information in TaskDataHandler?

final String physicalTaskId = getPhysicalTaskId(boundedSourceTask.getId());
final Map<String, Object> metric = new HashMap<>();
metricCollector.beginMeasurement(physicalTaskId, metric);
private boolean hasOutputWriter(final Task task) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments to all these private methods.

final Object element = outputCollector.remove();

// Pass outputCollectorOwnerTask's output to its children tasks recursively.
if (!childrenDataHandlers.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have to check this?

} catch (final BlockWriteException ex2) {
taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_RECOVERABLE,
Optional.empty(), Optional.of(TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
LOG.info("{} Execution Failed (Recoverable: output write failure)! Exception: {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG.error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check this comment again.

} catch (final Exception e) {
taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_UNRECOVERABLE,
Optional.empty(), Optional.empty());
LOG.info("{} Execution Failed! Exception: {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check this comment again.

} catch (final InterruptedException e) {
throw new BlockFetchException(e);
// Process element-wise depending on the Task type
if (task instanceof BoundedSourceTask) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we pre-build these stuffs as Callable in TaskDataHandler, we can avoid to doing instanceofs per every elements.

*/
public List<O> collectOutputList() {
return outputList.getAndSet(new ArrayList<>());
public O remove() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check this comment again

@johnyangk
Copy link
Contributor

Thanks @sanha. I'll make another pass once @jeongyooneo addresses the comments here.

seojangho referenced this pull request in snuspl/incubator-nemo Apr 12, 2018
…pache#165)

* Add getStateMachine()

* TaskGroup getId()

* ExecutionStateManager introduced

* New scheduling policies

* Update states

* Change SchedulingException

* Remove SampleScheduler

* Update ExecutorRepresenter

* Update scheduling policy

* Add ExecutionStateManager

* Add BatchRRScheduler

* Changed interface for Scheduler

* Add TaskStateManager

* change test name

* Add google protobuf: needs to be downgraded

* Checkstyle/compilation resolved

* protobuf files

* Revert States

* Add more exceptions

* Cleanup states (again)

* TaskStateManager

* ExecutorRepresenter to only contain taskGroup Ids

* Change scheduling policy interface

* Add protobuf message as a sample

* Update execution state manager interfaces

* Change an exception

* Scheduler modified

* Add TODOs

* Comments added

* More comments

* Add current states to toString

* Consider multithreading

* ExecutionManagerTest

* Physical DAG bug fixed

* ExecutionStateManager: add print

* RRSchedulerTest

* TaskGroupStateManager fixed

* RRScheduler minor bug fixed

* Scheduler

* SchedulerTest

* Comments addressed 1

* Protobuf compilation changed (w/ proto3)

* Ommitted tests

* Scheduler -> BatchScheduler

* Comments addressed #2

* Comments addressed #3

* Add ommitted test

* Comments addressed #4

* Reduce log level

* Remove unnecessary
@jeongyooneo
Copy link
Contributor Author

jeongyooneo commented May 9, 2018

@sanha I've addressed the comments except the following two. These will be addressed via another issue of refactoring TaskDataHandler-based data transfer:

  • Removing local variables dealing with iterators in TaskGroupExecutor
  • Pre-build runTask per-task type operation as Callable in TaskDataHandler

Copy link
Contributor

@sanha sanha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some minor comments. (I just skimmed the TaskGroupExecutor class because we already talked about it many times and you have a plan to refactor it.) Thanks!
@johnyangk @seojangho please check this PR and approve if you are happy with it.

* Set this OutputCollector as having side input for the given child task.
*
* @param physicalTaskId the id of child task whose side input will be put into this OutputCollector.
*/
public void setAsSideInputFor(final String physicalTaskId) {
sideInputReceivers.add(physicalTaskId);
}

public boolean hasSideInputFor(final String physicalTaskId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment please :)


/**
* Collect transform.
* @param <T> type of data to collect.
*/
public final class CollectTransform<T> implements Transform<T, T> {
private String filename;
private FileOutputStream fos;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to have these fos and oos as variables? It can be created only in close().

}

@Override
public void close() {
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use try-with-resource clause for these Streams.

@@ -60,7 +59,7 @@ public void prepare(final Context context, final OutputCollector<O> outputCollec
}

@Override
public void onData(final Iterator<I> elements, final String srcVertexId) {
public void onData(final Object element) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I element?

@@ -25,10 +25,10 @@
/**
* A data-skew policy with fixed parallelism 5 for tests.
*/
public final class DataSkewPolicyParallelsimFive implements Policy {
public final class DataSkewPolicyParallelismFive implements Policy{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert the space between Policy and bracket.

} catch (final Exception e) {
taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_UNRECOVERABLE,
Optional.empty(), Optional.empty());
LOG.info("{} Execution Failed! Exception: {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check this comment again.

sideInputRuntimeEdge = edge;
}

public RuntimeEdge getSideInputRuntimeEdge() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment :)

public void setAsSideInputFor(final String physicalTaskId) {
sideInputReceivers.add(physicalTaskId);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment :)

*
* @param dataToWrite An iterable for the elements to be written.
* * @param element
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the duplicated *

public final class TaskDataHandler {
private static final Logger LOG = LoggerFactory.getLogger(TaskDataHandler.class.getName());

public TaskDataHandler(final Task task) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments for the constructor and getters.

Copy link
Contributor

@johnyangk johnyangk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancel 'Request changes'.

johnyangk
johnyangk previously approved these changes May 10, 2018
@jeongyooneo
Copy link
Contributor Author

@sanha Just finished addressing comments!

Copy link
Contributor

@sanha sanha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I'll also look forward to your remaining refactoring pr. Thanks for the work!

@sanha sanha merged commit cf041f6 into master May 10, 2018
@sanha sanha deleted the 717-TGE branch May 10, 2018 06:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet