Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Auto-Disable Failing Connections feature #11099

Merged
merged 23 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
22f78d7
Add Disable Failing Connections feature
terencecho Mar 5, 2022
96f7fe3
Rename and cleanup
terencecho Mar 7, 2022
4ffe081
list jobs based off connection id
terencecho Mar 9, 2022
2015a94
Move variables to env config and update unit tests
terencecho Mar 10, 2022
ff0046e
Fix env flag name
terencecho Mar 10, 2022
814d314
Fix missing name changes
terencecho Mar 10, 2022
bacdf92
Add comments to unit test
terencecho Mar 10, 2022
9e0c365
Address PR comments
terencecho Mar 11, 2022
a8028f4
Support multiple config types
terencecho Mar 11, 2022
1be1ad5
Update unit tests
terencecho Mar 13, 2022
63a3cf4
Remove the attemptId notion in the connectionManagerWorkflow (#10780)
benmoriceau Mar 10, 2022
52ab967
Revert "Remove the attemptId notion in the connectionManagerWorkflow …
benmoriceau Mar 11, 2022
e751b0e
Revert "Revert "Remove the attemptId notion in the connectionManagerW…
benmoriceau Mar 11, 2022
bfce0a2
Revert "Revert "Revert "Remove the attemptId notion in the connection…
benmoriceau Mar 11, 2022
cba4117
Add Disable Failing Connections feature
terencecho Mar 5, 2022
0cd1ba0
Rename and cleanup
terencecho Mar 7, 2022
fe7591a
Fix rebase
terencecho Mar 14, 2022
89ddee3
only disable if first job is older than max days
terencecho Mar 15, 2022
f0e824e
Return boolean for activity
terencecho Mar 16, 2022
a4530e8
Return boolean for activity
terencecho Mar 16, 2022
b206a25
Add unit tests for ConnectionManagerWorkflow
terencecho Mar 16, 2022
a3a6f5d
Utilize object output for activity and ignore non success or failed runs
terencecho Mar 17, 2022
36adc9f
Utilize object output for activity and ignore non success or failed runs
terencecho Mar 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,4 @@ MAX_DISCOVER_WORKERS=5

### FEATURE FLAGS ###
NEW_SCHEDULER=false
AUTO_DISABLE_FAILING_CONNECTIONS=false
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,11 @@ public boolean usesNewScheduler() {
return Boolean.parseBoolean(System.getenv("NEW_SCHEDULER"));
}

@Override
public boolean autoDisablesFailingConnections() {
log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS")));

return Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ public interface FeatureFlags {

boolean usesNewScheduler();

boolean autoDisablesFailingConnections();

}
12 changes: 12 additions & 0 deletions airbyte-config/models/src/main/java/io/airbyte/config/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,18 @@ public interface Configs {
*/
Map<String, String> getJobDefaultEnvMap();

/**
* Defines the number of consecutive job failures required before a connection is auto-disabled if
* the AUTO_DISABLE_FAILING_CONNECTIONS flag is set to true.
*/
int getMaxFailedJobsInARowBeforeConnectionDisable();

/**
* Defines the required number of days with only failed jobs before a connection is auto-disabled if
* the AUTO_DISABLE_FAILING_CONNECTIONS flag is set to true.
*/
int getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable();

// Jobs - Kube only
/**
* Define the check job container's minimum CPU request. Defaults to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ public class EnvConfigs implements Configs {
private static final String SHOULD_RUN_SYNC_WORKFLOWS = "SHOULD_RUN_SYNC_WORKFLOWS";
private static final String SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS = "SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS";

private static final String MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE = "MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE";
private static final String MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE = "MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE";

// job-type-specific overrides
public static final String SPEC_JOB_KUBE_NODE_SELECTORS = "SPEC_JOB_KUBE_NODE_SELECTORS";
public static final String CHECK_JOB_KUBE_NODE_SELECTORS = "CHECK_JOB_KUBE_NODE_SELECTORS";
Expand Down Expand Up @@ -178,6 +181,9 @@ public class EnvConfigs implements Configs {

public static final int DEFAULT_TEMPORAL_HISTORY_RETENTION_IN_DAYS = 30;

public static final int DEFAULT_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE = 100;
public static final int DEFAULT_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE = 14;

private final Function<String, String> getEnv;
private final Supplier<Set<String>> getAllEnvKeys;
private final LogConfigs logConfigs;
Expand Down Expand Up @@ -661,6 +667,16 @@ public Map<String, String> getJobDefaultEnvMap() {
return MoreMaps.merge(jobPrefixedEnvMap, jobSharedEnvMap);
}

@Override
public int getMaxFailedJobsInARowBeforeConnectionDisable() {
return getEnvOrDefault(MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE, DEFAULT_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE);
}

@Override
public int getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable() {
return getEnvOrDefault(MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE, DEFAULT_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE);
}

@Override
public String getCheckJobMainContainerCpuRequest() {
return getEnvOrDefault(CHECK_JOB_MAIN_CONTAINER_CPU_REQUEST, getJobMainContainerCpuRequest());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,22 @@ public List<Job> listJobsWithStatus(final ConfigType configType, final JobStatus
return listJobsWithStatus(Sets.newHashSet(configType), status);
}

@Override
public List<JobStatus> listJobStatusWithConnection(final UUID connectionId, final Set<ConfigType> configTypes, final Instant jobCreatedAtTimestamp)
throws IOException {
final LocalDateTime timeConvertedIntoLocalDateTime = LocalDateTime.ofInstant(jobCreatedAtTimestamp, ZoneOffset.UTC);

final String JobStatusSelect = "SELECT status FROM jobs ";
return jobDatabase.query(ctx -> ctx
.fetch(JobStatusSelect + "WHERE " +
"CAST(scope AS VARCHAR) = ? AND " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to cast as a VARCHAR? scope = '?' should be working (same with the config type).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that was the convention since most of the other queries casted the scope, but just ran the unit tests without it and had no problems, so i'll remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misspoke earlier, looks like the convention is not to cast the scope. But ConfigType is normally casted.

"CAST(config_type AS VARCHAR) in " + Sqls.toSqlInFragment(configTypes) + " AND " +
"created_at >= ? ORDER BY created_at DESC", connectionId.toString(), timeConvertedIntoLocalDateTime))
.stream()
.map(r -> JobStatus.valueOf(r.get("status", String.class).toUpperCase()))
.toList();
}

@Override
public Optional<Job> getLastReplicationJob(final UUID connectionId) throws IOException {
return jobDatabase.query(ctx -> ctx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ public interface JobPersistence {
List<Job> listJobs(Set<JobConfig.ConfigType> configTypes, String configId, int limit, int offset) throws IOException;

/**
*
* @param configType The type of job
* @param attemptEndedAtTimestamp The timestamp after which you want the jobs
* @return List of jobs that have attempts after the provided timestamp
Expand All @@ -161,6 +160,17 @@ public interface JobPersistence {

List<Job> listJobsWithStatus(JobConfig.ConfigType configType, JobStatus status) throws IOException;

/**
* @param connectionId The ID of the connection
* @param configTypes The types of jobs
* @param jobCreatedAtTimestamp The timestamp after which you want the jobs
* @return List of job statuses from a specific connection that have attempts after the provided
* timestamp, sorted by jobs' createAt in descending order
* @throws IOException
*/
List<JobStatus> listJobStatusWithConnection(UUID connectionId, Set<JobConfig.ConfigType> configTypes, Instant jobCreatedAtTimestamp)
throws IOException;

Optional<Job> getLastReplicationJob(UUID connectionId) throws IOException;

Optional<Job> getNextJob() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,12 @@ class DefaultJobPersistenceTest {
private static final JobConfig SYNC_JOB_CONFIG = new JobConfig()
.withConfigType(ConfigType.SYNC)
.withSync(new JobSyncConfig());
private static PostgreSQLContainer<?> container;

private static final int DEFAULT_MINIMUM_AGE_IN_DAYS = 30;
private static final int DEFAULT_EXCESSIVE_NUMBER_OF_JOBS = 500;
private static final int DEFAULT_MINIMUM_RECENCY_COUNT = 10;

private static PostgreSQLContainer<?> container;
private Database jobDatabase;
private Database configDatabase;
private Supplier<Instant> timeSupplier;
Expand Down Expand Up @@ -169,7 +173,8 @@ public void setup() throws Exception {
timeSupplier = mock(Supplier.class);
when(timeSupplier.get()).thenReturn(NOW);

jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, 30, 500, 10);
jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, DEFAULT_MINIMUM_AGE_IN_DAYS, DEFAULT_EXCESSIVE_NUMBER_OF_JOBS,
DEFAULT_MINIMUM_RECENCY_COUNT);
}

@AfterEach
Expand Down Expand Up @@ -337,7 +342,8 @@ void testListJobsWithTimestamp() throws IOException {
final Instant now = Instant.parse("2021-01-01T00:00:00Z");
final Supplier<Instant> timeSupplier = incrementingSecondSupplier(now);

jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, 30, 500, 10);
jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, DEFAULT_MINIMUM_AGE_IN_DAYS, DEFAULT_EXCESSIVE_NUMBER_OF_JOBS,
DEFAULT_MINIMUM_RECENCY_COUNT);
final long syncJobId = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
final int syncJobAttemptNumber0 = jobPersistence.createAttempt(syncJobId, LOG_PATH);
jobPersistence.failAttempt(syncJobId, syncJobAttemptNumber0);
Expand Down Expand Up @@ -402,7 +408,8 @@ void testListJobsWithTimestamp() throws IOException {
void testListAttemptsWithJobInfo() throws IOException {
final Instant now = Instant.parse("2021-01-01T00:00:00Z");
final Supplier<Instant> timeSupplier = incrementingSecondSupplier(now);
jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, 30, 500, 10);
jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, DEFAULT_MINIMUM_AGE_IN_DAYS, DEFAULT_EXCESSIVE_NUMBER_OF_JOBS,
DEFAULT_MINIMUM_RECENCY_COUNT);

final long job1 = jobPersistence.enqueueJob(SCOPE + "-1", SYNC_JOB_CONFIG).orElseThrow();
final long job2 = jobPersistence.enqueueJob(SCOPE + "-2", SYNC_JOB_CONFIG).orElseThrow();
Expand Down Expand Up @@ -1315,4 +1322,140 @@ private Job addStateToJob(final Job job) throws IOException, SQLException {

}

@Nested
@DisplayName("When listing job statuses with specified connection id and timestamp")
class ListJobStatusWithConnection {

@Test
@DisplayName("Should list only job statuses of specified connection id")
public void testConnectionIdFiltering() throws IOException {
jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, DEFAULT_MINIMUM_AGE_IN_DAYS, DEFAULT_EXCESSIVE_NUMBER_OF_JOBS,
DEFAULT_MINIMUM_RECENCY_COUNT);

// create a connection with a non-relevant connection id that should be ignored for the duration of
// the test
final long wrongConnectionSyncJobId = jobPersistence.enqueueJob(UUID.randomUUID().toString(), SYNC_JOB_CONFIG).orElseThrow();
final int wrongSyncJobAttemptNumber0 = jobPersistence.createAttempt(wrongConnectionSyncJobId, LOG_PATH);
jobPersistence.failAttempt(wrongConnectionSyncJobId, wrongSyncJobAttemptNumber0);
assertEquals(0, jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH).size());

// create a connection with relevant connection id
final long syncJobId = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
final int syncJobAttemptNumber0 = jobPersistence.createAttempt(syncJobId, LOG_PATH);
jobPersistence.failAttempt(syncJobId, syncJobAttemptNumber0);

// check to see current status of only relevantly scoped job
final List<JobStatus> jobStatuses = jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH);
assertEquals(jobStatuses.size(), 1);
assertEquals(JobStatus.INCOMPLETE, jobStatuses.get(0));
}

