Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results #254

Closed
wants to merge 5 commits into from

Conversation

uce
Copy link
Contributor

@uce uce commented Dec 8, 2014

FLINK-25: Offer buffer-oriented runtime abstraction

  • The distributed runtime API was originally record-oriented. The output side has previously been refactored to a buffer-oriented API (BufferWriter). With this pull request, the input side offers a similar interface: BufferReader.
  • This enables us to directly work on the serialized data in the future. Previously, this was impossible as buffers were directly deserialized after being handed over to an input channel.
  • Currently, the buffer-oriented readers and writers are always wrapped by record-oriented readers and writers. The deserialization logic of the input channels has essentially moved to the readers.
  • The way of registering I/O has changed. The life-cycle of each task (see AbstractInvokable) involves two methods: registerInputOutput() and invoke(). The I/O setup was coupled with the creation of readers/writers in registerInputOutput() although the required information is independent of this Therefore, they are now directly instantiated by the runtime environment and can be accessed via getEnvironment().getReader(int) and getEnvironment().getWriter(int) respectively.

FLINK-986: Add initial support for intermediate results

  • This commit introduces the abstraction of intermediate results into the distributed runtime. It is essentially the runtime-part of @StephanEwen's rework of the scheduler/execution graph (FLINK-1094), which made the scheduler/execution graph aware of intermediate results. Job graphs used to essentially look like this: Source => Task i => ... => Sink. With Stephan's rework, they already look like this: Task:Intermediate Result <= Task i:Intermediate Result <= ... <= Task:Intermediate Result. Tasks produce intermediate results and attach to them for consumption. These changes logically decoupled produced results and in the job graph. At the runtime level, the intermediate results were still tightly coupled to the producing and consuming task. With this pull request, the producing task, the produced intermediate result, and the consuming task are decoupled as well.
  • Previously, the network stack was responsible to package buffers and immediately dispatch them to their receivers in a push-fashion. The first buffer for a receiver resulted in the scheduling of this receiver. With the new model, we gain much more flexibilty in the way we can produce and consume these intermediate results. The current state is feature equivalent with the current master, e.g. the produced intermediate result is pipelined and not persistent. I ran a few performance comparisons and the performance was in the same ball park as the current master although it was (consistently) slightly slower.

Changes

Buffer management/The life of a Buffer

  • The buffers for the network stack are allocated by each task manager at start up time on the heap by the NetworkBufferPool (previously GlobalBufferPool). These network buffers are divided uniformly at runtime among the tasks. The current default for this pool is set to 2048 buffers (each 32k bytes), resulting in a total of 64 MB. Intermediate results get their buffers from this buffer pool. Therefore, for any in-memory persistent data set to make sense, we would have to give the network pool a lot more memory. I think that this is currently not desirable as our memory management is static, e.g. memory given to the network pool would be missing at operators like sorting or hashing. In my opinion, adaptive memory management (FLINK-1101) is therefore a requirement before continuing with more runtime intermediate result variants (see below).
  • The buffer management for the pipelined intermediate results has essentially not changed. We need at least a single buffer per outgoing channel (or queue in the terms of the new runtime). When a task is submitted to a task manager, the network buffers are divided as follows: a single buffer pool per produced result (previously one for all produced results) and one pool per consumed result (unchanged).
  • A produced buffer used to follow the following high-level path: BufferWriter => ChannelManager => NETWORK xor local InputChannel. Now it is BufferWriter => IntermediateResult. What happens at the intermediate result is flexible depending on the IntermediateResultType. Early discussions with @StephanEwen suggested the following three dimensions:
    1. persistent/ephemeral,
    2. pipelined/blocking, and
    3. back pressure/no back pressure.

The current state offers an ephemeral-pipelined-back pressure implementation, which is what we currently have. (I have removed code for a persistent-blocking-no back pressure variant for now.)

