Skip to content

Commit

Permalink
fetch specs from definitions directly (#7293)
Browse files Browse the repository at this point in the history
* try fetching specs from definitions first

* refactor specFetcher and update tests

* run gradle format

* format again

* fix comment formatting

* fix test

* merge comment lines into single line

* move duplicate job metadata mocking logic to shared static method

* add todo

* formatting

* use local var and clone

* run gw format

* add todo

* skip spec fetcher in docker image validator and update todos

* run gw format
  • Loading branch information
lmossman committed Oct 25, 2021
1 parent 6ead334 commit 5d2b5dc
Show file tree
Hide file tree
Showing 16 changed files with 218 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -83,16 +81,7 @@ public SynchronousResponse<ConnectorSpecification> createGetSpecJob(final String

if (cachedSpecOptional.isPresent()) {
LOGGER.debug("Spec bucket cache: Cache hit.");
final long now = Instant.now().toEpochMilli();
final SynchronousJobMetadata mockMetadata = new SynchronousJobMetadata(
UUID.randomUUID(),
ConfigType.GET_SPEC,
null,
now,
now,
true,
null);
return new SynchronousResponse<>(cachedSpecOptional.get(), mockMetadata);
return new SynchronousResponse<>(cachedSpecOptional.get(), SynchronousJobMetadata.mock(ConfigType.GET_SPEC));
} else {
LOGGER.debug("Spec bucket cache: Cache miss.");
return client.createGetSpecJob(dockerImage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.workers.temporal.JobMetadata;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -114,4 +115,20 @@ public String toString() {
'}';
}

public static SynchronousJobMetadata mock(final ConfigType configType) {
final long now = Instant.now().toEpochMilli();
final UUID configId = null;
final boolean succeeded = true;
final Path logPath = null;

return new SynchronousJobMetadata(
UUID.randomUUID(),
configType,
configId,
now,
now,
succeeded,
logPath);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
final SpecFetcher specFetcher = new SpecFetcher(cachingSchedulerClient);

// required before migration
configRepository.setSpecFetcher(dockerImage -> Exceptions.toRuntime(() -> specFetcher.execute(dockerImage)));
// TODO: remove this specFetcherFn logic once file migrations are deprecated
configRepository.setSpecFetcher(dockerImage -> Exceptions.toRuntime(() -> specFetcher.getSpec(dockerImage)));

Optional<String> airbyteDatabaseVersion = jobPersistence.getVersion();
if (airbyteDatabaseVersion.isPresent() && isDatabaseVersionBehindAppVersion(airbyteVersion, airbyteDatabaseVersion.get())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.server.converters;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
Expand Down Expand Up @@ -42,8 +41,7 @@ public SourceConnection source(final UUID sourceId, final String sourceName, fin
persistedSource.setName(sourceName);
// get spec
final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(persistedSource.getSourceDefinitionId());
final String imageName = DockerUtils.getTaggedImageName(sourceDefinition.getDockerRepository(), sourceDefinition.getDockerImageTag());
final ConnectorSpecification spec = specFetcher.execute(imageName);
final ConnectorSpecification spec = specFetcher.getSpec(sourceDefinition);
// copy any necessary secrets from the current source to the incoming updated source
final JsonNode updatedConfiguration = secretsProcessor.copySecrets(
persistedSource.getConfiguration(),
Expand All @@ -61,8 +59,7 @@ public DestinationConnection destination(final UUID destinationId, final String
// get spec
final StandardDestinationDefinition destinationDefinition = configRepository
.getStandardDestinationDefinition(persistedDestination.getDestinationDefinitionId());
final String imageName = DockerUtils.getTaggedImageName(destinationDefinition.getDockerRepository(), destinationDefinition.getDockerImageTag());
final ConnectorSpecification spec = specFetcher.execute(imageName);
final ConnectorSpecification spec = specFetcher.getSpec(destinationDefinition);
// copy any necessary secrets from the current destination to the incoming updated destination
final JsonNode updatedConfiguration = secretsProcessor.copySecrets(
persistedDestination.getConfiguration(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,78 @@
package io.airbyte.server.converters;

import com.google.common.base.Preconditions;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.SynchronousJobMetadata;
import io.airbyte.scheduler.client.SynchronousResponse;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpecFetcher {

private static final Logger LOGGER = LoggerFactory.getLogger(SpecFetcher.class);

private final SynchronousSchedulerClient schedulerJobClient;

public SpecFetcher(final SynchronousSchedulerClient schedulerJobClient) {
this.schedulerJobClient = schedulerJobClient;
}

public ConnectorSpecification execute(final String dockerImage) throws IOException {
// TODO: remove this once file migrations are deprecated, as that is the only time this function is
// used
@Deprecated
public ConnectorSpecification getSpec(final String dockerImage) throws IOException {
return getSpecFromJob(schedulerJobClient.createGetSpecJob(dockerImage));
}

private static ConnectorSpecification getSpecFromJob(final SynchronousResponse<ConnectorSpecification> response) {
public ConnectorSpecification getSpec(final StandardSourceDefinition sourceDefinition) throws IOException {
return getSpecFromJob(getSpecJobResponse(sourceDefinition));
}

public ConnectorSpecification getSpec(final StandardDestinationDefinition destinationDefinition) throws IOException {
return getSpecFromJob(getSpecJobResponse(destinationDefinition));
}

// TODO: remove this method once the spec is a required field on the StandardSourceDefinition struct
public SynchronousResponse<ConnectorSpecification> getSpecJobResponse(final StandardSourceDefinition sourceDefinition) throws IOException {
LOGGER.debug("Spec Fetcher: Getting spec for Source Definition.");
final ConnectorSpecification spec = sourceDefinition.getSpec();

if (spec != null) {
LOGGER.debug("Spec Fetcher: Spec found in Source Definition.");
return new SynchronousResponse<>(spec, SynchronousJobMetadata.mock(ConfigType.GET_SPEC));
}

LOGGER.debug("Spec Fetcher: Spec not found in Source Definition, fetching with scheduler job instead.");
final String dockerImageName = DockerUtils.getTaggedImageName(sourceDefinition.getDockerRepository(), sourceDefinition.getDockerImageTag());
return schedulerJobClient.createGetSpecJob(dockerImageName);
}

// TODO: remove this method once the spec is a required field on the StandardDestinationDefinition
// struct
public SynchronousResponse<ConnectorSpecification> getSpecJobResponse(final StandardDestinationDefinition destinationDefinition)
throws IOException {
LOGGER.debug("Spec Fetcher: Getting spec for Destination Definition.");
final ConnectorSpecification spec = destinationDefinition.getSpec();

if (spec != null) {
LOGGER.debug("Spec Fetcher: Spec found in Destination Definition.");
return new SynchronousResponse<>(spec, SynchronousJobMetadata.mock(ConfigType.GET_SPEC));
}

LOGGER.debug("Spec Fetcher: Spec not found in Destination Definition, fetching with scheduler job instead.");
final String dockerImageName = DockerUtils.getTaggedImageName(
destinationDefinition.getDockerRepository(),
destinationDefinition.getDockerImageTag());
return schedulerJobClient.createGetSpecJob(dockerImageName);
}

public static ConnectorSpecification getSpecFromJob(final SynchronousResponse<ConnectorSpecification> response) {
Preconditions.checkState(response.isSuccess(), "Get Spec job failed.");
Preconditions.checkNotNull(response.getOutput(), "Get Spec job return null spec");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.airbyte.api.model.DestinationSearch;
import io.airbyte.api.model.DestinationUpdate;
import io.airbyte.api.model.WorkspaceIdRequestBody;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
Expand Down Expand Up @@ -213,7 +212,7 @@ public ConnectorSpecification getSpec(final UUID destinationDefinitionId)

public static ConnectorSpecification getSpec(final SpecFetcher specFetcher, final StandardDestinationDefinition destinationDef)
throws JsonValidationException, IOException, ConfigNotFoundException {
return specFetcher.execute(DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag()));
return specFetcher.getSpec(destinationDef);
}

private void persistDestinationConnection(final String name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,17 @@ public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdReque
public CheckConnectionRead checkSourceConnectionFromSourceCreate(final SourceCoreConfig sourceConfig)
throws ConfigNotFoundException, IOException, JsonValidationException {
final StandardSourceDefinition sourceDef = configRepository.getStandardSourceDefinition(sourceConfig.getSourceDefinitionId());
final String imageName = DockerUtils.getTaggedImageName(sourceDef.getDockerRepository(), sourceDef.getDockerImageTag());

final var partialConfig = configRepository.statefulSplitEphemeralSecrets(
sourceConfig.getConnectionConfiguration(),
specFetcher.execute(imageName));
specFetcher.getSpec(sourceDef));

// todo (cgardens) - narrow the struct passed to the client. we are not setting fields that are
// technically declared as required.
final SourceConnection source = new SourceConnection()
.withSourceDefinitionId(sourceConfig.getSourceDefinitionId())
.withConfiguration(partialConfig);

final String imageName = DockerUtils.getTaggedImageName(sourceDef.getDockerRepository(), sourceDef.getDockerImageTag());
return reportConnectionStatus(synchronousSchedulerClient.createSourceCheckConnectionJob(source, imageName));
}

Expand Down Expand Up @@ -177,18 +176,17 @@ public CheckConnectionRead checkDestinationConnectionFromDestinationId(final Des
public CheckConnectionRead checkDestinationConnectionFromDestinationCreate(final DestinationCoreConfig destinationConfig)
throws ConfigNotFoundException, IOException, JsonValidationException {
final StandardDestinationDefinition destDef = configRepository.getStandardDestinationDefinition(destinationConfig.getDestinationDefinitionId());
final String imageName = DockerUtils.getTaggedImageName(destDef.getDockerRepository(), destDef.getDockerImageTag());

final var partialConfig = configRepository.statefulSplitEphemeralSecrets(
destinationConfig.getConnectionConfiguration(),
specFetcher.execute(imageName));
specFetcher.getSpec(destDef));

// todo (cgardens) - narrow the struct passed to the client. we are not setting fields that are
// technically declared as required.
final DestinationConnection destination = new DestinationConnection()
.withDestinationDefinitionId(destinationConfig.getDestinationDefinitionId())
.withConfiguration(partialConfig);

final String imageName = DockerUtils.getTaggedImageName(destDef.getDockerRepository(), destDef.getDockerImageTag());
return reportConnectionStatus(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, imageName));
}

Expand Down Expand Up @@ -244,8 +242,7 @@ public SourceDefinitionSpecificationRead getSourceDefinitionSpecification(final
throws ConfigNotFoundException, IOException, JsonValidationException {
final UUID sourceDefinitionId = sourceDefinitionIdRequestBody.getSourceDefinitionId();
final StandardSourceDefinition source = configRepository.getStandardSourceDefinition(sourceDefinitionId);
final String imageName = DockerUtils.getTaggedImageName(source.getDockerRepository(), source.getDockerImageTag());
final SynchronousResponse<ConnectorSpecification> response = getConnectorSpecification(imageName);
final SynchronousResponse<ConnectorSpecification> response = specFetcher.getSpecJobResponse(source);
final ConnectorSpecification spec = response.getOutput();
final SourceDefinitionSpecificationRead specRead = new SourceDefinitionSpecificationRead()
.jobInfo(JobConverter.getSynchronousJobRead(response))
Expand All @@ -265,8 +262,7 @@ public DestinationDefinitionSpecificationRead getDestinationSpecification(final
throws ConfigNotFoundException, IOException, JsonValidationException {
final UUID destinationDefinitionId = destinationDefinitionIdRequestBody.getDestinationDefinitionId();
final StandardDestinationDefinition destination = configRepository.getStandardDestinationDefinition(destinationDefinitionId);
final String imageName = DockerUtils.getTaggedImageName(destination.getDockerRepository(), destination.getDockerImageTag());
final SynchronousResponse<ConnectorSpecification> response = getConnectorSpecification(imageName);
final SynchronousResponse<ConnectorSpecification> response = specFetcher.getSpecJobResponse(destination);
final ConnectorSpecification spec = response.getOutput();

final DestinationDefinitionSpecificationRead specRead = new DestinationDefinitionSpecificationRead()
Expand All @@ -286,10 +282,6 @@ public DestinationDefinitionSpecificationRead getDestinationSpecification(final
return specRead;
}

public SynchronousResponse<ConnectorSpecification> getConnectorSpecification(final String dockerImage) throws IOException {
return synchronousSchedulerClient.createGetSpecJob(dockerImage);
}

public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody)
throws ConfigNotFoundException, IOException, JsonValidationException {
final UUID connectionId = connectionIdRequestBody.getConnectionId();
Expand Down Expand Up @@ -378,9 +370,9 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE
}

private void cancelTemporalWorkflowIfPresent(final long jobId) throws IOException {
final var latestAttemptId = jobPersistence.getJob(jobId).getAttempts().size() - 1; // attempts ids are monotonically increasing starting from 0
// and
// specific to a job id, allowing us to do this.
// attempts ids are monotonically increasing starting from 0 and specific to a job id, allowing us
// to do this.
final var latestAttemptId = jobPersistence.getJob(jobId).getAttempts().size() - 1;
final var workflowId = jobPersistence.getAttemptTemporalWorkflowId(jobId, latestAttemptId);

if (workflowId.isPresent()) {
Expand Down Expand Up @@ -416,15 +408,13 @@ private CheckConnectionRead reportConnectionStatus(final SynchronousResponse<Sta
private ConnectorSpecification getSpecFromSourceDefinitionId(final UUID sourceDefId)
throws IOException, JsonValidationException, ConfigNotFoundException {
final StandardSourceDefinition sourceDef = configRepository.getStandardSourceDefinition(sourceDefId);
final String imageName = DockerUtils.getTaggedImageName(sourceDef.getDockerRepository(), sourceDef.getDockerImageTag());
return specFetcher.execute(imageName);
return specFetcher.getSpec(sourceDef);
}

private ConnectorSpecification getSpecFromDestinationDefinitionId(final UUID destDefId)
throws IOException, JsonValidationException, ConfigNotFoundException {
final StandardDestinationDefinition destinationDef = configRepository.getStandardDestinationDefinition(destDefId);
final String imageName = DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag());
return specFetcher.execute(imageName);
return specFetcher.getSpec(destinationDef);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.airbyte.api.model.SourceSearch;
import io.airbyte.api.model.SourceUpdate;
import io.airbyte.api.model.WorkspaceIdRequestBody;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSourceDefinition;
Expand Down Expand Up @@ -206,9 +205,7 @@ private SourceRead buildSourceRead(final UUID sourceId)
// read configuration from db
final StandardSourceDefinition sourceDef = configRepository
.getSourceDefinitionFromSource(sourceId);
final String imageName = DockerUtils
.getTaggedImageName(sourceDef.getDockerRepository(), sourceDef.getDockerImageTag());
final ConnectorSpecification spec = specFetcher.execute(imageName);
final ConnectorSpecification spec = specFetcher.getSpec(sourceDef);
return buildSourceRead(sourceId, spec);
}

Expand Down Expand Up @@ -243,9 +240,7 @@ private ConnectorSpecification getSpecFromSourceDefinitionId(final UUID sourceDe

public static ConnectorSpecification getSpecFromSourceDefinitionId(final SpecFetcher specFetcher, final StandardSourceDefinition sourceDefinition)
throws IOException, ConfigNotFoundException {
final String imageName = DockerUtils
.getTaggedImageName(sourceDefinition.getDockerRepository(), sourceDefinition.getDockerImageTag());
return specFetcher.execute(imageName);
return specFetcher.getSpec(sourceDefinition);
}

private void persistSourceConnection(final String name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,19 @@

package io.airbyte.server.validators;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.SynchronousResponse;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.BadObjectSchemaKnownException;

public class DockerImageValidator {

private final SpecFetcher specFetcher;
private final SynchronousSchedulerClient schedulerClient;

public DockerImageValidator(final SynchronousSchedulerClient schedulerJobClient) {
this(new SpecFetcher(schedulerJobClient));
}

@VisibleForTesting
DockerImageValidator(final SpecFetcher specFetcher) {
this.specFetcher = specFetcher;
this.schedulerClient = schedulerJobClient;
}

/**
Expand All @@ -32,7 +28,8 @@ public void assertValidIntegrationImage(final String dockerRepository, final Str
// job on the provided image.
final String imageName = DockerUtils.getTaggedImageName(dockerRepository, imageTag);
try {
specFetcher.execute(imageName);
final SynchronousResponse<ConnectorSpecification> getSpecResponse = schedulerClient.createGetSpecJob(imageName);
SpecFetcher.getSpecFromJob(getSpecResponse);
} catch (final Exception e) {
throw new BadObjectSchemaKnownException(
String.format("Encountered an issue while validating input docker image (%s): %s", imageName, e.getMessage()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE
specFetcher = mock(SpecFetcher.class);
emptyConnectorSpec = mock(ConnectorSpecification.class);
when(emptyConnectorSpec.getConnectionSpecification()).thenReturn(Jsons.emptyObject());
when(specFetcher.execute(any())).thenReturn(emptyConnectorSpec);
when(specFetcher.getSpec(any(StandardSourceDefinition.class))).thenReturn(emptyConnectorSpec);
when(specFetcher.getSpec(any(StandardDestinationDefinition.class))).thenReturn(emptyConnectorSpec);

configDumpImporter =
new ConfigDumpImporter(configRepository, jobPersistence, workspaceHelper, mock(JsonSchemaValidator.class), specFetcher, true);
Expand Down

0 comments on commit 5d2b5dc

Please sign in to comment.