@Test
@DisplayName("Should list jobs statuses filtered by different timestamps")
public void testTimestampFiltering() throws IOException {
jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, DEFAULT_MINIMUM_AGE_IN_DAYS, DEFAULT_EXCESSIVE_NUMBER_OF_JOBS,
DEFAULT_MINIMUM_RECENCY_COUNT);

// Create and fail initial job
final long syncJobId = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
final int syncJobAttemptNumber0 = jobPersistence.createAttempt(syncJobId, LOG_PATH);
jobPersistence.failAttempt(syncJobId, syncJobAttemptNumber0);
jobPersistence.failJob(syncJobId);

// Check to see current status of all jobs from beginning of time, expecting only 1 job
final List<JobStatus> jobStatuses = jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH);
assertEquals(jobStatuses.size(), 1);
assertEquals(JobStatus.FAILED, jobStatuses.get(0));

// Edit time supplier to return later time
final Instant timeAfterFirstJob = NOW.plusSeconds(60);
when(timeSupplier.get()).thenReturn(timeAfterFirstJob);

// Create and succeed second job
final long newSyncJobId = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
final int newSyncJobAttemptNumber = jobPersistence.createAttempt(newSyncJobId, LOG_PATH);
jobPersistence.succeedAttempt(newSyncJobId, newSyncJobAttemptNumber);

