Skip to content

Commit

Permalink
[FLINK-456] Basic JM Metric Infrastructure
Browse files Browse the repository at this point in the history
This closes #2146
  • Loading branch information
zentol committed Jul 1, 2016
1 parent a11c1c6 commit a3a9fd1
Show file tree
Hide file tree
Showing 21 changed files with 688 additions and 140 deletions.
Expand Up @@ -52,8 +52,10 @@ public class MetricRegistry {
public static final String KEY_METRICS_REPORTER_ARGUMENTS = "metrics.reporter.arguments"; public static final String KEY_METRICS_REPORTER_ARGUMENTS = "metrics.reporter.arguments";
public static final String KEY_METRICS_REPORTER_INTERVAL = "metrics.reporter.interval"; public static final String KEY_METRICS_REPORTER_INTERVAL = "metrics.reporter.interval";


public static final String KEY_METRICS_SCOPE_NAMING_JM = "metrics.scope.jm";
public static final String KEY_METRICS_SCOPE_NAMING_TM = "metrics.scope.tm"; public static final String KEY_METRICS_SCOPE_NAMING_TM = "metrics.scope.tm";
public static final String KEY_METRICS_SCOPE_NAMING_JOB = "metrics.scope.job"; public static final String KEY_METRICS_SCOPE_NAMING_JM_JOB = "metrics.scope.jm.job";
public static final String KEY_METRICS_SCOPE_NAMING_TM_JOB = "metrics.scope.tm.job";
public static final String KEY_METRICS_SCOPE_NAMING_TASK = "metrics.scope.task"; public static final String KEY_METRICS_SCOPE_NAMING_TASK = "metrics.scope.task";
public static final String KEY_METRICS_SCOPE_NAMING_OPERATOR = "metrics.scope.operator"; public static final String KEY_METRICS_SCOPE_NAMING_OPERATOR = "metrics.scope.operator";


Expand Down Expand Up @@ -243,16 +245,20 @@ static Configuration createReporterConfig(Configuration config, TimeUnit timeuni
} }


