Skip to content

Conversation

@zhijiangW
Copy link
Contributor

What is the purpose of the change

Currently in exactly-once mode, the BarrierBuffer would block inputs with barriers until all inputs have received the barrier for a given checkpoint. To avoid back-pressuring the input streams which may cause distributed deadlocks, the BarrierBuffer has to spill the data in disk files to recycle the buffers for blocked channels.

Based on credit-based flow control, every channel has exclusive buffers, so it is no need to spill data for avoiding deadlock. Then we implement a new CheckpointBarrierHandler for only buffering the data for blocked channels for better performance.

And this new CheckpointBarrierHandler can also be configured to use or not in order to rollback the original mode for unexpected risks.

Brief change log

  • Implement the new CreditBasedBarrierBuffer and CreditBasedBufferBlocker for buffering data in blocked channels in exactly-once mode.
  • Define the parameter taskmanager.exactly-once.blocking.data.enabled for enabling the new handler or not.

Verifying this change

This change added tests and can be verified as follows:

  • Added tests for the logic of CreditBasedBarrierBuffer
  • Added tests for the logic of CreditBasedBufferBlocker

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (yes)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@zhijiangW zhijiangW force-pushed the FLINK-8547 branch 5 times, most recently from 86559e9 to 25396e6 Compare February 5, 2018 10:09
Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes :)

Maybe could you deduplicate the code for BarrierBuffer and CreditBasedBarrierBuffer (modulo the changes that I requested in a comment)?

At least could you deduplicate the tests for them? They are identical and it will cost us in additional maintenance to keep both of them (and it can take long time to completely get rid one of them, especially that we might need to maintain 1.5 branch after 1.6 release and who know when we will drop BarrierBuffer).

Also pleas mark BarrierBuffer as deprecated.

