diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 373e0427d8382..f7ff30798260c 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -1887,6 +1887,10 @@ components: type: array items: $ref: "#/components/schemas/Notification" + firstCompletedSync: + type: boolean + feedbackDone: + type: boolean WorkspaceUpdate: type: object required: diff --git a/airbyte-config/init/src/main/java/io/airbyte/config/init/YamlSeedConfigPersistence.java b/airbyte-config/init/src/main/java/io/airbyte/config/init/YamlSeedConfigPersistence.java index db6841b47b95f..a0637930edd8f 100644 --- a/airbyte-config/init/src/main/java/io/airbyte/config/init/YamlSeedConfigPersistence.java +++ b/airbyte-config/init/src/main/java/io/airbyte/config/init/YamlSeedConfigPersistence.java @@ -130,6 +130,11 @@ public void writeConfig(final AirbyteConfig configType, final String configI throw new UnsupportedOperationException("The seed config persistence is read only."); } + @Override + public void writeConfigs(final AirbyteConfig configType, final Map configs) { + throw new UnsupportedOperationException("The seed config persistence is read only."); + } + @Override public void deleteConfig(final AirbyteConfig configType, final String configId) { throw new UnsupportedOperationException("The seed config persistence is read only."); diff --git a/airbyte-config/models/src/main/resources/types/StandardWorkspace.yaml b/airbyte-config/models/src/main/resources/types/StandardWorkspace.yaml index 160aec4799a0f..44a44070ae6b7 100644 --- a/airbyte-config/models/src/main/resources/types/StandardWorkspace.yaml +++ b/airbyte-config/models/src/main/resources/types/StandardWorkspace.yaml @@ -43,3 +43,7 @@ properties: type: array items: "$ref": Notification.yaml + firstCompletedSync: + type: boolean + feedbackDone: + type: boolean diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java index 5ef82cc21ee2e..2a9cad477d1c7 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java @@ -23,6 +23,8 @@ public interface ConfigPersistence { void writeConfig(AirbyteConfig configType, String configId, T config) throws JsonValidationException, IOException; + void writeConfigs(AirbyteConfig configType, Map configs) throws IOException, JsonValidationException; + void deleteConfig(AirbyteConfig configType, String configId) throws ConfigNotFoundException, IOException; void replaceAllConfigs(Map> configs, boolean dryRun) throws IOException; diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index 7dda31b4961d3..3d2889ab57235 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -135,6 +135,16 @@ public StandardSourceDefinition getSourceDefinitionFromConnection(final UUID con } } + public StandardWorkspace getStandardWorkspaceFromConnection(final UUID connectionId, final boolean isTombstone) { + try { + final StandardSync sync = getStandardSync(connectionId); + final SourceConnection source = getSourceConnection(sync.getSourceId()); + return getStandardWorkspace(source.getWorkspaceId(), isTombstone); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + public List listStandardSourceDefinitions() throws JsonValidationException, IOException { return persistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class); } diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java index 75b0b9267f5f8..57d2dd400f797 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.sql.SQLException; import java.time.OffsetDateTime; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -146,18 +147,32 @@ public List> listConfigsWithMetadata(final AirbyteConf @Override public void writeConfig(final AirbyteConfig configType, final String configId, final T config) throws IOException { - database.transaction(ctx -> { - final boolean isExistingConfig = ctx.fetchExists(select() - .from(AIRBYTE_CONFIGS) - .where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType.name()), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId))); + final Map configIdToConfig = new HashMap<>() { + + { + put(configId, config); + } + + }; + writeConfigs(configType, configIdToConfig); + } + @Override + public void writeConfigs(final AirbyteConfig configType, final Map configs) throws IOException { + database.transaction(ctx -> { final OffsetDateTime timestamp = OffsetDateTime.now(); + configs.forEach((configId, config) -> { + final boolean isExistingConfig = ctx.fetchExists(select() + .from(AIRBYTE_CONFIGS) + .where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType.name()), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId))); - if (isExistingConfig) { - updateConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config), configId); - } else { - insertConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config), configType.getIdFieldName()); - } + if (isExistingConfig) { + updateConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config), configId); + } else { + insertConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config), + configType.getIdFieldName()); + } + }); return null; }); diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java index d370ada77ac4a..e91f335d15eb1 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java @@ -92,6 +92,15 @@ public void writeConfig(final AirbyteConfig configType, final String configI } } + @Override + public void writeConfigs(final AirbyteConfig configType, final Map configs) throws IOException { + synchronized (lock) { + for (final Map.Entry config : configs.entrySet()) { + writeConfigInternal(configType, config.getKey(), config.getValue()); + } + } + } + private void writeConfigs(final AirbyteConfig configType, final Stream configs, final Path rootOverride) { configs.forEach(config -> { final String configId = configType.getId(config); diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java index 3cb572ef481ca..81ac50832e84c 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ValidatingConfigPersistence.java @@ -11,6 +11,7 @@ import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Stream; @@ -59,8 +60,25 @@ public List> listConfigsWithMetadata(final AirbyteConf @Override public void writeConfig(final AirbyteConfig configType, final String configId, final T config) throws JsonValidationException, IOException { - validateJson(Jsons.jsonNode(config), configType); - decoratedPersistence.writeConfig(configType, configId, config); + + final Map configIdToConfig = new HashMap<>() { + + { + put(configId, config); + } + + }; + + writeConfigs(configType, configIdToConfig); + } + + @Override + public void writeConfigs(final AirbyteConfig configType, final Map configs) + throws IOException, JsonValidationException { + for (final Map.Entry config : configs.entrySet()) { + validateJson(Jsons.jsonNode(config.getValue()), configType); + } + decoratedPersistence.writeConfigs(configType, configs); } @Override diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java index 4dcbf918c9e6c..bb0053f52c18f 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java @@ -15,10 +15,12 @@ import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSourceDefinition.SourceType; import io.airbyte.db.Database; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.UUID; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import org.jooq.Record1; @@ -88,6 +90,13 @@ protected static void writeDestination(final ConfigPersistence configPersistence configPersistence.writeConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(), destination); } + protected static void writeDestinations(final ConfigPersistence configPersistence, final List destinations) + throws Exception { + final Map destinationsByID = destinations.stream() + .collect(Collectors.toMap(destinationDefinition -> destinationDefinition.getDestinationDefinitionId().toString(), Function.identity())); + configPersistence.writeConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destinationsByID); + } + protected static void deleteDestination(final ConfigPersistence configPersistence, final StandardDestinationDefinition destination) throws Exception { configPersistence.deleteConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString()); diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java index 641c5933eea79..fad4f7bd63975 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java @@ -5,9 +5,11 @@ package io.airbyte.config.persistence; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -32,6 +34,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; class ConfigRepositoryTest { @@ -45,7 +49,7 @@ void setup() { configPersistence = mock(ConfigPersistence.class); final var secretPersistence = new MemorySecretPersistence(); configRepository = - new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.of(secretPersistence), Optional.of(secretPersistence)); + spy(new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.of(secretPersistence), Optional.of(secretPersistence))); } @AfterEach @@ -74,6 +78,35 @@ void assertReturnsWorkspace(final StandardWorkspace workspace) throws ConfigNotF assertEquals(workspace, configRepository.getStandardWorkspace(WORKSPACE_ID, true)); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testWorkspaceByConnectionId(final boolean isTombstone) throws ConfigNotFoundException, IOException, JsonValidationException { + final StandardWorkspace workspace = new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withTombstone(isTombstone); + + final UUID connectionId = UUID.randomUUID(); + final UUID sourceId = UUID.randomUUID(); + final StandardSync mSync = new StandardSync() + .withSourceId(sourceId); + final SourceConnection mSourceConnection = new SourceConnection() + .withWorkspaceId(WORKSPACE_ID); + final StandardWorkspace mWorkflow = new StandardWorkspace() + .withWorkspaceId(WORKSPACE_ID); + + doReturn(mSync) + .when(configRepository) + .getStandardSync(connectionId); + doReturn(mSourceConnection) + .when(configRepository) + .getSourceConnection(sourceId); + doReturn(mWorkflow) + .when(configRepository) + .getStandardWorkspace(WORKSPACE_ID, isTombstone); + + configRepository.getStandardWorkspaceFromConnection(connectionId, isTombstone); + + verify(configRepository).getStandardWorkspace(WORKSPACE_ID, isTombstone); + } + @Test void testGetConnectionState() throws Exception { final UUID connectionId = UUID.randomUUID(); diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java index 4b050b0385af0..853173ce43a91 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java @@ -13,6 +13,7 @@ import static org.mockito.Mockito.spy; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; import io.airbyte.config.AirbyteConfig; import io.airbyte.config.ConfigSchema; @@ -53,6 +54,17 @@ void tearDown() throws Exception { database.close(); } + @Test + public void testMultiWriteAndGetConfig() throws Exception { + writeDestinations(configPersistence, Lists.newArrayList(DESTINATION_S3, DESTINATION_SNOWFLAKE)); + assertRecordCount(2); + assertHasDestination(DESTINATION_S3); + assertHasDestination(DESTINATION_SNOWFLAKE); + assertEquals( + List.of(DESTINATION_SNOWFLAKE, DESTINATION_S3), + configPersistence.listConfigs(STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)); + } + @Test public void testWriteAndGetConfig() throws Exception { writeDestination(configPersistence, DESTINATION_S3); @@ -67,10 +79,10 @@ public void testWriteAndGetConfig() throws Exception { @Test public void testListConfigWithMetadata() throws Exception { - Instant now = Instant.now().minus(Duration.ofSeconds(1)); + final Instant now = Instant.now().minus(Duration.ofSeconds(1)); writeDestination(configPersistence, DESTINATION_S3); writeDestination(configPersistence, DESTINATION_SNOWFLAKE); - List> configWithMetadata = configPersistence + final List> configWithMetadata = configPersistence .listConfigsWithMetadata(STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class); assertEquals(2, configWithMetadata.size()); assertEquals("STANDARD_DESTINATION_DEFINITION", configWithMetadata.get(0).getConfigType()); diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/FileSystemConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/FileSystemConfigPersistenceTest.java index 7418a844c0469..26604a86a9c62 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/FileSystemConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/FileSystemConfigPersistenceTest.java @@ -15,7 +15,9 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -58,6 +60,34 @@ void testReadWriteConfig() throws IOException, JsonValidationException, ConfigNo StandardSourceDefinition.class)); } + @Test + void testReadWriteConfigs() throws IOException, JsonValidationException, ConfigNotFoundException { + final Map sourceDefinitionById = new HashMap<>() { + + { + put(UUID_1.toString(), SOURCE_1); + put(UUID_2.toString(), SOURCE_2); + } + + }; + + configPersistence.writeConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, sourceDefinitionById); + + assertEquals( + SOURCE_1, + configPersistence.getConfig( + ConfigSchema.STANDARD_SOURCE_DEFINITION, + UUID_1.toString(), + StandardSourceDefinition.class)); + + assertEquals( + SOURCE_2, + configPersistence.getConfig( + ConfigSchema.STANDARD_SOURCE_DEFINITION, + UUID_2.toString(), + StandardSourceDefinition.class)); + } + @Test void testListConfigs() throws JsonValidationException, IOException { configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), SOURCE_1); diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ValidatingConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ValidatingConfigPersistenceTest.java index c3dd150bef6fd..3cbca7edcd9d8 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ValidatingConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ValidatingConfigPersistenceTest.java @@ -20,7 +20,9 @@ import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.nio.file.Path; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -58,7 +60,29 @@ void setUp() { @Test void testWriteConfigSuccess() throws IOException, JsonValidationException { configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), SOURCE_1); - verify(decoratedConfigPersistence).writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), SOURCE_1); + final Map aggregatedSource = new HashMap<>() { + + { + put(UUID_1.toString(), SOURCE_1); + } + + }; + verify(decoratedConfigPersistence).writeConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, aggregatedSource); + } + + @Test + void testWriteConfigsSuccess() throws IOException, JsonValidationException { + final Map sourceDefinitionById = new HashMap<>() { + + { + put(UUID_1.toString(), SOURCE_1); + put(UUID_2.toString(), SOURCE_2); + } + + }; + + configPersistence.writeConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, sourceDefinitionById); + verify(decoratedConfigPersistence).writeConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, sourceDefinitionById); } @Test @@ -70,6 +94,25 @@ void testWriteConfigFailure() throws JsonValidationException { verifyNoInteractions(decoratedConfigPersistence); } + @Test + void testWriteConfigsFailure() throws JsonValidationException { + doThrow(new JsonValidationException("error")).when(schemaValidator).ensure(any(), any()); + + final Map sourceDefinitionById = new HashMap<>() { + + { + put(UUID_1.toString(), SOURCE_1); + put(UUID_2.toString(), SOURCE_2); + } + + }; + + assertThrows(JsonValidationException.class, + () -> configPersistence.writeConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, sourceDefinitionById)); + + verifyNoInteractions(decoratedConfigPersistence); + } + @Test void testGetConfigSuccess() throws IOException, JsonValidationException, ConfigNotFoundException { when(decoratedConfigPersistence.getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), StandardSourceDefinition.class)) diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobSubmitter.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobSubmitter.java index c35a56fc57779..5a720cda2d888 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobSubmitter.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobSubmitter.java @@ -10,8 +10,10 @@ import io.airbyte.commons.enums.Enums; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.JobConfig.ConfigType; +import io.airbyte.config.StandardWorkspace; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.helpers.LogConfigs; +import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.scheduler.app.worker_run.TemporalWorkerRunFactory; import io.airbyte.scheduler.app.worker_run.WorkerRun; import io.airbyte.scheduler.models.Job; @@ -22,6 +24,7 @@ import java.nio.file.Path; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import org.slf4j.Logger; @@ -39,6 +42,7 @@ public class JobSubmitter implements Runnable { private final JobNotifier jobNotifier; private final WorkerEnvironment workerEnvironment; private final LogConfigs logConfigs; + private final ConfigRepository configRepository; // See attemptJobSubmit() to understand the need for this Concurrent Set. private final Set runningJobs = Sets.newConcurrentHashSet(); @@ -49,7 +53,8 @@ public JobSubmitter(final ExecutorService threadPool, final JobTracker jobTracker, final JobNotifier jobNotifier, final WorkerEnvironment workerEnvironment, - final LogConfigs logConfigs) { + final LogConfigs logConfigs, + final ConfigRepository configRepository) { this.threadPool = threadPool; this.persistence = persistence; this.temporalWorkerRunFactory = temporalWorkerRunFactory; @@ -57,6 +62,7 @@ public JobSubmitter(final ExecutorService threadPool, this.jobNotifier = jobNotifier; this.workerEnvironment = workerEnvironment; this.logConfigs = logConfigs; + this.configRepository = configRepository; } @Override @@ -80,15 +86,15 @@ public void run() { * Since job submission and job execution happen in two separate thread pools, and job execution is * what removes a job from the submission queue, it is possible for a job to be submitted multiple * times. - * + *

* This synchronised block guarantees only a single thread can utilise the concurrent set to decide * whether a job should be submitted. This job id is added here, and removed in the finish block of * {@link #submitJob(Job)}. - * + *

* Since {@link JobPersistence#getNextJob()} returns the next queued job, this solution cause * head-of-line blocking as the JobSubmitter tries to submit the same job. However, this suggests * the Worker Pool needs more workers and is inevitable when dealing with pending jobs. - * + *

* See https://github.com/airbytehq/airbyte/issues/4378 for more info. */ synchronized private Consumer attemptJobSubmit() { @@ -133,7 +139,16 @@ void submitJob(final Job job) { if (output.getStatus() == io.airbyte.workers.JobStatus.SUCCEEDED) { persistence.succeedAttempt(job.getId(), attemptNumber); + if (job.getConfigType() == ConfigType.SYNC) { + final String connectionId = job.getScope(); + final StandardWorkspace workspace = configRepository.getStandardWorkspaceFromConnection(UUID.fromString(connectionId), false); + + if (!workspace.getFirstCompletedSync()) { + workspace.setFirstCompletedSync(true); + configRepository.writeStandardWorkspace(workspace); + } + jobNotifier.successJob(job); } } else { diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index 7830b35c0ef9b..85d9ee8512bba 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -57,7 +57,7 @@ * The SchedulerApp is responsible for finding new scheduled jobs that need to be run and to launch * them. The current implementation uses two thread pools to do so. One pool is responsible for all * job launching operations. The other pool is responsible for clean up operations. - * + *

* Operations can have thread pools under the hood. An important thread pool to note is that the job * submitter thread pool. This pool does the work of submitting jobs to temporal - the size of this * pool determines the number of concurrent jobs that can be run. This is controlled via the @@ -124,7 +124,7 @@ public void start() throws IOException { jobPersistence, temporalWorkerRunFactory, new JobTracker(configRepository, jobPersistence, trackingClient), - jobNotifier, workerEnvironment, logConfigs); + jobNotifier, workerEnvironment, logConfigs, configRepository); final Map mdc = MDC.getCopyOfContextMap(); diff --git a/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/JobSubmitterTest.java b/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/JobSubmitterTest.java index 4da251262da22..74a798afbccd6 100644 --- a/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/JobSubmitterTest.java +++ b/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/JobSubmitterTest.java @@ -9,6 +9,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doNothing; @@ -23,11 +24,15 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import io.airbyte.config.Configs.WorkerEnvironment; +import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobOutput; +import io.airbyte.config.StandardWorkspace; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.helpers.LogConfiguration; +import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.scheduler.app.worker_run.TemporalWorkerRunFactory; import io.airbyte.scheduler.app.worker_run.WorkerRun; import io.airbyte.scheduler.models.Job; @@ -42,6 +47,7 @@ import java.nio.file.Path; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.BeforeEach; @@ -67,6 +73,7 @@ public class JobSubmitterTest { private JobSubmitter jobSubmitter; private JobTracker jobTracker; private JobNotifier jobNotifier; + private ConfigRepository configRepository; @BeforeEach public void setup() throws IOException { @@ -88,12 +95,17 @@ public void setup() throws IOException { when(persistence.createAttempt(JOB_ID, logPath)).thenReturn(ATTEMPT_NUMBER); jobNotifier = mock(JobNotifier.class); + configRepository = mock(ConfigRepository.class); + jobSubmitter = spy(new JobSubmitter( MoreExecutors.newDirectExecutorService(), persistence, workerRunFactory, jobTracker, - jobNotifier, WorkerEnvironment.DOCKER, LogConfiguration.EMPTY)); + jobNotifier, + WorkerEnvironment.DOCKER, + LogConfiguration.EMPTY, + configRepository)); } @Test @@ -120,6 +132,14 @@ public void testPersistenceNoJob() throws Exception { public void testSuccess() throws Exception { doReturn(SUCCESS_OUTPUT).when(workerRun).call(); + final StandardWorkspace completedSyncWorkspace = new StandardWorkspace() + .withFirstCompletedSync(true); + final StandardWorkspace nonCompletedSyncWorkspace = new StandardWorkspace() + .withFirstCompletedSync(false); + + when(configRepository.listStandardWorkspaces(false)) + .thenReturn(Lists.newArrayList(completedSyncWorkspace, nonCompletedSyncWorkspace)); + jobSubmitter.submitJob(job); final InOrder inOrder = inOrder(persistence, jobSubmitter); @@ -130,6 +150,28 @@ public void testSuccess() throws Exception { inOrder.verifyNoMoreInteractions(); } + @Test + public void testSuccessCompleteWorkspace() throws Exception { + doReturn(SUCCESS_OUTPUT).when(workerRun).call(); + + final StandardWorkspace completedSyncWorkspace = new StandardWorkspace() + .withFirstCompletedSync(true); + final StandardWorkspace nonCompletedSyncWorkspace = new StandardWorkspace() + .withFirstCompletedSync(false); + + when(configRepository.getStandardWorkspaceFromConnection(any(UUID.class), eq(false))) + .thenReturn(nonCompletedSyncWorkspace); + + when(job.getScope()) + .thenReturn(UUID.randomUUID().toString()); + when(job.getConfigType()) + .thenReturn(ConfigType.SYNC); + + jobSubmitter.submitJob(job); + + verify(configRepository).writeStandardWorkspace(nonCompletedSyncWorkspace); + } + @Test public void testFailure() throws Exception { doReturn(FAILED_OUTPUT).when(workerRun).call(); @@ -141,6 +183,7 @@ public void testFailure() throws Exception { inOrder.verify(persistence).failAttempt(JOB_ID, ATTEMPT_NUMBER); verify(jobTracker).trackSync(job, JobState.FAILED); inOrder.verifyNoMoreInteractions(); + verifyNoInteractions(configRepository); } @Test @@ -154,6 +197,7 @@ public void testException() throws Exception { inOrder.verify(persistence).failAttempt(JOB_ID, ATTEMPT_NUMBER); inOrder.verify(jobTracker).trackSync(job, JobState.FAILED); inOrder.verifyNoMoreInteractions(); + verifyNoInteractions(configRepository); } @Test diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index b7b3057f8e3f5..bd714c51a588a 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -6474,12 +6474,14 @@

Return type

Example data

Content-Type: application/json
{
-  "displaySetupWizard" : true,
   "news" : true,
+  "displaySetupWizard" : true,
   "initialSetupComplete" : true,
   "anonymousDataCollection" : true,
   "customerId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
   "name" : "name",
+  "firstCompletedSync" : true,
+  "feedbackDone" : true,
   "email" : "email",
   "slug" : "slug",
   "securityUpdates" : true,
@@ -6599,12 +6601,14 @@ 

Return type

Example data

Content-Type: application/json
{
-  "displaySetupWizard" : true,
   "news" : true,
+  "displaySetupWizard" : true,
   "initialSetupComplete" : true,
   "anonymousDataCollection" : true,
   "customerId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
   "name" : "name",
+  "firstCompletedSync" : true,
+  "feedbackDone" : true,
   "email" : "email",
   "slug" : "slug",
   "securityUpdates" : true,
@@ -6679,12 +6683,14 @@ 

Return type

Example data

Content-Type: application/json
{
-  "displaySetupWizard" : true,
   "news" : true,
+  "displaySetupWizard" : true,
   "initialSetupComplete" : true,
   "anonymousDataCollection" : true,
   "customerId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
   "name" : "name",
+  "firstCompletedSync" : true,
+  "feedbackDone" : true,
   "email" : "email",
   "slug" : "slug",
   "securityUpdates" : true,
@@ -6748,12 +6754,14 @@ 

Example data

Content-Type: application/json
{
   "workspaces" : [ {
-    "displaySetupWizard" : true,
     "news" : true,
+    "displaySetupWizard" : true,
     "initialSetupComplete" : true,
     "anonymousDataCollection" : true,
     "customerId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
     "name" : "name",
+    "firstCompletedSync" : true,
+    "feedbackDone" : true,
     "email" : "email",
     "slug" : "slug",
     "securityUpdates" : true,
@@ -6772,12 +6780,14 @@ 

Example data

} ], "workspaceId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91" }, { - "displaySetupWizard" : true, "news" : true, + "displaySetupWizard" : true, "initialSetupComplete" : true, "anonymousDataCollection" : true, "customerId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91", "name" : "name", + "firstCompletedSync" : true, + "feedbackDone" : true, "email" : "email", "slug" : "slug", "securityUpdates" : true, @@ -6847,12 +6857,14 @@

Return type

Example data

Content-Type: application/json
{
-  "displaySetupWizard" : true,
   "news" : true,
+  "displaySetupWizard" : true,
   "initialSetupComplete" : true,
   "anonymousDataCollection" : true,
   "customerId" : "046b6c7f-0b8a-43b9-b35d-6489e6daee91",
   "name" : "name",
+  "firstCompletedSync" : true,
+  "feedbackDone" : true,
   "email" : "email",
   "slug" : "slug",
   "securityUpdates" : true,
@@ -8077,6 +8089,8 @@ 

WorkspaceRead - news (optional)
securityUpdates (optional)
notifications (optional)
+
firstCompletedSync (optional)
+
feedbackDone (optional)