Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloud Dashboard 1 #10628

Merged
merged 8 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,17 @@ public ConfigRepository(final ConfigPersistence persistence,
this.database = new ExceptionWrappingDatabase(database);
}

public ExceptionWrappingDatabase getDatabase() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

required to run a query against the database.

Copy link
Contributor

@cgardens cgardens Mar 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davinchia this is breaking the whole point of the ConfigRepository abstraction. It also sets an example for everyone else who joins the team that the db can now be accessed from anywhere in code which is a pattern we have worked really hard to avoid.

Is there a reason you can't write your query inside here instead like we do with other queries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mental context at time of writing:

  • I was under the impression we want to move away from this class to DatabaseConfigPersistence.java. Because of this, adding more code here didn't seem like a good idea.
  • The queries were complicated multi-join queries and not the typical CRUD queries present in the ConfigRepository class. It didn't feel it was the right place to put these here.
  • Chris mentioned it's useful to collect all the metric queries in one spot so he can re-use them for analytics if needed. A metric specific spot like this class seemed more sane than adding to the ConfigRepository class which is bloated and quite tough to read at this point. (I think we should starting composing down to table-specific classes.)
  • I feel that the accessing the db directly is 'okay' in special cases. In this case, the metric queries are quite specialised and unlikely to be used by another application. We don't want to use the normal CRUD operations here since it'll result in tens of queries vs 1.

I'm happy to move the queries here if you have feel otherwise.

return database;
}

public StandardWorkspace getStandardWorkspace(final UUID workspaceId, final boolean includeTombstone)
throws JsonValidationException, IOException, ConfigNotFoundException {
final StandardWorkspace workspace = persistence.getConfig(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString(), StandardWorkspace.class);

if (!MoreBooleans.isTruthy(workspace.getTombstone()) || includeTombstone) {
return workspace;
}

throw new ConfigNotFoundException(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString());
}

