Skip to content

Commit

Permalink
Reporter App Monitoring. (#11074)
Browse files Browse the repository at this point in the history
Add a metric to monitor the monitoring app's rate of publishing metrics. Though this isn't perfect, it gives us some insight into whether metric publishing is okay or running into issues.
  • Loading branch information
davinchia committed Mar 13, 2022
1 parent 9cb3d7e commit af6c64d
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,6 @@ public static void toRuntime(final Procedure voidCallable) {
castCheckedToRuntime(voidCallable, RuntimeException::new);
}

/**
* Return a Runnable that logs anonymous function exceptions.
*
* @param voidCallable
* @return
*/
public static Runnable toSwallowExceptionRunnable(final Procedure voidCallable) {
return () -> {
try {
voidCallable.call();
} catch (Exception e) {
log.error("Exception: ", e);
}
};
}

public static void toIllegalState(final Procedure voidCallable) {
castCheckedToRuntime(voidCallable, IllegalStateException::new);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public enum MetricsRegistry {
MetricEmittingApps.WORKER,
"attempt_succeeded_by_release_stage",
"increments when an attempts succeeds. attempts are double counted as this is tagged by release stage."),
EST_NUM_METRICS_EMITTED_BY_REPORTER(
MetricEmittingApps.METRICS_REPORTER,
"est_num_metrics_emitted_by_reporter",
"estimated metrics emitted by the reporter in the last interval. this is estimated since the count is not precise."),
JOB_CANCELLED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_cancelled_by_release_stage",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class ReporterApp {

public static Database configDatabase;

public static void main(final String[] args) throws IOException, InterruptedException {
public static void main(final String[] args) throws IOException {
final Configs configs = new EnvConfigs();

DogStatsDMetricSingleton.initialize(MetricEmittingApps.METRICS_REPORTER, new DatadogClientConfiguration(configs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,47 @@

package io.airbyte.metrics.reporter;

import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.lang.Exceptions.Procedure;
import io.airbyte.db.instance.jobs.jooq.enums.JobStatus;
import io.airbyte.metrics.lib.DogStatsDMetricSingleton;
import io.airbyte.metrics.lib.MetricQueries;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.MetricsRegistry;
import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;

/**
* This class contains all metrics emitted by the {@link ReporterApp}.
*/
@Slf4j
@AllArgsConstructor
public enum ToEmit {

NUM_PENDING_JOBS(Exceptions.toSwallowExceptionRunnable(() -> {
NUM_PENDING_JOBS(countMetricEmission(() -> {
final var pendingJobs = ReporterApp.configDatabase.query(MetricQueries::numberOfPendingJobs);
DogStatsDMetricSingleton.gauge(MetricsRegistry.NUM_PENDING_JOBS, pendingJobs);
})),
NUM_RUNNING_JOBS(Exceptions.toSwallowExceptionRunnable(() -> {
NUM_RUNNING_JOBS(countMetricEmission(() -> {
final var runningJobs = ReporterApp.configDatabase.query(MetricQueries::numberOfRunningJobs);
DogStatsDMetricSingleton.gauge(MetricsRegistry.NUM_RUNNING_JOBS, runningJobs);
})),
OLDEST_RUNNING_JOB_AGE_SECS(Exceptions.toSwallowExceptionRunnable(() -> {
OLDEST_RUNNING_JOB_AGE_SECS(countMetricEmission(() -> {
final var age = ReporterApp.configDatabase.query(MetricQueries::oldestRunningJobAgeSecs);
DogStatsDMetricSingleton.gauge(MetricsRegistry.OLDEST_RUNNING_JOB_AGE_SECS, age);
})),
OLDEST_PENDING_JOB_AGE_SECS(Exceptions.toSwallowExceptionRunnable(() -> {
OLDEST_PENDING_JOB_AGE_SECS(countMetricEmission(() -> {
final var age = ReporterApp.configDatabase.query(MetricQueries::oldestPendingJobAgeSecs);
DogStatsDMetricSingleton.gauge(MetricsRegistry.OLDEST_PENDING_JOB_AGE_SECS, age);
})),
NUM_ACTIVE_CONN_PER_WORKSPACE(Exceptions.toSwallowExceptionRunnable(() -> {
NUM_ACTIVE_CONN_PER_WORKSPACE(countMetricEmission(() -> {
final var age = ReporterApp.configDatabase.query(MetricQueries::numberOfActiveConnPerWorkspace);
for (long count : age) {
DogStatsDMetricSingleton.percentile(MetricsRegistry.NUM_ACTIVE_CONN_PER_WORKSPACE, count);
}
})),
OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(Exceptions.toSwallowExceptionRunnable(() -> {
OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(countMetricEmission(() -> {
final var times = ReporterApp.configDatabase.query(MetricQueries::overallJobRuntimeForTerminalJobsInLastHour);
for (Pair<JobStatus, Double> pair : times) {
DogStatsDMetricSingleton.recordTimeGlobal(
Expand All @@ -59,4 +61,22 @@ public enum ToEmit {
this(toEmit, 15, TimeUnit.SECONDS);
}

/**
* Wrapper callable to handle 1) query exception logging and 2) counting metric emissions so
* reporter app can be monitored too.
*
* @param metricQuery
* @return
*/
private static Runnable countMetricEmission(Procedure metricQuery) {
return () -> {
try {
metricQuery.call();
DogStatsDMetricSingleton.count(MetricsRegistry.EST_NUM_METRICS_EMITTED_BY_REPORTER, 1);
} catch (Exception e) {
log.error("Exception querying database for metric: ", e);
}
};
}

}

0 comments on commit af6c64d

Please sign in to comment.