Intermediate results, intermediate result partitions and intermediate result partition queues

  • As noted above, tasks at the job graph level are associated with intermediate results. When scheduling these graphs, the intermediate result is divided into result partitions (associated with execution vertices). Each parallel sub task (execution vertex) produces one partition of this result (it produces possibly multiple intermediate results).
  • At the runtime level, these partitions are further divided into queues depending on the degree of parallelism. For example a map-reduce with degree of parallelism of 2 might look like this at the map side:
                                             +---------+
+-------+               +-------------+  +=> | Queue 1 |
| Map 1 | = produces => | Partition 1 | =|   +---------+
+-------+               +-------------+  +=> | Queue 2 |
                                             +---------+

                                             +---------+
+-------+               +-------------+  +=> | Queue 1 |
| Map 2 | = produces => | Partition 2 | =|   +---------+
+-------+               +-------------+  +=> | Queue 2 |
                                             +---------+

Scheduling of consumers and receiver-initiated sending

  • Depending on the intermediate result type, it is necessary to deploy the consumers either when the first buffer is produced or after all buffers have been produced (depending on the pipelined vs. blocking type). The receivers then request the respective intermediate result partition queue from the task managers where the partitions were produced. For a data repartitioning as in a map-reduce, each sub task would request the queue matching its subtask number, e.g.:
                                             +---------+                         +----------+
+-------+               +-------------+  +=> | Queue 1 | <=======+=== requests = | Reduce 1 |
| Map 1 | = produces => | Partition 1 | =|   +---------+         |               +----------+
+-------+               +-------------+  +=> | Queue 2 | <==+    |
                                             +---------+    |    |
                                                            |    |
                                             +---------+    |    |
+-------+               +-------------+  +=> | Queue 1 | <==+====+
| Map 2 | = produces => | Partition 2 | =|   +---------+    |                    +----------+
+-------+               +-------------+  +=> | Queue 2 | <==+======== requests = | Reduce 2 |
                                             +---------+                         +----------+

If the partitioning is not known or a task has no known consumers, the partition will consist of a single queue (or the default degree of parallelism). When it is consumed, a re-partitioning task needs to be inserted.

  • The scheduling of consumers requires notifications to the central job manager, which can then deploy the consumers. For a single produced pipelined intermediate result, this results in number of produced partitions RPC calls to the job manager (see ConsumerNotificationProtocol). The consumer tasks can then be deployed with the necessary information to request the respective queues. This results in number of consumer sub tasks task deployment RPC calls. The first notification from a task manager results in the deployment of the receiver. It is possible that not all producer task locations are known at deployment time of a consumer task. At the moment, we follow the naive approach of sending task update RPC calls with information about the location of the producer for every partition. This currently results in a total of N + N * M RPC calls per produced intermediate result (where N is the number of produced partitions and M the number of consumer tasks).
  • For blocking results, this can be reduced to 2*N as we are guaranteed to know all producer locations when all partitions are finished. It should also be possible to further reduce this number to the number of task managers by collecting the notifications per task manager and then sending them to the job manager at once.

More robust shuffle

  • Shuffles were previously managed by a large per task manager component called the ChannelManager, which kept track of all channels. This component was huge and responsible for many different things (network buffers, channel lookup, buffer routing, TCP connections). This has been re-factored to more localized components, where resource acquisition and release can be handled in the same local component, which is in my opinion much easier to follow and understand.
  • In particular, we had issues with the release of resources like channel lookup tables or TCP connections, which have been addressed with this pull request as well. Especially the Netty-based network code was not reliable in the way it propagated errors to the tasks and released all resources. Addressing this was a requirement for adding reliable fault-tolerance mechanisms in the future.

Next steps