Expand Down
5 changes: 5 additions & 0 deletions airbyte-metrics/lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ plugins {
dependencies {
implementation project(':airbyte-commons')
implementation project(':airbyte-config:models')
implementation project(':airbyte-db:jooq')
implementation project(':airbyte-db:lib')

implementation 'com.datadoghq:java-dogstatsd-client:4.0.0'

testImplementation project(':airbyte-config:persistence')
testImplementation 'org.testcontainers:postgresql:1.15.3'
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ public synchronized static void flush() {
* @param amt to adjust.
* @param tags
*/
public static void count(final AirbyteMetricsRegistry metric, final double amt, final String... tags) {
public static void count(final MetricsRegistry metric, final double amt, final String... tags) {
if (instancePublish) {
if (statsDClient == null) {
// do not loudly fail to prevent application disruption
log.warn("singleton not initialized, count {} not emitted", metric.metricName);
return;
}

log.info("publishing count, name: {}, value: {}", metric.metricName, amt);
log.info("publishing count, name: {}, value: {}, tags: {}", metric.metricName, amt, tags);
statsDClient.count(metric.metricName, amt, tags);
}
}
Expand All @@ -82,15 +82,15 @@ public static void count(final AirbyteMetricsRegistry metric, final double amt,
* @param val to record.
* @param tags
*/
public static void gauge(final AirbyteMetricsRegistry metric, final double val, final String... tags) {
public static void gauge(final MetricsRegistry metric, final double val, final String... tags) {
if (instancePublish) {
if (statsDClient == null) {
// do not loudly fail to prevent application disruption
log.warn("singleton not initialized, gauge {} not emitted", metric.metricName);
return;
}

log.info("publishing gauge, name: {}, value: {}", metric, val);
log.info("publishing gauge, name: {}, value: {}, tags: {}", metric, val, tags);
statsDClient.gauge(metric.metricName, val, tags);
}
}
Expand All @@ -109,15 +109,15 @@ public static void gauge(final AirbyteMetricsRegistry metric, final double val,
* @param val of time to record.
* @param tags
*/
public static void recordTimeLocal(final AirbyteMetricsRegistry metric, final double val, final String... tags) {
public static void recordTimeLocal(final MetricsRegistry metric, final double val, final String... tags) {
if (instancePublish) {
if (statsDClient == null) {
// do not loudly fail to prevent application disruption
log.warn("singleton not initialized, histogram {} not emitted", metric.metricName);
return;
}

log.info("recording histogram, name: {}, value: {}", metric.metricName, val);
log.info("recording histogram, name: {}, value: {}, tags: {}", metric.metricName, val, tags);
statsDClient.histogram(metric.metricName, val, tags);
}
}
Expand All @@ -130,28 +130,28 @@ public static void recordTimeLocal(final AirbyteMetricsRegistry metric, final do
* @param val of time to record.
* @param tags
*/
public static void recordTimeGlobal(final AirbyteMetricsRegistry metric, final double val, final String... tags) {
public static void recordTimeGlobal(final MetricsRegistry metric, final double val, final String... tags) {
if (instancePublish) {
if (statsDClient == null) {
// do not loudly fail to prevent application disruption
log.warn("singleton not initialized, distribution {} not emitted", metric.metricName);
return;
}

log.info("recording distribution, name: {}, value: {}", metric.metricName, val);
log.info("recording distribution, name: {}, value: {}, tags: {}", metric.metricName, val, tags);
statsDClient.distribution(metric.metricName, val, tags);
}
}

/**
* Wrapper of {@link #recordTimeGlobal(AirbyteMetricsRegistry, double, String...)} with a runnable
* for convenience.
* Wrapper of {@link #recordTimeGlobal(MetricsRegistry, double, String...)} with a runnable for
* convenience.
*
* @param metric
* @param runnable to time
* @param tags
*/
public static void recordTimeGlobal(final AirbyteMetricsRegistry metric, final Runnable runnable, final String... tags) {
public static void recordTimeGlobal(final MetricsRegistry metric, final Runnable runnable, final String... tags) {
final long start = System.currentTimeMillis();
runnable.run();
final long end = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
/**
* Interface representing an Airbyte Application to collect metrics for. This interface is present
* as Java doesn't support enum inheritance as of Java 17. We use a shared interface so this
* interface can be used in the {@link AirbyteMetricsRegistry} enum.
* interface can be used in the {@link MetricsRegistry} enum.
*/
public interface MetricEmittingApp {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

/**
* Enum containing all applications metrics are emitted for. Used to initialize
* {@link DogStatsDMetricSingleton#initialize(MetricEmittingApp, boolean)}.
* {@link DogStatsDMetricSingleton#initialize(MetricEmittingApp, DatadogClientConfiguration)}.
*
* Application Name Conventions:
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.metrics.lib;

import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION;

import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage;
import java.util.List;
import java.util.UUID;
import org.jooq.DSLContext;

/**
* Keep track of all metric queries.
*/
public class MetricQueries {

public static List<ReleaseStage> jobIdToReleaseStages(final DSLContext ctx, final long jobId) {
final var srcRelStageCol = "src_release_stage";
final var dstRelStageCol = "dst_release_stage";

final var query = String.format("""
SELECT src_def_data.release_stage AS %s,
dest_def_data.release_stage AS %s
FROM connection
INNER JOIN jobs ON connection.id=CAST(jobs.scope AS uuid)
INNER JOIN actor AS dest_data ON connection.destination_id = dest_data.id
INNER JOIN actor_definition AS dest_def_data ON dest_data.actor_definition_id = dest_def_data.id
INNER JOIN actor AS src_data ON connection.source_id = src_data.id
INNER JOIN actor_definition AS src_def_data ON src_data.actor_definition_id = src_def_data.id
WHERE jobs.id = '%d';""", srcRelStageCol, dstRelStageCol, jobId);
Comment on lines +25 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be nice to isolate the "complex" SQL queries in separate resources .sql files down the line


final var res = ctx.fetch(query);
final var stages = res.getValues(srcRelStageCol, ReleaseStage.class);
stages.addAll(res.getValues(dstRelStageCol, ReleaseStage.class));
return stages;
}

public static List<ReleaseStage> srcIdAndDestIdToReleaseStages(final DSLContext ctx, final UUID srcId, final UUID dstId) {
return ctx.select(ACTOR_DEFINITION.RELEASE_STAGE).from(ACTOR).join(ACTOR_DEFINITION).on(ACTOR.ACTOR_DEFINITION_ID.eq(ACTOR_DEFINITION.ID))
.where(ACTOR.ID.eq(srcId))
.or(ACTOR.ID.eq(dstId)).fetch().getValues(ACTOR_DEFINITION.RELEASE_STAGE);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.metrics.lib;

import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage;

/**
* Keep track of all metric tags.
*/
public class MetricTags {

private static final String RELEASE_STAGE = "release_stage:";

public static String getReleaseStage(final ReleaseStage stage) {
return RELEASE_STAGE + ":" + stage.getLiteral();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,24 @@
* - Add units at name end if applicable. This is especially relevant for time units. versioning
* tactic and present at the end of the metric.
*/
public enum AirbyteMetricsRegistry {
public enum MetricsRegistry {

JOB_CANCELLED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_cancelled_by_release_stage",
"increments when a job is cancelled. jobs are double counted as this is tagged by release stage."),
JOB_CREATED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_created_by_release_stage",
"increments when a new job is created. jobs are double counted as this is tagged by release stage."),
JOB_FAILED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_failed_by_release_stage",
"increments when a job fails. jobs are double counted as this is tagged by release stage."),
JOB_SUCCEEDED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_succeeded_by_release_stage",
"increments when a job succeeds. jobs are double counted as this is tagged by release stage."),
KUBE_POD_PROCESS_CREATE_TIME_MILLISECS(
MetricEmittingApps.WORKER,
"kube_pod_process_create_time_millisecs",
Expand All @@ -39,7 +55,7 @@ public enum AirbyteMetricsRegistry {
public final String metricName;
public final String metricDescription;

AirbyteMetricsRegistry(final MetricEmittingApp application, final String metricName, final String metricDescription) {
MetricsRegistry(final MetricEmittingApp application, final String metricName, final String metricDescription) {
Preconditions.checkNotNull(metricDescription);
Preconditions.checkNotNull(application);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.metrics.llib;
package io.airbyte.metrics.lib;

import io.airbyte.metrics.lib.AirbyteMetricsRegistry;
import io.airbyte.metrics.lib.DatadogClientConfiguration;
import io.airbyte.metrics.lib.DogStatsDMetricSingleton;
import io.airbyte.metrics.lib.MetricEmittingApps;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
Expand All @@ -25,7 +21,7 @@ void tearDown() {
public void testPublishTrueNoEmitError() {
Assertions.assertDoesNotThrow(() -> {
DogStatsDMetricSingleton.initialize(MetricEmittingApps.WORKER, new DatadogClientConfiguration("localhost", "1000", false));
DogStatsDMetricSingleton.gauge(AirbyteMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
});
}

Expand All @@ -34,15 +30,15 @@ public void testPublishTrueNoEmitError() {
public void testPublishFalseNoEmitError() {
Assertions.assertDoesNotThrow(() -> {
DogStatsDMetricSingleton.initialize(MetricEmittingApps.WORKER, new DatadogClientConfiguration("localhost", "1000", true));
DogStatsDMetricSingleton.gauge(AirbyteMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
});
}

@Test
@DisplayName("there should be no exception if we attempt to emit metrics without initializing")
public void testNoInitializeNoEmitError() {
Assertions.assertDoesNotThrow(() -> {
DogStatsDMetricSingleton.gauge(AirbyteMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
});
}

Expand Down