Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate connection id before sync #20107

Merged
merged 3 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ private SourceDiscoverSchemaRead retrieveDiscoveredSchema(final SynchronousRespo
.jobInfo(jobConverter.getSynchronousJobRead(response));

if (response.isSuccess()) {
ActorCatalog catalog = configRepository.getActorCatalogById(response.getOutput());
final ActorCatalog catalog = configRepository.getActorCatalogById(response.getOutput());
final AirbyteCatalog persistenceCatalog = Jsons.object(catalog.getCatalog(),
io.airbyte.protocol.models.AirbyteCatalog.class);
sourceDiscoverSchemaRead.catalog(CatalogConverter.toApi(persistenceCatalog));
Expand Down Expand Up @@ -352,7 +352,8 @@ public DestinationDefinitionSpecificationRead getDestinationSpecification(
return specRead;
}

public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody) throws IOException {
public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody)
throws IOException, JsonValidationException, ConfigNotFoundException {
return submitManualSyncToWorker(connectionIdRequestBody.getConnectionId());
}

Expand All @@ -365,19 +366,21 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE
return submitCancellationToWorker(jobIdRequestBody.getId());
}

private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discoveredSchema, SourceDiscoverSchemaRequestBody discoverSchemaRequestBody)
private void discoveredSchemaWithCatalogDiff(final SourceDiscoverSchemaRead discoveredSchema,
final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody)
throws JsonValidationException, ConfigNotFoundException, IOException {
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler
.getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId());
final ConnectionRead connectionRead = connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId());
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
connectionRead.getSyncCatalog();
CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
CatalogConverter.toProtocol(currentAirbyteCatalog));
boolean containsBreakingChange = containsBreakingChange(diff);
ConnectionUpdate updateObject =
final CatalogDiff diff =
connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
CatalogConverter.toProtocol(currentAirbyteCatalog));
final boolean containsBreakingChange = containsBreakingChange(diff);
final ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId());
ConnectionStatus connectionStatus;
final ConnectionStatus connectionStatus;
if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) {
connectionStatus = ConnectionStatus.INACTIVE;
} else {
Expand All @@ -389,7 +392,9 @@ private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discovered

}

private boolean shouldDisableConnection(boolean containsBreakingChange, NonBreakingChangesPreference preference, CatalogDiff diff) {
private boolean shouldDisableConnection(final boolean containsBreakingChange,
final NonBreakingChangesPreference preference,
final CatalogDiff diff) {
if (!envVariableFeatureFlags.autoDetectSchema()) {
return false;
}
Expand Down Expand Up @@ -438,13 +443,16 @@ private JobInfoRead submitCancellationToWorker(final Long jobId) throws IOExcept
return jobConverter.getJobInfoRead(jobPersistence.getJob(jobId));
}

private JobInfoRead submitManualSyncToWorker(final UUID connectionId) throws IOException {
private JobInfoRead submitManualSyncToWorker(final UUID connectionId)
throws IOException, IllegalStateException, JsonValidationException, ConfigNotFoundException {
// get standard sync to validate connection id before submitting sync to temporal
configRepository.getStandardSync(connectionId);
final ManualOperationResult manualSyncResult = eventRunner.startNewManualSync(connectionId);

return readJobFromResult(manualSyncResult);
}

private JobInfoRead submitResetConnectionToWorker(final UUID connectionId) throws IOException, JsonValidationException, ConfigNotFoundException {
private JobInfoRead submitResetConnectionToWorker(final UUID connectionId) throws IOException, IllegalStateException, ConfigNotFoundException {
final ManualOperationResult resetConnectionResult = eventRunner.resetConnection(
connectionId,
configRepository.getAllStreamsForConnection(connectionId),
Expand All @@ -468,12 +476,12 @@ private JobInfoRead readJobFromResult(final ManualOperationResult manualOperatio
}

private boolean containsBreakingChange(final CatalogDiff diff) {
for (StreamTransform streamTransform : diff.getTransforms()) {
for (final StreamTransform streamTransform : diff.getTransforms()) {
if (streamTransform.getTransformType() != TransformTypeEnum.UPDATE_STREAM) {
continue;
}

boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(FieldTransform::getBreaking);
final boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(FieldTransform::getBreaking);
if (anyBreakingFieldTransforms) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ void testEnumConversion() {
}

@Test
void testSyncConnection() throws IOException {
void testSyncConnection() throws IOException, JsonValidationException, ConfigNotFoundException {
final UUID connectionId = UUID.randomUUID();

final long jobId = 123L;
Expand Down