Skip to content

Commit

Permalink
[FLINK-1502] [core] Various improvements to the metrics infrastructur…
Browse files Browse the repository at this point in the history
…e code

  - Metric groups are generally thread-safe
  - Metric groups are closable. Closed groups do not register metric objects and more.
  - TaskManager's JobMetricsGroup auto disposes when all TaskMetricGroups are closed
  - Maven project with metric reporters renamed to 'flink-metric-reporters'
  - Various code style cleanups
  • Loading branch information
StephanEwen committed May 22, 2016
1 parent 003ce18 commit 707606a
Show file tree
Hide file tree
Showing 45 changed files with 1,051 additions and 344 deletions.
Expand Up @@ -59,15 +59,11 @@
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry; import org.apache.flink.metrics.groups.NonRegisteringMetricsGroup;
import org.apache.flink.metrics.groups.JobMetricGroup;
import org.apache.flink.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.types.Value; import org.apache.flink.types.Value;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Visitor; import org.apache.flink.util.Visitor;


/** /**
Expand All @@ -93,8 +89,6 @@ public class CollectionExecutor {
private final ExecutionConfig executionConfig; private final ExecutionConfig executionConfig;


private int iterationSuperstep; private int iterationSuperstep;

private JobMetricGroup jobMetricGroup;


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------


Expand All @@ -120,9 +114,7 @@ public JobExecutionResult execute(Plan program) throws Exception {
if (jobID == null) { if (jobID == null) {
jobID = new JobID(); jobID = new JobID();
} }
this.jobMetricGroup =
new TaskManagerMetricGroup(new MetricRegistry(new Configuration()), "localhost", new AbstractID().toString())
.addJob(jobID, program.getJobName());
initCache(program.getCachedFiles()); initCache(program.getCachedFiles());
Collection<? extends GenericDataSinkBase<?>> sinks = program.getDataSinks(); Collection<? extends GenericDataSinkBase<?>> sinks = program.getDataSinks();
for (Operator<?> sink : sinks) { for (Operator<?> sink : sinks) {
Expand Down Expand Up @@ -202,7 +194,7 @@ private <IN> void executeDataSink(GenericDataSinkBase<?> sink, int superStep) th
TaskInfo taskInfo = new TaskInfo(typedSink.getName(), 0, 1, 0); TaskInfo taskInfo = new TaskInfo(typedSink.getName(), 0, 1, 0);
RuntimeUDFContext ctx; RuntimeUDFContext ctx;


MetricGroup metrics = this.jobMetricGroup.addTask(new AbstractID(), new AbstractID(), 0, typedSink.getName()); MetricGroup metrics = NonRegisteringMetricsGroup.get();


if (RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass())) { if (RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) : ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
Expand All @@ -223,7 +215,7 @@ private <OUT> List<OUT> executeDataSource(GenericDataSourceBase<?, ?> source, in


RuntimeUDFContext ctx; RuntimeUDFContext ctx;


MetricGroup metrics = this.jobMetricGroup.addTask(new AbstractID(), new AbstractID(), 0, source.getName()); MetricGroup metrics = NonRegisteringMetricsGroup.get();
if (RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass())) { if (RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) : ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics); new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);
Expand All @@ -249,7 +241,7 @@ private <IN, OUT> List<OUT> executeUnaryOperator(SingleInputOperator<?, ?, ?> op
TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0); TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0);
RuntimeUDFContext ctx; RuntimeUDFContext ctx;


MetricGroup metrics = this.jobMetricGroup.addTask(new AbstractID(), new AbstractID(), 0, typedOp.getName()); MetricGroup metrics = NonRegisteringMetricsGroup.get();
if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) : ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics); new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);
Expand Down Expand Up @@ -291,7 +283,8 @@ private <IN1, IN2, OUT> List<OUT> executeBinaryOperator(DualInputOperator<?, ?,
TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0); TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0);
RuntimeUDFContext ctx; RuntimeUDFContext ctx;


MetricGroup metrics = this.jobMetricGroup.addTask(new AbstractID(), new AbstractID(), 0, typedOp.getName()); MetricGroup metrics = NonRegisteringMetricsGroup.get();

if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) : ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics); new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);
Expand Down
Expand Up @@ -20,12 +20,13 @@
import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.PublicEvolving;


/** /**
* A Counter is a {@link org.apache.flink.metrics.Metric} that measures a count. * A Counter is a {@link Metric} that measures a count.
*/ */
@PublicEvolving @PublicEvolving
public final class Counter implements Metric { public final class Counter implements Metric {
private long count = 0;

private long count;

/** /**
* Increment the current count by 1. * Increment the current count by 1.
*/ */
Expand Down
3 changes: 2 additions & 1 deletion flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
Expand Up @@ -20,10 +20,11 @@
import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.PublicEvolving;


/** /**
* A Gauge is a {@link org.apache.flink.metrics.Metric} that calculates a specific value at a point in time. * A Gauge is a {@link Metric} that calculates a specific value at a point in time.
*/ */
@PublicEvolving @PublicEvolving
public abstract class Gauge<T> implements Metric { public abstract class Gauge<T> implements Metric {

/** /**
* Calculates and returns the measured value. * Calculates and returns the measured value.
* *
Expand Down
50 changes: 34 additions & 16 deletions flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
Expand Up @@ -15,31 +15,49 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */

package org.apache.flink.metrics; package org.apache.flink.metrics;


import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.PublicEvolving;


/** /**
* A MetricGroup is a named container for {@link org.apache.flink.metrics.Metric}s and * A MetricGroup is a named container for {@link Metric Metrics} and {@link MetricGroup MetricGroups}.
* {@link org.apache.flink.metrics.MetricGroup}s. *
* <p> * <p>Instances of this class can be used to register new metrics with Flink and to create a nested
* Instances of this class can be used to register new metrics with Flink and to create a nested hierarchy based on the * hierarchy based on the group names.
* group names. *
* <p> * <p>A MetricGroup is uniquely identified by it's place in the hierarchy and name.
* A MetricGroup is uniquely identified by it's place in the hierarchy and name. *
* <p>Metrics groups can be {@link #close() closed}. Upon closing, they de-register all metrics
* from any metrics reporter and any internal maps. Note that even closed metrics groups
* return Counters, Gauges, etc to the code, to prevent exceptions in the monitored code.
* These metrics simply do not get reported any more, when created on a closed group.
*/ */
@PublicEvolving @PublicEvolving
public interface MetricGroup { public interface MetricGroup {


// ------------------------------------------------------------------------
// Closing
// ------------------------------------------------------------------------

/** /**
* Recursively unregisters all {@link org.apache.flink.metrics.Metric}s contained in this * Marks the group as closed.
* {@link org.apache.flink.metrics.MetricGroup} * Recursively unregisters all {@link Metric Metrics} contained in this group.
*
* <p>Any metrics created after the call to this function will not be registered in
* the {@link MetricRegistry} and not be reported to any reporter (like JMX).
*/ */
void close(); void close();


// ----------------------------------------------------------------------------------------------------------------- /**
// Metrics * Checks whether this MetricGroup has been closed.
// ----------------------------------------------------------------------------------------------------------------- * @return True if the group has been closed, false is the group is still open.
*/
boolean isClosed();

// ------------------------------------------------------------------------
// Metrics
// ------------------------------------------------------------------------


/** /**
* Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink. * Creates and registers a new {@link org.apache.flink.metrics.Counter} with Flink.
Expand Down Expand Up @@ -77,20 +95,20 @@ public interface MetricGroup {
*/ */
<T> Gauge<T> gauge(String name, Gauge<T> gauge); <T> Gauge<T> gauge(String name, Gauge<T> gauge);


// ----------------------------------------------------------------------------------------------------------------- // ------------------------------------------------------------------------
// Groups // Groups
// ----------------------------------------------------------------------------------------------------------------- // ------------------------------------------------------------------------


/** /**
* Creates a new {@link org.apache.flink.metrics.MetricGroup} and adds it to this groups sub-groups. * Creates a new MetricGroup and adds it to this groups sub-groups.
* *
* @param name name of the group * @param name name of the group
* @return the created group * @return the created group
*/ */
MetricGroup addGroup(int name); MetricGroup addGroup(int name);


/** /**
* Creates a new {@link org.apache.flink.metrics.MetricGroup} and adds it to this groups sub-groups. * Creates a new MetricGroup and adds it to this groups sub-groups.
* *
* @param name name of the group * @param name name of the group
* @return the created group * @return the created group
Expand Down

0 comments on commit 707606a

Please sign in to comment.