Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metric and attributes #19943

Merged
merged 2 commits into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static void addTagsToTrace(final Map<String, Object> tags, final String t
public static void addTagsToTrace(final Span span, final Map<String, Object> tags, final String tagPrefix) {
if (span != null) {
tags.entrySet().forEach(entry -> {
span.setTag(String.format(TAG_FORMAT, tagPrefix, entry.getKey()), entry.getValue().toString());
span.setTag(formatTag(entry.getKey(), tagPrefix), entry.getValue().toString());
});
}
}
Expand All @@ -83,4 +83,27 @@ public static void addExceptionToTrace(final Span span, final Throwable t) {
}
}

/**
* Formats the tag key using {@link #TAG_FORMAT} provided by this utility, using the default tag
* prefix {@link #TAG_PREFIX}.
*
* @param tagKey The tag key to format.
* @return The formatted tag key.
*/
public static String formatTag(final String tagKey) {
return formatTag(tagKey, TAG_PREFIX);
}

/**
* Formats the tag key using {@link #TAG_FORMAT} provided by this utility with the provided tag
* prefix.
*
* @param tagKey The tag key to format.
* @param tagPrefix The prefix to be added to each custom tag name.
* @return The formatted tag key.
*/
public static String formatTag(final String tagKey, final String tagPrefix) {
return String.format(TAG_FORMAT, tagPrefix, tagKey);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ public enum OssMetricsRegistry implements MetricsRegistry {
"number of bytes synced during replication"),
REPLICATION_RECORDS_SYNCED(MetricEmittingApps.WORKER,
"replication_records_synced",
"number of records synced during replication");
"number of records synced during replication"),
RESET_REQUEST(MetricEmittingApps.WORKER,
"reset_request",
"number of requested resets");

private final MetricEmittingApp application;
private final String metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static io.airbyte.metrics.lib.ApmTraceUtils.TAG_FORMAT;
import static io.airbyte.metrics.lib.ApmTraceUtils.TAG_PREFIX;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -29,6 +30,7 @@ class ApmTraceUtilsTest {
private static final String TAG_2 = "tag2";
private static final String VALUE_1 = "foo";
private static final String VALUE_2 = "bar";
private static final String PREFIX = "prefix";
private static final Map<String, Object> TAGS = Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2);

@Before
Expand All @@ -54,15 +56,15 @@ void testAddingTagsWithPrefix() {
final Tracer tracer = mock(Tracer.class);
when(tracer.activeSpan()).thenReturn(span);
GlobalTracerTestUtil.setGlobalTracerUnconditionally(tracer);
final String tagPrefix = "prefix";
final String tagPrefix = PREFIX;
ApmTraceUtils.addTagsToTrace(TAGS, tagPrefix);
verify(span, times(1)).setTag(String.format(TAG_FORMAT, tagPrefix, TAG_1), VALUE_1);
verify(span, times(1)).setTag(String.format(TAG_FORMAT, tagPrefix, TAG_2), VALUE_2);
}

