Skip to content

Commit

Permalink
Save specs to source/dest definitions on create and update (#7367)
Browse files Browse the repository at this point in the history
* store spec in db

* update tests

* run gw format

* add TODOs

* add lmossman to TODOs

* run gw format

* remove redundant DockerImageValidator

* run gw format
  • Loading branch information
lmossman committed Oct 28, 2021
1 parent 10889d8 commit d525f1f
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@
import io.airbyte.server.handlers.WebBackendDestinationsHandler;
import io.airbyte.server.handlers.WebBackendSourcesHandler;
import io.airbyte.server.handlers.WorkspacesHandler;
import io.airbyte.server.validators.DockerImageValidator;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand Down Expand Up @@ -170,12 +169,11 @@ public ConfigurationApi(final ConfigRepository configRepository,
jobNotifier,
temporalService,
new OAuthConfigSupplier(configRepository, false, trackingClient));
final DockerImageValidator dockerImageValidator = new DockerImageValidator(synchronousSchedulerClient);
final WorkspaceHelper workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence);
sourceDefinitionsHandler = new SourceDefinitionsHandler(configRepository, dockerImageValidator, synchronousSchedulerClient);
sourceDefinitionsHandler = new SourceDefinitionsHandler(configRepository, synchronousSchedulerClient);
connectionsHandler = new ConnectionsHandler(configRepository, workspaceHelper, trackingClient);
operationsHandler = new OperationsHandler(configRepository);
destinationDefinitionsHandler = new DestinationDefinitionsHandler(configRepository, dockerImageValidator, synchronousSchedulerClient);
destinationDefinitionsHandler = new DestinationDefinitionsHandler(configRepository, synchronousSchedulerClient);
destinationHandler = new DestinationHandler(configRepository, schemaValidator, specFetcher, connectionsHandler);
sourceHandler = new SourceHandler(configRepository, schemaValidator, specFetcher, connectionsHandler);
workspacesHandler = new WorkspacesHandler(configRepository, connectionsHandler, destinationHandler, sourceHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@
import io.airbyte.api.model.DestinationDefinitionRead;
import io.airbyte.api.model.DestinationDefinitionReadList;
import io.airbyte.api.model.DestinationDefinitionUpdate;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.CachingSynchronousSchedulerClient;
import io.airbyte.scheduler.client.SynchronousResponse;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.InternalServerKnownException;
import io.airbyte.server.services.AirbyteGithubStore;
import io.airbyte.server.validators.DockerImageValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.net.URI;
Expand All @@ -33,26 +36,22 @@ public class DestinationDefinitionsHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(DestinationDefinitionsHandler.class);

private final DockerImageValidator imageValidator;
private final ConfigRepository configRepository;
private final Supplier<UUID> uuidSupplier;
private final CachingSynchronousSchedulerClient schedulerSynchronousClient;
private final AirbyteGithubStore githubStore;

public DestinationDefinitionsHandler(final ConfigRepository configRepository,
final DockerImageValidator imageValidator,
final CachingSynchronousSchedulerClient schedulerSynchronousClient) {
this(configRepository, imageValidator, UUID::randomUUID, schedulerSynchronousClient, AirbyteGithubStore.production());
this(configRepository, UUID::randomUUID, schedulerSynchronousClient, AirbyteGithubStore.production());
}

@VisibleForTesting
public DestinationDefinitionsHandler(final ConfigRepository configRepository,
final DockerImageValidator imageValidator,
final Supplier<UUID> uuidSupplier,
final CachingSynchronousSchedulerClient schedulerSynchronousClient,
final AirbyteGithubStore githubStore) {
this.configRepository = configRepository;
this.imageValidator = imageValidator;
this.uuidSupplier = uuidSupplier;
this.schedulerSynchronousClient = schedulerSynchronousClient;
this.githubStore = githubStore;
Expand Down Expand Up @@ -104,7 +103,8 @@ public DestinationDefinitionRead getDestinationDefinition(final DestinationDefin

public DestinationDefinitionRead createDestinationDefinition(final DestinationDefinitionCreate destinationDefinitionCreate)
throws JsonValidationException, IOException {
imageValidator.assertValidIntegrationImage(destinationDefinitionCreate.getDockerRepository(),
final ConnectorSpecification spec = getSpecForImage(
destinationDefinitionCreate.getDockerRepository(),
destinationDefinitionCreate.getDockerImageTag());

final UUID id = uuidSupplier.get();
Expand All @@ -114,7 +114,8 @@ public DestinationDefinitionRead createDestinationDefinition(final DestinationDe
.withDockerImageTag(destinationDefinitionCreate.getDockerImageTag())
.withDocumentationUrl(destinationDefinitionCreate.getDocumentationUrl().toString())
.withName(destinationDefinitionCreate.getName())
.withIcon(destinationDefinitionCreate.getIcon());
.withIcon(destinationDefinitionCreate.getIcon())
.withSpec(spec);

configRepository.writeStandardDestinationDefinition(destinationDefinition);

Expand All @@ -125,23 +126,35 @@ public DestinationDefinitionRead updateDestinationDefinition(final DestinationDe
throws ConfigNotFoundException, IOException, JsonValidationException {
final StandardDestinationDefinition currentDestination = configRepository
.getStandardDestinationDefinition(destinationDefinitionUpdate.getDestinationDefinitionId());
imageValidator.assertValidIntegrationImage(currentDestination.getDockerRepository(),
destinationDefinitionUpdate.getDockerImageTag());

final boolean imageTagHasChanged = !currentDestination.getDockerImageTag().equals(destinationDefinitionUpdate.getDockerImageTag());
// TODO (lmossman): remove null spec condition when the spec field becomes required on the
// definition struct
final ConnectorSpecification spec = (imageTagHasChanged || currentDestination.getSpec() == null)
? getSpecForImage(currentDestination.getDockerRepository(), destinationDefinitionUpdate.getDockerImageTag())
: currentDestination.getSpec();

final StandardDestinationDefinition newDestination = new StandardDestinationDefinition()
.withDestinationDefinitionId(currentDestination.getDestinationDefinitionId())
.withDockerImageTag(destinationDefinitionUpdate.getDockerImageTag())
.withDockerRepository(currentDestination.getDockerRepository())
.withName(currentDestination.getName())
.withDocumentationUrl(currentDestination.getDocumentationUrl())
.withIcon(currentDestination.getIcon());
.withIcon(currentDestination.getIcon())
.withSpec(spec);

configRepository.writeStandardDestinationDefinition(newDestination);
// we want to re-fetch the spec for updated definitions.
schedulerSynchronousClient.resetCache();
return buildDestinationDefinitionRead(newDestination);
}

private ConnectorSpecification getSpecForImage(final String dockerRepository, final String imageTag) throws IOException {
final String imageName = DockerUtils.getTaggedImageName(dockerRepository, imageTag);
final SynchronousResponse<ConnectorSpecification> getSpecResponse = schedulerSynchronousClient.createGetSpecJob(imageName);
return SpecFetcher.getSpecFromJob(getSpecResponse);
}

public static String loadIcon(final String name) {
try {
return name == null ? null : MoreResources.readResource("icons/" + name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@
import io.airbyte.api.model.SourceDefinitionRead;
import io.airbyte.api.model.SourceDefinitionReadList;
import io.airbyte.api.model.SourceDefinitionUpdate;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.CachingSynchronousSchedulerClient;
import io.airbyte.scheduler.client.SynchronousResponse;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.InternalServerKnownException;
import io.airbyte.server.services.AirbyteGithubStore;
import io.airbyte.server.validators.DockerImageValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.net.URI;
Expand All @@ -29,28 +32,24 @@

public class SourceDefinitionsHandler {

private final DockerImageValidator imageValidator;
private final ConfigRepository configRepository;
private final Supplier<UUID> uuidSupplier;
private final AirbyteGithubStore githubStore;
private final CachingSynchronousSchedulerClient schedulerSynchronousClient;

public SourceDefinitionsHandler(
final ConfigRepository configRepository,
final DockerImageValidator imageValidator,
final CachingSynchronousSchedulerClient schedulerSynchronousClient) {
this(configRepository, imageValidator, UUID::randomUUID, schedulerSynchronousClient, AirbyteGithubStore.production());
this(configRepository, UUID::randomUUID, schedulerSynchronousClient, AirbyteGithubStore.production());
}

public SourceDefinitionsHandler(
final ConfigRepository configRepository,
final DockerImageValidator imageValidator,
final Supplier<UUID> uuidSupplier,
final CachingSynchronousSchedulerClient schedulerSynchronousClient,
final AirbyteGithubStore githubStore) {
this.configRepository = configRepository;
this.uuidSupplier = uuidSupplier;
this.imageValidator = imageValidator;
this.schedulerSynchronousClient = schedulerSynchronousClient;
this.githubStore = githubStore;
}
Expand Down Expand Up @@ -100,7 +99,7 @@ public SourceDefinitionRead getSourceDefinition(final SourceDefinitionIdRequestB

public SourceDefinitionRead createSourceDefinition(final SourceDefinitionCreate sourceDefinitionCreate)
throws JsonValidationException, IOException {
imageValidator.assertValidIntegrationImage(sourceDefinitionCreate.getDockerRepository(), sourceDefinitionCreate.getDockerImageTag());
final ConnectorSpecification spec = getSpecForImage(sourceDefinitionCreate.getDockerRepository(), sourceDefinitionCreate.getDockerImageTag());

final UUID id = uuidSupplier.get();
final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition()
Expand All @@ -109,7 +108,8 @@ public SourceDefinitionRead createSourceDefinition(final SourceDefinitionCreate
.withDockerImageTag(sourceDefinitionCreate.getDockerImageTag())
.withDocumentationUrl(sourceDefinitionCreate.getDocumentationUrl().toString())
.withName(sourceDefinitionCreate.getName())
.withIcon(sourceDefinitionCreate.getIcon());
.withIcon(sourceDefinitionCreate.getIcon())
.withSpec(spec);

configRepository.writeStandardSourceDefinition(sourceDefinition);

Expand All @@ -120,22 +120,35 @@ public SourceDefinitionRead updateSourceDefinition(final SourceDefinitionUpdate
throws ConfigNotFoundException, IOException, JsonValidationException {
final StandardSourceDefinition currentSourceDefinition =
configRepository.getStandardSourceDefinition(sourceDefinitionUpdate.getSourceDefinitionId());
imageValidator.assertValidIntegrationImage(currentSourceDefinition.getDockerRepository(), sourceDefinitionUpdate.getDockerImageTag());

final boolean imageTagHasChanged = !currentSourceDefinition.getDockerImageTag().equals(sourceDefinitionUpdate.getDockerImageTag());
// TODO (lmossman): remove null spec condition when the spec field becomes required on the
// definition struct
final ConnectorSpecification spec = (imageTagHasChanged || currentSourceDefinition.getSpec() == null)
? getSpecForImage(currentSourceDefinition.getDockerRepository(), sourceDefinitionUpdate.getDockerImageTag())
: currentSourceDefinition.getSpec();

final StandardSourceDefinition newSource = new StandardSourceDefinition()
.withSourceDefinitionId(currentSourceDefinition.getSourceDefinitionId())
.withDockerImageTag(sourceDefinitionUpdate.getDockerImageTag())
.withDockerRepository(currentSourceDefinition.getDockerRepository())
.withDocumentationUrl(currentSourceDefinition.getDocumentationUrl())
.withName(currentSourceDefinition.getName())
.withIcon(currentSourceDefinition.getIcon());
.withIcon(currentSourceDefinition.getIcon())
.withSpec(spec);

configRepository.writeStandardSourceDefinition(newSource);
// we want to re-fetch the spec for updated definitions.
schedulerSynchronousClient.resetCache();
return buildSourceDefinitionRead(newSource);
}

private ConnectorSpecification getSpecForImage(final String dockerRepository, final String imageTag) throws IOException {
final String imageName = DockerUtils.getTaggedImageName(dockerRepository, imageTag);
final SynchronousResponse<ConnectorSpecification> getSpecResponse = schedulerSynchronousClient.createGetSpecJob(imageName);
return SpecFetcher.getSpecFromJob(getSpecResponse);
}

public static String loadIcon(final String name) {
try {
return name == null ? null : MoreResources.readResource("icons/" + name);
Expand Down

This file was deleted.

0 comments on commit d525f1f

Please sign in to comment.