diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java index d83e343569328..053a1cc0908c0 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java @@ -6,9 +6,12 @@ import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR; import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION; +import static io.airbyte.db.instance.configs.jooq.generated.Tables.CONNECTION; import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS; +import static org.jooq.impl.SQLDataType.VARCHAR; import io.airbyte.db.instance.configs.jooq.generated.enums.ReleaseStage; +import io.airbyte.db.instance.configs.jooq.generated.enums.StatusType; import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus; import java.util.ArrayList; import java.util.List; @@ -59,7 +62,15 @@ public static int numberOfPendingJobs(final DSLContext ctx) { } public static int numberOfRunningJobs(final DSLContext ctx) { - return ctx.selectCount().from(JOBS).where(JOBS.STATUS.eq(JobStatus.running)).fetchOne(0, int.class); + return ctx.selectCount().from(JOBS).join(CONNECTION).on(CONNECTION.ID.cast(VARCHAR(255)).eq(JOBS.SCOPE)) + .where(JOBS.STATUS.eq(JobStatus.running).and(CONNECTION.STATUS.eq(StatusType.active))) + .fetchOne(0, int.class); + } + + public static int numberOfOrphanRunningJobs(final DSLContext ctx) { + return ctx.selectCount().from(JOBS).join(CONNECTION).on(CONNECTION.ID.cast(VARCHAR(255)).eq(JOBS.SCOPE)) + .where(JOBS.STATUS.eq(JobStatus.running).and(CONNECTION.STATUS.ne(StatusType.active))) + .fetchOne(0, int.class); } public static Long oldestPendingJobAgeSecs(final DSLContext ctx) { diff --git a/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/MetricsQueriesTest.java b/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/MetricsQueriesTest.java index 9c443ed1ee31f..f5793e41bdc6a 100644 --- a/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/MetricsQueriesTest.java +++ b/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/MetricsQueriesTest.java @@ -177,18 +177,38 @@ void tearDown() throws SQLException { @Test void runningJobsShouldReturnCorrectCount() throws SQLException { + final var srcId = UUID.randomUUID(); + final var dstId = UUID.randomUUID(); + configDb.transaction( + ctx -> ctx.insertInto(ACTOR, ACTOR.ID, ACTOR.WORKSPACE_ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.NAME, ACTOR.CONFIGURATION, ACTOR.ACTOR_TYPE) + .values(srcId, UUID.randomUUID(), SRC_DEF_ID, "src", JSONB.valueOf("{}"), ActorType.source) + .values(dstId, UUID.randomUUID(), DST_DEF_ID, "dst", JSONB.valueOf("{}"), ActorType.destination) + .execute()); + final UUID activeConnectionId = UUID.randomUUID(); + final UUID inactiveConnectionId = UUID.randomUUID(); + configDb.transaction( + ctx -> ctx + .insertInto(CONNECTION, CONNECTION.ID, CONNECTION.STATUS, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID, + CONNECTION.DESTINATION_ID, CONNECTION.NAME, CONNECTION.CATALOG, CONNECTION.MANUAL) + .values(activeConnectionId, StatusType.active, NamespaceDefinitionType.source, srcId, dstId, "conn", JSONB.valueOf("{}"), true) + .values(inactiveConnectionId, StatusType.inactive, NamespaceDefinitionType.source, srcId, dstId, "conn", JSONB.valueOf("{}"), true) + .execute()); + // non-pending jobs configDb.transaction( - ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(1L, "", JobStatus.pending).execute()); + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(1L, activeConnectionId.toString(), JobStatus.pending).execute()); configDb.transaction( - ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(2L, "", JobStatus.failed).execute()); + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(2L, activeConnectionId.toString(), JobStatus.failed).execute()); configDb.transaction( - ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(3L, "", JobStatus.running).execute()); + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(3L, activeConnectionId.toString(), JobStatus.running).execute()); configDb.transaction( - ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(4L, "", JobStatus.running).execute()); + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(4L, activeConnectionId.toString(), JobStatus.running).execute()); + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(5L, inactiveConnectionId.toString(), JobStatus.running).execute()); final var res = configDb.query(MetricQueries::numberOfRunningJobs); - assertEquals(2, res); + assertEquals(2, configDb.query(MetricQueries::numberOfRunningJobs)); + assertEquals(1, configDb.query(MetricQueries::numberOfOrphanRunningJobs)); } @Test