Skip to content

Commit

Permalink
API changes to support configuring column selection (#20259)
Browse files Browse the repository at this point in the history
* database migration to add column for field selection info

* add field selection info to standard sync persistence

* fix around persistence of field selection info

* API changes to support configuring column selection

* style and testing improvements around column selection api impl

* acceptance test fix for field selection api changes
  • Loading branch information
mfsiega-airbyte committed Dec 8, 2022
1 parent 0e6e02f commit 70ee6b0
Show file tree
Hide file tree
Showing 10 changed files with 537 additions and 36 deletions.
18 changes: 18 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3744,7 +3744,25 @@ components:
description: Alias name to the stream to be used in the destination
type: string
selected:
description: If this is true, the stream is selected with all of its properties.
type: boolean
fieldSelectionEnabled:
description: Whether field selection should be enabled. If this is true, only the properties in `selectedFields` will be included.
type: boolean
selectedFields:
description: Paths to the fields that will be included in the configured catalog. This must be set if `fieldSelectedEnabled` is set. An empty list indicates that no properties will be included.
type: array
items:
$ref: "#/components/schemas/SelectedFieldInfo"
SelectedFieldInfo:
type: object
# TODO(mfsiega-airbyte): point to thorough documentation on nested fields and paths.
description: Path to a field/column/property in a stream to be selected. For example, if the field to be selected is a database column called "foo", this will be ["foo"]. Use multiple path elements for nested schemas.
properties:
fieldPath:
type: array
items:
type: string
DataType:
type: string
enum:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public static ConnectionRead internalToConnectionRead(final StandardSync standar
.namespaceDefinition(Enums.convertTo(standardSync.getNamespaceDefinition(), io.airbyte.api.model.generated.NamespaceDefinitionType.class))
.namespaceFormat(standardSync.getNamespaceFormat())
.prefix(standardSync.getPrefix())
.syncCatalog(CatalogConverter.toApi(standardSync.getCatalog()))
.syncCatalog(CatalogConverter.toApi(standardSync.getCatalog(), standardSync.getFieldSelectionData()))
.sourceCatalogId(standardSync.getSourceCatalogId())
.breakingChange(standardSync.getBreakingChange())
.geography(Enums.convertTo(standardSync.getGeography(), Geography.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.FieldSelectionData;
import io.airbyte.config.Geography;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Schedule;
Expand Down Expand Up @@ -157,8 +158,10 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
// TODO Undesirable behavior: sending a null configured catalog should not be valid?
if (connectionCreate.getSyncCatalog() != null) {
standardSync.withCatalog(CatalogConverter.toProtocol(connectionCreate.getSyncCatalog()));
standardSync.withFieldSelectionData(CatalogConverter.getFieldSelectionData(connectionCreate.getSyncCatalog()));
} else {
standardSync.withCatalog(new ConfiguredAirbyteCatalog().withStreams(Collections.emptyList()));
standardSync.withFieldSelectionData(new FieldSelectionData());
}

if (connectionCreate.getSchedule() != null && connectionCreate.getScheduleType() != null) {
Expand Down Expand Up @@ -313,6 +316,7 @@ private static void applyPatchToStandardSync(final StandardSync sync, final Conn

if (patch.getSyncCatalog() != null) {
sync.setCatalog(CatalogConverter.toProtocol(patch.getSyncCatalog()));
sync.withFieldSelectionData(CatalogConverter.getFieldSelectionData(patch.getSyncCatalog()));
}

if (patch.getName() != null) {
Expand Down Expand Up @@ -440,7 +444,8 @@ public ConnectionRead getConnection(final UUID connectionId)
return buildConnectionRead(connectionId);
}

public CatalogDiff getDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog, final ConfiguredAirbyteCatalog configuredCatalog) {
public CatalogDiff getDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog, final ConfiguredAirbyteCatalog configuredCatalog)
throws JsonValidationException {
return new CatalogDiff().transforms(CatalogHelpers.getCatalogDiff(
CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(oldCatalog)),
CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(newCatalog)), configuredCatalog)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.airbyte.api.model.generated.OperationReadList;
import io.airbyte.api.model.generated.OperationUpdate;
import io.airbyte.api.model.generated.SchemaChange;
import io.airbyte.api.model.generated.SelectedFieldInfo;
import io.airbyte.api.model.generated.SourceDiscoverSchemaRead;
import io.airbyte.api.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.model.generated.SourceIdRequestBody;
Expand Down Expand Up @@ -433,6 +434,17 @@ protected static AirbyteCatalog updateSchemaWithDiscovery(final AirbyteCatalog o

outputStreamConfig.setAliasName(originalStreamConfig.getAliasName());
outputStreamConfig.setSelected(originalStream.getConfig().getSelected());

outputStreamConfig.setFieldSelectionEnabled(originalStreamConfig.getFieldSelectionEnabled());
if (outputStreamConfig.getFieldSelectionEnabled()) {
// TODO(mfsiega-airbyte): support nested fields.
// If field selection is enabled, populate the selected columns.
final List<String> selectedFields = new ArrayList<>();
originalStream.getStream().getJsonSchema().findValue("properties").fieldNames().forEachRemaining((name) -> selectedFields.add(name));
outputStreamConfig.setSelectedFields(
selectedFields.stream().map((fieldName) -> new SelectedFieldInfo().addFieldPathItem(fieldName)).collect(Collectors.toList()));
}

} else {
outputStreamConfig = discoveredStream.getConfig();
outputStreamConfig.setSelected(false);
Expand Down Expand Up @@ -465,24 +477,24 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
throws ConfigNotFoundException, IOException, JsonValidationException {

final UUID connectionId = webBackendConnectionPatch.getConnectionId();
ConnectionRead connectionRead = connectionsHandler.getConnection(connectionId);
final ConnectionRead originalConnectionRead = connectionsHandler.getConnection(connectionId);

// before doing any updates, fetch the existing catalog so that it can be diffed
// with the final catalog to determine which streams might need to be reset.
final ConfiguredAirbyteCatalog oldConfiguredCatalog =
configRepository.getConfiguredCatalogForConnection(connectionId);

final List<UUID> newAndExistingOperationIds = createOrUpdateOperations(connectionRead, webBackendConnectionPatch);
final List<UUID> newAndExistingOperationIds = createOrUpdateOperations(originalConnectionRead, webBackendConnectionPatch);

// pass in operationIds because the patch object doesn't include operationIds that were just created
// above.
final ConnectionUpdate connectionPatch = toConnectionPatch(webBackendConnectionPatch, newAndExistingOperationIds);

// persist the update and set the connectionRead to the updated form.
connectionRead = connectionsHandler.updateConnection(connectionPatch);
final ConnectionRead updatedConnectionRead = connectionsHandler.updateConnection(connectionPatch);

// detect if any streams need to be reset based on the patch and initial catalog, if so, reset them
resetStreamsIfNeeded(webBackendConnectionPatch, oldConfiguredCatalog, connectionRead);
resetStreamsIfNeeded(webBackendConnectionPatch, oldConfiguredCatalog, updatedConnectionRead, originalConnectionRead);
/*
* This catalog represents the full catalog that was used to create the configured catalog. It will
* have all streams that were present at the time. It will have no configuration set.
Expand All @@ -491,12 +503,12 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
.getConnectionAirbyteCatalog(connectionId);
if (catalogUsedToMakeConfiguredCatalog.isPresent()) {
// Update the Catalog returned to include all streams, including disabled ones
final AirbyteCatalog syncCatalog = updateSchemaWithDiscovery(connectionRead.getSyncCatalog(), catalogUsedToMakeConfiguredCatalog.get());
connectionRead.setSyncCatalog(syncCatalog);
final AirbyteCatalog syncCatalog = updateSchemaWithDiscovery(updatedConnectionRead.getSyncCatalog(), catalogUsedToMakeConfiguredCatalog.get());
updatedConnectionRead.setSyncCatalog(syncCatalog);
}

final Optional<UUID> currentSourceCatalogId = Optional.ofNullable(connectionRead.getSourceCatalogId());
return buildWebBackendConnectionRead(connectionRead, currentSourceCatalogId);
final Optional<UUID> currentSourceCatalogId = Optional.ofNullable(updatedConnectionRead.getSourceCatalogId());
return buildWebBackendConnectionRead(updatedConnectionRead, currentSourceCatalogId);
}

/**
Expand All @@ -505,13 +517,15 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
*/
private void resetStreamsIfNeeded(final WebBackendConnectionUpdate webBackendConnectionPatch,
final ConfiguredAirbyteCatalog oldConfiguredCatalog,
final ConnectionRead updatedConnectionRead)
final ConnectionRead updatedConnectionRead,
final ConnectionRead oldConnectionRead)
throws IOException, JsonValidationException, ConfigNotFoundException {

final UUID connectionId = webBackendConnectionPatch.getConnectionId();
final Boolean skipReset = webBackendConnectionPatch.getSkipReset() != null ? webBackendConnectionPatch.getSkipReset() : false;
if (!skipReset) {
final AirbyteCatalog apiExistingCatalog = CatalogConverter.toApi(oldConfiguredCatalog);
final AirbyteCatalog apiExistingCatalog = CatalogConverter.toApi(oldConfiguredCatalog,
CatalogConverter.getFieldSelectionData(oldConnectionRead.getSyncCatalog()));
final AirbyteCatalog upToDateAirbyteCatalog = updatedConnectionRead.getSyncCatalog();
final CatalogDiff catalogDiff =
connectionsHandler.getDiff(apiExistingCatalog, upToDateAirbyteCatalog, CatalogConverter.toProtocol(upToDateAirbyteCatalog));
Expand Down
Loading

0 comments on commit 70ee6b0

Please sign in to comment.