The goal is to merge the current state as soon as possible and then start to extend (I've removed some code, which was already in place). I could have done this way earlier and intend to not make that mistake again.

  1. I've only tested on a small custer (4 nodes) with a low degree of parallelism (32). I have to test further on a larger cluster with a higher degree of parallelism. I also didn't test iterative programs yet. After doing this, I will also post the numbers.
  2. I've already discovered a few problems in corner cases, which I need to fix before we review this.
  3. Some tests are failing and I've disabled a few others, which I will adjust and enable again.

I intend to address these points over the course of this week and work on the branch of this pull request.

After I've addressed the above points, I find it reasonable to work on reducing the number of redundant RPC calls and introducing the persistent-blocking-no back pressure intermediate result partition variant.

@rmetzger
Copy link
Contributor

rmetzger commented Dec 8, 2014

+1 for merging it asap.

@StephanEwen
Copy link
Contributor

Very nice description! We should add this to the Internals section in the docs.

decodedMsg = new ErrorResponse();
}
else {
// Exception: unknown message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really throw an exception here? ;-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes ;)

@StephanEwen
Copy link
Contributor

This will take a bit to double-check. All in all: nice code!

You serialize Strings in most places by writing their byte length and their bytes: out.writeInt(msg.getBytes().length); and out.writeBytes(msg.getBytes());. In most other places, we used either out.writeUtf8(), or preferable StringUtils.writeNullableString() or StingValue.writeString(). Also, the above code frequently goes through the charset encoding code paths multiple, which is a bit unnecessary.

The encoding logic in the NettyMessage does a few unnecessary or at least clumsy steps. The reason seems to be that every message must compute its exact size before actually putting its contents into the netty buffer. We can simplify that by allowing the write() call to allocate and return the target buffer by itself. That way, we can also implement code that does not really copy data into netty buffers, but only wraps existing NIO buffers.

}