static ScopeFormats createScopeConfig(Configuration config) { static ScopeFormats createScopeConfig(Configuration config) {
String jmFormat = config.getString(
KEY_METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
String jmJobFormat = config.getString(
KEY_METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
String tmFormat = config.getString( String tmFormat = config.getString(
KEY_METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP); KEY_METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
String jobFormat = config.getString( String tmJobFormat = config.getString(
KEY_METRICS_SCOPE_NAMING_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP); KEY_METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
String taskFormat = config.getString( String taskFormat = config.getString(
KEY_METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP); KEY_METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
String operatorFormat = config.getString( String operatorFormat = config.getString(
KEY_METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP); KEY_METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);


return new ScopeFormats(tmFormat, jobFormat, taskFormat, operatorFormat); return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat);
} }


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics.groups;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat;

import javax.annotation.Nullable;
import java.util.Collections;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to
* a specific job, running on the JobManager.
*/
@Internal
public class JobManagerJobMetricGroup extends JobMetricGroup {

/** The metrics group that contains this group */
private final JobManagerMetricGroup parent;

public JobManagerJobMetricGroup(
MetricRegistry registry,
JobManagerMetricGroup parent,
JobID jobId,
@Nullable String jobName) {

this(registry, checkNotNull(parent), registry.getScopeFormats().getJobManagerJobFormat(), jobId, jobName);
}

public JobManagerJobMetricGroup(
MetricRegistry registry,
JobManagerMetricGroup parent,
JobManagerJobScopeFormat scopeFormat,
JobID jobId,
@Nullable String jobName) {

super(registry, jobId, jobName, scopeFormat.formatScope(parent, jobId, jobName));

this.parent = checkNotNull(parent);
}

public final JobManagerMetricGroup parent() {
return parent;
}

@Override
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return Collections.emptyList();
}
}
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics.groups;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat;

import java.util.HashMap;
import java.util.Map;

/**
* Special {@link org.apache.flink.metrics.MetricGroup} representing a JobManager.
*
* <p>Contains extra logic for adding jobs with tasks, and removing jobs when they do
* not contain tasks any more
*/
@Internal
public class JobManagerMetricGroup extends ComponentMetricGroup {

private final Map<JobID, JobManagerJobMetricGroup> jobs = new HashMap<>();

private final String hostname;

public JobManagerMetricGroup(MetricRegistry registry, String hostname) {
this(registry, registry.getScopeFormats().getJobManagerFormat(), hostname);
}

public JobManagerMetricGroup(
MetricRegistry registry,
JobManagerScopeFormat scopeFormat,
String hostname) {

super(registry, scopeFormat.formatScope(hostname));
this.hostname = hostname;
}

public String hostname() {
return hostname;
}

// ------------------------------------------------------------------------
// job groups
// ------------------------------------------------------------------------

public JobManagerJobMetricGroup addJob(
JobID jobId,
String jobName) {
// get or create a jobs metric group
JobManagerJobMetricGroup currentJobGroup;
synchronized (this) {
if (!isClosed()) {
currentJobGroup = jobs.get(jobId);

if (currentJobGroup == null || currentJobGroup.isClosed()) {
currentJobGroup = new JobManagerJobMetricGroup(registry, this, jobId, jobName);
jobs.put(jobId, currentJobGroup);
}
return currentJobGroup;
} else {
return null;
}
}
}

public void removeJob(JobID jobId) {
if (jobId == null) {
return;
}

synchronized (this) {
JobManagerJobMetricGroup containedGroup = jobs.remove(jobId);
if (containedGroup != null) {
containedGroup.close();
}
}
}

public int numRegisteredJobMetricGroups() {
return jobs.size();
}

@Override
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return jobs.values();
}
}

Expand Up @@ -21,66 +21,36 @@
import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.MetricRegistry; import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat;
import org.apache.flink.util.AbstractID;


import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkNotNull;


/** /**
* Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to * Special abstract {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to
* a specific job, running on the TaskManager. * a specific job.
*
* <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}).
*/ */
@Internal @Internal
public class JobMetricGroup extends ComponentMetricGroup { public abstract class JobMetricGroup extends ComponentMetricGroup {

/** The metrics group that contains this group */
private final TaskManagerMetricGroup parent;

/** Map from execution attempt ID (task identifier) to task metrics */
private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>();


/** The ID of the job represented by this metrics group */ /** The ID of the job represented by this metrics group */
private final JobID jobId; protected final JobID jobId;


/** The name of the job represented by this metrics group */ /** The name of the job represented by this metrics group */
@Nullable @Nullable
private final String jobName; protected final String jobName;


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


public JobMetricGroup( protected JobMetricGroup(
MetricRegistry registry, MetricRegistry registry,
TaskManagerMetricGroup parent,
JobID jobId, JobID jobId,
@Nullable String jobName) { @Nullable String jobName,
String[] scope) {
super(registry, scope);


this(registry, checkNotNull(parent), registry.getScopeFormats().getJobFormat(), jobId, jobName); this.jobId = jobId;
}

public JobMetricGroup(
MetricRegistry registry,
TaskManagerMetricGroup parent,
TaskManagerJobScopeFormat scopeFormat,
JobID jobId,
@Nullable String jobName) {

super(registry, scopeFormat.formatScope(parent, jobId, jobName));

this.parent = checkNotNull(parent);
this.jobId = checkNotNull(jobId);
this.jobName = jobName; this.jobName = jobName;
} }


public final TaskManagerMetricGroup parent() {
return parent;
}

public JobID jobId() { public JobID jobId() {
return jobId; return jobId;
} }
Expand All @@ -89,53 +59,4 @@ public JobID jobId() {
public String jobName() { public String jobName() {
return jobName; return jobName;
} }

// ------------------------------------------------------------------------
// adding / removing tasks
// ------------------------------------------------------------------------

public TaskMetricGroup addTask(
AbstractID vertexId,
AbstractID executionId,
String taskName,
int subtaskIndex,
int attemptNumber) {

checkNotNull(executionId);

synchronized (this) {
if (!isClosed()) {
TaskMetricGroup task = new TaskMetricGroup(registry, this,
vertexId, executionId, taskName, subtaskIndex, attemptNumber);
tasks.put(executionId, task);
return task;
} else {
return null;
}
}
}

public void removeTaskMetricGroup(AbstractID executionId) {
checkNotNull(executionId);

boolean removeFromParent = false;
synchronized (this) {
if (!isClosed() && tasks.remove(executionId) != null && tasks.isEmpty()) {
// this call removed the last task. close this group.
removeFromParent = true;
close();
}
}

// IMPORTANT: removing from the parent must happen while holding the this group's lock,
// because it would violate the "first parent then subgroup" lock acquisition order
if (removeFromParent) {
parent.removeJobMetricsGroup(jobId, this);
}
}

@Override
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return tasks.values();
}
} }

0 comments on commit a3a9fd1

Please sign in to comment.