Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ public abstract class InputGate implements AutoCloseable {

public abstract int getNumberOfInputChannels();

public abstract String getOwningTaskName();

public abstract boolean isFinished();

public abstract void requestPartitions() throws IOException, InterruptedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,6 @@ public int getNumberOfQueuedBuffers() {
return 0;
}

@Override
public String getOwningTaskName() {
return owningTaskName;
}

public CompletableFuture<Void> getCloseFuture() {
return closeFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,6 @@ public int getNumberOfInputChannels() {
return totalNumberOfInputChannels;
}

@Override
public String getOwningTaskName() {
// all input gates have the same owning task
return inputGates[0].getOwningTaskName();
}

@Override
public boolean isFinished() {
for (InputGate inputGate : inputGates) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ public int getNumberOfInputChannels() {
return inputGate.getNumberOfInputChannels();
}

@Override
public String getOwningTaskName() {
return inputGate.getOwningTaskName();
}

@Override
public boolean isFinished() {
return inputGate.isFinished();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ else if (current == ExecutionState.CANCELING) {

LOG.info("Registering task at network: {}.", this);

setupPartionsAndGates(consumableNotifyingPartitionWriters, inputGates);
setupPartitionsAndGates(consumableNotifyingPartitionWriters, inputGates);

for (ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) {
taskEventDispatcher.registerPartition(partitionWriter.getPartitionId());
Expand Down Expand Up @@ -823,7 +823,7 @@ else if (transitionState(current, ExecutionState.FAILED, t)) {
}

@VisibleForTesting
public static void setupPartionsAndGates(
public static void setupPartitionsAndGates(
ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException {

for (ResultPartitionWriter partition : producedPartitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static List<Boolean> parameters() {
public ExpectedException expectedException = ExpectedException.none();

/**
* Verifies that {@link Task#setupPartionsAndGates(ResultPartitionWriter[], InputGate[])}} sets up (un)bounded buffer pool
* Verifies that {@link Task#setupPartitionsAndGates(ResultPartitionWriter[], InputGate[])}} sets up (un)bounded buffer pool
* instances for various types of input and output channels.
*/
@Test
Expand All @@ -87,7 +87,7 @@ public void testRegisterTaskUsesBoundedBuffers() throws Exception {
SingleInputGate ig4 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 8);
final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3, ig4};

Task.setupPartionsAndGates(resultPartitions, inputGates);
Task.setupPartitionsAndGates(resultPartitions, inputGates);

// verify buffer pools for the result partitions
assertEquals(rp1.getNumberOfSubpartitions(), rp1.getBufferPool().getNumberOfRequiredMemorySegments());
Expand Down Expand Up @@ -128,7 +128,7 @@ public void testRegisterTaskUsesBoundedBuffers() throws Exception {
}

/**
* Verifies that {@link Task#setupPartionsAndGates(ResultPartitionWriter[], InputGate[])}} sets up (un)bounded buffer pool
* Verifies that {@link Task#setupPartitionsAndGates(ResultPartitionWriter[], InputGate[])}} sets up (un)bounded buffer pool
* instances for various types of input and output channels working with the bare minimum of
* required buffers.
*/
Expand All @@ -148,7 +148,7 @@ public void testRegisterTaskWithLimitedBuffers() throws Exception {
}

/**
* Verifies that {@link Task#setupPartionsAndGates(ResultPartitionWriter[], InputGate[])}} fails if the bare minimum of
* Verifies that {@link Task#setupPartitionsAndGates(ResultPartitionWriter[], InputGate[])}} fails if the bare minimum of
* required buffers is not available (we are one buffer short).
*/
@Test
Expand Down Expand Up @@ -208,7 +208,7 @@ private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Excep
createRemoteInputChannel(ig3, 3, rp4, connManager, network.getNetworkBufferPool());
}

Task.setupPartionsAndGates(resultPartitions, inputGates);
Task.setupPartitionsAndGates(resultPartitions, inputGates);

// verify buffer pools for the result partitions
assertEquals(Integer.MAX_VALUE, rp1.getBufferPool().getMaxNumberOfMemorySegments());
Expand Down Expand Up @@ -250,16 +250,16 @@ private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Excep

/**
* Helper to create spy of a {@link SingleInputGate} for use by a {@link Task} inside
* {@link Task#setupPartionsAndGates(ResultPartitionWriter[], InputGate[])}}.
* {@link Task#setupPartitionsAndGates(ResultPartitionWriter[], InputGate[])}}.
*
* @param network
* network enviroment to create buffer pool factory for {@link SingleInputGate}
* network environment to create buffer pool factory for {@link SingleInputGate}
* @param partitionType
* the consumed partition type
* @param numberOfChannels
* the number of input channels
*
* @return input gate with some fake settiFngs
* @return input gate with some fake settings
*/
private SingleInputGate createSingleInputGate(
NetworkEnvironment network, ResultPartitionType partitionType, int numberOfChannels) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.io;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
Expand Down Expand Up @@ -79,6 +80,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
*/
private final long maxBufferedBytes;

private final String taskName;

/**
* The sequence of buffers/events that has been unblocked and must now be consumed before
* requesting further data from the input gate.
Expand Down Expand Up @@ -119,11 +122,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
*
* @param inputGate The input gate to draw the buffers and events from.
* @param bufferBlocker The buffer blocker to hold the buffers and events for channels with barrier.
*
* @throws IOException Thrown, when the spilling to temp files cannot be initialized.
*/
public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker) throws IOException {
this (inputGate, bufferBlocker, -1);
@VisibleForTesting
BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker) {
this (inputGate, bufferBlocker, -1, "Testing: No task associated");
}

/**
Expand All @@ -136,11 +138,9 @@ public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker) throws IO
* @param inputGate The input gate to draw the buffers and events from.
* @param bufferBlocker The buffer blocker to hold the buffers and events for channels with barrier.
* @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts.
*
* @throws IOException Thrown, when the spilling to temp files cannot be initialized.
* @param taskName The task name for logging.
*/
public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, long maxBufferedBytes)
throws IOException {
BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, long maxBufferedBytes, String taskName) {
checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);

this.inputGate = inputGate;
Expand All @@ -150,6 +150,8 @@ public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, long maxB

this.bufferBlocker = checkNotNull(bufferBlocker);
this.queuedBuffered = new ArrayDeque<BufferOrEventSequence>();

this.taskName = taskName;
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -213,7 +215,7 @@ else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
}

private void completeBufferedSequence() throws IOException {
LOG.debug("{}: Finished feeding back buffered data.", inputGate.getOwningTaskName());
LOG.debug("{}: Finished feeding back buffered data.", taskName);

currentBuffered.cleanup();
currentBuffered = queuedBuffered.pollFirst();
Expand Down Expand Up @@ -249,7 +251,7 @@ else if (barrierId > currentCheckpointId) {
// we did not complete the current checkpoint, another started before
LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
inputGate.getOwningTaskName(),
taskName,
barrierId,
currentCheckpointId);

Expand Down Expand Up @@ -283,7 +285,7 @@ else if (barrierId > currentCheckpointId) {
// actually trigger checkpoint
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
inputGate.getOwningTaskName(),
taskName,
receivedBarrier.getId(),
receivedBarrier.getTimestamp());
}
Expand Down Expand Up @@ -314,9 +316,7 @@ private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) th
if (barrierId == currentCheckpointId) {
// cancel this alignment
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Checkpoint {} canceled, aborting alignment.",
inputGate.getOwningTaskName(),
barrierId);
LOG.debug("{}: Checkpoint {} canceled, aborting alignment.", taskName, barrierId);
}

releaseBlocksAndResetBarriers();
Expand All @@ -326,7 +326,7 @@ else if (barrierId > currentCheckpointId) {
// we canceled the next which also cancels the current
LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
inputGate.getOwningTaskName(),
taskName,
barrierId,
currentCheckpointId);

Expand Down Expand Up @@ -357,9 +357,7 @@ else if (barrierId > currentCheckpointId) {
latestAlignmentDurationNanos = 0L;

if (LOG.isDebugEnabled()) {
LOG.debug("{}: Checkpoint {} canceled, skipping alignment.",
inputGate.getOwningTaskName(),
barrierId);
LOG.debug("{}: Checkpoint {} canceled, skipping alignment.", taskName, barrierId);
}

notifyAbortOnCancellationBarrier(barrierId);
Expand Down Expand Up @@ -414,7 +412,7 @@ private void checkSizeLimit() throws Exception {
if (maxBufferedBytes > 0 && (numQueuedBytes + bufferBlocker.getBytesBlocked()) > maxBufferedBytes) {
// exceeded our limit - abort this checkpoint
LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.",
inputGate.getOwningTaskName(),
taskName,
currentCheckpointId,
maxBufferedBytes);

Expand Down Expand Up @@ -458,9 +456,7 @@ private void beginNewAlignment(long checkpointId, int channelIndex) throws IOExc
startOfAlignmentTimestamp = System.nanoTime();

if (LOG.isDebugEnabled()) {
LOG.debug("{}: Starting stream alignment for checkpoint {}.",
inputGate.getOwningTaskName(),
checkpointId);
LOG.debug("{}: Starting stream alignment for checkpoint {}.", taskName, checkpointId);
}
}

Expand All @@ -486,9 +482,7 @@ private void onBarrier(int channelIndex) throws IOException {
numBarriersReceived++;

if (LOG.isDebugEnabled()) {
LOG.debug("{}: Received barrier from channel {}.",
inputGate.getOwningTaskName(),
channelIndex);
LOG.debug("{}: Received barrier from channel {}.", taskName, channelIndex);
}
}
else {
Expand All @@ -501,8 +495,7 @@ private void onBarrier(int channelIndex) throws IOException {
* Makes sure the just written data is the next to be consumed.
*/
private void releaseBlocksAndResetBarriers() throws IOException {
LOG.debug("{}: End of stream alignment, feeding buffered data back.",
inputGate.getOwningTaskName());
LOG.debug("{}: End of stream alignment, feeding buffered data back.", taskName);

for (int i = 0; i < blockedChannels.length; i++) {
blockedChannels[i] = false;
Expand All @@ -519,8 +512,7 @@ private void releaseBlocksAndResetBarriers() throws IOException {
// uncommon case: buffered data pending
// push back the pending data, if we have any
LOG.debug("{}: Checkpoint skipped via buffered data:" +
"Pushing back current alignment buffers and feeding back new alignment data first.",
inputGate.getOwningTaskName());
"Pushing back current alignment buffers and feeding back new alignment data first.", taskName);

// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one
BufferOrEventSequence bufferedNow = bufferBlocker.rollOverWithoutReusingResources();
Expand All @@ -534,7 +526,7 @@ private void releaseBlocksAndResetBarriers() throws IOException {

if (LOG.isDebugEnabled()) {
LOG.debug("{}: Size of buffered data: {} bytes",
inputGate.getOwningTaskName(),
taskName,
currentBuffered == null ? 0L : currentBuffered.size());
}

Expand Down Expand Up @@ -577,7 +569,7 @@ public long getAlignmentDurationNanos() {
@Override
public String toString() {
return String.format("%s: last checkpoint: %d, current barriers: %d, closed channels: %d",
inputGate.getOwningTaskName(),
taskName,
currentCheckpointId,
numBarriersReceived,
numClosedChannels);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public static CheckpointBarrierHandler createCheckpointBarrierHandler(
CheckpointingMode checkpointMode,
IOManager ioManager,
InputGate inputGate,
Configuration taskManagerConfig) throws IOException {
Configuration taskManagerConfig,
String taskName) throws IOException {

CheckpointBarrierHandler barrierHandler;
if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
Expand All @@ -53,9 +54,17 @@ public static CheckpointBarrierHandler createCheckpointBarrierHandler(
}

if (taskManagerConfig.getBoolean(NetworkEnvironmentOptions.NETWORK_CREDIT_MODEL)) {
barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);
barrierHandler = new BarrierBuffer(
inputGate,
new CachedBufferBlocker(inputGate.getPageSize()),
maxAlign,
taskName);
} else {
barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);
barrierHandler = new BarrierBuffer(
inputGate,
new BufferSpiller(ioManager, inputGate.getPageSize()),
maxAlign,
taskName);
}
} else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
barrierHandler = new BarrierTracker(inputGate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,18 @@ public StreamInputProcessor(
StreamStatusMaintainer streamStatusMaintainer,
OneInputStreamOperator<IN, ?> streamOperator,
TaskIOMetricGroup metrics,
WatermarkGauge watermarkGauge) throws IOException {
WatermarkGauge watermarkGauge,
String taskName) throws IOException {

InputGate inputGate = InputGateUtil.createInputGate(inputGates);

this.barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(
checkpointedTask, checkpointMode, ioManager, inputGate, taskManagerConfig);
checkpointedTask,
checkpointMode,
ioManager,
inputGate,
taskManagerConfig,
taskName);

this.lock = checkNotNull(lock);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,18 @@ public StreamTwoInputProcessor(
TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
TaskIOMetricGroup metrics,
WatermarkGauge input1WatermarkGauge,
WatermarkGauge input2WatermarkGauge) throws IOException {
WatermarkGauge input2WatermarkGauge,
String taskName) throws IOException {

final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);

this.barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(
checkpointedTask, checkpointMode, ioManager, inputGate, taskManagerConfig);
checkpointedTask,
checkpointMode,
ioManager,
inputGate,
taskManagerConfig,
taskName);

this.lock = checkNotNull(lock);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public void init() throws Exception {
getStreamStatusMaintainer(),
this.headOperator,
getEnvironment().getMetricGroup().getIOMetricGroup(),
inputWatermarkGauge);
inputWatermarkGauge,
getTaskNameWithSubtaskAndId());
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
// wrap watermark gauge since registered metrics must be unique
Expand Down
Loading