Skip to content

Commit

Permalink
Add Auto-Disable Failing Connections feature (#11099)
Browse files Browse the repository at this point in the history
* Add Disable Failing Connections feature

* Rename and cleanup

* list jobs based off connection id

* Move variables to env config and update unit tests

* Fix env flag name

* Fix missing name changes

* Add comments to unit test

* Address PR comments

* Support multiple config types

* Update unit tests

* Remove the attemptId notion in the connectionManagerWorkflow (#10780)

This is removing the attemptId from the create attempt activity to replace it with the attemptNumber. This will be modified in the workflow in a later commit.

* Revert "Remove the attemptId notion in the connectionManagerWorkflow (#10780)" (#11057)

This reverts commit 99338c8.

* Revert "Revert "Remove the attemptId notion in the connectionManagerWorkflow (#10780)" (#11057)" (#11073)

This reverts commit 892dc7e.

* Revert "Revert "Revert "Remove the attemptId notion in the connectionManagerWorkflow (#10780)" (#11057)" (#11073)" (#11081)

This reverts commit e27bb74.

* Add Disable Failing Connections feature

* Rename and cleanup

* Fix rebase

* only disable if first job is older than max days

* Return boolean for activity

* Return boolean for activity

* Add unit tests for ConnectionManagerWorkflow

* Utilize object output for activity and ignore non success or failed runs

* Utilize object output for activity and ignore non success or failed runs

Co-authored-by: Benoit Moriceau <benoit@airbyte.io>
  • Loading branch information
terencecho and benmoriceau committed Mar 18, 2022
1 parent 672b347 commit f4bb7b2
Show file tree
Hide file tree
Showing 15 changed files with 703 additions and 10 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,4 @@ MAX_DISCOVER_WORKERS=5

### FEATURE FLAGS ###
NEW_SCHEDULER=false
AUTO_DISABLE_FAILING_CONNECTIONS=false
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,11 @@ public boolean usesNewScheduler() {
return Boolean.parseBoolean(System.getenv("NEW_SCHEDULER"));
}

@Override
public boolean autoDisablesFailingConnections() {
log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS")));

return Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ public interface FeatureFlags {

boolean usesNewScheduler();

boolean autoDisablesFailingConnections();

}
12 changes: 12 additions & 0 deletions airbyte-config/models/src/main/java/io/airbyte/config/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,18 @@ public interface Configs {
*/
Map<String, String> getJobDefaultEnvMap();

/**
* Defines the number of consecutive job failures required before a connection is auto-disabled if
* the AUTO_DISABLE_FAILING_CONNECTIONS flag is set to true.
*/
int getMaxFailedJobsInARowBeforeConnectionDisable();

/**
* Defines the required number of days with only failed jobs before a connection is auto-disabled if
* the AUTO_DISABLE_FAILING_CONNECTIONS flag is set to true.
*/
int getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable();

// Jobs - Kube only
/**
* Define the check job container's minimum CPU request. Defaults to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ public class EnvConfigs implements Configs {
private static final String SHOULD_RUN_SYNC_WORKFLOWS = "SHOULD_RUN_SYNC_WORKFLOWS";
private static final String SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS = "SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS";

private static final String MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE = "MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE";
private static final String MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE = "MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE";

// job-type-specific overrides
public static final String SPEC_JOB_KUBE_NODE_SELECTORS = "SPEC_JOB_KUBE_NODE_SELECTORS";
public static final String CHECK_JOB_KUBE_NODE_SELECTORS = "CHECK_JOB_KUBE_NODE_SELECTORS";
Expand Down Expand Up @@ -178,6 +181,9 @@ public class EnvConfigs implements Configs {

public static final int DEFAULT_TEMPORAL_HISTORY_RETENTION_IN_DAYS = 30;

public static final int DEFAULT_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE = 100;
public static final int DEFAULT_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE = 14;

private final Function<String, String> getEnv;
private final Supplier<Set<String>> getAllEnvKeys;
private final LogConfigs logConfigs;
Expand Down Expand Up @@ -661,6 +667,16 @@ public Map<String, String> getJobDefaultEnvMap() {
return MoreMaps.merge(jobPrefixedEnvMap, jobSharedEnvMap);
}

@Override
public int getMaxFailedJobsInARowBeforeConnectionDisable() {
return getEnvOrDefault(MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE, DEFAULT_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE);
}

@Override
public int getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable() {
return getEnvOrDefault(MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE, DEFAULT_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE);
}

@Override
public String getCheckJobMainContainerCpuRequest() {
return getEnvOrDefault(CHECK_JOB_MAIN_CONTAINER_CPU_REQUEST, getJobMainContainerCpuRequest());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,22 @@ public List<Job> listJobsWithStatus(final ConfigType configType, final JobStatus
return listJobsWithStatus(Sets.newHashSet(configType), status);
}

@Override
public List<JobStatus> listJobStatusWithConnection(final UUID connectionId, final Set<ConfigType> configTypes, final Instant jobCreatedAtTimestamp)
throws IOException {
final LocalDateTime timeConvertedIntoLocalDateTime = LocalDateTime.ofInstant(jobCreatedAtTimestamp, ZoneOffset.UTC);

final String JobStatusSelect = "SELECT status FROM jobs ";
return jobDatabase.query(ctx -> ctx
.fetch(JobStatusSelect + "WHERE " +
"scope = ? AND " +
"CAST(config_type AS VARCHAR) in " + Sqls.toSqlInFragment(configTypes) + " AND " +
"created_at >= ? ORDER BY created_at DESC", connectionId.toString(), timeConvertedIntoLocalDateTime))
.stream()
.map(r -> JobStatus.valueOf(r.get("status", String.class).toUpperCase()))
.toList();
}

@Override
public Optional<Job> getLastReplicationJob(final UUID connectionId) throws IOException {
return jobDatabase.query(ctx -> ctx
Expand All @@ -400,6 +416,21 @@ public Optional<Job> getLastReplicationJob(final UUID connectionId) throws IOExc
.flatMap(r -> getJobOptional(ctx, r.get("job_id", Long.class))));
}

@Override
public Optional<Job> getFirstReplicationJob(final UUID connectionId) throws IOException {
return jobDatabase.query(ctx -> ctx
.fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE " +
"CAST(jobs.config_type AS VARCHAR) in " + Sqls.toSqlInFragment(Job.REPLICATION_TYPES) + " AND " +
"scope = ? AND " +
"CAST(jobs.status AS VARCHAR) <> ? " +
"ORDER BY jobs.created_at ASC LIMIT 1",
connectionId.toString(),
Sqls.toSqlName(JobStatus.CANCELLED))
.stream()
.findFirst()
.flatMap(r -> getJobOptional(ctx, r.get("job_id", Long.class))));
}

@Override
public Optional<Job> getNextJob() throws IOException {
// rules:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ public interface JobPersistence {
List<Job> listJobs(Set<JobConfig.ConfigType> configTypes, String configId, int limit, int offset) throws IOException;

/**
*
* @param configType The type of job
* @param attemptEndedAtTimestamp The timestamp after which you want the jobs
* @return List of jobs that have attempts after the provided timestamp
Expand All @@ -161,8 +160,21 @@ public interface JobPersistence {

List<Job> listJobsWithStatus(JobConfig.ConfigType configType, JobStatus status) throws IOException;

/**
* @param connectionId The ID of the connection
* @param configTypes The types of jobs
* @param jobCreatedAtTimestamp The timestamp after which you want the jobs
* @return List of job statuses from a specific connection that have attempts after the provided
* timestamp, sorted by jobs' createAt in descending order
* @throws IOException
*/
List<JobStatus> listJobStatusWithConnection(UUID connectionId, Set<JobConfig.ConfigType> configTypes, Instant jobCreatedAtTimestamp)
throws IOException;

Optional<Job> getLastReplicationJob(UUID connectionId) throws IOException;

Optional<Job> getFirstReplicationJob(UUID connectionId) throws IOException;

Optional<Job> getNextJob() throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,12 @@ class DefaultJobPersistenceTest {
private static final JobConfig SYNC_JOB_CONFIG = new JobConfig()
.withConfigType(ConfigType.SYNC)
.withSync(new JobSyncConfig());
private static PostgreSQLContainer<?> container;

private static final int DEFAULT_MINIMUM_AGE_IN_DAYS = 30;
private static final int DEFAULT_EXCESSIVE_NUMBER_OF_JOBS = 500;
private static final int DEFAULT_MINIMUM_RECENCY_COUNT = 10;

private static PostgreSQLContainer<?> container;
private Database jobDatabase;
private Database configDatabase;
private Supplier<Instant> timeSupplier;
Expand Down Expand Up @@ -169,7 +173,8 @@ public void setup() throws Exception {
timeSupplier = mock(Supplier.class);
when(timeSupplier.get()).thenReturn(NOW);

jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, 30, 500, 10);
jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, DEFAULT_MINIMUM_AGE_IN_DAYS, DEFAULT_EXCESSIVE_NUMBER_OF_JOBS,
DEFAULT_MINIMUM_RECENCY_COUNT);
}

@AfterEach
Expand Down Expand Up @@ -337,7 +342,8 @@ void testListJobsWithTimestamp() throws IOException {
final Instant now = Instant.parse("2021-01-01T00:00:00Z");
final Supplier<Instant> timeSupplier = incrementingSecondSupplier(now);

jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, 30, 500, 10);
jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, DEFAULT_MINIMUM_AGE_IN_DAYS, DEFAULT_EXCESSIVE_NUMBER_OF_JOBS,
DEFAULT_MINIMUM_RECENCY_COUNT);
final long syncJobId = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
final int syncJobAttemptNumber0 = jobPersistence.createAttempt(syncJobId, LOG_PATH);
jobPersistence.failAttempt(syncJobId, syncJobAttemptNumber0);
Expand Down Expand Up @@ -402,7 +408,8 @@ void testListJobsWithTimestamp() throws IOException {
void testListAttemptsWithJobInfo() throws IOException {
final Instant now = Instant.parse("2021-01-01T00:00:00Z");
final Supplier<Instant> timeSupplier = incrementingSecondSupplier(now);
jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, 30, 500, 10);
jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, DEFAULT_MINIMUM_AGE_IN_DAYS, DEFAULT_EXCESSIVE_NUMBER_OF_JOBS,
DEFAULT_MINIMUM_RECENCY_COUNT);

final long job1 = jobPersistence.enqueueJob(SCOPE + "-1", SYNC_JOB_CONFIG).orElseThrow();
final long job2 = jobPersistence.enqueueJob(SCOPE + "-2", SYNC_JOB_CONFIG).orElseThrow();
Expand Down Expand Up @@ -793,6 +800,38 @@ public void testGetLastSyncJobForConnectionId() throws IOException {

}

@Nested
@DisplayName("When getting first replication job")
class GetFirstReplicationJob {

@Test
@DisplayName("Should return nothing if no job exists")
public void testGetFirstSyncJobForConnectionIdEmpty() throws IOException {
final Optional<Job> actual = jobPersistence.getFirstReplicationJob(CONNECTION_ID);

assertTrue(actual.isEmpty());
}

@Test
@DisplayName("Should return the first job")
public void testGetFirstSyncJobForConnectionId() throws IOException {
final long jobId1 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
jobPersistence.succeedAttempt(jobId1, jobPersistence.createAttempt(jobId1, LOG_PATH));
final List<AttemptWithJobInfo> attemptsWithJobInfo = jobPersistence.listAttemptsWithJobInfo(SYNC_JOB_CONFIG.getConfigType(), Instant.EPOCH);
final List<Attempt> attempts = Collections.singletonList(attemptsWithJobInfo.get(0).getAttempt());

final Instant afterNow = NOW.plusSeconds(1000);
when(timeSupplier.get()).thenReturn(afterNow);
final long jobId2 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();

final Optional<Job> actual = jobPersistence.getFirstReplicationJob(CONNECTION_ID);
final Job expected = createJob(jobId1, SYNC_JOB_CONFIG, JobStatus.SUCCEEDED, attempts, NOW.getEpochSecond());

assertEquals(Optional.of(expected), actual);
}

}

@Nested
@DisplayName("When getting next job")
class GetNextJob {
Expand Down Expand Up @@ -1315,4 +1354,140 @@ private Job addStateToJob(final Job job) throws IOException, SQLException {

}

@Nested
@DisplayName("When listing job statuses with specified connection id and timestamp")
class ListJobStatusWithConnection {

@Test
@DisplayName("Should list only job statuses of specified connection id")
public void testConnectionIdFiltering() throws IOException {
jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, DEFAULT_MINIMUM_AGE_IN_DAYS, DEFAULT_EXCESSIVE_NUMBER_OF_JOBS,
DEFAULT_MINIMUM_RECENCY_COUNT);

// create a connection with a non-relevant connection id that should be ignored for the duration of
// the test
final long wrongConnectionSyncJobId = jobPersistence.enqueueJob(UUID.randomUUID().toString(), SYNC_JOB_CONFIG).orElseThrow();
final int wrongSyncJobAttemptNumber0 = jobPersistence.createAttempt(wrongConnectionSyncJobId, LOG_PATH);
jobPersistence.failAttempt(wrongConnectionSyncJobId, wrongSyncJobAttemptNumber0);
assertEquals(0, jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH).size());

// create a connection with relevant connection id
final long syncJobId = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
final int syncJobAttemptNumber0 = jobPersistence.createAttempt(syncJobId, LOG_PATH);
jobPersistence.failAttempt(syncJobId, syncJobAttemptNumber0);

// check to see current status of only relevantly scoped job
final List<JobStatus> jobStatuses = jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH);
assertEquals(jobStatuses.size(), 1);
assertEquals(JobStatus.INCOMPLETE, jobStatuses.get(0));
}

@Test
@DisplayName("Should list jobs statuses filtered by different timestamps")
public void testTimestampFiltering() throws IOException {
jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, DEFAULT_MINIMUM_AGE_IN_DAYS, DEFAULT_EXCESSIVE_NUMBER_OF_JOBS,
DEFAULT_MINIMUM_RECENCY_COUNT);

// Create and fail initial job
final long syncJobId = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
final int syncJobAttemptNumber0 = jobPersistence.createAttempt(syncJobId, LOG_PATH);
jobPersistence.failAttempt(syncJobId, syncJobAttemptNumber0);
jobPersistence.failJob(syncJobId);

// Check to see current status of all jobs from beginning of time, expecting only 1 job
final List<JobStatus> jobStatuses = jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH);
assertEquals(jobStatuses.size(), 1);
assertEquals(JobStatus.FAILED, jobStatuses.get(0));

// Edit time supplier to return later time
final Instant timeAfterFirstJob = NOW.plusSeconds(60);
when(timeSupplier.get()).thenReturn(timeAfterFirstJob);

// Create and succeed second job
final long newSyncJobId = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
final int newSyncJobAttemptNumber = jobPersistence.createAttempt(newSyncJobId, LOG_PATH);
jobPersistence.succeedAttempt(newSyncJobId, newSyncJobAttemptNumber);

// Check to see current status of all jobs from beginning of time, expecting both jobs in createAt
// descending order (most recent first)
final List<JobStatus> allQueryJobStatuses =
jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH);
assertEquals(2, allQueryJobStatuses.size());
assertEquals(JobStatus.SUCCEEDED, allQueryJobStatuses.get(0));
assertEquals(JobStatus.FAILED, allQueryJobStatuses.get(1));

// Look up jobs with a timestamp after the first job. Expecting only the second job status
final List<JobStatus> timestampFilteredJobStatuses =
jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), timeAfterFirstJob);
assertEquals(1, timestampFilteredJobStatuses.size());
assertEquals(JobStatus.SUCCEEDED, timestampFilteredJobStatuses.get(0));

