Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interface changes to support separating secrets from the config #6065

Merged
merged 4 commits into from Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-config/persistence/build.gradle
Expand Up @@ -7,6 +7,7 @@ dependencies {
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-config:init')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-protocol:models')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
implementation project(':airbyte-protocol:models')

duplicated from line 7

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


testImplementation "org.testcontainers:postgresql:1.15.1"
}
Expand Up @@ -188,7 +188,8 @@ public DestinationConnection getDestinationConnection(final UUID destinationId)
return persistence.getConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId.toString(), DestinationConnection.class);
}

public void writeDestinationConnection(final DestinationConnection destinationConnection) throws JsonValidationException, IOException {
public void writeDestinationConnection(final DestinationConnection destinationConnection, final ConnectorSpecification connectorSpecification)
throws JsonValidationException, IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the validation sanity check, similar to the source connection? Otherwise it's hard to verify that we're getting the correct value passed here in tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

persistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationConnection.getDestinationId().toString(), destinationConnection);
}

Expand Down
Expand Up @@ -149,13 +149,13 @@ public void testSource() throws IOException, JsonValidationException {
@Test
public void testDestination() throws IOException, JsonValidationException {
configRepository.writeStandardDestinationDefinition(DEST_DEF);
configRepository.writeDestinationConnection(DEST);
configRepository.writeDestinationConnection(DEST, emptyConnectorSpec);

final UUID retrievedWorkspace = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID);
assertEquals(WORKSPACE_ID, retrievedWorkspace);

// check that caching is working
configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(UUID.randomUUID()));
configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(UUID.randomUUID()), emptyConnectorSpec);
final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID);
assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate);
}
Expand All @@ -165,7 +165,7 @@ public void testConnection() throws IOException, JsonValidationException {
configRepository.writeStandardSource(SOURCE_DEF);
configRepository.writeSourceConnection(SOURCE, emptyConnectorSpec);
configRepository.writeStandardDestinationDefinition(DEST_DEF);
configRepository.writeDestinationConnection(DEST);
configRepository.writeDestinationConnection(DEST, emptyConnectorSpec);

// set up connection
configRepository.writeStandardSync(CONNECTION);
Expand All @@ -181,7 +181,7 @@ public void testConnection() throws IOException, JsonValidationException {
// check that caching is working
final UUID newWorkspace = UUID.randomUUID();
configRepository.writeSourceConnection(Jsons.clone(SOURCE).withWorkspaceId(newWorkspace), emptyConnectorSpec);
configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(newWorkspace));
configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(newWorkspace), emptyConnectorSpec);
final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID);
assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate);
}
Expand All @@ -205,7 +205,7 @@ public void testConnectionAndJobs() throws IOException, JsonValidationException
configRepository.writeStandardSource(SOURCE_DEF);
configRepository.writeSourceConnection(SOURCE, emptyConnectorSpec);
configRepository.writeStandardDestinationDefinition(DEST_DEF);
configRepository.writeDestinationConnection(DEST);
configRepository.writeDestinationConnection(DEST, emptyConnectorSpec);
configRepository.writeStandardSync(CONNECTION);

// test jobs
Expand Down
Expand Up @@ -55,6 +55,7 @@
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.IdNotFoundKnownException;
import io.airbyte.server.handlers.DestinationHandler;
import io.airbyte.server.handlers.SourceHandler;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
Expand Down Expand Up @@ -440,15 +441,19 @@ private <T> void importConfigsIntoWorkspace(Path sourceRoot, UUID workspaceId, b
return destinationConnection;
},
(destinationConnection) -> {
final ConnectorSpecification spec;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is error prone. What if the code in the try changes and there is a path that leads to an undefined spec?

You could change the structure to be:

              try {
                StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition(
                    destinationConnection.getDestinationDefinitionId());
                if (destinationDefinition == null) {
                  return;
                }
                final ConnectorSpecification spec = DestinationHandler.getSpec(specFetcher, destinationDefinition);
                configRepository.writeDestinationConnection(destinationConnection, spec);
              } catch (ConfigNotFoundException e) {
                return;
              }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted to bring the writeDestinationConnection inside the try, and similar for the source connection in related code.

// make sure connector definition exists
try {
if (configRepository.getStandardDestinationDefinition(destinationConnection.getDestinationDefinitionId()) == null) {
StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition(
destinationConnection.getDestinationDefinitionId());
if (destinationDefinition == null) {
return;
}
spec = DestinationHandler.getSpec(specFetcher, destinationDefinition);
} catch (ConfigNotFoundException e) {
return;
}
configRepository.writeDestinationConnection(destinationConnection);
configRepository.writeDestinationConnection(destinationConnection, spec);
}));
case STANDARD_SYNC -> standardSyncs = configs;
case STANDARD_SYNC_OPERATION -> operationIdMap.putAll(importIntoWorkspace(
Expand Down
Expand Up @@ -205,11 +205,14 @@ private void validateDestination(final ConnectorSpecification spec, final JsonNo
validator.ensure(spec.getConnectionSpecification(), configuration);
}

private ConnectorSpecification getSpec(UUID destinationDefinitionId)
public ConnectorSpecification getSpec(UUID destinationDefinitionId)
throws JsonValidationException, IOException, ConfigNotFoundException {
final StandardDestinationDefinition destinationDef = configRepository.getStandardDestinationDefinition(destinationDefinitionId);
final String imageName = DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag());
return specFetcher.execute(imageName);
return getSpec(specFetcher, configRepository.getStandardDestinationDefinition(destinationDefinitionId));
}

public static ConnectorSpecification getSpec(SpecFetcher specFetcher, StandardDestinationDefinition destinationDef)
throws JsonValidationException, IOException, ConfigNotFoundException {
return specFetcher.execute(DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag()));
}

private void persistDestinationConnection(final String name,
Expand All @@ -218,16 +221,15 @@ private void persistDestinationConnection(final String name,
final UUID destinationId,
final JsonNode configurationJson,
final boolean tombstone)
throws JsonValidationException, IOException {
throws JsonValidationException, IOException, ConfigNotFoundException {
final DestinationConnection destinationConnection = new DestinationConnection()
.withName(name)
.withDestinationDefinitionId(destinationDefinitionId)
.withWorkspaceId(workspaceId)
.withDestinationId(destinationId)
.withConfiguration(configurationJson)
.withTombstone(tombstone);

configRepository.writeDestinationConnection(destinationConnection);
configRepository.writeDestinationConnection(destinationConnection, getSpec(destinationDefinitionId));
}

private DestinationRead buildDestinationRead(final UUID destinationId) throws JsonValidationException, IOException, ConfigNotFoundException {
Expand Down
Expand Up @@ -190,7 +190,8 @@ public void testImportIntoWorkspaceWithConflicts() throws JsonValidationExceptio
Jsons.clone(sourceConnection).withWorkspaceId(newWorkspaceId).withSourceId(not(eq(sourceConnection.getSourceId()))),
eq(emptyConnectorSpec));
verify(configRepository).writeDestinationConnection(
Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId).withDestinationId(not(eq(destinationConnection.getDestinationId()))));
Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId).withDestinationId(not(eq(destinationConnection.getDestinationId()))),
eq(emptyConnectorSpec));
verify(configRepository)
.writeStandardSyncOperation(Jsons.clone(operation).withWorkspaceId(newWorkspaceId).withOperationId(not(eq(operation.getOperationId()))));
verify(configRepository).writeStandardSync(Jsons.clone(connection).withConnectionId(not(eq(connection.getConnectionId()))));
Expand Down Expand Up @@ -241,7 +242,7 @@ public void testImportIntoWorkspaceWithoutConflicts() throws JsonValidationExcep
verify(configRepository).writeSourceConnection(
Jsons.clone(sourceConnection).withWorkspaceId(newWorkspaceId),
emptyConnectorSpec);
verify(configRepository).writeDestinationConnection(Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId));
verify(configRepository).writeDestinationConnection(Jsons.clone(destinationConnection).withWorkspaceId(newWorkspaceId), emptyConnectorSpec);
verify(configRepository).writeStandardSyncOperation(Jsons.clone(operation).withWorkspaceId(newWorkspaceId));
verify(configRepository).writeStandardSync(connection);
}
Expand Down
Expand Up @@ -71,7 +71,6 @@ class DestinationHandlerTest {
private ConfigRepository configRepository;
private StandardDestinationDefinition standardDestinationDefinition;
private DestinationDefinitionSpecificationRead destinationDefinitionSpecificationRead;
private DestinationDefinitionIdRequestBody destinationDefinitionIdRequestBody;
private DestinationConnection destinationConnection;
private DestinationHandler destinationHandler;
private ConnectionsHandler connectionsHandler;
Expand Down Expand Up @@ -104,8 +103,8 @@ void setUp() throws IOException {
imageName =
DockerUtils.getTaggedImageName(standardDestinationDefinition.getDockerRepository(), standardDestinationDefinition.getDockerImageTag());

destinationDefinitionIdRequestBody =
new DestinationDefinitionIdRequestBody().destinationDefinitionId(standardDestinationDefinition.getDestinationDefinitionId());
DestinationDefinitionIdRequestBody destinationDefinitionIdRequestBody = new DestinationDefinitionIdRequestBody().destinationDefinitionId(
standardDestinationDefinition.getDestinationDefinitionId());

connectorSpecification = ConnectorSpecificationHelpers.generateConnectorSpecification();

Expand Down Expand Up @@ -154,7 +153,7 @@ void testCreateDestination() throws JsonValidationException, ConfigNotFoundExcep
assertEquals(expectedDestinationRead, actualDestinationRead);

verify(validator).ensure(destinationDefinitionSpecificationRead.getConnectionSpecification(), destinationConnection.getConfiguration());
verify(configRepository).writeDestinationConnection(destinationConnection);
verify(configRepository).writeDestinationConnection(destinationConnection, connectorSpecification);
verify(secretsProcessor)
.maskSecrets(destinationConnection.getConfiguration(), destinationDefinitionSpecificationRead.getConnectionSpecification());
}
Expand All @@ -181,7 +180,7 @@ void testDeleteDestination() throws JsonValidationException, ConfigNotFoundExcep

destinationHandler.deleteDestination(destinationId);

verify(configRepository).writeDestinationConnection(expectedDestinationConnection);
verify(configRepository).writeDestinationConnection(expectedDestinationConnection, connectorSpecification);
verify(connectionsHandler).listConnectionsForWorkspace(workspaceIdRequestBody);
verify(connectionsHandler).deleteConnection(connectionRead);
}
Expand Down Expand Up @@ -225,7 +224,7 @@ void testUpdateDestination() throws JsonValidationException, ConfigNotFoundExcep
assertEquals(expectedDestinationRead, actualDestinationRead);

verify(secretsProcessor).maskSecrets(newConfiguration, destinationDefinitionSpecificationRead.getConnectionSpecification());
verify(configRepository).writeDestinationConnection(expectedDestinationConnection);
verify(configRepository).writeDestinationConnection(expectedDestinationConnection, connectorSpecification);
verify(validator).ensure(destinationDefinitionSpecificationRead.getConnectionSpecification(), newConfiguration);
}

Expand Down