Skip to content

Commit

Permalink
[FLINK-8590][runtime] Drop addBufferConsumerToAllSubpartitions method
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowojski committed Feb 19, 2018
1 parent 0af22bf commit eb96d5d
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 112 deletions.
Expand Up @@ -47,29 +47,4 @@ public interface ResultPartitionWriter {
* resources. * resources.
*/ */
void writeBuffer(Buffer buffer, int subpartitionIndex) throws IOException; void writeBuffer(Buffer buffer, int subpartitionIndex) throws IOException;

/**
* Writes the given buffer to all available target subpartitions.
*
* <p>The buffer is taken over and used for each of the channels.
* It will be recycled afterwards.
*
* <p>This method takes the ownership of the passed {@code buffer} and thus is responsible for releasing it's
* resources.
*
* @param buffer the buffer to write
*/
default void writeBufferToAllSubpartitions(final Buffer buffer) throws IOException {
try {
for (int subpartition = 0; subpartition < getNumberOfSubpartitions(); subpartition++) {
// retain the buffer so that it can be recycled by each channel of targetPartition
buffer.retainBuffer();
writeBuffer(buffer, subpartition);
}
} finally {
// we do not need to further retain the eventBuffer
// (it will be recycled after the last channel stops using it)
buffer.recycleBuffer();
}
}
} }
Expand Up @@ -24,15 +24,14 @@
import org.apache.flink.api.common.typeutils.TypeComparatorFactory; import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.InputViewIterator; import org.apache.flink.runtime.io.disk.InputViewIterator;
import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel; import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker; import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
Expand Down Expand Up @@ -95,7 +94,9 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte


private TypeSerializerFactory<X> solutionTypeSerializer; private TypeSerializerFactory<X> solutionTypeSerializer;


private ResultPartitionWriter toSync; private RecordWriter<IOReadableWritable> toSync;

private ResultPartitionID toSyncPartitionId;


private int feedbackDataInput; // workset or bulk partial solution private int feedbackDataInput; // workset or bulk partial solution


Expand Down Expand Up @@ -141,7 +142,8 @@ protected void initOutputs() throws Exception {
throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates."); throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
} }
// now, we can instantiate the sync gate // now, we can instantiate the sync gate
this.toSync = getEnvironment().getWriter(syncGateIndex); this.toSync = new RecordWriter<IOReadableWritable>(getEnvironment().getWriter(syncGateIndex));
this.toSyncPartitionId = getEnvironment().getWriter(syncGateIndex).getPartitionId();
} }


/** /**
Expand Down Expand Up @@ -238,7 +240,7 @@ private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIte
private SuperstepBarrier initSuperstepBarrier() { private SuperstepBarrier initSuperstepBarrier() {
SuperstepBarrier barrier = new SuperstepBarrier(getUserCodeClassLoader()); SuperstepBarrier barrier = new SuperstepBarrier(getUserCodeClassLoader());
TaskEventDispatcher taskEventDispatcher = getEnvironment().getTaskEventDispatcher(); TaskEventDispatcher taskEventDispatcher = getEnvironment().getTaskEventDispatcher();
ResultPartitionID partitionId = toSync.getPartitionId(); ResultPartitionID partitionId = toSyncPartitionId;
taskEventDispatcher.subscribeToEvent(partitionId, barrier, AllWorkersDoneEvent.class); taskEventDispatcher.subscribeToEvent(partitionId, barrier, AllWorkersDoneEvent.class);
taskEventDispatcher.subscribeToEvent(partitionId, barrier, TerminationEvent.class); taskEventDispatcher.subscribeToEvent(partitionId, barrier, TerminationEvent.class);
return barrier; return barrier;
Expand Down Expand Up @@ -452,7 +454,6 @@ private void sendEventToSync(WorkerDoneEvent event) throws IOException, Interrup
if (log.isInfoEnabled()) { if (log.isInfoEnabled()) {
log.info(formatLogString("sending " + WorkerDoneEvent.class.getSimpleName() + " to sync")); log.info(formatLogString("sending " + WorkerDoneEvent.class.getSimpleName() + " to sync"));
} }

this.toSync.broadcastEvent(event);
this.toSync.writeBufferToAllSubpartitions(EventSerializer.toBuffer(event));
} }
} }
Expand Up @@ -21,8 +21,6 @@
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.util.TestBufferFactory; import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.taskmanager.TaskActions; import org.apache.flink.runtime.taskmanager.TaskActions;
Expand All @@ -32,7 +30,6 @@
import org.junit.Test; import org.junit.Test;


import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -168,35 +165,6 @@ protected void testAddOnReleasedPartition(final ResultPartitionType pipelined)
} }
} }


/**
* Tests that event buffers are properly added and recycled when broadcasting events
* to multiple channels.
*/
@Test
public void testWriteBufferToAllSubpartitionsReferenceCounting() throws Exception {
Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);

ResultPartition partition = new ResultPartition(
"TestTask",
mock(TaskActions.class),
new JobID(),
new ResultPartitionID(),
ResultPartitionType.PIPELINED,
2,
2,
mock(ResultPartitionManager.class),
mock(ResultPartitionConsumableNotifier.class),
mock(IOManager.class),
false);

partition.writeBufferToAllSubpartitions(buffer);

// release the buffers in the partition
partition.release();

assertTrue(buffer.isRecycled());
}

@Test @Test
public void testAddOnPipelinedPartition() throws Exception { public void testAddOnPipelinedPartition() throws Exception {
testAddOnPartition(ResultPartitionType.PIPELINED); testAddOnPartition(ResultPartitionType.PIPELINED);
Expand Down
Expand Up @@ -413,6 +413,13 @@ public Map<Integer, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl
} }
} }


public Map<Integer, StreamConfig> getTransitiveChainedTaskConfigsWithSelf(ClassLoader cl) {
//TODO: could this logic be moved to the user of #setTransitiveChainedTaskConfigs() ?
Map<Integer, StreamConfig> chainedTaskConfigs = getTransitiveChainedTaskConfigs(cl);
chainedTaskConfigs.put(getVertexID(), this);
return chainedTaskConfigs;
}

public void setOperatorID(OperatorID operatorID) { public void setOperatorID(OperatorID operatorID) {
this.config.setBytes(OPERATOR_ID, operatorID.getBytes()); this.config.setBytes(OPERATOR_ID, operatorID.getBytes());
} }
Expand Down
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup; import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
Expand All @@ -44,8 +43,6 @@
import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamRecordWriter; import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
Expand Down Expand Up @@ -92,16 +89,17 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
*/ */
private StreamStatus streamStatus = StreamStatus.ACTIVE; private StreamStatus streamStatus = StreamStatus.ACTIVE;


public OperatorChain(StreamTask<OUT, OP> containingTask) { public OperatorChain(
StreamTask<OUT, OP> containingTask,
List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> streamRecordWriters) {


final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader(); final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
final StreamConfig configuration = containingTask.getConfiguration(); final StreamConfig configuration = containingTask.getConfiguration();


headOperator = configuration.getStreamOperator(userCodeClassloader); headOperator = configuration.getStreamOperator(userCodeClassloader);


// we read the chained configs, and the order of record writer registrations by output name // we read the chained configs, and the order of record writer registrations by output name
Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigs(userCodeClassloader); Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);
chainedConfigs.put(configuration.getVertexID(), configuration);


// create the final output stream writers // create the final output stream writers
// we iterate through all the out edges from this job vertex and create a stream output // we iterate through all the out edges from this job vertex and create a stream output
Expand All @@ -116,11 +114,10 @@ public OperatorChain(StreamTask<OUT, OP> containingTask) {
StreamEdge outEdge = outEdgesInOrder.get(i); StreamEdge outEdge = outEdgesInOrder.get(i);


RecordWriterOutput<?> streamOutput = createStreamOutput( RecordWriterOutput<?> streamOutput = createStreamOutput(
streamRecordWriters.get(i),
outEdge, outEdge,
chainedConfigs.get(outEdge.getSourceId()), chainedConfigs.get(outEdge.getSourceId()),
i, containingTask.getEnvironment());
containingTask.getEnvironment(),
containingTask.getName());


this.streamOutputs[i] = streamOutput; this.streamOutputs[i] = streamOutput;
streamOutputMap.put(outEdge, streamOutput); streamOutputMap.put(outEdge, streamOutput);
Expand Down Expand Up @@ -380,12 +377,11 @@ private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOp
return currentOperatorOutput; return currentOperatorOutput;
} }


private <T> RecordWriterOutput<T> createStreamOutput( private RecordWriterOutput<OUT> createStreamOutput(
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter,
StreamEdge edge, StreamEdge edge,
StreamConfig upStreamConfig, StreamConfig upStreamConfig,
int outputIndex, Environment taskEnvironment) {
Environment taskEnvironment,
String taskName) {
OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput


TypeSerializer outSerializer = null; TypeSerializer outSerializer = null;
Expand All @@ -399,26 +395,7 @@ private <T> RecordWriterOutput<T> createStreamOutput(
outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader()); outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
} }


@SuppressWarnings("unchecked") return new RecordWriterOutput<>(streamRecordWriter, outSerializer, sideOutputTag, this);
StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();

LOG.debug("Using partitioner {} for output {} of task ", outputPartitioner, outputIndex, taskName);

ResultPartitionWriter bufferWriter = taskEnvironment.getWriter(outputIndex);

// we initialize the partitioner here with the number of key groups (aka max. parallelism)
if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
if (0 < numKeyGroups) {
((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
}
}

StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output =
new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());

return new RecordWriterOutput<>(output, outSerializer, sideOutputTag, this);
} }


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down

0 comments on commit eb96d5d

Please sign in to comment.