Skip to content

Commit

Permalink
Validate connection id before sync (#20107)
Browse files Browse the repository at this point in the history
* Validate connection id before sync

* Propogate changes
  • Loading branch information
terencecho committed Dec 6, 2022
1 parent ea3fb89 commit 0e62836
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ private SourceDiscoverSchemaRead retrieveDiscoveredSchema(final SynchronousRespo
.jobInfo(jobConverter.getSynchronousJobRead(response));

if (response.isSuccess()) {
ActorCatalog catalog = configRepository.getActorCatalogById(response.getOutput());
final ActorCatalog catalog = configRepository.getActorCatalogById(response.getOutput());
final AirbyteCatalog persistenceCatalog = Jsons.object(catalog.getCatalog(),
io.airbyte.protocol.models.AirbyteCatalog.class);
sourceDiscoverSchemaRead.catalog(CatalogConverter.toApi(persistenceCatalog));
Expand Down Expand Up @@ -352,7 +352,8 @@ public DestinationDefinitionSpecificationRead getDestinationSpecification(
return specRead;
}

public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody) throws IOException {
public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody)
throws IOException, JsonValidationException, ConfigNotFoundException {
return submitManualSyncToWorker(connectionIdRequestBody.getConnectionId());
}

Expand All @@ -365,19 +366,21 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE
return submitCancellationToWorker(jobIdRequestBody.getId());
}

private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discoveredSchema, SourceDiscoverSchemaRequestBody discoverSchemaRequestBody)
private void discoveredSchemaWithCatalogDiff(final SourceDiscoverSchemaRead discoveredSchema,
final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody)
throws JsonValidationException, ConfigNotFoundException, IOException {
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler
.getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId());
final ConnectionRead connectionRead = connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId());
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
connectionRead.getSyncCatalog();
CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
CatalogConverter.toProtocol(currentAirbyteCatalog));
boolean containsBreakingChange = containsBreakingChange(diff);
ConnectionUpdate updateObject =
final CatalogDiff diff =
connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
CatalogConverter.toProtocol(currentAirbyteCatalog));
final boolean containsBreakingChange = containsBreakingChange(diff);
final ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId());
ConnectionStatus connectionStatus;
final ConnectionStatus connectionStatus;
if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) {
connectionStatus = ConnectionStatus.INACTIVE;
} else {
Expand All @@ -389,7 +392,9 @@ private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discovered

}

private boolean shouldDisableConnection(boolean containsBreakingChange, NonBreakingChangesPreference preference, CatalogDiff diff) {
private boolean shouldDisableConnection(final boolean containsBreakingChange,
final NonBreakingChangesPreference preference,
final CatalogDiff diff) {
if (!envVariableFeatureFlags.autoDetectSchema()) {
return false;
}
Expand Down Expand Up @@ -438,13 +443,16 @@ private JobInfoRead submitCancellationToWorker(final Long jobId) throws IOExcept
return jobConverter.getJobInfoRead(jobPersistence.getJob(jobId));
}

private JobInfoRead submitManualSyncToWorker(final UUID connectionId) throws IOException {
private JobInfoRead submitManualSyncToWorker(final UUID connectionId)
throws IOException, IllegalStateException, JsonValidationException, ConfigNotFoundException {
// get standard sync to validate connection id before submitting sync to temporal
configRepository.getStandardSync(connectionId);
final ManualOperationResult manualSyncResult = eventRunner.startNewManualSync(connectionId);

return readJobFromResult(manualSyncResult);
}

private JobInfoRead submitResetConnectionToWorker(final UUID connectionId) throws IOException, JsonValidationException, ConfigNotFoundException {
private JobInfoRead submitResetConnectionToWorker(final UUID connectionId) throws IOException, IllegalStateException, ConfigNotFoundException {
final ManualOperationResult resetConnectionResult = eventRunner.resetConnection(
connectionId,
configRepository.getAllStreamsForConnection(connectionId),
Expand All @@ -468,12 +476,12 @@ private JobInfoRead readJobFromResult(final ManualOperationResult manualOperatio
}

private boolean containsBreakingChange(final CatalogDiff diff) {
for (StreamTransform streamTransform : diff.getTransforms()) {
for (final StreamTransform streamTransform : diff.getTransforms()) {
if (streamTransform.getTransformType() != TransformTypeEnum.UPDATE_STREAM) {
continue;
}

boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(FieldTransform::getBreaking);
final boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(FieldTransform::getBreaking);
if (anyBreakingFieldTransforms) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ void testEnumConversion() {
}

@Test
void testSyncConnection() throws IOException {
void testSyncConnection() throws IOException, JsonValidationException, ConfigNotFoundException {
final UUID connectionId = UUID.randomUUID();

final long jobId = 123L;
Expand Down

0 comments on commit 0e62836

Please sign in to comment.