Skip to content

Commit

Permalink
[FLINK-12201][network][metrics] Introduce InputGateWithMetrics in Tas…
Browse files Browse the repository at this point in the history
…k to increment numBytesIn metric

Incrementing of numBytesIn metric does not depend on specific shuffle service and can be moved out of network internals into Task. Task could wrap InputGate provided by ShuffleService with InputGateWithMetrics which would increment numBytesIn metric.
  • Loading branch information
zhijiangW committed May 9, 2019
1 parent c954780 commit 309f321
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 39 deletions.
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
Expand Down Expand Up @@ -290,8 +289,7 @@ public SingleInputGate[] createInputGates(
Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors,
MetricGroup parentGroup,
MetricGroup inputGroup,
MetricGroup buffersGroup,
Counter numBytesInCounter) {
MetricGroup buffersGroup) {
synchronized (lock) {
Preconditions.checkState(!isShutdown, "The NetworkEnvironment has already been shut down.");

Expand All @@ -306,8 +304,7 @@ public SingleInputGate[] createInputGates(
this,
taskEventPublisher,
taskActions,
inputChannelMetrics,
numBytesInCounter);
inputChannelMetrics);
}

registerInputMetrics(inputGroup, buffersGroup, inputGates);
Expand Down
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;

Expand All @@ -34,6 +35,8 @@ public class BufferOrEvent {

private final AbstractEvent event;

private final int size;

/**
* Indicate availability of further instances for the union input gate.
* This is not needed outside of the input gate unioning logic and cannot
Expand All @@ -43,26 +46,30 @@ public class BufferOrEvent {

private int channelIndex;

BufferOrEvent(Buffer buffer, int channelIndex, boolean moreAvailable) {
public BufferOrEvent(Buffer buffer, int channelIndex, boolean moreAvailable, int size) {
this.buffer = checkNotNull(buffer);
this.event = null;
this.channelIndex = channelIndex;
this.moreAvailable = moreAvailable;
this.size = size;
}

BufferOrEvent(AbstractEvent event, int channelIndex, boolean moreAvailable) {
public BufferOrEvent(AbstractEvent event, int channelIndex, boolean moreAvailable, int size) {
this.buffer = null;
this.event = checkNotNull(event);
this.channelIndex = channelIndex;
this.moreAvailable = moreAvailable;
this.size = size;
}

@VisibleForTesting
public BufferOrEvent(Buffer buffer, int channelIndex) {
this(buffer, channelIndex, true);
this(buffer, channelIndex, true, 0);
}

@VisibleForTesting
public BufferOrEvent(AbstractEvent event, int channelIndex) {
this(event, channelIndex, true);
this(event, channelIndex, true, 0);
}

public boolean isBuffer() {
Expand Down Expand Up @@ -103,4 +110,8 @@ public String toString() {
public void setMoreAvailable(boolean moreAvailable) {
this.moreAvailable = moreAvailable;
}

public int getSize() {
return size;
}
}
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
Expand Down Expand Up @@ -185,8 +184,6 @@ public class SingleInputGate implements InputGate {
/** A timer to retrigger local partition requests. Only initialized if actually needed. */
private Timer retriggerLocalRequestTimer;

private final Counter numBytesIn;

public SingleInputGate(
String owningTaskName,
JobID jobId,
Expand All @@ -195,7 +192,6 @@ public SingleInputGate(
int consumedSubpartitionIndex,
int numberOfInputChannels,
TaskActions taskActions,
Counter numBytesIn,
boolean isCreditBased) {

this.owningTaskName = checkNotNull(owningTaskName);
Expand All @@ -216,8 +212,6 @@ public SingleInputGate(

this.taskActions = checkNotNull(taskActions);

this.numBytesIn = checkNotNull(numBytesIn);

this.isCreditBased = isCreditBased;
}

Expand Down Expand Up @@ -568,9 +562,9 @@ private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IO
}

final Buffer buffer = result.get().buffer();
numBytesIn.inc(buffer.getSizeUnsafe());
final int size = buffer.getSizeUnsafe();
if (buffer.isBuffer()) {
return Optional.of(new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable));
return Optional.of(new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable, size));
}
else {
final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
Expand All @@ -593,7 +587,7 @@ private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IO
currentChannel.releaseAllResources();
}

return Optional.of(new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable));
return Optional.of(new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable, size));
}
}

Expand Down Expand Up @@ -674,8 +668,7 @@ public static SingleInputGate create(
NetworkEnvironment networkEnvironment,
TaskEventPublisher taskEventPublisher,
TaskActions taskActions,
InputChannelMetrics metrics,
Counter numBytesInCounter) {
InputChannelMetrics metrics) {

final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId());
final ResultPartitionType consumedPartitionType = checkNotNull(igdd.getConsumedPartitionType());
Expand All @@ -688,8 +681,14 @@ public static SingleInputGate create(
final NetworkEnvironmentConfiguration networkConfig = networkEnvironment.getConfiguration();

final SingleInputGate inputGate = new SingleInputGate(
owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex,
icdd.length, taskActions, numBytesInCounter, networkConfig.isCreditBased());
owningTaskName,
jobId,
consumedResultId,
consumedPartitionType,
consumedSubpartitionIndex,
icdd.length,
taskActions,
networkConfig.isCreditBased());

// Create the input channels. There is one input channel for each consumed partition.
final InputChannel[] inputChannels = new InputChannel[icdd.length];
Expand Down
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskmanager;

import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;

import java.io.IOException;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* The InputGateWithMetrics is wrapped in task with {@link InputGate} provided by
* shuffle service. It is mainly used for increasing general input metrics from
* {@link TaskIOMetricGroup}.
*/
public class InputGateWithMetrics implements InputGate {

private final InputGate inputGate;

private final Counter numBytesInCounter;

InputGateWithMetrics(InputGate inputGate, Counter numBytesInCounter) {
this.inputGate = checkNotNull(inputGate);
this.numBytesInCounter = checkNotNull(numBytesInCounter);
}

@Override
public int getNumberOfInputChannels() {
return inputGate.getNumberOfInputChannels();
}

@Override
public String getOwningTaskName() {
return inputGate.getOwningTaskName();
}

@Override
public boolean isFinished() {
return inputGate.isFinished();
}

@Override
public void requestPartitions() throws IOException, InterruptedException {
inputGate.requestPartitions();
}

@Override
public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException {
Optional<BufferOrEvent> bufferOrEvent = inputGate.getNextBufferOrEvent();
if (bufferOrEvent.isPresent()) {
numBytesInCounter.inc(bufferOrEvent.get().getSize());
}

return bufferOrEvent;
}

@Override
public Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException {
return inputGate.pollNextBufferOrEvent();
}

@Override
public void sendTaskEvent(TaskEvent event) throws IOException {
inputGate.sendTaskEvent(event);
}

@Override
public void registerListener(InputGateListener listener) {
inputGate.registerListener(listener);
}

@Override
public int getPageSize() {
return inputGate.getPageSize();
}

@Override
public void close() throws Exception {
}
}
Expand Up @@ -392,8 +392,7 @@ public Task(
inputGateDeploymentDescriptors,
metrics.getIOMetricGroup(),
inputGroup,
buffersGroup,
metrics.getIOMetricGroup().getNumBytesInCounter());
buffersGroup);

this.inputGatesById = new HashMap<>();
for (SingleInputGate inputGate : inputGates) {
Expand Down Expand Up @@ -637,6 +636,11 @@ else if (current == ExecutionState.CANCELING) {

TaskKvStateRegistry kvStateRegistry = kvStateService.createKvStateTaskRegistry(jobId, getJobVertexId());

InputGate[] gates = new InputGate[inputGates.length];
for (int i = 0; i < inputGates.length; i++) {
gates[i] = new InputGateWithMetrics(inputGates[i], metrics.getIOMetricGroup().getNumBytesInCounter());
}

Environment env = new RuntimeEnvironment(
jobId,
vertexId,
Expand All @@ -656,7 +660,7 @@ else if (current == ExecutionState.CANCELING) {
inputSplitProvider,
distributedCacheEntries,
producedPartitions,
inputGates,
gates,
taskEventDispatcher,
checkpointResponder,
taskManagerConfig,
Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
Expand Down Expand Up @@ -91,7 +90,6 @@ public static SingleInputGate createSingleInputGate(
0,
numberOfChannels,
new NoOpTaskActions(),
new SimpleCounter(),
isCreditBased);
}

Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
Expand Down Expand Up @@ -333,7 +332,7 @@ public FairnessVerifyingInputGate(
boolean isCreditBased) {

super(owningTaskName, jobId, consumedResultId, ResultPartitionType.PIPELINED,
consumedSubpartitionIndex, numberOfInputChannels, taskActions, new SimpleCounter(), isCreditBased);
consumedSubpartitionIndex, numberOfInputChannels, taskActions, isCreditBased);

try {
Field f = SingleInputGate.class.getDeclaredField("inputChannelsWithData");
Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
Expand Down Expand Up @@ -460,7 +459,6 @@ public TestLocalInputChannelConsumer(
subpartitionIndex,
numberOfInputChannels,
new NoOpTaskActions(),
new SimpleCounter(),
true);

// Set buffer pool
Expand Down
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
Expand Down Expand Up @@ -363,8 +362,7 @@ public void testRequestBackoffConfiguration() throws Exception {
netEnv,
new TaskEventDispatcher(),
new NoOpTaskActions(),
InputChannelTestUtils.newUnregisteredInputChannelMetrics(),
new SimpleCounter());
InputChannelTestUtils.newUnregisteredInputChannelMetrics());

try {
assertEquals(gateDesc.getConsumedPartitionType(), gate.getConsumedPartitionType());
Expand Down
Expand Up @@ -374,7 +374,7 @@ public BufferOrEvent getNext() throws IOException {
Buffer buf = new NetworkBuffer(seg, FreeingBufferRecycler.INSTANCE);
buf.setSize(length);

return new BufferOrEvent(buf, channel);
return new BufferOrEvent(buf, channel, true, length);
}
else {
// deserialize event
Expand All @@ -399,7 +399,7 @@ public BufferOrEvent getNext() throws IOException {
AbstractEvent evt = EventSerializer.fromSerializedEvent(buffer, getClass().getClassLoader());
buffer.limit(oldLimit);

return new BufferOrEvent(evt, channel);
return new BufferOrEvent(evt, channel, true, length);
}
}

Expand Down

0 comments on commit 309f321

Please sign in to comment.