@Test
void testAddingTagsToSpanWithPrefix() {
final String tagPrefix = "prefix";
final String tagPrefix = PREFIX;
final Span span = mock(Span.class);
ApmTraceUtils.addTagsToTrace(span, TAGS, tagPrefix);
verify(span, times(1)).setTag(String.format(TAG_FORMAT, tagPrefix, TAG_1), VALUE_1);
Expand All @@ -75,4 +77,16 @@ void testAddingTagsToNullSpanWithPrefix() {
Assertions.assertDoesNotThrow(() -> ApmTraceUtils.addTagsToTrace(null, TAGS, tagPrefix));
}

@Test
void testFormattingTagKeys() {
final String tagKey1 = "tagKey1";
final String tagPrefix1 = PREFIX;

final String result1 = ApmTraceUtils.formatTag(tagKey1);
assertEquals("airbyte.metadata." + tagKey1, result1);

final String result2 = ApmTraceUtils.formatTag(tagKey1, tagPrefix1);
assertEquals("airbyte." + tagPrefix1 + "." + tagKey1, result2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricEmittingApps;
Expand Down Expand Up @@ -75,6 +76,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -142,9 +144,14 @@ public StandardSyncOutput replicate(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig destinationLauncherConfig,
final StandardSyncInput syncInput,
@Nullable final String taskQueue) {
final Map<String, Object> traceAttributes =
Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY,
destinationLauncherConfig.getDockerImage(), SOURCE_DOCKER_IMAGE_KEY, sourceLauncherConfig.getDockerImage());
ApmTraceUtils
.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY,
destinationLauncherConfig.getDockerImage(), SOURCE_DOCKER_IMAGE_KEY, sourceLauncherConfig.getDockerImage()));
.addTagsToTrace(traceAttributes);
if (isResetJob(sourceLauncherConfig.getDockerImage())) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.RESET_REQUEST, 1);
}
final ActivityExecutionContext context = Activity.getExecutionContext();
return temporalUtils.withBackgroundHeartbeat(
() -> {
Expand Down Expand Up @@ -192,7 +199,7 @@ public StandardSyncOutput replicate(final JobRunConfig jobRunConfig,
Optional.ofNullable(taskQueue));

final ReplicationOutput attemptOutput = temporalAttempt.get();
final StandardSyncOutput standardSyncOutput = reduceReplicationOutput(attemptOutput);
final StandardSyncOutput standardSyncOutput = reduceReplicationOutput(attemptOutput, traceAttributes);

final String standardSyncOutputString = standardSyncOutput.toString();
LOGGER.info("sync summary: {}", standardSyncOutputString);
Expand All @@ -208,12 +215,12 @@ public StandardSyncOutput replicate(final JobRunConfig jobRunConfig,
() -> context);
}

private static StandardSyncOutput reduceReplicationOutput(final ReplicationOutput output) {
private static StandardSyncOutput reduceReplicationOutput(final ReplicationOutput output, final Map<String, Object> metricAttributes) {
final StandardSyncOutput standardSyncOutput = new StandardSyncOutput();
final StandardSyncSummary syncSummary = new StandardSyncSummary();
final ReplicationAttemptSummary replicationSummary = output.getReplicationAttemptSummary();

traceReplicationSummary(replicationSummary);
traceReplicationSummary(replicationSummary, metricAttributes);

syncSummary.setBytesSynced(replicationSummary.getBytesSynced());
syncSummary.setRecordsSynced(replicationSummary.getRecordsSynced());
Expand All @@ -231,19 +238,22 @@ private static StandardSyncOutput reduceReplicationOutput(final ReplicationOutpu
return standardSyncOutput;
}

private static void traceReplicationSummary(final ReplicationAttemptSummary replicationSummary) {
private static void traceReplicationSummary(final ReplicationAttemptSummary replicationSummary, final Map<String, Object> metricAttributes) {
if (replicationSummary == null) {
return;
}

final MetricAttribute[] attributes = metricAttributes.entrySet().stream()
.map(e -> new MetricAttribute(ApmTraceUtils.formatTag(e.getKey()), e.getValue().toString()))
.collect(Collectors.toSet()).toArray(new MetricAttribute[] {});
final Map<String, Object> tags = new HashMap<>();
if (replicationSummary.getBytesSynced() != null) {
tags.put(REPLICATION_BYTES_SYNCED_KEY, replicationSummary.getBytesSynced());
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.REPLICATION_BYTES_SYNCED, replicationSummary.getBytesSynced());
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.REPLICATION_BYTES_SYNCED, replicationSummary.getBytesSynced(), attributes);
}
if (replicationSummary.getRecordsSynced() != null) {
tags.put(REPLICATION_RECORDS_SYNCED_KEY, replicationSummary.getRecordsSynced());
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.REPLICATION_RECORDS_SYNCED, replicationSummary.getRecordsSynced());
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.REPLICATION_RECORDS_SYNCED, replicationSummary.getRecordsSynced(), attributes);
}
if (replicationSummary.getStatus() != null) {
tags.put(REPLICATION_STATUS_KEY, replicationSummary.getStatus().value());
Expand Down Expand Up @@ -272,12 +282,11 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
syncInput.getDestinationResourceRequirements());

// reset jobs use an empty source to induce resetting all data in destination.
final AirbyteSource airbyteSource =
WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equals(sourceLauncherConfig.getDockerImage())
? new EmptyAirbyteSource(featureFlags.useStreamCapableState())
: new DefaultAirbyteSource(sourceLauncher,
new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, sourceLauncherConfig.getProtocolVersion(),
DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER));
final AirbyteSource airbyteSource = isResetJob(sourceLauncherConfig.getDockerImage())
? new EmptyAirbyteSource(featureFlags.useStreamCapableState())
: new DefaultAirbyteSource(sourceLauncher,
new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, sourceLauncherConfig.getProtocolVersion(),
DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER));
MetricClientFactory.initialize(MetricEmittingApps.WORKER);
final MetricClient metricClient = MetricClientFactory.getMetricClient();
final WorkerMetricReporter metricReporter = new WorkerMetricReporter(metricClient, sourceLauncherConfig.getDockerImage());
Expand Down Expand Up @@ -327,4 +336,8 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
workerConfigs);
}

private boolean isResetJob(final String dockerImage) {
return WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equalsIgnoreCase(dockerImage);
}

}