// Check to see if timestamp filtering is working by only looking up jobs with timestamp after
// second job. Expecting no job status output
final Instant timeAfterSecondJob = timeAfterFirstJob.plusSeconds(60);
assertEquals(0, jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), timeAfterSecondJob).size());
}

@Test
@DisplayName("Should list jobs statuses of differing status types")
public void testMultipleJobStatusTypes() throws IOException {
final Supplier<Instant> timeSupplier = incrementingSecondSupplier(NOW);
jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, DEFAULT_MINIMUM_AGE_IN_DAYS, DEFAULT_EXCESSIVE_NUMBER_OF_JOBS,
DEFAULT_MINIMUM_RECENCY_COUNT);

// Create and fail initial job
final long syncJobId1 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
final int syncJobAttemptNumber1 = jobPersistence.createAttempt(syncJobId1, LOG_PATH);
jobPersistence.failAttempt(syncJobId1, syncJobAttemptNumber1);
jobPersistence.failJob(syncJobId1);

// Create and succeed second job
final long syncJobId2 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
final int syncJobAttemptNumber2 = jobPersistence.createAttempt(syncJobId2, LOG_PATH);
jobPersistence.succeedAttempt(syncJobId2, syncJobAttemptNumber2);

// Create and cancel third job
final long syncJobId3 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
jobPersistence.createAttempt(syncJobId3, LOG_PATH);
jobPersistence.cancelJob(syncJobId3);

