From 1acf0d60b2b7e73a7c9dc4b4e5e9436cccd93fc8 Mon Sep 17 00:00:00 2001 From: terencecho Date: Mon, 5 Dec 2022 15:25:02 -0800 Subject: [PATCH 1/2] Validate connection id before sync --- .../server/handlers/SchedulerHandler.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 7621cb4baf936..2a78fbd9360c4 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -297,7 +297,7 @@ private SourceDiscoverSchemaRead retrieveDiscoveredSchema(final SynchronousRespo .jobInfo(jobConverter.getSynchronousJobRead(response)); if (response.isSuccess()) { - ActorCatalog catalog = configRepository.getActorCatalogById(response.getOutput()); + final ActorCatalog catalog = configRepository.getActorCatalogById(response.getOutput()); final AirbyteCatalog persistenceCatalog = Jsons.object(catalog.getCatalog(), io.airbyte.protocol.models.AirbyteCatalog.class); sourceDiscoverSchemaRead.catalog(CatalogConverter.toApi(persistenceCatalog)); @@ -365,19 +365,19 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE return submitCancellationToWorker(jobIdRequestBody.getId()); } - private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discoveredSchema, SourceDiscoverSchemaRequestBody discoverSchemaRequestBody) + private void discoveredSchemaWithCatalogDiff(final SourceDiscoverSchemaRead discoveredSchema, final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody) throws JsonValidationException, ConfigNotFoundException, IOException { final Optional catalogUsedToMakeConfiguredCatalog = connectionsHandler .getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId()); final ConnectionRead connectionRead = connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()); final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog = connectionRead.getSyncCatalog(); - CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), + final CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), CatalogConverter.toProtocol(currentAirbyteCatalog)); - boolean containsBreakingChange = containsBreakingChange(diff); - ConnectionUpdate updateObject = + final boolean containsBreakingChange = containsBreakingChange(diff); + final ConnectionUpdate updateObject = new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId()); - ConnectionStatus connectionStatus; + final ConnectionStatus connectionStatus; if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) { connectionStatus = ConnectionStatus.INACTIVE; } else { @@ -389,7 +389,7 @@ private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discovered } - private boolean shouldDisableConnection(boolean containsBreakingChange, NonBreakingChangesPreference preference, CatalogDiff diff) { + private boolean shouldDisableConnection(final boolean containsBreakingChange, final NonBreakingChangesPreference preference, final CatalogDiff diff) { if (!envVariableFeatureFlags.autoDetectSchema()) { return false; } @@ -438,13 +438,15 @@ private JobInfoRead submitCancellationToWorker(final Long jobId) throws IOExcept return jobConverter.getJobInfoRead(jobPersistence.getJob(jobId)); } - private JobInfoRead submitManualSyncToWorker(final UUID connectionId) throws IOException { + private JobInfoRead submitManualSyncToWorker(final UUID connectionId) throws IOException, IllegalStateException, JsonValidationException, ConfigNotFoundException { + // get standard sync to validate connection id before submitting sync to temporal + configRepository.getStandardSync(connectionId); final ManualOperationResult manualSyncResult = eventRunner.startNewManualSync(connectionId); return readJobFromResult(manualSyncResult); } - private JobInfoRead submitResetConnectionToWorker(final UUID connectionId) throws IOException, JsonValidationException, ConfigNotFoundException { + private JobInfoRead submitResetConnectionToWorker(final UUID connectionId) throws IOException, IllegalStateException, ConfigNotFoundException { final ManualOperationResult resetConnectionResult = eventRunner.resetConnection( connectionId, configRepository.getAllStreamsForConnection(connectionId), @@ -468,12 +470,12 @@ private JobInfoRead readJobFromResult(final ManualOperationResult manualOperatio } private boolean containsBreakingChange(final CatalogDiff diff) { - for (StreamTransform streamTransform : diff.getTransforms()) { + for (final StreamTransform streamTransform : diff.getTransforms()) { if (streamTransform.getTransformType() != TransformTypeEnum.UPDATE_STREAM) { continue; } - boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(FieldTransform::getBreaking); + final boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(FieldTransform::getBreaking); if (anyBreakingFieldTransforms) { return true; } From fa8bc565e1d3ec824c69e5e3118fa4ced8f4a78a Mon Sep 17 00:00:00 2001 From: terencecho Date: Mon, 5 Dec 2022 15:26:38 -0800 Subject: [PATCH 2/2] Propogate changes --- .../server/handlers/SchedulerHandler.java | 18 ++++++++++++------ .../server/handlers/SchedulerHandlerTest.java | 2 +- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 2a78fbd9360c4..ccb3f46e8e69d 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -352,7 +352,8 @@ public DestinationDefinitionSpecificationRead getDestinationSpecification( return specRead; } - public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody) throws IOException { + public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody) + throws IOException, JsonValidationException, ConfigNotFoundException { return submitManualSyncToWorker(connectionIdRequestBody.getConnectionId()); } @@ -365,15 +366,17 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE return submitCancellationToWorker(jobIdRequestBody.getId()); } - private void discoveredSchemaWithCatalogDiff(final SourceDiscoverSchemaRead discoveredSchema, final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody) + private void discoveredSchemaWithCatalogDiff(final SourceDiscoverSchemaRead discoveredSchema, + final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody) throws JsonValidationException, ConfigNotFoundException, IOException { final Optional catalogUsedToMakeConfiguredCatalog = connectionsHandler .getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId()); final ConnectionRead connectionRead = connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()); final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog = connectionRead.getSyncCatalog(); - final CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), - CatalogConverter.toProtocol(currentAirbyteCatalog)); + final CatalogDiff diff = + connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), + CatalogConverter.toProtocol(currentAirbyteCatalog)); final boolean containsBreakingChange = containsBreakingChange(diff); final ConnectionUpdate updateObject = new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId()); @@ -389,7 +392,9 @@ private void discoveredSchemaWithCatalogDiff(final SourceDiscoverSchemaRead disc } - private boolean shouldDisableConnection(final boolean containsBreakingChange, final NonBreakingChangesPreference preference, final CatalogDiff diff) { + private boolean shouldDisableConnection(final boolean containsBreakingChange, + final NonBreakingChangesPreference preference, + final CatalogDiff diff) { if (!envVariableFeatureFlags.autoDetectSchema()) { return false; } @@ -438,7 +443,8 @@ private JobInfoRead submitCancellationToWorker(final Long jobId) throws IOExcept return jobConverter.getJobInfoRead(jobPersistence.getJob(jobId)); } - private JobInfoRead submitManualSyncToWorker(final UUID connectionId) throws IOException, IllegalStateException, JsonValidationException, ConfigNotFoundException { + private JobInfoRead submitManualSyncToWorker(final UUID connectionId) + throws IOException, IllegalStateException, JsonValidationException, ConfigNotFoundException { // get standard sync to validate connection id before submitting sync to temporal configRepository.getStandardSync(connectionId); final ManualOperationResult manualSyncResult = eventRunner.startNewManualSync(connectionId); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index f06b4d6a8d695..21284a7ed0e21 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -927,7 +927,7 @@ void testEnumConversion() { } @Test - void testSyncConnection() throws IOException { + void testSyncConnection() throws IOException, JsonValidationException, ConfigNotFoundException { final UUID connectionId = UUID.randomUUID(); final long jobId = 123L;