Skip to content

Commit

Permalink
Metric fixes on multi cloud (#19268)
Browse files Browse the repository at this point in the history
* fix numpending secs

* fix worker
  • Loading branch information
xiaohansong committed Nov 10, 2022
1 parent c755439 commit bde7fe9
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ final class OldestPendingJob extends Emitter {
db.oldestPendingJobAgeSecsByGeography().forEach((geographyType, count) -> client.gauge(
OssMetricsRegistry.OLDEST_PENDING_JOB_AGE_SECS,
count,
new MetricAttribute(MetricTags.GEOGRAPHY, geographyType.getLiteral())));
new MetricAttribute(MetricTags.GEOGRAPHY, geographyType)));
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import static org.jooq.impl.DSL.count;
import static org.jooq.impl.SQLDataType.VARCHAR;

import io.airbyte.db.instance.configs.jooq.generated.enums.GeographyType;
import io.airbyte.db.instance.configs.jooq.generated.enums.StatusType;
import io.airbyte.db.instance.jobs.jooq.generated.enums.AttemptStatus;
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus;
Expand Down Expand Up @@ -65,7 +64,7 @@ int numberOfOrphanRunningJobs() {
.fetchOne(0, int.class);
}

Map<GeographyType, Double> oldestPendingJobAgeSecsByGeography() {
Map<String, Double> oldestPendingJobAgeSecsByGeography() {
final var query =
"""
SELECT cast(connection.geography as varchar) AS geography, MAX(EXTRACT(EPOCH FROM (current_timestamp - jobs.created_at))) AS run_duration_seconds
Expand All @@ -76,7 +75,7 @@ SELECT cast(connection.geography as varchar) AS geography, MAX(EXTRACT(EPOCH FRO
GROUP BY geography;
""";
final var result = ctx.fetch(query);
return (Map<GeographyType, Double>) result.intoMap(0, 1);
return (Map<String, Double>) result.intoMap(0, 1);
}

Map<String, Double> oldestRunningJobAgeSecsByTaskQueue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.airbyte.db.instance.configs.jooq.generated.enums.GeographyType;
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
Expand Down Expand Up @@ -40,7 +39,7 @@ void setUp() {

@Test
void TestNumPendingJobs() {
final var value = Map.of("AUTO", 101, "EU", 20);
final var value = Map.of(AUTO_REGION, 101, EU_REGION, 20);
when(repo.numberOfPendingJobsByGeography()).thenReturn(value);

final var emitter = new NumPendingJobs(client, repo);
Expand All @@ -49,9 +48,9 @@ void TestNumPendingJobs() {
assertEquals(Duration.ofSeconds(15), emitter.getDuration());
verify(repo).numberOfPendingJobsByGeography();
verify(client).gauge(OssMetricsRegistry.NUM_PENDING_JOBS, 101,
new MetricAttribute(MetricTags.GEOGRAPHY, "AUTO"));
new MetricAttribute(MetricTags.GEOGRAPHY, AUTO_REGION));
verify(client).gauge(OssMetricsRegistry.NUM_PENDING_JOBS, 20,
new MetricAttribute(MetricTags.GEOGRAPHY, "EU"));
new MetricAttribute(MetricTags.GEOGRAPHY, EU_REGION));
verify(client).count(OssMetricsRegistry.EST_NUM_METRICS_EMITTED_BY_REPORTER, 1);
}

Expand Down Expand Up @@ -105,7 +104,7 @@ void TestOldestRunningJob() {

@Test
void TestOldestPendingJob() {
final var value = Map.of(GeographyType.AUTO, 101.0, GeographyType.EU, 20.0);
final var value = Map.of(AUTO_REGION, 101.0, EU_REGION, 20.0);
when(repo.oldestPendingJobAgeSecsByGeography()).thenReturn(value);

final var emitter = new OldestPendingJob(client, repo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class TemporalAttemptExecution<INPUT, OUTPUT> implements Supplier<OUTPUT>
private final Supplier<String> workflowIdProvider;
private final AirbyteApiClient airbyteApiClient;
private final String airbyteVersion;
private final Optional<String> workflowTaskQueue;
private final Optional<String> replicationTaskQueue;

public TemporalAttemptExecution(final Path workspaceRoot,
final WorkerEnvironment workerEnvironment,
Expand Down Expand Up @@ -86,7 +86,7 @@ public TemporalAttemptExecution(final Path workspaceRoot,
final AirbyteApiClient airbyteApiClient,
final String airbyteVersion,
final Supplier<ActivityExecutionContext> activityContext,
final Optional<String> workflowTaskQueue) {
final Optional<String> replicationTaskQueue) {
this(
workspaceRoot, workerEnvironment, logConfigs,
jobRunConfig,
Expand All @@ -97,7 +97,7 @@ public TemporalAttemptExecution(final Path workspaceRoot,
airbyteApiClient,
() -> activityContext.get().getInfo().getWorkflowId(),
airbyteVersion,
workflowTaskQueue);
replicationTaskQueue);
}

@VisibleForTesting
Expand All @@ -112,7 +112,7 @@ public TemporalAttemptExecution(final Path workspaceRoot,
final AirbyteApiClient airbyteApiClient,
final Supplier<String> workflowIdProvider,
final String airbyteVersion,
final Optional<String> workflowTaskQueue) {
final Optional<String> replicationTaskQueue) {
this.jobRunConfig = jobRunConfig;

this.jobRoot = TemporalUtils.getJobRoot(workspaceRoot, jobRunConfig.getJobId(), jobRunConfig.getAttemptId());
Expand All @@ -124,7 +124,7 @@ public TemporalAttemptExecution(final Path workspaceRoot,

this.airbyteApiClient = airbyteApiClient;
this.airbyteVersion = airbyteVersion;
this.workflowTaskQueue = workflowTaskQueue;
this.replicationTaskQueue = replicationTaskQueue;
}

@Override
Expand All @@ -139,8 +139,6 @@ public OUTPUT get() {
}

LOGGER.info("Executing worker wrapper. Airbyte version: {}", airbyteVersion);
// TODO(Davin): This will eventually run into scaling problems, since it opens a DB connection per
// workflow. See https://github.com/airbytehq/airbyte/issues/5936.
saveWorkflowIdForCancellation(airbyteApiClient);

final Worker<INPUT, OUTPUT> worker = workerSupplier.get();
Expand Down Expand Up @@ -170,14 +168,18 @@ public OUTPUT get() {
private void saveWorkflowIdForCancellation(final AirbyteApiClient airbyteApiClient) throws ApiException {
// If the jobId is not a number, it means the job is a synchronous job. No attempt is created for
// it, and it cannot be cancelled, so do not save the workflowId. See
// SynchronousSchedulerClient.java
// for info.
if (NumberUtils.isCreatable(jobRunConfig.getJobId())) {
// SynchronousSchedulerClient.java for info.
//
// At this moment(Nov 2022), we decide to save workflowId for cancellation purpose only at
// replication activity level. We know now the only async workflow is SyncWorkflow,
// and under the same workflow, the workflowId would stay the same,
// so it's not needed to save it for multiple times.
if (NumberUtils.isCreatable(jobRunConfig.getJobId()) && replicationTaskQueue.isPresent()) {
final String workflowId = workflowIdProvider.get();
airbyteApiClient.getAttemptApi().setWorkflowInAttempt(new SetWorkflowInAttemptRequestBody()
.jobId(Long.parseLong(jobRunConfig.getJobId()))
.attemptNumber(jobRunConfig.getAttemptId().intValue())
.processingTaskQueue(workflowTaskQueue.orElse(""))
.processingTaskQueue(replicationTaskQueue.get())
.workflowId(workflowId));
}
}
Expand Down

0 comments on commit bde7fe9

Please sign in to comment.