@Deprecated
public static final ConfigOption<Boolean> EXACTLY_ONCE_BLOCKING_DATA_ENABLED =
key("taskmanager.exactly-once.blocking.data.enabled")
.defaultValue(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would like to enable it by default and leave this config option just as a safety net in case of bugs/problems.

btw, shouldn't this be tightly coupled with a credit based flow switch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the default value should be true, but I think it should be changed after the FLINK-7456 is merged to make the credit-based work.


package org.apache.flink.streaming.runtime.io;

import org.apache.flink.annotation.Internal;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: There were some checkstyle failures

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the checkstyle failures are fixed

* all inputs have received the barrier for a given checkpoint.
*
* <p>The BarrierBuffer continues receiving buffers from the blocked channels and buffered them
* internally until the blocks are released. It will not cause deadlocks based on credit-based
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please explain a little bit more It will not cause deadlocks based on credit-based flow control part in the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

* The pending blocked buffer/event sequences. Must be consumed before requesting further data
* from the input gate.
*/
private final ArrayDeque<BufferOrEventSequence> queuedBuffered;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this queuedBuffered and currentBuffered fields with CreditBasedBufferBlocker? Why can not we just use ArrayDeque<BufferOrEvent> currentBuffers field from CreditBasedBufferBlocker for this? Why do we need this triple level buffering here? In original code it made sense, since instead of CreditBasedBufferBlocker there was a BufferSpiller.

Getting rid of those three fields would vastly simplify this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation keeps the same logic with BarrierBuffer. I am wondering whether it can make sense if only keeping one ArrayDeque<BufferOrEvent> for holding all blocking buffers for different checkpoint ids. Especially for the uncommon case mentioned on line 496 in BarrierBuffer. I will double check that logic and reply to you later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can not directly mix all the blocked buffers for different checkpoint ids into one ArrayDeque. It also needs the BufferOrEventSequence which indicates the blocked buffers for a specific checkpoint id, otherwise we can not know when the blocked buffers are exhausted after reset a specific checkpoint id.

If we want to use only one ArrayDeque for blocking all buffers, we may need to insert extra hints of checkpoint id into this queue for helping when to stop reading blocked buffers from the queue.

For example:
channel1: [cp1,cp2,b1,cp3,b2,b3]
channel2: [cp2]

  1. When reading cp1 first from channel1, [cp2,b1,cp3,b2,b3] are blocked as separate sequence1.
  2. When reading cp2 from channel2, the cp1 is released and begins to read sequence1.
  3. When reading cp2 from seq1, the following buffers will be blocked in new seq2.
  4. When reading cp3 from seq1,the cp2 is released and the seq2 only contains [b1].
  5. The following buffers after cp3 will be blocked in new seq3 which contains[b2,b3].

So every sequence indicates the blocked buffers belonging to different checkpoint id, and they will be read first after this checkpoint id is released.

throw new IllegalConfigurationException(
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
+ " must be positive or -1 (infinite)");
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please extract this and the same code from StreamTwoInputProcessor.java into a common method. I think all of the lines upto this.lock = checkNotNull(lock); could be unified. Maybe into some base class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i will consider a proper way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can change the current CheckpointBarrierHandler interface into abstract class and then add a createBarrierHanlder method for extracting the common parts in StreamInputProcessor and StreamTwoInputProcessor. Or we define a new class for the common method. I prefer the first way.
What do you think?

/**
* Tests for {@link CreditBasedBufferBlocker}.
*/
public class BufferBlockerTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename class to CreditBasedBufferBlockerTest

final int maxNumChannels = 1656;

// do multiple blocking / rolling over rounds
for (int round = 0; round < 5; round++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you deduplicate the code of those two unit tests (testSpillAndRollOverSimple and testSpillWhileReading)? It seems like this one is just a one sequence of the next one?

@zhijiangW
Copy link
Contributor Author

@pnowojski , thanks for reviews!

I understand your concerns and I should deduplicate some common utils in these tests. I will do that tomorrow together with other comments!

@zhijiangW zhijiangW force-pushed the FLINK-8547 branch 3 times, most recently from a78dfc9 to 25d6eb1 Compare February 7, 2018 09:08
@zhijiangW
Copy link
Contributor Author

@pnowojski , I have submitted a separate commit to address above comments.

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides deduplicating tests, please deduplicate the BarrierBuffer and CreditBasedBarrierBuffer.
They are also almost 1 to 1 identical classes. The only difference here is bufferBlocker field, where BarrierBuffer uses BufferSpiller and CreditBasedBarrierBuffer uses CreditBasedBufferBlocker. However this also can be easily fixed by extracting a common interface of CreditBasedBufferBlocker and BufferSpiller.

So please:

  1. deduplicate tests as I suggested in a comment
  2. extract common interface from CreditBasedBufferBlocker and BufferSpiller to lets say BufferBlocker class.
  3. Completely remove current CreditBasedBarrierBuffer class (but keep CreditBasedBufferBlocker!) and change BarrierBuffer class to use interface BufferBlocker.
  4. Replace BarrierBuffer constructors with the following ones:
public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker) throws IOException {
	this (inputGate, bufferBlocker, -1);
}

public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, long maxBufferedBytes) hrows IOException {
	checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);

	this.inputGate = inputGate;
	this.maxBufferedBytes = maxBufferedBytes;
	this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
	this.blockedChannels = new boolean[this.totalNumberOfInputChannels];

	this.bufferBlocker = checkNotNull(bufferBlocker);
	this.queuedBuffered = new ArrayDeque<BufferOrEventSequence>();
}

In that case, depending on how you want to block input channels, you can inject either BufferSpiller to the BarrierBuffer (old way) or inject CreditBasedBufferBlocker in case of a new non spilling code.

  1. Inject appropriate BufferBlocker implementations in InputProcessorUtil#createCheckpointBarrierHandler:
			if (taskManagerConfig.getBoolean(TaskManagerOptions.EXACTLY_ONCE_BLOCKING_DATA_ENABLED)) {
				barrierHandler = new BarrierBuffer(inputGate, new CreditBasedBufferBlocker(), maxAlign);
			} else {
				barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager), maxAlign);
			}

* Utility class containing common methods for testing
* {@link BufferSpillerTest} and {@link CreditBasedBufferBlockerTest}.
*/
public class BarrierBufferTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not exactly what I had in mind by deduplication of BarrierBufferTest and CreditBasedBarrierBufferTest. Both of those tests are still pretty much copy of one another and those static methods are only a fraction of duplication.

Look for example at the testSingleChannelNoBarriers() they are 99% identical. All of it's code could be moved to BarrierBufferTestBase. BarrierBufferTestBase would only need to define abstract method CheckpointBarrierHandler createBarrierHandler() which would be define differently in BarrierBufferTest and CreditBasedBarrierBufferTest. One minor thing is that BarrierBufferTest would need checkNoTempFilesRemain() added as an @After test hook. Same applies to all of the other tests.

checkpointId, System.currentTimeMillis(), CheckpointOptions.forCheckpointWithDefaultLocation()), channel);
}

public static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using static methods please use inheritance - make BarrierBufferTest and CreditBasedBarrierBufferTest extend BarrierBufferTestBase. Especially that name *Base already suggests that.

@zhijiangW
Copy link
Contributor Author

@pnowojski , thanks for suggestions and I totally agree with that.
That abstraction indeed makes the code simple. I will update the codes ASAP.

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again for the contribution. This one looks almost good to me. Left only couple of NITs.


/**
* Tests for the behavior of the {@link BarrierBuffer}.
* Tests for the behavior of the {@link BarrierBuffer} with {@link BufferSpiller}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Missing period in java doc (build failure).

this.numChannels = numChannels;
}
}
} No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: build failure, missing EOL

* @param newBuffer only works for {@link BufferSpiller} implements currently.
* @return The readable sequence of buffers and events, or 'null', if nothing was added.
*/
BufferOrEventSequence rollOver(boolean newBuffer) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we stick with two methods in the interface? I think more descriptive names will be better compared to parameter here: rollOverWithoutReusingResources() and rollOverReusingResources(), where: rollOverWithoutReusingResources == rollOver(true).

Especially if one implementation doesn't support one of those calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

* Starts a new sequence of buffers and event and returns the current sequence of buffers for reading.
* This method returns {@code null}, if nothing was added since the creation, or the last call to this method.
*
* @param newBuffer only works for {@link BufferSpiller} implements currently.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java doc in this interface shouldn't mention implementation specific details. On the other hand, this java doc doesn't explain what newBuffer is doing and for this information one must check the BufferSpiller's java doc itself.

Can you add appropriate java doc here, or better add java doc to proposed in the comment below two methods: rollOverWithoutReusingResources() and rollOverReusingResources(). Comment in CachedBufferBlocker.java#rollOverReusingResources should state that it is never reusing resources and is defaulting to CachedBufferBlocker.java#rollOverWithoutReusingResources

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

private static final int PAGE_SIZE = 512;

private static int sizeCounter = 0;
public class BarrierBufferTest extends BarrierBufferTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename the test class name to SpillingBarrierBufferTest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense

@zhijiangW
Copy link
Contributor Author

@pnowojski , I have submitted the updates for above comments.

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one LGTM.

There is one catch - it will conflict with #5423 . Let's wait for credit based PR to be merged, then low latency improvements, then I can rebase this one on top of my changes.

I assume that after rebasing this on top of the credit-based changes, the default value for EXACTLY_ONCE_BLOCKING_DATA_ENABLED should be changed to true, right @zhijiangW ?

@zhijiangW
Copy link
Contributor Author

Thanks for rebasing the conflicts.

Yes, the default value can be changed to true after the credit-based is totally merged. If need any changes on my side after all, pls let me know. :)

@zhijiangW
Copy link
Contributor Author

@pnowojski , I have changed the EXACTLY_ONCE_BLOCKING_DATA_ENABLED as true and squashed the commits.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants