diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 7621cb4baf936a..ccb3f46e8e69dc 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -297,7 +297,7 @@ private SourceDiscoverSchemaRead retrieveDiscoveredSchema(final SynchronousRespo .jobInfo(jobConverter.getSynchronousJobRead(response)); if (response.isSuccess()) { - ActorCatalog catalog = configRepository.getActorCatalogById(response.getOutput()); + final ActorCatalog catalog = configRepository.getActorCatalogById(response.getOutput()); final AirbyteCatalog persistenceCatalog = Jsons.object(catalog.getCatalog(), io.airbyte.protocol.models.AirbyteCatalog.class); sourceDiscoverSchemaRead.catalog(CatalogConverter.toApi(persistenceCatalog)); @@ -352,7 +352,8 @@ public DestinationDefinitionSpecificationRead getDestinationSpecification( return specRead; } - public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody) throws IOException { + public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody) + throws IOException, JsonValidationException, ConfigNotFoundException { return submitManualSyncToWorker(connectionIdRequestBody.getConnectionId()); } @@ -365,19 +366,21 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE return submitCancellationToWorker(jobIdRequestBody.getId()); } - private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discoveredSchema, SourceDiscoverSchemaRequestBody discoverSchemaRequestBody) + private void discoveredSchemaWithCatalogDiff(final SourceDiscoverSchemaRead discoveredSchema, + final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody) throws JsonValidationException, ConfigNotFoundException, IOException { final Optional catalogUsedToMakeConfiguredCatalog = connectionsHandler .getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId()); final ConnectionRead connectionRead = connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()); final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog = connectionRead.getSyncCatalog(); - CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), - CatalogConverter.toProtocol(currentAirbyteCatalog)); - boolean containsBreakingChange = containsBreakingChange(diff); - ConnectionUpdate updateObject = + final CatalogDiff diff = + connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), + CatalogConverter.toProtocol(currentAirbyteCatalog)); + final boolean containsBreakingChange = containsBreakingChange(diff); + final ConnectionUpdate updateObject = new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId()); - ConnectionStatus connectionStatus; + final ConnectionStatus connectionStatus; if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) { connectionStatus = ConnectionStatus.INACTIVE; } else { @@ -389,7 +392,9 @@ private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discovered } - private boolean shouldDisableConnection(boolean containsBreakingChange, NonBreakingChangesPreference preference, CatalogDiff diff) { + private boolean shouldDisableConnection(final boolean containsBreakingChange, + final NonBreakingChangesPreference preference, + final CatalogDiff diff) { if (!envVariableFeatureFlags.autoDetectSchema()) { return false; } @@ -438,13 +443,16 @@ private JobInfoRead submitCancellationToWorker(final Long jobId) throws IOExcept return jobConverter.getJobInfoRead(jobPersistence.getJob(jobId)); } - private JobInfoRead submitManualSyncToWorker(final UUID connectionId) throws IOException { + private JobInfoRead submitManualSyncToWorker(final UUID connectionId) + throws IOException, IllegalStateException, JsonValidationException, ConfigNotFoundException { + // get standard sync to validate connection id before submitting sync to temporal + configRepository.getStandardSync(connectionId); final ManualOperationResult manualSyncResult = eventRunner.startNewManualSync(connectionId); return readJobFromResult(manualSyncResult); } - private JobInfoRead submitResetConnectionToWorker(final UUID connectionId) throws IOException, JsonValidationException, ConfigNotFoundException { + private JobInfoRead submitResetConnectionToWorker(final UUID connectionId) throws IOException, IllegalStateException, ConfigNotFoundException { final ManualOperationResult resetConnectionResult = eventRunner.resetConnection( connectionId, configRepository.getAllStreamsForConnection(connectionId), @@ -468,12 +476,12 @@ private JobInfoRead readJobFromResult(final ManualOperationResult manualOperatio } private boolean containsBreakingChange(final CatalogDiff diff) { - for (StreamTransform streamTransform : diff.getTransforms()) { + for (final StreamTransform streamTransform : diff.getTransforms()) { if (streamTransform.getTransformType() != TransformTypeEnum.UPDATE_STREAM) { continue; } - boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(FieldTransform::getBreaking); + final boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(FieldTransform::getBreaking); if (anyBreakingFieldTransforms) { return true; } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index f06b4d6a8d695f..21284a7ed0e21d 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -927,7 +927,7 @@ void testEnumConversion() { } @Test - void testSyncConnection() throws IOException { + void testSyncConnection() throws IOException, JsonValidationException, ConfigNotFoundException { final UUID connectionId = UUID.randomUUID(); final long jobId = 123L;