New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment #8416
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
@flinkbot attention @zhijiangW |
6f664a3
to
ca96b3f
Compare
@@ -108,6 +112,7 @@ private NetworkEnvironment( | |||
this.taskEventPublisher = taskEventPublisher; | |||
this.isShutdown = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put isShutdown=false
at the end of this constructor?
@@ -128,13 +133,17 @@ public static NetworkEnvironment create( | |||
ResultPartitionFactory resultPartitionFactory = | |||
new ResultPartitionFactory(resultPartitionManager, checkNotNull(ioManager)); | |||
|
|||
SingleInputGateFactory singleInputGateFactory = | |||
new SingleInputGateFactory(config, connectionManager, resultPartitionManager); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TaskEventDispatcher
could also be included in singleInputGateFactory
?
InputChannelBuilder.newBuilder() | ||
.setupFromNetworkEnvironment(network) | ||
.buildUnknown(inputGate) | ||
.toRemoteInputChannel(InputChannelBuilder.STUB_CONNECTION_ID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not build remote directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to preserve the previous way of building remote but looking at it more, true, it does not seem to be relevant.
.buildUnknown(inputGate) | ||
.toRemoteInputChannel(InputChannelBuilder.STUB_CONNECTION_ID); | ||
IntermediateResultPartitionID resultPartitionId = remoteFromUnknown.getPartitionId().getPartitionId(); | ||
inputGate.setInputChannel(resultPartitionId, remoteFromUnknown); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setInputChannel
could be removed?
|
||
network.setupInputGate(inputGate); | ||
|
||
NetworkBufferPool bufferPool = network.getNetworkBufferPool(); | ||
if (enableCreditBasedFlowControl) { | ||
RemoteInputChannel remote = (RemoteInputChannel) inputGate.getInputChannels() | ||
.get(resultPartitionId.getPartitionId()); | ||
RemoteInputChannel remote = (RemoteInputChannel) inputGate.getInputChannels().get(resultPartitionId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the previous remoteFromUnknown
directly, so IntermediateResultPartitionID resultPartitionId = remoteFromUnknown.getPartitionId().getPartitionId()
could also be removed.
/** Requests default number of memory segments. */ | ||
Collection<MemorySegment> requestMemorySegments() throws IOException; | ||
|
||
Collection<MemorySegment> requestMemorySegments(int numRequiredBuffers) throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we only consider the semantics of MemorySegmentProvider
interface, the requestMemorySegments()
is a bit redundant, which could be replaced by requestMemorySegments(int numRequiredBuffers)
. The only reason we keep both methods is that it seems not elegant to maintain networkBuffersPerChannel
in SingleInputGate
.
TBH, I do not think it is worth defining this new interface not very clean to accommodate the implementation. Because requestMemorySegments()
seems blurry for the usage which relies on the default value, and we should also consider if the default value is 0, whether the returned collection is null or empty size.
In addition, requestMemorySegments(int numRequiredBuffers)
is only used for tests ATM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think MemorySegmentProvider
makes sense from point of interface segregation principle. This interface encapsulates segment management concern which is the only thing that remote channel needs atm. NetworkBufferPool
looks like implementing 2 separate concerns: BufferPoolFactory
and MemorySegmentProvider
.
As for requestMemorySegments(int numRequiredBuffers)
, you are right, it is more clear to remove it from the interface and adjust the tests with explicit default value, e.g. 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are indeed in the lack of semantics of requesting/releasing batch of buffers before, and I think it is reasonable to extend these methods.
-
SegmentProvider
is not really needed becauseRemoteInputChannel
still needs construct theBuffer
internally after requestingMemorySegment
. -
We could extend the general
BufferProvider
interface to provide method ofrequestBuffers(int numRequiredBuffers, BufferRecycler)
, andRemoteInputChannel
already implementsBufferRecycler
interface. -
We could also extend the general
BufferRecycler
interface to provide method ofrecycle(Collection<MemorySegment>)
.
To do so we make use of the existing semantics and architecture. But the only concern is we would affect the previous implementations to bring more changes.
@@ -155,8 +155,8 @@ | |||
*/ | |||
private BufferPool bufferPool; | |||
|
|||
/** Global network buffer pool to request and recycle exclusive buffers (only for credit-based). */ | |||
private NetworkBufferPool networkBufferPool; | |||
/** Global global memory segment provider to request and recycle exclusive buffers (only for credit-based). */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
repeated global
@@ -535,7 +510,7 @@ public void requestPartitions() throws IOException, InterruptedException { | |||
throws IOException, InterruptedException { | |||
while (true) { | |||
synchronized (inputChannelsWithData) { | |||
while (inputChannelsWithData.size() == 0) { | |||
while (inputChannelsWithData.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why make this change?
metrics | ||
); | ||
metrics, | ||
memorySegmentProvider); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether to separate line for last );
which keeps the consistent with local
and unknown
cases?
static RemoteInputChannel createRemoteInputChannel( | ||
SingleInputGate inputGate, MemorySegmentProvider memorySegmentProvider) throws Exception { | ||
|
||
return createRemoteInputChannel(inputGate, mock(PartitionRequestClient.class), 0, 0, memorySegmentProvider); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to avoid mock PartitionRequestClient
here.
@@ -237,7 +254,8 @@ static RemoteInputChannel createRemoteInputChannel( | |||
SingleInputGate inputGate, | |||
PartitionRequestClient client, | |||
int initialBackoff, | |||
int maxBackoff) throws Exception { | |||
int maxBackoff, | |||
MemorySegmentProvider memorySegmentProvider) throws Exception { | |||
final ConnectionManager connectionManager = mock(ConnectionManager.class); | |||
when(connectionManager.createPartitionRequestClient(any(ConnectionID.class))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are already too many utils here to construct the RemoteInputChannel
, but we still miss the units of {SingleInputGate, PartitionRequestClient, MemorySegmentProvider}
which could avoid pass createRemoteInputChannel(inputGate, client, 0, 0, networkBufferPool)
in CreditBasedPartitionRequestClientHandlerTest
.
Maybe we need a separate hotfix for refactoring these parts to make connectionManager.createPartitionRequestClient
call in separate tests if necessary, then we only need one type to construct RemoteInputChannel
.
@@ -46,6 +48,7 @@ | |||
private int maxBackoff = 0; | |||
private InputChannelMetrics metrics = | |||
new InputChannelMetrics(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we could use InputChannelTestUtils.newUnregisteredInputChannelMetrics
@@ -1010,8 +1032,9 @@ private RemoteInputChannel createRemoteInputChannel( | |||
|
|||
return InputChannelBuilder.newBuilder() | |||
.setConnectionManager(connectionManager) | |||
.setInitialBackoff(initialBackoff) | |||
.setInitialBackoff(initialBackOff) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need change?
private RemoteInputChannel createRemoteInputChannel( | ||
SingleInputGate inputGate, | ||
PartitionRequestClient partitionRequestClient, | ||
int initialBackOff, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lowercase off
to keep consistent with maxBackoff?
throws IOException, InterruptedException { | ||
|
||
return createRemoteInputChannel( | ||
inputGate, mock(PartitionRequestClient.class), 0, 0, memorySegmentProvider); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better avoid mock PartitionRequestClient
.
The same issue for creating RemoteInputChannel
in various units, if we do not need unify the createPartitionRequestClient
with the constructing of RemoteInputChannel
, all these createRemoteInputChannel
could be covered by one InputChannelBuilder
.
checkArgument(bufferPool.getNumberOfRequiredMemorySegments() >= getNumberOfSubpartitions(), | ||
"Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for this result partition."); | ||
"Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for this result partition."); | ||
|
||
checkState(this.bufferPool == null, "Bug in result partition setup logic: Already registered buffer pool."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check should be put before resultPartitionBufferPoolFactory.create
@@ -836,6 +833,21 @@ else if (transitionState(current, ExecutionState.FAILED, t)) { | |||
} | |||
} | |||
|
|||
private void setupPartionsAndGates() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not remove this method to use the following static directly?
@@ -262,7 +268,29 @@ public void testExecutionFailsInBlobsMissing() throws Exception { | |||
} | |||
|
|||
@Test | |||
public void testExecutionFailsInNetworkRegistration() throws Exception { | |||
public void testExecutionFailsInNetworkRegistrationFprPartitions() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fpr->For
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the work for these refactoring @azagrebin .
For the commits arrangement I have two concerns:
-
The hotfix of
Introduce NetworkEnvironment.create factory
should be put firstly because the other ones are relying on it. And the motivation for introducing static creation is not for tests purpose. I think it avoids the complex logics of constructing the factories in the constructor which could initialize the vars directly in easy way. -
The hotfix of
Introduce ResultPartitionBufferPoolFactory
andIntroduce InputGateBufferPoolFactory
could be squashed with the last twosetup
commits, because they are only the temporary states, not final ones.
In addition, I have some other concerns for the motivation of introducing related factories:
-
PartitionBufferPoolFactory
: I think it is no need to define this new factory for creatingBufferPool
, because we already haveBufferPoolFactory
for doing this, and theNetworkBufferPool
implementation is just used for creatingBufferPool
for partition/gate. We might extend theBufferPoolFactory#createBufferPool
to achieve the same motivation and makeSingleInputGate/ResultPartition
only sees theBufferPoolFactory
instead ofNetworkBufferPool
. The new introducedPartitionBufferPoolFactory
would make the relationship more complex and difficult to distinguish the semantic scope. In other words, I do not think twoShuffleService
implementations would create the sameSingleInputGate/ResultPartition
in future. So even though theSingleInputGate
sees theNetworkBufferPool
which belongs toNetworkEnvironment
ATM, it still might make sense, or we could break this tie if necessary in future. -
ResultPartitionFactory
: It seems reasonable to add the factory layer betweenShuffleService
andResultPartitionWriter
as one option. The current factory is just wrapping some components inNetworkEnvironment
, but these components are still needed to be maintained separately inNetworkEnvironment
for other usages. So I am not sure the specific advantages from the architecture view to do so except for the form view.
|
||
import java.io.IOException; | ||
|
||
/** factory of {@link BufferPool} for network partitions or channels. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment annotation /** */
is used for variable, and the class should be like following
/**
*
*/
I am not sure of it.
@@ -215,10 +215,9 @@ private void testReleaseMemory(final ResultPartitionType resultPartitionType) th | |||
final int numAllBuffers = 10; | |||
final NetworkEnvironment network = new NetworkEnvironmentBuilder() | |||
.setNumNetworkBuffers(numAllBuffers).build(); | |||
final ResultPartitionConsumableNotifier notifier = new NoOpResultPartitionConsumableNotifier(); | |||
final ResultPartition resultPartition = createPartition(notifier, resultPartitionType, false); | |||
final ResultPartition resultPartition = createPartition(network, resultPartitionType, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the default value 1 is not needed, just because we could not give all the possible units of parameters in PartitionTestUtils
. So it might be better to use ResultPartitionBuilder
directly to delete intermediate PartitionTestUtils
as I mentioned above.
config.isCreditBased(), | ||
config.networkBuffersPerChannel(), | ||
config.floatingNetworkBuffersPerGate(), | ||
numberOfChannels, partitionType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
separate line for numberOfChannels
and partitionType
?
*/ | ||
public class ResultPartitionBuilder { | ||
|
||
private JobID jobId = new JobID(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be final for these vars.
Thanks for the updates @azagrebin ! It seems much better now. I have other two concerns:
|
…n of SingleInputGate in tests
…n of ResultPartition in tests
…f InputChannels in tests
Thanks for the reviews @zentol @zhijiangW , I addressed the comments |
@@ -115,7 +115,11 @@ public static NetworkEnvironment create( | |||
TaskEventPublisher taskEventPublisher, | |||
MetricGroup metricGroup, | |||
IOManager ioManager) { | |||
NettyConfig nettyConfig = checkNotNull(config).nettyConfig(); | |||
checkNotNull(ioManager); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the check at L 136 is now reedundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true, done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change. I have left couple of smaller comments. Mostly it LGTM.
I do not see a problems with removing this one extra lock
, we should be fine without it.
ResultPartitionFactory resultPartitionFactory, | ||
SingleInputGateFactory singleInputGateFactory) { | ||
|
||
NetworkEnvironmentConfiguration config, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you have squashed/amended/fixed ups wrong commits here. Why is this commit changing formatting?
@@ -191,7 +198,7 @@ public void registerTask(Task task) throws IOException { | |||
} | |||
|
|||
for (final ResultPartition partition : producedPartitions) { | |||
setupPartition(partition); | |||
partition.setup(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain more in the commit message of [FLINK-12331][network] Refactor NetworkEnvironment#setupPartition() to ResultPartitionWriter#setup()
what is this commit supposed to achieve?
} | ||
|
||
@VisibleForTesting | ||
public void setupInputGate(SingleInputGate gate) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto, about commit message: please explain more why & what are you trying to achieve.
@@ -327,7 +327,8 @@ public FairnessVerifyingInputGate( | |||
boolean isCreditBased) { | |||
|
|||
super(owningTaskName, jobId, consumedResultId, ResultPartitionType.PIPELINED, | |||
consumedSubpartitionIndex, numberOfInputChannels, taskActions, new SimpleCounter(), isCreditBased); | |||
consumedSubpartitionIndex, numberOfInputChannels, taskActions, new SimpleCounter(), | |||
isCreditBased, () -> null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null
?
either:
- if you are not expecting this supplier function to be ever called,
throw new UnsupportedOperationException()
. - if
null
is supposed to be supported, change it toOptional
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer throw new UnsupportedOperationException()
. There is no need for Optional
in production code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case throw new UnsupportedOperationException
is of course better :)
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import javax.annotation.Nonnull; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think you could drop Nonnull
. We should follow an implicit assumption that everything is Nonnull
unless marked differently.
|
||
private int floatingNetworkBuffersPerGate = 1; | ||
|
||
private FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional
instead of Nullable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it related to the previous comment about UnsupportedOperationException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think not. This builder is later performing != null
check on this field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I see, done
Thanks for the review @pnowojski, I've addressed comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. @azagrebin LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % address comments
commit should be probably squashed with some previous commit(s)
…nel to assign exclusive segments
…RemoteInputChannel
…o ResultPartitionWriter#setup() Move partition setup from NetworkEnvironment to ResultPartition. This eliminates tie between Task and NetworkEnvironment. Task does not need depend on NetworkEnvironment and can trigger setup from ResultPartitionWriter interface.
…o InputGate#setup() Move input gate setup from NetworkEnvironment to InputGate. This eliminates tie between Task and NetworkEnvironment. Task does not need depend on NetworkEnvironment and can trigger setup from InputGate directly.
What is the purpose of the change
Introduce partition/gate setup to decouple task registration with NetworkEnvironment.
The PR also does some preparation refactoring.
Brief change log
Verifying this change
existing unit tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation