Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-12882][network] Remove ExecutionAttemptID argument from ResultPartitionFactory#create #8779

Merged
merged 3 commits into from Jun 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -187,10 +187,7 @@ public Collection<ResultPartition> createResultPartitionWriters(
ResultPartition[] resultPartitions = new ResultPartition[resultPartitionDeploymentDescriptors.size()];
int counter = 0;
for (ResultPartitionDeploymentDescriptor rpdd : resultPartitionDeploymentDescriptors) {
resultPartitions[counter++] = resultPartitionFactory.create(
ownerContext.getOwnerName(),
ownerContext.getExecutionAttemptID(),
rpdd);
resultPartitions[counter++] = resultPartitionFactory.create(ownerContext.getOwnerName(), rpdd);
}

registerOutputMetrics(config.isNetworkDetailedMetrics(), ownerContext.getOutputGroup(), resultPartitions);
Expand Down
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
Expand Down Expand Up @@ -73,12 +72,11 @@ public ResultPartitionFactory(

public ResultPartition create(
@Nonnull String taskNameWithSubtaskAndId,
@Nonnull ExecutionAttemptID executionAttemptID,
@Nonnull ResultPartitionDeploymentDescriptor desc) {

return create(
taskNameWithSubtaskAndId,
new ResultPartitionID(desc.getPartitionId(), executionAttemptID),
desc.getShuffleDescriptor().getResultPartitionID(),
desc.getPartitionType(),
desc.getNumberOfSubpartitions(),
desc.getMaxParallelism(),
Expand Down
Expand Up @@ -18,12 +18,15 @@

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;

import java.io.Serializable;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Runtime identifier of a produced {@link IntermediateResultPartition}.
*
Expand All @@ -39,13 +42,14 @@ public final class ResultPartitionID implements Serializable {

private final ExecutionAttemptID producerId;

@VisibleForTesting
public ResultPartitionID() {
this(new IntermediateResultPartitionID(), new ExecutionAttemptID());
}

public ResultPartitionID(IntermediateResultPartitionID partitionId, ExecutionAttemptID producerId) {
this.partitionId = partitionId;
this.producerId = producerId;
this.partitionId = checkNotNull(partitionId);
this.producerId = checkNotNull(producerId);
}

public IntermediateResultPartitionID getPartitionId() {
Expand Down
Expand Up @@ -20,13 +20,11 @@

import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.task.IntegerTaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;

import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
Expand Down Expand Up @@ -97,7 +95,7 @@ public void testEncodeDecode() {
}

{
NettyMessage.PartitionRequest expected = new NettyMessage.PartitionRequest(new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()), random.nextInt(), new InputChannelID(), random.nextInt());
NettyMessage.PartitionRequest expected = new NettyMessage.PartitionRequest(new ResultPartitionID(), random.nextInt(), new InputChannelID(), random.nextInt());
NettyMessage.PartitionRequest actual = encodeAndDecode(expected);

assertEquals(expected.partitionId, actual.partitionId);
Expand All @@ -107,7 +105,7 @@ public void testEncodeDecode() {
}

{
NettyMessage.TaskEventRequest expected = new NettyMessage.TaskEventRequest(new IntegerTaskEvent(random.nextInt()), new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()), new InputChannelID());
NettyMessage.TaskEventRequest expected = new NettyMessage.TaskEventRequest(new IntegerTaskEvent(random.nextInt()), new ResultPartitionID(), new InputChannelID());
NettyMessage.TaskEventRequest actual = encodeAndDecode(expected);

assertEquals(expected.event, actual.event);
Expand All @@ -130,7 +128,7 @@ public void testEncodeDecode() {
}

{
NettyMessage.AddCredit expected = new NettyMessage.AddCredit(new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()), random.nextInt(Integer.MAX_VALUE) + 1, new InputChannelID());
NettyMessage.AddCredit expected = new NettyMessage.AddCredit(new ResultPartitionID(), random.nextInt(Integer.MAX_VALUE) + 1, new InputChannelID());
NettyMessage.AddCredit actual = encodeAndDecode(expected);

assertEquals(expected.partitionId, actual.partitionId);
Expand Down
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.NoOpIOManager;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
Expand Down Expand Up @@ -74,6 +73,6 @@ private static ResultPartition createResultPartition(ShuffleDescriptor.ReleaseTy
releaseType
);

return factory.create("test", new ExecutionAttemptID(), descriptor);
return factory.create("test", descriptor);
}
}
Expand Up @@ -196,7 +196,7 @@ public void testBackwardsEventWithUninitializedChannel() throws Exception {
inputGate.setBufferPool(bufferPool);

// Local
ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
ResultPartitionID localPartitionId = new ResultPartitionID();

InputChannelBuilder.newBuilder()
.setPartitionId(localPartitionId)
Expand All @@ -205,7 +205,7 @@ public void testBackwardsEventWithUninitializedChannel() throws Exception {
.buildLocalAndSetToGate(inputGate);

// Unknown
ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
ResultPartitionID unknownPartitionId = new ResultPartitionID();

InputChannelBuilder.newBuilder()
.setChannelIndex(1)
Expand Down