Skip to content

Commit

Permalink
[FLINK-1502] [core] Add basic metric system
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol authored and StephanEwen committed May 22, 2016
1 parent 8ed3685 commit 003ce18
Show file tree
Hide file tree
Showing 118 changed files with 3,936 additions and 96 deletions.
7 changes: 7 additions & 0 deletions flink-contrib/flink-statebackend-rocksdb/pom.xml
Expand Up @@ -52,6 +52,13 @@ under the License.
<artifactId>rocksdbjni</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.10</artifactId>
Expand Down
8 changes: 8 additions & 0 deletions flink-contrib/flink-storm/pom.xml
Expand Up @@ -48,6 +48,14 @@ under the License.
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
Expand Down
Expand Up @@ -32,7 +32,10 @@
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.metrics.util.DummyMetricGroup;
import org.apache.flink.metrics.util.DummyTaskMetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.taskmanager.RuntimeEnvironment;
import org.apache.flink.storm.util.AbstractTest;
import org.apache.flink.storm.util.SplitStreamType;
import org.apache.flink.storm.util.StormConfig;
Expand Down Expand Up @@ -141,6 +144,7 @@ private void testWrapper(final int numberOfAttributes) throws Exception {
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
when(taskContext.getTaskName()).thenReturn("name");
when(taskContext.getMetricGroup()).thenReturn(new DummyMetricGroup());

final IRichBolt bolt = mock(IRichBolt.class);

Expand Down Expand Up @@ -225,6 +229,7 @@ public void testOpen() throws Exception {
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
when(taskContext.getTaskName()).thenReturn("name");
when(taskContext.getMetricGroup()).thenReturn(new DummyMetricGroup());

final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
declarer.declare(new Fields("dummy"));
Expand Down Expand Up @@ -289,6 +294,7 @@ public void testOpenSink() throws Exception {
final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
when(taskContext.getTaskName()).thenReturn("name");
when(taskContext.getMetricGroup()).thenReturn(new DummyMetricGroup());

final IRichBolt bolt = mock(IRichBolt.class);
BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
Expand Down Expand Up @@ -361,6 +367,7 @@ public Map<String, Object> getComponentConfiguration() {
Environment env = mock(Environment.class);
when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 0, 1, 0));
when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader());
when(env.getMetricGroup()).thenReturn(new DummyTaskMetricGroup());

StreamTask<?, ?> mockTask = mock(StreamTask.class);
when(mockTask.getCheckpointLock()).thenReturn(new Object());
Expand Down
6 changes: 6 additions & 0 deletions flink-core/pom.xml
Expand Up @@ -47,6 +47,12 @@ under the License.
<!-- managed version -->
</dependency>

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${metrics.version}</version>
</dependency>

<!-- Avro is needed for the interoperability with Avro types for serialization -->
<dependency>
<groupId>org.apache.avro</groupId>
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.metrics.MetricGroup;

/**
* A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
Expand All @@ -58,6 +59,13 @@ public interface RuntimeContext {
*/
String getTaskName();

/**
* Returns the metric group for this parallel subtask.
*
* @return The metric group for this parallel subtask.
*/
MetricGroup getMetricGroup();

/**
* Gets the parallelism with which the parallel task runs.
*
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -61,17 +62,21 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
private final Map<String, Accumulator<?, ?>> accumulators;

private final DistributedCache distributedCache;

private final MetricGroup metrics;

public AbstractRuntimeUDFContext(TaskInfo taskInfo,
ClassLoader userCodeClassLoader,
ExecutionConfig executionConfig,
Map<String, Accumulator<?,?>> accumulators,
Map<String, Future<Path>> cpTasks) {
Map<String, Future<Path>> cpTasks,
MetricGroup metrics) {
this.taskInfo = checkNotNull(taskInfo);
this.userCodeClassLoader = userCodeClassLoader;
this.executionConfig = executionConfig;
this.distributedCache = new DistributedCache(checkNotNull(cpTasks));
this.accumulators = checkNotNull(accumulators);
this.metrics = metrics;
}

@Override
Expand All @@ -93,6 +98,11 @@ public int getNumberOfParallelSubtasks() {
public int getIndexOfThisSubtask() {
return taskInfo.getIndexOfThisSubtask();
}

@Override
public MetricGroup getMetricGroup() {
return metrics;
}

@Override
public int getAttemptNumber() {
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;

/**
* A standalone implementation of the {@link RuntimeContext}, created by runtime UDF operators.
Expand All @@ -42,8 +43,9 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
private final HashMap<String, List<?>> uninitializedBroadcastVars = new HashMap<String, List<?>>();

public RuntimeUDFContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) {
super(taskInfo, userCodeClassLoader, executionConfig, accumulators, cpTasks);
Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulators,
MetricGroup metrics) {
super(taskInfo, userCodeClassLoader, executionConfig, accumulators, cpTasks, metrics);
}

@Override
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
Expand All @@ -58,9 +59,15 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.JobMetricGroup;
import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.types.Value;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Visitor;

/**
Expand All @@ -86,6 +93,8 @@ public class CollectionExecutor {
private final ExecutionConfig executionConfig;

private int iterationSuperstep;

private JobMetricGroup jobMetricGroup;

// --------------------------------------------------------------------------------------------

Expand All @@ -106,6 +115,14 @@ public CollectionExecutor(ExecutionConfig executionConfig) {

public JobExecutionResult execute(Plan program) throws Exception {
long startTime = System.currentTimeMillis();

JobID jobID = program.getJobId();
if (jobID == null) {
jobID = new JobID();
}
this.jobMetricGroup =
new TaskManagerMetricGroup(new MetricRegistry(new Configuration()), "localhost", new AbstractID().toString())
.addJob(jobID, program.getJobName());
initCache(program.getCachedFiles());
Collection<? extends GenericDataSinkBase<?>> sinks = program.getDataSinks();
for (Operator<?> sink : sinks) {
Expand Down Expand Up @@ -184,9 +201,12 @@ private <IN> void executeDataSink(GenericDataSinkBase<?> sink, int superStep) th
// build the runtime context and compute broadcast variables, if necessary
TaskInfo taskInfo = new TaskInfo(typedSink.getName(), 0, 1, 0);
RuntimeUDFContext ctx;

MetricGroup metrics = this.jobMetricGroup.addTask(new AbstractID(), new AbstractID(), 0, typedSink.getName());

if (RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators);
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);
} else {
ctx = null;
}
Expand All @@ -200,10 +220,13 @@ private <OUT> List<OUT> executeDataSource(GenericDataSourceBase<?, ?> source, in
GenericDataSourceBase<OUT, ?> typedSource = (GenericDataSourceBase<OUT, ?>) source;
// build the runtime context and compute broadcast variables, if necessary
TaskInfo taskInfo = new TaskInfo(typedSource.getName(), 0, 1, 0);

RuntimeUDFContext ctx;

MetricGroup metrics = this.jobMetricGroup.addTask(new AbstractID(), new AbstractID(), 0, source.getName());
if (RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators);
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);
} else {
ctx = null;
}
Expand All @@ -225,9 +248,11 @@ private <IN, OUT> List<OUT> executeUnaryOperator(SingleInputOperator<?, ?, ?> op
// build the runtime context and compute broadcast variables, if necessary
TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0);
RuntimeUDFContext ctx;

MetricGroup metrics = this.jobMetricGroup.addTask(new AbstractID(), new AbstractID(), 0, typedOp.getName());
if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators);
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);

for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
List<?> bcData = execute(bcInputs.getValue());
Expand Down Expand Up @@ -265,9 +290,11 @@ private <IN1, IN2, OUT> List<OUT> executeBinaryOperator(DualInputOperator<?, ?,
// build the runtime context and compute broadcast variables, if necessary
TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0);
RuntimeUDFContext ctx;

MetricGroup metrics = this.jobMetricGroup.addTask(new AbstractID(), new AbstractID(), 0, typedOp.getName());
if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators);
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);

for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
List<?> bcData = execute(bcInputs.getValue());
Expand Down Expand Up @@ -523,8 +550,9 @@ else if (op instanceof GenericDataSourceBase) {
private class IterationRuntimeUDFContext extends RuntimeUDFContext implements IterationRuntimeContext {

public IterationRuntimeUDFContext(TaskInfo taskInfo, ClassLoader classloader, ExecutionConfig executionConfig,
Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) {
super(taskInfo, classloader, executionConfig, cpTasks, accumulators);
Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulators,
MetricGroup metrics) {
super(taskInfo, classloader, executionConfig, cpTasks, accumulators, metrics);
}

@Override
Expand Down
69 changes: 69 additions & 0 deletions flink-core/src/main/java/org/apache/flink/metrics/Counter.java
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics;

import org.apache.flink.annotation.PublicEvolving;

/**
* A Counter is a {@link org.apache.flink.metrics.Metric} that measures a count.
*/
@PublicEvolving
public final class Counter implements Metric {
private long count = 0;

/**
* Increment the current count by 1.
*/
public void inc() {
count++;
}

/**
* Increment the current count by the given value.
*
* @param n value to increment the current count by
*/
public void inc(long n) {
count += n;
}

/**
* Decrement the current count by 1.
*/
public void dec() {
count--;
}

/**
* Decrement the current count by the given value.
*
* @param n value to decrement the current count by
*/
public void dec(long n) {
count -= n;
}

/**
* Returns the current count.
*
* @return current count
*/
public long getCount() {
return count;
}
}
33 changes: 33 additions & 0 deletions flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics;

import org.apache.flink.annotation.PublicEvolving;

/**
* A Gauge is a {@link org.apache.flink.metrics.Metric} that calculates a specific value at a point in time.
*/
@PublicEvolving
public abstract class Gauge<T> implements Metric {
/**
* Calculates and returns the measured value.
*
* @return calculated value
*/
public abstract T getValue();
}

0 comments on commit 003ce18

Please sign in to comment.