// Check to see current status of all jobs from beginning of time, expecting both jobs in createAt
// descending order (most recent first)
final List<JobStatus> allQueryJobStatuses =
jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH);
assertEquals(2, allQueryJobStatuses.size());
assertEquals(JobStatus.SUCCEEDED, allQueryJobStatuses.get(0));
assertEquals(JobStatus.FAILED, allQueryJobStatuses.get(1));

// Look up jobs with a timestamp after the first job. Expecting only the second job status
final List<JobStatus> timestampFilteredJobStatuses =
jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), timeAfterFirstJob);
assertEquals(1, timestampFilteredJobStatuses.size());
assertEquals(JobStatus.SUCCEEDED, timestampFilteredJobStatuses.get(0));

// Check to see if timestamp filtering is working by only looking up jobs with timestamp after
// second job. Expecting no job status output
final Instant timeAfterSecondJob = timeAfterFirstJob.plusSeconds(60);
assertEquals(0, jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), timeAfterSecondJob).size());
}

@Test
@DisplayName("Should list jobs statuses of differing status types")
public void testMultipleJobStatusTypes() throws IOException {
final Supplier<Instant> timeSupplier = incrementingSecondSupplier(NOW);
jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, DEFAULT_MINIMUM_AGE_IN_DAYS, DEFAULT_EXCESSIVE_NUMBER_OF_JOBS,
DEFAULT_MINIMUM_RECENCY_COUNT);

// Create and fail initial job
final long syncJobId1 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
final int syncJobAttemptNumber1 = jobPersistence.createAttempt(syncJobId1, LOG_PATH);
jobPersistence.failAttempt(syncJobId1, syncJobAttemptNumber1);
jobPersistence.failJob(syncJobId1);

// Create and succeed second job
final long syncJobId2 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
final int syncJobAttemptNumber2 = jobPersistence.createAttempt(syncJobId2, LOG_PATH);
jobPersistence.succeedAttempt(syncJobId2, syncJobAttemptNumber2);

// Create and cancel third job
final long syncJobId3 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
jobPersistence.createAttempt(syncJobId3, LOG_PATH);
jobPersistence.cancelJob(syncJobId3);

// Check to see current status of all jobs from beginning of time, expecting all jobs in createAt
// descending order (most recent first)
final List<JobStatus> allJobStatuses =
jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH);
assertEquals(3, allJobStatuses.size());
assertEquals(JobStatus.CANCELLED, allJobStatuses.get(0));
assertEquals(JobStatus.SUCCEEDED, allJobStatuses.get(1));
assertEquals(JobStatus.FAILED, allJobStatuses.get(2));
}

@Test
@DisplayName("Should list jobs statuses of differing job config types")
public void testMultipleConfigTypes() throws IOException {
final Set<ConfigType> configTypes = Sets.newHashSet(ConfigType.GET_SPEC, ConfigType.CHECK_CONNECTION_DESTINATION);
final Supplier<Instant> timeSupplier = incrementingSecondSupplier(NOW);
jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, DEFAULT_MINIMUM_AGE_IN_DAYS, DEFAULT_EXCESSIVE_NUMBER_OF_JOBS,
DEFAULT_MINIMUM_RECENCY_COUNT);

// pending status
final long failedSpecJobId = jobPersistence.enqueueJob(SCOPE, CHECK_JOB_CONFIG).orElseThrow();
jobPersistence.failJob(failedSpecJobId);

// incomplete status
final long incompleteSpecJobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
final int attemptNumber = jobPersistence.createAttempt(incompleteSpecJobId, LOG_PATH);
jobPersistence.failAttempt(incompleteSpecJobId, attemptNumber);

// this job should be ignored since it's not in the configTypes we're querying for
jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();

// expect order to be from most recent to least recent
final List<JobStatus> allJobStatuses = jobPersistence.listJobStatusWithConnection(CONNECTION_ID, configTypes, Instant.EPOCH);
assertEquals(2, allJobStatuses.size());
assertEquals(JobStatus.INCOMPLETE, allJobStatuses.get(0));
assertEquals(JobStatus.FAILED, allJobStatuses.get(1));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogActivityImpl;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflowImpl;
import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflowImpl;
import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivityImpl;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivityImpl;
import io.airbyte.workers.temporal.scheduling.activities.ConnectionDeletionActivityImpl;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivityImpl;
Expand Down Expand Up @@ -159,6 +160,7 @@ public void start() {

private void registerConnectionManager(final WorkerFactory factory) {
final JobCreator jobCreator = new DefaultJobCreator(jobPersistence, configRepository, defaultWorkerConfigs.getResourceRequirements());
final FeatureFlags featureFlags = new EnvVariableFeatureFlags();

final Worker connectionUpdaterWorker =
factory.newWorker(TemporalJobType.CONNECTION_UPDATER.toString(), getWorkerOptions(maxWorkers.getMaxSyncWorkers()));
Expand All @@ -177,7 +179,8 @@ private void registerConnectionManager(final WorkerFactory factory) {
configRepository,
jobCreator),
new ConfigFetchActivityImpl(configRepository, jobPersistence, configs, () -> Instant.now().getEpochSecond()),
new ConnectionDeletionActivityImpl(connectionHelper));
new ConnectionDeletionActivityImpl(connectionHelper),
new AutoDisableConnectionActivityImpl(configRepository, jobPersistence, featureFlags, configs));
}

private void registerSync(final WorkerFactory factory) {
Expand Down
Loading