Skip to content

Commit

Permalink
Fix the running job counter (#13833)
Browse files Browse the repository at this point in the history
  • Loading branch information
malikdiarra committed Jun 16, 2022
1 parent 96a4d75 commit 417ba6f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.CONNECTION;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS;
import static org.jooq.impl.SQLDataType.VARCHAR;

import io.airbyte.db.instance.configs.jooq.generated.enums.ReleaseStage;
import io.airbyte.db.instance.configs.jooq.generated.enums.StatusType;
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -59,7 +62,15 @@ public static int numberOfPendingJobs(final DSLContext ctx) {
}

public static int numberOfRunningJobs(final DSLContext ctx) {
return ctx.selectCount().from(JOBS).where(JOBS.STATUS.eq(JobStatus.running)).fetchOne(0, int.class);
return ctx.selectCount().from(JOBS).join(CONNECTION).on(CONNECTION.ID.cast(VARCHAR(255)).eq(JOBS.SCOPE))
.where(JOBS.STATUS.eq(JobStatus.running).and(CONNECTION.STATUS.eq(StatusType.active)))
.fetchOne(0, int.class);
}

public static int numberOfOrphanRunningJobs(final DSLContext ctx) {
return ctx.selectCount().from(JOBS).join(CONNECTION).on(CONNECTION.ID.cast(VARCHAR(255)).eq(JOBS.SCOPE))
.where(JOBS.STATUS.eq(JobStatus.running).and(CONNECTION.STATUS.ne(StatusType.active)))
.fetchOne(0, int.class);
}

public static Long oldestPendingJobAgeSecs(final DSLContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,38 @@ void tearDown() throws SQLException {

@Test
void runningJobsShouldReturnCorrectCount() throws SQLException {
final var srcId = UUID.randomUUID();
final var dstId = UUID.randomUUID();
configDb.transaction(
ctx -> ctx.insertInto(ACTOR, ACTOR.ID, ACTOR.WORKSPACE_ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.NAME, ACTOR.CONFIGURATION, ACTOR.ACTOR_TYPE)
.values(srcId, UUID.randomUUID(), SRC_DEF_ID, "src", JSONB.valueOf("{}"), ActorType.source)
.values(dstId, UUID.randomUUID(), DST_DEF_ID, "dst", JSONB.valueOf("{}"), ActorType.destination)
.execute());
final UUID activeConnectionId = UUID.randomUUID();
final UUID inactiveConnectionId = UUID.randomUUID();
configDb.transaction(
ctx -> ctx
.insertInto(CONNECTION, CONNECTION.ID, CONNECTION.STATUS, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID,
CONNECTION.DESTINATION_ID, CONNECTION.NAME, CONNECTION.CATALOG, CONNECTION.MANUAL)
.values(activeConnectionId, StatusType.active, NamespaceDefinitionType.source, srcId, dstId, "conn", JSONB.valueOf("{}"), true)
.values(inactiveConnectionId, StatusType.inactive, NamespaceDefinitionType.source, srcId, dstId, "conn", JSONB.valueOf("{}"), true)
.execute());

// non-pending jobs
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(1L, "", JobStatus.pending).execute());
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(1L, activeConnectionId.toString(), JobStatus.pending).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(2L, "", JobStatus.failed).execute());
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(2L, activeConnectionId.toString(), JobStatus.failed).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(3L, "", JobStatus.running).execute());
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(3L, activeConnectionId.toString(), JobStatus.running).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(4L, "", JobStatus.running).execute());
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(4L, activeConnectionId.toString(), JobStatus.running).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(5L, inactiveConnectionId.toString(), JobStatus.running).execute());

final var res = configDb.query(MetricQueries::numberOfRunningJobs);
assertEquals(2, res);
assertEquals(2, configDb.query(MetricQueries::numberOfRunningJobs));
assertEquals(1, configDb.query(MetricQueries::numberOfOrphanRunningJobs));
}

@Test
Expand Down

0 comments on commit 417ba6f

Please sign in to comment.