// Check to see current status of all jobs from beginning of time, expecting all jobs in createAt
// descending order (most recent first)
final List<JobStatus> allJobStatuses =
jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH);
assertEquals(3, allJobStatuses.size());
assertEquals(JobStatus.CANCELLED, allJobStatuses.get(0));
assertEquals(JobStatus.SUCCEEDED, allJobStatuses.get(1));
assertEquals(JobStatus.FAILED, allJobStatuses.get(2));
}

@Test
@DisplayName("Should list jobs statuses of differing job config types")
public void testMultipleConfigTypes() throws IOException {
final Set<ConfigType> configTypes = Sets.newHashSet(ConfigType.GET_SPEC, ConfigType.CHECK_CONNECTION_DESTINATION);
final Supplier<Instant> timeSupplier = incrementingSecondSupplier(NOW);
jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, DEFAULT_MINIMUM_AGE_IN_DAYS, DEFAULT_EXCESSIVE_NUMBER_OF_JOBS,
DEFAULT_MINIMUM_RECENCY_COUNT);

// pending status
final long failedSpecJobId = jobPersistence.enqueueJob(SCOPE, CHECK_JOB_CONFIG).orElseThrow();
jobPersistence.failJob(failedSpecJobId);

// incomplete status
final long incompleteSpecJobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
final int attemptNumber = jobPersistence.createAttempt(incompleteSpecJobId, LOG_PATH);
jobPersistence.failAttempt(incompleteSpecJobId, attemptNumber);

// this job should be ignored since it's not in the configTypes we're querying for
jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();

// expect order to be from most recent to least recent
final List<JobStatus> allJobStatuses = jobPersistence.listJobStatusWithConnection(CONNECTION_ID, configTypes, Instant.EPOCH);
assertEquals(2, allJobStatuses.size());
assertEquals(JobStatus.INCOMPLETE, allJobStatuses.get(0));
assertEquals(JobStatus.FAILED, allJobStatuses.get(1));
}

}

}
Loading

0 comments on commit f4bb7b2

Please sign in to comment.