public boolean isRecycled() {
return referenceCount.get() > 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a bug, it should be return referenceCount.get() <= 0;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah definitely, see above.

@StephanEwen
Copy link
Contributor

The IntermediateResultPartitionManager is not safe. To make it safe against concurrent shutdown calls, it needs to make the check for the shutdown flag and the HashMap manipulation atomic. Here, it means it needs a proper lock, not two individually atomic structures.

I think it does not hurt there, as it only happens once per partition at startup and once at cleanup time.

for (int i = 0; i < inGates.size(); i++) {
this.inputGates.get(i).initializeChannels(inGates.get(i));
catch (Throwable t) {
LOG.error(ExceptionUtils.stringifyException(t), t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the stringifyException call is unnecessary and even hindering proper logging. I would remove it, construct a proper error message for the log and pass the exception as the second parameter.

@StephanEwen
Copy link
Contributor

Tagging buffers in the memory segment as "data frames" vs "serialized events" seems dangerous. If someone uses the BufferWriter interface directly, they need to adhere to the contract of using the first byte as the tag.

import java.nio.ByteBuffer;

// TODO We have to rethink this. Currently this limits the event size to the page size
public class EventSerializer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The restriction to one buffer may be fine here, when the buffer is not part of the pool anyways and can be of arbitrary size. Does event deserialization of arbitrary size work on the receiver side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think this was simply not a good idea to do this way ;) See above. It's also related to your next note...

@StephanEwen
Copy link
Contributor

I would like to understand more about the BufferFutures. Is there ever a case where you need a buffer, but not immediately, so you can work with a future action and keep on going? If yes, then the futures may be a good idea. If not, they may be a bit of a complicated way to replace a blocking call.

maybeNotifyConsumers(partitionType.isPipelined());
}

public void addToAllPartitions(Buffer buffer) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this does not work for buffers that contain data frames. The buffers are queued without incrementing the reference counter, so they will be released as soon as the first queue discards them.

In general, I am skeptical if it is a good idea to have the same buffer referenced from various queues. If we start spilling one of the queues, we may not get any memory back, because references are held from other queues.

I think for the time being, broadcasts should really write disjoint results. As a next step, we should create a single queue that is requested by all consumers. That way, we need not duplicate data in memory, spilled, and ease the reasoning about how much we get back when spilling data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a bug...

@StephanEwen
Copy link
Contributor

It seems that we do not need multiple references to buffers in order to guarantee recycling only when the latest consumer is done. Why not simplify the logic in the buffer? (it seems a bit broken right now anyways)

else {
Throwable error = result.getError();
if (error != null) {
throw new IOException(error.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should properly chain the root exception, will probably be crucial for debugging errors.

@StephanEwen
Copy link
Contributor

The ConsumableOnceInMemoryOnlyPartitionQueue uses a bit of an unorthodox combination of atomics, a blocking queue, and sometimes a lock. I am still digging through the interactions with the netty channels, but I think in its current state this one is not quite safe to concurrent operations (by the task thread that adds / cancels / closes and the netty event loop). May be simpler, safer, and actually even more efficient to just use a monitor for synchronization and an ArrayDeque for the buffers.

@StephanEwen
Copy link
Contributor

Also, we could rename it to PipelinedPartitionQueue. The class name alone busts the 80 char line with limit...

@uce
Copy link
Contributor Author

uce commented Dec 9, 2014

Thanks for the feedback so far. :-) I replied inline where possible.

NettyMessage: that's a very good idea. I will also use the String serialization utilities you suggested.

IntermediateResultManager: I agree, but left it as it is as it doesn't hurt yet. I will change it.

BufferFuture: I initially thought that we might use them in a case like that. But not at the moment and therefore it indeed complicates things. I will revert that as well. ;)

Buffer references: we don't use it at the moment, but we will need that in any case for the "adaptive" spilling or multiple consumers. It would be OK to remove it for time being. It would be easy to add back.

Exceptions: I will remove the stringify calls and add proper messages. Maybe we should completely remove them from the other code as well?

PipelinedPartitionQueue: your proposed name is better. I have a simple synchronized version as well, which should work fine for the single consumer case.

Can you have a special look at the partition queue interface? We need some kind of asynchrony (subscribe-notify). Maybe you have another idea about how to expose this.

@uce
Copy link
Contributor Author

uce commented Dec 9, 2014

PS: regarding the queue iterator interface. The isConsumed() check is not enough. We need to add a check for discard/release if a partition is discarded/released while being consumed.

@uce uce force-pushed the flink-986-runtime_intermediate_results branch 2 times, most recently from 55e5d4b to e469296 Compare December 24, 2014 17:41
@uce
Copy link
Contributor Author

uce commented Dec 24, 2014

I've squashed all commits and rebased this on the current master.

@uce uce force-pushed the flink-986-runtime_intermediate_results branch from e469296 to ad250bc Compare January 5, 2015 09:34
@rmetzger
Copy link
Contributor

rmetzger commented Jan 8, 2015

Are there any open, known issues with this pull request?
I think we should try to merge it asap to avoid many too many rebases (like we had with the akka pull request)

@StephanEwen
Copy link
Contributor

I am trying this out right now. Hope to be able to +1 it soon...

@uce uce force-pushed the flink-986-runtime_intermediate_results branch from 93a8521 to 70b87d2 Compare January 9, 2015 13:43
@uce
Copy link
Contributor Author

uce commented Jan 9, 2015

Just rebased on the current master again. Locally, all tests pass. Let's see what Travis says.

@aalexandrov
Copy link
Contributor

I am excited!

@StephanEwen
Copy link
Contributor

Nice.

+1 to merge this now!

@rmetzger
Copy link
Contributor

rmetzger commented Jan 9, 2015

+1

@asfgit asfgit closed this in d908ca1 Jan 12, 2015
@aalexandrov
Copy link
Contributor

YES!!! 👏 👏 👏

@uce uce deleted the flink-986-runtime_intermediate_results branch February 23, 2015 09:18
jnh5y pushed a commit to jnh5y/flink that referenced this pull request Dec 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants