From 9178442f5ac4c47e49b607f453d3b39acf391098 Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Tue, 14 Sep 2021 17:33:50 -0500 Subject: [PATCH 1/3] Interface changes to support separating secrets from the config --- airbyte-config/persistence/build.gradle | 1 + .../config/persistence/ConfigRepository.java | 3 ++- .../persistence/WorkspaceHelperTest.java | 10 +++++----- .../io/airbyte/server/ConfigDumpImporter.java | 9 +++++++-- .../server/handlers/DestinationHandler.java | 16 +++++++++------- .../airbyte/server/ConfigDumpImporterTest.java | 5 +++-- .../server/handlers/DestinationHandlerTest.java | 11 +++++------ 7 files changed, 32 insertions(+), 23 deletions(-) diff --git a/airbyte-config/persistence/build.gradle b/airbyte-config/persistence/build.gradle index 40bb4cce193dd..8017112a1a37c 100644 --- a/airbyte-config/persistence/build.gradle +++ b/airbyte-config/persistence/build.gradle @@ -7,6 +7,7 @@ dependencies { implementation project(':airbyte-protocol:models') implementation project(':airbyte-config:init') implementation project(':airbyte-json-validation') + implementation project(':airbyte-protocol:models') testImplementation "org.testcontainers:postgresql:1.15.1" } diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index 9c9a2f862dc3d..c1ee0d8335b07 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -188,7 +188,8 @@ public DestinationConnection getDestinationConnection(final UUID destinationId) return persistence.getConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId.toString(), DestinationConnection.class); } - public void writeDestinationConnection(final DestinationConnection destinationConnection) throws JsonValidationException, IOException { + public void writeDestinationConnection(final DestinationConnection destinationConnection, final ConnectorSpecification connectorSpecification) + throws JsonValidationException, IOException { persistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationConnection.getDestinationId().toString(), destinationConnection); } diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/WorkspaceHelperTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/WorkspaceHelperTest.java index b6fd60f3df6b4..67bdbcc438779 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/WorkspaceHelperTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/WorkspaceHelperTest.java @@ -149,13 +149,13 @@ public void testSource() throws IOException, JsonValidationException { @Test public void testDestination() throws IOException, JsonValidationException { configRepository.writeStandardDestinationDefinition(DEST_DEF); - configRepository.writeDestinationConnection(DEST); + configRepository.writeDestinationConnection(DEST, emptyConnectorSpec); final UUID retrievedWorkspace = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID); assertEquals(WORKSPACE_ID, retrievedWorkspace); // check that caching is working - configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(UUID.randomUUID())); + configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(UUID.randomUUID()), emptyConnectorSpec); final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID); assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate); } @@ -165,7 +165,7 @@ public void testConnection() throws IOException, JsonValidationException { configRepository.writeStandardSource(SOURCE_DEF); configRepository.writeSourceConnection(SOURCE, emptyConnectorSpec); configRepository.writeStandardDestinationDefinition(DEST_DEF); - configRepository.writeDestinationConnection(DEST); + configRepository.writeDestinationConnection(DEST, emptyConnectorSpec); // set up connection configRepository.writeStandardSync(CONNECTION); @@ -181,7 +181,7 @@ public void testConnection() throws IOException, JsonValidationException { // check that caching is working final UUID newWorkspace = UUID.randomUUID(); configRepository.writeSourceConnection(Jsons.clone(SOURCE).withWorkspaceId(newWorkspace), emptyConnectorSpec); - configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(newWorkspace)); + configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(newWorkspace), emptyConnectorSpec); final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID); assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate); } @@ -205,7 +205,7 @@ public void testConnectionAndJobs() throws IOException, JsonValidationException configRepository.writeStandardSource(SOURCE_DEF); configRepository.writeSourceConnection(SOURCE, emptyConnectorSpec); configRepository.writeStandardDestinationDefinition(DEST_DEF); - configRepository.writeDestinationConnection(DEST); + configRepository.writeDestinationConnection(DEST, emptyConnectorSpec); configRepository.writeStandardSync(CONNECTION); // test jobs diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java index a30165a1ae109..b91f3a1297fe3 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java @@ -55,6 +55,7 @@ import io.airbyte.scheduler.persistence.WorkspaceHelper; import io.airbyte.server.converters.SpecFetcher; import io.airbyte.server.errors.IdNotFoundKnownException; +import io.airbyte.server.handlers.DestinationHandler; import io.airbyte.server.handlers.SourceHandler; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; @@ -440,15 +441,19 @@ private void importConfigsIntoWorkspace(Path sourceRoot, UUID workspaceId, b return destinationConnection; }, (destinationConnection) -> { + final ConnectorSpecification spec; // make sure connector definition exists try { - if (configRepository.getStandardDestinationDefinition(destinationConnection.getDestinationDefinitionId()) == null) { + StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition( + destinationConnection.getDestinationDefinitionId()); + if (destinationDefinition == null) { return; } + spec = DestinationHandler.getSpec(specFetcher, destinationDefinition); } catch (ConfigNotFoundException e) { return; } - configRepository.writeDestinationConnection(destinationConnection); + configRepository.writeDestinationConnection(destinationConnection, spec); })); case STANDARD_SYNC -> standardSyncs = configs; case STANDARD_SYNC_OPERATION -> operationIdMap.putAll(importIntoWorkspace( diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java index 8b1dad257be2d..e2a1cf73261b3 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java @@ -205,11 +205,14 @@ private void validateDestination(final ConnectorSpecification spec, final JsonNo validator.ensure(spec.getConnectionSpecification(), configuration); } - private ConnectorSpecification getSpec(UUID destinationDefinitionId) + public ConnectorSpecification getSpec(UUID destinationDefinitionId) throws JsonValidationException, IOException, ConfigNotFoundException { - final StandardDestinationDefinition destinationDef = configRepository.getStandardDestinationDefinition(destinationDefinitionId); - final String imageName = DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag()); - return specFetcher.execute(imageName); + return getSpec(specFetcher, configRepository.getStandardDestinationDefinition(destinationDefinitionId)); + } + + public static ConnectorSpecification getSpec(SpecFetcher specFetcher, StandardDestinationDefinition destinationDef) + throws JsonValidationException, IOException, ConfigNotFoundException { + return specFetcher.execute(DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag())); } private void persistDestinationConnection(final String name, @@ -218,7 +221,7 @@ private void persistDestinationConnection(final String name, final UUID destinationId, final JsonNode configurationJson, final boolean tombstone) - throws JsonValidationException, IOException { + throws JsonValidationException, IOException, ConfigNotFoundException { final DestinationConnection destinationConnection = new DestinationConnection() .withName(name) .withDestinationDefinitionId(destinationDefinitionId) @@ -226,8 +229,7 @@ private void persistDestinationConnection(final String name, .withDestinationId(destinationId) .withConfiguration(configurationJson) .withTombstone(tombstone); - - configRepository.writeDestinationConnection(destinationConnection); + configRepository.writeDestinationConnection(destinationConnection, getSpec(destinationDefinitionId)); } private DestinationRead buildDestinationRead(final UUID destinationId) throws JsonValidationException, IOException, ConfigNotFoundException { diff --git a/airbyte-server/src/test/java/io/airbyte/server/ConfigDumpImporterTest.java b/airbyte-server/src/test/java/io/airbyte/server/ConfigDumpImporterTest.java index ccd968b3edeb7..73c0bfae022a9 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/ConfigDumpImporterTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/ConfigDumpImporterTest.java @@ -190,7 +190,8 @@ public void testImportIntoWorkspaceWithConflicts() throws JsonValidationExceptio Jsons.clone(sourceConnection).withWorkspaceId(newWorkspaceId).withSourceId(not(eq(sourceConnection.getSourceId()))), eq(emptyConnectorSpec)); verify(configRepository).writeDestinationConnection( - Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId).withDestinationId(not(eq(destinationConnection.getDestinationId())))); + Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId).withDestinationId(not(eq(destinationConnection.getDestinationId()))), + eq(emptyConnectorSpec)); verify(configRepository) .writeStandardSyncOperation(Jsons.clone(operation).withWorkspaceId(newWorkspaceId).withOperationId(not(eq(operation.getOperationId())))); verify(configRepository).writeStandardSync(Jsons.clone(connection).withConnectionId(not(eq(connection.getConnectionId())))); @@ -241,7 +242,7 @@ public void testImportIntoWorkspaceWithoutConflicts() throws JsonValidationExcep verify(configRepository).writeSourceConnection( Jsons.clone(sourceConnection).withWorkspaceId(newWorkspaceId), emptyConnectorSpec); - verify(configRepository).writeDestinationConnection(Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId)); + verify(configRepository).writeDestinationConnection(Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId), emptyConnectorSpec); verify(configRepository).writeStandardSyncOperation(Jsons.clone(operation).withWorkspaceId(newWorkspaceId)); verify(configRepository).writeStandardSync(connection); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationHandlerTest.java index 837b3c49f8cbf..fb1c3673366b4 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationHandlerTest.java @@ -71,7 +71,6 @@ class DestinationHandlerTest { private ConfigRepository configRepository; private StandardDestinationDefinition standardDestinationDefinition; private DestinationDefinitionSpecificationRead destinationDefinitionSpecificationRead; - private DestinationDefinitionIdRequestBody destinationDefinitionIdRequestBody; private DestinationConnection destinationConnection; private DestinationHandler destinationHandler; private ConnectionsHandler connectionsHandler; @@ -104,8 +103,8 @@ void setUp() throws IOException { imageName = DockerUtils.getTaggedImageName(standardDestinationDefinition.getDockerRepository(), standardDestinationDefinition.getDockerImageTag()); - destinationDefinitionIdRequestBody = - new DestinationDefinitionIdRequestBody().destinationDefinitionId(standardDestinationDefinition.getDestinationDefinitionId()); + DestinationDefinitionIdRequestBody destinationDefinitionIdRequestBody = new DestinationDefinitionIdRequestBody().destinationDefinitionId( + standardDestinationDefinition.getDestinationDefinitionId()); connectorSpecification = ConnectorSpecificationHelpers.generateConnectorSpecification(); @@ -154,7 +153,7 @@ void testCreateDestination() throws JsonValidationException, ConfigNotFoundExcep assertEquals(expectedDestinationRead, actualDestinationRead); verify(validator).ensure(destinationDefinitionSpecificationRead.getConnectionSpecification(), destinationConnection.getConfiguration()); - verify(configRepository).writeDestinationConnection(destinationConnection); + verify(configRepository).writeDestinationConnection(destinationConnection, connectorSpecification); verify(secretsProcessor) .maskSecrets(destinationConnection.getConfiguration(), destinationDefinitionSpecificationRead.getConnectionSpecification()); } @@ -181,7 +180,7 @@ void testDeleteDestination() throws JsonValidationException, ConfigNotFoundExcep destinationHandler.deleteDestination(destinationId); - verify(configRepository).writeDestinationConnection(expectedDestinationConnection); + verify(configRepository).writeDestinationConnection(expectedDestinationConnection, connectorSpecification); verify(connectionsHandler).listConnectionsForWorkspace(workspaceIdRequestBody); verify(connectionsHandler).deleteConnection(connectionRead); } @@ -225,7 +224,7 @@ void testUpdateDestination() throws JsonValidationException, ConfigNotFoundExcep assertEquals(expectedDestinationRead, actualDestinationRead); verify(secretsProcessor).maskSecrets(newConfiguration, destinationDefinitionSpecificationRead.getConnectionSpecification()); - verify(configRepository).writeDestinationConnection(expectedDestinationConnection); + verify(configRepository).writeDestinationConnection(expectedDestinationConnection, connectorSpecification); verify(validator).ensure(destinationDefinitionSpecificationRead.getConnectionSpecification(), newConfiguration); } From abce77d39cab87339fa9e23af52c75ff84fe0aac Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Wed, 15 Sep 2021 11:24:29 -0500 Subject: [PATCH 2/3] Cleanup from PR comments --- airbyte-config/persistence/build.gradle | 1 - .../io/airbyte/config/persistence/ConfigRepository.java | 4 ++++ .../main/java/io/airbyte/server/ConfigDumpImporter.java | 8 ++------ 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/airbyte-config/persistence/build.gradle b/airbyte-config/persistence/build.gradle index 8017112a1a37c..40bb4cce193dd 100644 --- a/airbyte-config/persistence/build.gradle +++ b/airbyte-config/persistence/build.gradle @@ -7,7 +7,6 @@ dependencies { implementation project(':airbyte-protocol:models') implementation project(':airbyte-config:init') implementation project(':airbyte-json-validation') - implementation project(':airbyte-protocol:models') testImplementation "org.testcontainers:postgresql:1.15.1" } diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index c1ee0d8335b07..5f60c4ebe6118 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -190,6 +190,10 @@ public DestinationConnection getDestinationConnection(final UUID destinationId) public void writeDestinationConnection(final DestinationConnection destinationConnection, final ConnectorSpecification connectorSpecification) throws JsonValidationException, IOException { + // actual validation is only for sanity checking + final JsonSchemaValidator validator = new JsonSchemaValidator(); + validator.ensure(connectorSpecification.getConnectionSpecification(), destinationConnection.getConfiguration()); + persistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationConnection.getDestinationId().toString(), destinationConnection); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java index b91f3a1297fe3..a466a3b2cd0da 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java @@ -415,7 +415,6 @@ private void importConfigsIntoWorkspace(Path sourceRoot, UUID workspaceId, b return sourceConnection; }, (sourceConnection) -> { - final ConnectorSpecification spec; // make sure connector definition exists try { final StandardSourceDefinition sourceDefinition = @@ -423,11 +422,10 @@ private void importConfigsIntoWorkspace(Path sourceRoot, UUID workspaceId, b if (sourceDefinition == null) { return; } - spec = SourceHandler.getSpecFromSourceDefinitionId(specFetcher, sourceDefinition); + configRepository.writeSourceConnection(sourceConnection, SourceHandler.getSpecFromSourceDefinitionId(specFetcher, sourceDefinition)); } catch (ConfigNotFoundException e) { return; } - configRepository.writeSourceConnection(sourceConnection, spec); })); case STANDARD_DESTINATION_DEFINITION -> importDestinationDefinitionIntoWorkspace(configs); case DESTINATION_CONNECTION -> destinationIdMap.putAll(importIntoWorkspace( @@ -441,7 +439,6 @@ private void importConfigsIntoWorkspace(Path sourceRoot, UUID workspaceId, b return destinationConnection; }, (destinationConnection) -> { - final ConnectorSpecification spec; // make sure connector definition exists try { StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition( @@ -449,11 +446,10 @@ private void importConfigsIntoWorkspace(Path sourceRoot, UUID workspaceId, b if (destinationDefinition == null) { return; } - spec = DestinationHandler.getSpec(specFetcher, destinationDefinition); + configRepository.writeDestinationConnection(destinationConnection, DestinationHandler.getSpec(specFetcher, destinationDefinition)); } catch (ConfigNotFoundException e) { return; } - configRepository.writeDestinationConnection(destinationConnection, spec); })); case STANDARD_SYNC -> standardSyncs = configs; case STANDARD_SYNC_OPERATION -> operationIdMap.putAll(importIntoWorkspace( From 3e68aa36edce3db56560f0f30c922c7d50d79828 Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Wed, 15 Sep 2021 11:30:23 -0500 Subject: [PATCH 3/3] Whitespace --- .../integrations/destination/keen/KeenTimestampService.java | 6 +++--- .../destination/s3/S3DestinationAcceptanceTest.java | 4 ++-- .../src/main/java/io/airbyte/server/ConfigDumpImporter.java | 1 - 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenTimestampService.java b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenTimestampService.java index 3b1146ec42da6..3c33a93dc4cfd 100644 --- a/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenTimestampService.java +++ b/airbyte-integrations/connectors/destination-keen/src/main/java/io/airbyte/integrations/destination/keen/KeenTimestampService.java @@ -75,9 +75,9 @@ public KeenTimestampService(ConfiguredAirbyteCatalog catalog, boolean timestampI /** * Tries to inject keen.timestamp field to the given message data. If the stream contains cursor * field, it's value is tried to be parsed to timestamp. If this procedure fails, stream is removed - * from timestamp-parsable stream map, so parsing is not tried for future messages in the same stream. - * If parsing succeeds, keen.timestamp field is put as a JSON node to the message data and whole data - * is returned. Otherwise, keen.timestamp is set to emittedAt value + * from timestamp-parsable stream map, so parsing is not tried for future messages in the same + * stream. If parsing succeeds, keen.timestamp field is put as a JSON node to the message data and + * whole data is returned. Otherwise, keen.timestamp is set to emittedAt value * * @param message AirbyteRecordMessage containing record data * @return Record data together with keen.timestamp field diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java index 9f0bae2af0fa7..20a92927dfa61 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java @@ -24,6 +24,8 @@ package io.airbyte.integrations.destination.s3; +import static io.airbyte.integrations.destination.s3.S3DestinationConstants.NAME_TRANSFORMER; + import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; @@ -53,8 +55,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static io.airbyte.integrations.destination.s3.S3DestinationConstants.NAME_TRANSFORMER; - /** * When adding a new S3 destination acceptance test, extend this class and do the following: *
  • Implement {@link #getFormatConfig} that returns a {@link S3FormatConfig}
  • diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java index a466a3b2cd0da..3039b399d55e8 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java @@ -49,7 +49,6 @@ import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.db.instance.jobs.JobsDatabaseSchema; -import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.scheduler.persistence.DefaultJobPersistence; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.WorkspaceHelper;