Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-12530][network] Move Task.inputGatesById to NetworkEnvironment #8463

Merged
merged 1 commit into from
May 30, 2019

Conversation

azagrebin
Copy link
Contributor

@azagrebin azagrebin commented May 16, 2019

What is the purpose of the change

Task.inputGatesById indexes SingleInputGates by id. The end user of this indexing is NetworkEnvironment for two cases:

  • SingleInputGate triggers producer partition readiness check and
    then the successful result of check is dispatched back to this SingleInputGate by id.
    We can just return a future from TaskActions.triggerPartitionProducerStateCheck.
    SingleInputGate could use the future to react with re-triggering of the
    partition request if the producer is ready. Then inputGatesById is not needed for dispatching.

  • TaskExecutor.updatePartitions uses inputGatesById to dispatch PartitionInfo update to the right SingleInputGate. If inputGatesById is moved to NetworkEnvironment, which should be a better place for gate management, and NetworkEnvironment.updatePartitionInfo is added then
    TaskExecutor.updatePartitions could directly call NetworkEnvironment.updatePartitionInfo.

Additional refactoring:

  • TaskActions.triggerPartitionProducerStateCheck is
    separated into another interface PartitionProducerStateProvider.
    TaskActions is too broad interface used also for other purposes.
    Shuffle API needs only PartitionProducerStateProvider .

  • PartitionProducerStateProvider returns future with the ResponseHandle which contains the producer state and accepts callbacks to cancel or fail consumption as a result of state check.

  • Task.triggerPartitionProducerStateCheck is also refactored into a RemoteChannelStateChecker which becomes internal detail of NetworkEnvironment. RemoteChannelStateChecker accepts ResponseHandle, checks whether producer is ready for consumption or aborts consumption using ResponseHandle.cancelConsumption or ResponseHandle.failConsumption.

Brief change log

  • Change TaskActions.triggerPartitionProducerStateCheck to react on future instead of inputGatesById
  • add NetworkEnviroment.updatePartitionInfo
  • use NetworkEnviroment.updatePartitionInfo in TaskExecutor.updatePartitions instead of Task.inputGatesById
  • move Move Task.inputGatesById to NetworkEnvironment
  • add SingleInputGate.close future and add a callback for it to remove gate from NetworkEnvironment.inputGatesById
  • Move TaskActions.triggerPartitionProducerStateCheck to a
    separate interface PartitionProducerChecker
  • Refactor Task.triggerPartitionProducerStateCheck into RemoteChannelStateChecker

Verifying this change

the change is a refactoring

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented May 16, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit ac96671 (Wed Aug 07 15:49:06 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❗ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@azagrebin
Copy link
Contributor Author

@flinkbot attention @zhijiangW

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening this PR @azagrebin. I had some comment concerning the callbacks. Maybe we can do the same with a termination future and a future which is returned when calling triggerPartitionProducerStateCheck.

@@ -110,6 +117,8 @@ public NetworkEnvironment(

this.resultPartitionManager = new ResultPartitionManager();

this.inputGatesById = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which threads do access this structure concurrently?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Canceler thread might close SingleInputGate while rpc thread is creating SingleInputGate.

@@ -150,6 +159,11 @@ public NetworkEnvironmentConfiguration getConfiguration() {
return config;
}

@VisibleForTesting
public Map<IntermediateDataSetID, SingleInputGate> getInputGatesById() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to expose the map or would it be enough to have getInputGate(IntermediateDataSetID)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should actually inject it as dependency to check in tests

* @throws PartitionException the input gate with the id from the partitionInfo is not found
*/
public void updatePartitionInfo(PartitionInfo partitionInfo)
throws IOException, InterruptedException, PartitionException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I'd prefer to put throws on the same line as ). The problem is that it now looks as if the throws line belongs to the body. Just personal taste, though.

@@ -634,7 +633,8 @@ void notifyChannelNonEmpty(InputChannel channel) {
}

void triggerPartitionStateCheck(ResultPartitionID partitionId) {
taskActions.triggerPartitionProducerStateCheck(jobId, consumedResultId, partitionId);
taskActions.triggerPartitionProducerStateCheck(consumedResultId, partitionId,
() -> retriggerPartitionRequest(partitionId.getPartitionId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line breaking

SingleInputGate[] inputGates = network.createInputGates("",
new NoOpTaskActions(), Collections.singletonList(igdd),
new UnregisteredMetricsGroup(), new UnregisteredMetricsGroup(), new UnregisteredMetricsGroup(),
new SimpleCounter());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line breaks

assertTrue(network.getInputGatesById().containsKey(id));
inputGates[0].close();
assertEquals(0, network.getInputGatesById().size());
assertFalse(network.getInputGatesById().containsKey(id));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two last assertions seem to be redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rewrite a bit the test to make it more generic

IntermediateDataSetID intermediateDataSetId,
ResultPartitionID resultPartitionId);
ResultPartitionID resultPartitionId,
ThrowingRunnable<Exception> producerReadyCallback);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding this callback, we could also let this method return a CompletableFuture<Void> which if completed indicates to retrigger the partition request. That way we would not have to pass in the callback which is forwarded to some other place.

IntermediateDataSetID consumedResultId,
final ResultPartitionType consumedPartitionType,
int consumedSubpartitionIndex,
int numberOfInputChannels,
TaskActions taskActions,
Counter numBytesIn,
boolean isCreditBased) {
boolean isCreditBased,
Runnable closeListener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the close listener we could maybe apply a similar trick as with the partition request retriggering. We could for example add a termination future to the SingleInputGate which is completed once the gate gets closed. On this future we can register the removal from the inputGatesById map. WDYT?

IntermediateDataSetID intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
SingleInputGate inputGate = inputGatesById.get(intermediateResultPartitionID);
if (inputGate == null) {
throw new PartitionException("No reader with ID " + intermediateResultPartitionID + " was found.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this maybe be an IllegalStateException because this should actually not happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how it was reported before.. but I agree IllegalStateException fits better

log.error("Failed canceling task with execution ID {} after task update failure.", executionAttemptID, re);
}
task.failExternally(e);
} catch (RuntimeException re) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to remove this catch block and instead do the following:

FutureUtils.assertNoException(
  CompletableFuture.runAsync(() -> ..., getRpcService()));

() -> {
try {
networkEnvironment.updatePartitionInfo(partitionInfo);
} catch (IOException | PartitionException | InterruptedException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We changed some previous behavior here but seems not covered by related tests before.
The PartitionException would trigger cancel task before, so the task final state might be canceled. But now we fail the task directly, so the task final state might be failed. Does it need to supplement a unit test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think before it would send PartitionException back to the JobMaster and which would then fail the Execution. This should have the same effect. Verifying whether this is guarded by a test makes sense, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree with the same effect, only concern the unit test. If we ever had the unit test for verifying the task is in CANCELED state after PartitionException, then this change should make the previous test failure because the task would be in FAILED state after change.

Copy link
Contributor Author

@azagrebin azagrebin May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: never mind, wrong comment answered :)
True, TaskExecutorSubmissionTest.testUpdateTaskInputPartitionsFailure needs to be adjusted.

@@ -542,6 +543,29 @@ public void testUpdateUnknownInputChannel() throws Exception {
}
}

@Test
public void checkInputGateRemoveInNetworkEnvironment() throws IOException {
NetworkEnvironment network = createNetworkEnvironment();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need network.shutdown() in finally part?

IntermediateDataSetID id = new IntermediateDataSetID();
InputGateDeploymentDescriptor igdd = new InputGateDeploymentDescriptor(id,
ResultPartitionType.PIPELINED, 0, channelDescs);
SingleInputGate[] inputGates = network.createInputGates("",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the array case, it seems better to make size more than 1, such as 2? I am not very sure.

@@ -542,6 +543,29 @@ public void testUpdateUnknownInputChannel() throws Exception {
}
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, do you think we should also cover the current case for closing SingleInputGate and creatingInputGates in different threads?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can rely on ConcurrentHashMap provided in constructor (I changed the approach a bit), then there is no need to test ConcurrentHashMap.

Copy link
Contributor

@zhijiangW zhijiangW left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening this PR @azagrebin !

The changes look almost good to me, only left some inline comments.

@azagrebin
Copy link
Contributor Author

azagrebin commented May 21, 2019

Thanks for the review @tillrohrmann @zhijiangW !
I have addressed the comments.
I have also rebased it on #8416 because it is about to be merged.

@azagrebin
Copy link
Contributor Author

I have also pushed one more hotfix to separate PartitionProducerChecker interface from TaskActions because it is the only thing which InputGate requires for shuffle service API.

@@ -119,11 +118,21 @@ private NetworkEnvironment(
this.isShutdown = false;
}

public static NetworkEnvironment create(
NetworkEnvironmentConfiguration config,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep the same indentation as the below create method?

inputGates[0].close();
assertEquals(0, network.getInputGatesById().size());
assertFalse(network.getInputGatesById().containsKey(id));
public void checkInputGateRemoveInNetworkEnvironment() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkInputGateRemoveInNetworkEnvironment -> testInputGateRemoveInNetworkEnvironment ?

@@ -86,37 +89,90 @@

private final ResultPartitionManager resultPartitionManager;

private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is more clearly to define ConcurrentHashMap instead of Map here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep Map because NetworkEnviroment does not depend on ConcurrentHashMap methods at the moment.

@zhijiangW
Copy link
Contributor

Thanks for the updates @azagrebin!

I left several minor comments, only one concern of new proposed PartitionProducerChecker which seems to provide the similar function as current PartitionProducerStateChecker. The only difference is the parameter JobID in PartitionProducerStateChecker#requestPartitionProducerState. And the JobID is not used in current process, so we might remove this parameter and then no need to introduce the new interface PartitionProducerChecker. WDYT?

@azagrebin azagrebin force-pushed the FLINK-12530 branch 3 times, most recently from fb57607 to 9ccba19 Compare May 22, 2019 12:22
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this PR @azagrebin. I had some more comments.

@@ -441,6 +444,7 @@ public void close() throws IOException {
finally {
isReleased = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could remove this field because we have now the closeFuture

() -> {
try {
networkEnvironment.updatePartitionInfo(partitionInfo);
} catch (Throwable t) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not catch Throwable here. If an unchecked exception occurs, it should simply bubble up and cause the component to fail.

() -> task.failExternally(t),
getRpcService().getExecutor()));
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to exchange this block with:

FutureUtils.assertNoException(
	CompletableFuture.runAsync(
		() -> {
			try {
				networkEnvironment.updatePartitionInfo(partitionInfo);
			} catch (IOException | InterruptedException e) {
				log.error("Could not update input data location for task {}. Trying to fail task.", task.getTaskInfo().getTaskName(), e);
				task.failExternally(e);
			}
		},
		getRpcService().getExecutor()));

@@ -1102,11 +1093,13 @@ public void triggerPartitionProducerStateCheck(
} else {
failExternally(throwable);
}
} catch (IOException | InterruptedException e) {
failExternally(e);
} catch (Throwable t) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not catch Throwable because it could be a legitimate Flink problem which should make us to fail the process.

}
},
executor);

return producerReadyFuture;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the following alternative:

final CompletableFuture<Boolean> producerReadyFuture = new CompletableFuture<>();

FutureUtils.assertNoException(
	futurePartitionState.whenCompleteAsync(
		(ExecutionState executionState, Throwable throwable) -> {
			if (executionState != null || throwable instanceof TimeoutException) {
				final boolean producerReady = onPartitionStateUpdate(
					resultPartitionId,
					executionState != null ? executionState : ExecutionState.RUNNING);

				producerReadyFuture.complete(producerReady);
			} else {
				if (throwable instanceof PartitionProducerDisposedException) {
					String msg = String.format("Producer %s of partition %s disposed. Cancelling execution.",
						resultPartitionId.getProducerId(), resultPartitionId.getPartitionId());
					LOG.info(msg, throwable);
					cancelExecution();
				} else {
					failExternally(throwable);
				}

				producerReadyFuture.complete(false);
			}
		},
		executor));

return producerReadyFuture;

ResultPartitionID resultPartitionId,
ExecutionState producerState) throws IOException, InterruptedException {
ExecutionState producerState,
CompletableFuture<Void> producerReadyFuture) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of passing in this future, I would return a boolean indicating whether the producer is ready or not

@@ -1221,53 +1214,46 @@ public void run() {
*/
@VisibleForTesting
void onPartitionStateUpdate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename into isProducerReadyAndHandlePartitionStateUpdate

@@ -97,13 +104,15 @@ private NetworkEnvironment(
NetworkBufferPool networkBufferPool,
ConnectionManager connectionManager,
ResultPartitionManager resultPartitionManager,
Map<IntermediateDataSetID, SingleInputGate> inputGatesById,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing the map for storing the input gates into NetworkEnvironment exposes in my opinion a bit too many implementation details of this structure. I basically says that the object needs to store the SingleInputGates in this structure. Otherwise the test will fail. I think it would be better to have a getInputGate(IntermediateDataSetID) and let the way the gates are stored internally be an implementation detail.

@azagrebin azagrebin force-pushed the FLINK-12530 branch 2 times, most recently from a627eb9 to 0f41bbd Compare May 24, 2019 08:32
@azagrebin
Copy link
Contributor Author

Thanks for the further review @tillrohrmann. I have addressed comments.

@azagrebin
Copy link
Contributor Author

azagrebin commented May 24, 2019

Thanks for the review @zhijiangW, I've addressed comments.
I think PartitionProducerStateChecker and PartitionProducerChecker have different levels of abstraction. The state checker is used by Task to query the execution state of the producer from JM, basically their RPC interface. Then task makes further decisions based on it. Shuffle API (remote channel) is basically interested in only whether the partition producer is producing or not at the moment. I think, there is a bit coupling between Task decisions and netty implementation at the moment. We might later further rethink it and maybe move Task logic completely into netty implementation based on just the producer execution state.

@zhijiangW
Copy link
Contributor

Thanks for the explanation @azagrebin !

If not caring about the detail implementation, we should think through the partition checker logic and make clear the scope owner of it. The below sharing is just my personal thought, maybe not very correct:

  • During requesting partition, RemoteInputChannel/InputGate might receive PartitionNotFoundException, so RemoteInputChannel/InputGate should decide how to handle this exception. It could throw this exception to the outside directly to cause task fail. Or it wants to further query partition's state to make the final decision.

  • The checker/query should be targeting the partition's state, not producer's state. If the producer state is FINISHED but the partition state might be RELEASED, then only the partition's state could give the right decision.

  • ShuffleMaster could provide the ability for querying partition's state future, just like ShuffleMaster would communicate with ShuffleService for releasing partition. For simple implementation, we could make use of the RPC between TM/JM for the communication.

If the current partition checker refactor in this PR might not be the final way/direction to go, it is better not to touch it now, since it is not very related to the scope of moving inputGatesById. Or we could forward step by step and keep the current refactor in this PR.

@azagrebin
Copy link
Contributor Author

Thanks for the thoughts @zhijiangW !

True, this state might be not final. Further steps in this topic are probably more related to the partition lifecycle management and out of scope of this PR.

The main motivation of introducing the PartitionProducerChecker now was to separate it from TaskActions. TaskActions is very broad interface used in other components for unrelated purposes. Shuffle API does not need it in the scope of the first refactoring. Introducing PartitionProducerChecker improves Shuffle API decoupling because it reflects only what is needed at the moment. I can move this hotfix into another PR if it feels like this but it is small and has been already reviewed. Also the related code is touched anyway in this PR.

@zhijiangW
Copy link
Contributor

Thanks for the confirmation, @azagrebin !
I am not caring about keeping that hotfix in this PR, no need to submit it separately. :)

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this PR @azagrebin. The changes look good to me. I had one last comment about the consumerExecutionState and whether we can move it from the RemoteChannelStateChecker to the Task.

* Result of partition state check, accepts check callbacks.
*/
interface CheckResult {
ExecutionState getConsumerExecutionState();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need the consumer's execution state here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment we use it as a shortcut if consumer is done before the check has been accomplished then we skip the check. Ideally we should use closeFuture in SingleInputGate instead of ConsumerExecutionState for this. Although this shortcut is the best effort because the state change or gate close can happen concurrently right after this check, I suggest we consider it as a separate refactoring as it would change a bit current behaviour: https://issues.apache.org/jira/browse/FLINK-12672

/**
* Runtime identifier of a consumed {@link org.apache.flink.runtime.executiongraph.IntermediateResult}.
*
* <p>In runtime the {@link org.apache.flink.runtime.jobgraph.IntermediateDataSetID} is not enough to uniquely
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: At runtime

ExecutionState consumerState = checkResult.getConsumerExecutionState();
Either<ExecutionState, Throwable> result = checkResult.getProducerExecutionState();
ExecutionState producerState = result.isLeft() ? result.left() : ExecutionState.RUNNING;
return consumerState == ExecutionState.RUNNING &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check the consumerState outside of the RemoteChannelStateChecker? If the consumer (==this) is not running, then we should simply ignore the update message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private boolean isProducerConsumerReady(CheckResult checkResult) {
ExecutionState consumerState = checkResult.getConsumerExecutionState();
Either<ExecutionState, Throwable> result = checkResult.getProducerExecutionState();
ExecutionState producerState = result.isLeft() ? result.left() : ExecutionState.RUNNING;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is duplicated here and in abortConsumptionOrIgnoreCheckResult. Would be good to deduplicate it.


checkResult.failConsumption(new IllegalStateException(msg));
}
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The else branch would not be needed if we check the consumer state before calling into the RemoteChannelStateChecker.

this.taskNameWithSubtask = taskNameWithSubtask;
}

public boolean isProducerConsumerReadyOrAbortConsumption(CheckResult checkResult) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not super important and out of scope for this PR, but this method either triggers some action (fail or cancel) or returns a decision (trigger new partition check or not). I think it would be more symmetric if this class would not trigger any action but only return a decision what to do:

enum Action {
  FAIL(Throwable cause),
  CANCEL(String msg),
  TRIGGER_PARTITION_CHECK,
  NOOP
}

Then the caller would be responsible for making the action. That way this class would only need access to checkResult.getProducerExecutionState() and not checkResult itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I would suggest we consider this refactoring as another issue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@azagrebin azagrebin force-pushed the FLINK-12530 branch 2 times, most recently from fb3d560 to a469b69 Compare May 29, 2019 14:15
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing my comments @azagrebin. LGTM. +1 for merging.

Copy link
Contributor

@zhijiangW zhijiangW left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also LGTM on my side!

Task.inputGatesById indexes SingleInputGates by id. The end user of this indexing is NetworkEnvironment for two cases:

- SingleInputGate triggers producer partition readiness check and
  then the successful result of check is dispatched back to this SingleInputGate by id.
  We can just return a future from TaskActions.triggerPartitionProducerStateCheck.
  SingleInputGate could use the future to react with re-triggering of the
  partition request if the producer is ready. Then inputGatesById is not needed for dispatching.

- TaskExecutor.updatePartitions uses inputGatesById to dispatch PartitionInfo update to the right SingleInputGate.
  If inputGatesById is moved to NetworkEnvironment, which should be a better place for gate management,
  and NetworkEnvironment.updatePartitionInfo is added then
  TaskExecutor.updatePartitions could directly call NetworkEnvironment.updatePartitionInfo.

Additional refactoring:

- TaskActions.triggerPartitionProducerStateCheck is
  separated into another interface PartitionProducerStateProvider.
  TaskActions is too broad interface used also for other purposes.
  Shuffle API needs only PartitionProducerStateProvider.

- PartitionProducerStateProvider returns future with the ResponseHandle
  which contains the producer state and accepts callbacks to cancel or fail consumption as a result of state check.

- Task.triggerPartitionProducerStateCheck is also refactored into a RemoteChannelStateChecker
  which becomes internal detail of NetworkEnvironment. RemoteChannelStateChecker accepts ResponseHandle,
  checks whether producer is ready for consumption or aborts consumption
  using ResponseHandle.cancelConsumption or ResponseHandle.failConsumption.
@azagrebin
Copy link
Contributor Author

Thanks for the reviews @tillrohrmann @zhijiangW !
I squashed the commits and adjusted the comments.

@zentol zentol merged commit 809e40d into apache:master May 30, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants