diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index ae9909300922..4883fb4acc39 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -3744,7 +3744,25 @@ components: description: Alias name to the stream to be used in the destination type: string selected: + description: If this is true, the stream is selected with all of its properties. type: boolean + fieldSelectionEnabled: + description: Whether field selection should be enabled. If this is true, only the properties in `selectedFields` will be included. + type: boolean + selectedFields: + description: Paths to the fields that will be included in the configured catalog. This must be set if `fieldSelectedEnabled` is set. An empty list indicates that no properties will be included. + type: array + items: + $ref: "#/components/schemas/SelectedFieldInfo" + SelectedFieldInfo: + type: object + # TODO(mfsiega-airbyte): point to thorough documentation on nested fields and paths. + description: Path to a field/column/property in a stream to be selected. For example, if the field to be selected is a database column called "foo", this will be ["foo"]. Use multiple path elements for nested schemas. + properties: + fieldPath: + type: array + items: + type: string DataType: type: string enum: diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java b/airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java index aa91c5e9b71e..701ef8445045 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java @@ -92,7 +92,7 @@ public static ConnectionRead internalToConnectionRead(final StandardSync standar .namespaceDefinition(Enums.convertTo(standardSync.getNamespaceDefinition(), io.airbyte.api.model.generated.NamespaceDefinitionType.class)) .namespaceFormat(standardSync.getNamespaceFormat()) .prefix(standardSync.getPrefix()) - .syncCatalog(CatalogConverter.toApi(standardSync.getCatalog())) + .syncCatalog(CatalogConverter.toApi(standardSync.getCatalog(), standardSync.getFieldSelectionData())) .sourceCatalogId(standardSync.getSourceCatalogId()) .breakingChange(standardSync.getBreakingChange()) .geography(Enums.convertTo(standardSync.getGeography(), Geography.class)) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java index 8e3a9473bef1..cf15edd7bf79 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java @@ -29,6 +29,7 @@ import io.airbyte.config.ActorCatalog; import io.airbyte.config.BasicSchedule; import io.airbyte.config.DestinationConnection; +import io.airbyte.config.FieldSelectionData; import io.airbyte.config.Geography; import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; import io.airbyte.config.Schedule; @@ -157,8 +158,10 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate) // TODO Undesirable behavior: sending a null configured catalog should not be valid? if (connectionCreate.getSyncCatalog() != null) { standardSync.withCatalog(CatalogConverter.toProtocol(connectionCreate.getSyncCatalog())); + standardSync.withFieldSelectionData(CatalogConverter.getFieldSelectionData(connectionCreate.getSyncCatalog())); } else { standardSync.withCatalog(new ConfiguredAirbyteCatalog().withStreams(Collections.emptyList())); + standardSync.withFieldSelectionData(new FieldSelectionData()); } if (connectionCreate.getSchedule() != null && connectionCreate.getScheduleType() != null) { @@ -313,6 +316,7 @@ private static void applyPatchToStandardSync(final StandardSync sync, final Conn if (patch.getSyncCatalog() != null) { sync.setCatalog(CatalogConverter.toProtocol(patch.getSyncCatalog())); + sync.withFieldSelectionData(CatalogConverter.getFieldSelectionData(patch.getSyncCatalog())); } if (patch.getName() != null) { @@ -440,7 +444,8 @@ public ConnectionRead getConnection(final UUID connectionId) return buildConnectionRead(connectionId); } - public CatalogDiff getDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog, final ConfiguredAirbyteCatalog configuredCatalog) { + public CatalogDiff getDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog, final ConfiguredAirbyteCatalog configuredCatalog) + throws JsonValidationException { return new CatalogDiff().transforms(CatalogHelpers.getCatalogDiff( CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(oldCatalog)), CatalogHelpers.configuredCatalogToCatalog(CatalogConverter.toProtocolKeepAllStreams(newCatalog)), configuredCatalog) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index fa4822a6da67..716d39e5bfc1 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -25,6 +25,7 @@ import io.airbyte.api.model.generated.OperationReadList; import io.airbyte.api.model.generated.OperationUpdate; import io.airbyte.api.model.generated.SchemaChange; +import io.airbyte.api.model.generated.SelectedFieldInfo; import io.airbyte.api.model.generated.SourceDiscoverSchemaRead; import io.airbyte.api.model.generated.SourceDiscoverSchemaRequestBody; import io.airbyte.api.model.generated.SourceIdRequestBody; @@ -433,6 +434,17 @@ protected static AirbyteCatalog updateSchemaWithDiscovery(final AirbyteCatalog o outputStreamConfig.setAliasName(originalStreamConfig.getAliasName()); outputStreamConfig.setSelected(originalStream.getConfig().getSelected()); + + outputStreamConfig.setFieldSelectionEnabled(originalStreamConfig.getFieldSelectionEnabled()); + if (outputStreamConfig.getFieldSelectionEnabled()) { + // TODO(mfsiega-airbyte): support nested fields. + // If field selection is enabled, populate the selected columns. + final List selectedFields = new ArrayList<>(); + originalStream.getStream().getJsonSchema().findValue("properties").fieldNames().forEachRemaining((name) -> selectedFields.add(name)); + outputStreamConfig.setSelectedFields( + selectedFields.stream().map((fieldName) -> new SelectedFieldInfo().addFieldPathItem(fieldName)).collect(Collectors.toList())); + } + } else { outputStreamConfig = discoveredStream.getConfig(); outputStreamConfig.setSelected(false); @@ -465,24 +477,24 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne throws ConfigNotFoundException, IOException, JsonValidationException { final UUID connectionId = webBackendConnectionPatch.getConnectionId(); - ConnectionRead connectionRead = connectionsHandler.getConnection(connectionId); + final ConnectionRead originalConnectionRead = connectionsHandler.getConnection(connectionId); // before doing any updates, fetch the existing catalog so that it can be diffed // with the final catalog to determine which streams might need to be reset. final ConfiguredAirbyteCatalog oldConfiguredCatalog = configRepository.getConfiguredCatalogForConnection(connectionId); - final List newAndExistingOperationIds = createOrUpdateOperations(connectionRead, webBackendConnectionPatch); + final List newAndExistingOperationIds = createOrUpdateOperations(originalConnectionRead, webBackendConnectionPatch); // pass in operationIds because the patch object doesn't include operationIds that were just created // above. final ConnectionUpdate connectionPatch = toConnectionPatch(webBackendConnectionPatch, newAndExistingOperationIds); // persist the update and set the connectionRead to the updated form. - connectionRead = connectionsHandler.updateConnection(connectionPatch); + final ConnectionRead updatedConnectionRead = connectionsHandler.updateConnection(connectionPatch); // detect if any streams need to be reset based on the patch and initial catalog, if so, reset them - resetStreamsIfNeeded(webBackendConnectionPatch, oldConfiguredCatalog, connectionRead); + resetStreamsIfNeeded(webBackendConnectionPatch, oldConfiguredCatalog, updatedConnectionRead, originalConnectionRead); /* * This catalog represents the full catalog that was used to create the configured catalog. It will * have all streams that were present at the time. It will have no configuration set. @@ -491,12 +503,12 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne .getConnectionAirbyteCatalog(connectionId); if (catalogUsedToMakeConfiguredCatalog.isPresent()) { // Update the Catalog returned to include all streams, including disabled ones - final AirbyteCatalog syncCatalog = updateSchemaWithDiscovery(connectionRead.getSyncCatalog(), catalogUsedToMakeConfiguredCatalog.get()); - connectionRead.setSyncCatalog(syncCatalog); + final AirbyteCatalog syncCatalog = updateSchemaWithDiscovery(updatedConnectionRead.getSyncCatalog(), catalogUsedToMakeConfiguredCatalog.get()); + updatedConnectionRead.setSyncCatalog(syncCatalog); } - final Optional currentSourceCatalogId = Optional.ofNullable(connectionRead.getSourceCatalogId()); - return buildWebBackendConnectionRead(connectionRead, currentSourceCatalogId); + final Optional currentSourceCatalogId = Optional.ofNullable(updatedConnectionRead.getSourceCatalogId()); + return buildWebBackendConnectionRead(updatedConnectionRead, currentSourceCatalogId); } /** @@ -505,13 +517,15 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne */ private void resetStreamsIfNeeded(final WebBackendConnectionUpdate webBackendConnectionPatch, final ConfiguredAirbyteCatalog oldConfiguredCatalog, - final ConnectionRead updatedConnectionRead) + final ConnectionRead updatedConnectionRead, + final ConnectionRead oldConnectionRead) throws IOException, JsonValidationException, ConfigNotFoundException { final UUID connectionId = webBackendConnectionPatch.getConnectionId(); final Boolean skipReset = webBackendConnectionPatch.getSkipReset() != null ? webBackendConnectionPatch.getSkipReset() : false; if (!skipReset) { - final AirbyteCatalog apiExistingCatalog = CatalogConverter.toApi(oldConfiguredCatalog); + final AirbyteCatalog apiExistingCatalog = CatalogConverter.toApi(oldConfiguredCatalog, + CatalogConverter.getFieldSelectionData(oldConnectionRead.getSyncCatalog())); final AirbyteCatalog upToDateAirbyteCatalog = updatedConnectionRead.getSyncCatalog(); final CatalogDiff catalogDiff = connectionsHandler.getDiff(apiExistingCatalog, upToDateAirbyteCatalog, CatalogConverter.toProtocol(upToDateAirbyteCatalog)); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java index 174fec49e50e..6face8f6f35e 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java @@ -4,21 +4,35 @@ package io.airbyte.server.handlers.helpers; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.api.model.generated.AirbyteCatalog; import io.airbyte.api.model.generated.AirbyteStream; +import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration; +import io.airbyte.api.model.generated.AirbyteStreamConfiguration; +import io.airbyte.api.model.generated.SelectedFieldInfo; +import io.airbyte.api.model.generated.StreamDescriptor; import io.airbyte.commons.enums.Enums; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.text.Names; +import io.airbyte.config.FieldSelectionData; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.validation.json.JsonValidationException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Convert classes between io.airbyte.protocol.models and io.airbyte.api.model.generated */ public class CatalogConverter { + private static final Logger LOGGER = LoggerFactory.getLogger(CatalogConverter.class); + private static io.airbyte.api.model.generated.AirbyteStream toApi(final io.airbyte.protocol.models.AirbyteStream stream) { return new io.airbyte.api.model.generated.AirbyteStream() .name(stream.getName()) @@ -30,7 +44,36 @@ private static io.airbyte.api.model.generated.AirbyteStream toApi(final io.airby .namespace(stream.getNamespace()); } - private static io.airbyte.protocol.models.AirbyteStream toProtocol(final io.airbyte.api.model.generated.AirbyteStream stream) { + @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") + private static io.airbyte.protocol.models.AirbyteStream toProtocol(final AirbyteStream stream, AirbyteStreamConfiguration config) + throws JsonValidationException { + if (config.getFieldSelectionEnabled() != null && config.getFieldSelectionEnabled()) { + // Validate the selected field paths. + if (config.getSelectedFields() == null) { + throw new JsonValidationException("Requested field selection but no selected fields provided"); + } + final JsonNode properties = stream.getJsonSchema().findValue("properties"); + if (properties == null || !properties.isObject()) { + throw new JsonValidationException("Requested field selection but no properties node found"); + } + for (final var selectedFieldInfo : config.getSelectedFields()) { + if (selectedFieldInfo.getFieldPath() == null || selectedFieldInfo.getFieldPath().isEmpty()) { + throw new JsonValidationException("Selected field path cannot be empty"); + } + if (selectedFieldInfo.getFieldPath().size() > 1) { + // TODO(mfsiega-airbyte): support nested fields. + throw new UnsupportedOperationException("Nested field selection not supported"); + } + } + // Only include the selected fields. + List selectedFieldNames = config.getSelectedFields().stream().map((field) -> field.getFieldPath().get(0)).collect(Collectors.toList()); + for (final String selectedFieldName : selectedFieldNames) { + if (!properties.has(selectedFieldName)) { + throw new JsonValidationException(String.format("Requested selected field %s not found in JSON schema", selectedFieldName)); + } + } + ((ObjectNode) properties).retain(selectedFieldNames); + } return new io.airbyte.protocol.models.AirbyteStream() .withName(stream.getName()) .withJsonSchema(stream.getJsonSchema()) @@ -67,10 +110,13 @@ private static io.airbyte.api.model.generated.AirbyteStreamConfiguration generat return result; } - public static io.airbyte.api.model.generated.AirbyteCatalog toApi(final io.airbyte.protocol.models.ConfiguredAirbyteCatalog catalog) { + public static io.airbyte.api.model.generated.AirbyteCatalog toApi(final ConfiguredAirbyteCatalog catalog, FieldSelectionData fieldSelectionData) { final List streams = catalog.getStreams() .stream() .map(configuredStream -> { + final var streamDescriptor = new StreamDescriptor() + .name(configuredStream.getStream().getName()) + .namespace(configuredStream.getStream().getNamespace()); final io.airbyte.api.model.generated.AirbyteStreamConfiguration configuration = new io.airbyte.api.model.generated.AirbyteStreamConfiguration() .syncMode(Enums.convertTo(configuredStream.getSyncMode(), io.airbyte.api.model.generated.SyncMode.class)) @@ -79,7 +125,15 @@ public static io.airbyte.api.model.generated.AirbyteCatalog toApi(final io.airby Enums.convertTo(configuredStream.getDestinationSyncMode(), io.airbyte.api.model.generated.DestinationSyncMode.class)) .primaryKey(configuredStream.getPrimaryKey()) .aliasName(Names.toAlphanumericAndUnderscore(configuredStream.getStream().getName())) - .selected(true); + .selected(true) + .fieldSelectionEnabled(getStreamHasFieldSelectionEnabled(fieldSelectionData, streamDescriptor)); + if (configuration.getFieldSelectionEnabled()) { + final List selectedColumns = new ArrayList<>(); + // TODO(mfsiega-airbyte): support nested fields here. + configuredStream.getStream().getJsonSchema().findValue("properties").fieldNames().forEachRemaining((name) -> selectedColumns.add(name)); + configuration.setSelectedFields( + selectedColumns.stream().map((fieldName) -> new SelectedFieldInfo().addFieldPathItem(fieldName)).collect(Collectors.toList())); + } return new io.airbyte.api.model.generated.AirbyteStreamAndConfiguration() .stream(toApi(configuredStream.getStream())) .config(configuration); @@ -88,14 +142,26 @@ public static io.airbyte.api.model.generated.AirbyteCatalog toApi(final io.airby return new io.airbyte.api.model.generated.AirbyteCatalog().streams(streams); } + private static Boolean getStreamHasFieldSelectionEnabled(FieldSelectionData fieldSelectionData, StreamDescriptor streamDescriptor) { + if (fieldSelectionData == null + || fieldSelectionData.getAdditionalProperties().get(streamDescriptorToStringForFieldSelection(streamDescriptor)) == null) { + return false; + } + + return fieldSelectionData.getAdditionalProperties().get(streamDescriptorToStringForFieldSelection(streamDescriptor)); + } + /** * Converts the API catalog model into a protocol catalog. Note: returns all streams, regardless of - * selected status. See {@link CatalogConverter#toProtocol(AirbyteStream)} for context. + * selected status. See + * {@link CatalogConverter#toProtocol(AirbyteStream, AirbyteStreamConfiguration)} for context. * * @param catalog api catalog * @return protocol catalog */ - public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog toProtocolKeepAllStreams(final io.airbyte.api.model.generated.AirbyteCatalog catalog) { + public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog toProtocolKeepAllStreams( + final io.airbyte.api.model.generated.AirbyteCatalog catalog) + throws JsonValidationException { final AirbyteCatalog clone = Jsons.clone(catalog); clone.getStreams().forEach(stream -> stream.getConfig().setSelected(true)); return toProtocol(clone); @@ -111,20 +177,62 @@ public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog toProtocolKeep * @param catalog api catalog * @return protocol catalog */ - public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog toProtocol(final io.airbyte.api.model.generated.AirbyteCatalog catalog) { + public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog toProtocol(final io.airbyte.api.model.generated.AirbyteCatalog catalog) + throws JsonValidationException { + final ArrayList errors = new ArrayList<>(); final List streams = catalog.getStreams() .stream() .filter(s -> s.getConfig().getSelected()) - .map(s -> new io.airbyte.protocol.models.ConfiguredAirbyteStream() - .withStream(toProtocol(s.getStream())) - .withSyncMode(Enums.convertTo(s.getConfig().getSyncMode(), io.airbyte.protocol.models.SyncMode.class)) - .withCursorField(s.getConfig().getCursorField()) - .withDestinationSyncMode(Enums.convertTo(s.getConfig().getDestinationSyncMode(), - io.airbyte.protocol.models.DestinationSyncMode.class)) - .withPrimaryKey(Optional.ofNullable(s.getConfig().getPrimaryKey()).orElse(Collections.emptyList()))) + .map(s -> { + try { + return new io.airbyte.protocol.models.ConfiguredAirbyteStream() + .withStream(toProtocol(s.getStream(), s.getConfig())) + .withSyncMode(Enums.convertTo(s.getConfig().getSyncMode(), io.airbyte.protocol.models.SyncMode.class)) + .withCursorField(s.getConfig().getCursorField()) + .withDestinationSyncMode(Enums.convertTo(s.getConfig().getDestinationSyncMode(), + io.airbyte.protocol.models.DestinationSyncMode.class)) + .withPrimaryKey(Optional.ofNullable(s.getConfig().getPrimaryKey()).orElse(Collections.emptyList())); + } catch (JsonValidationException e) { + LOGGER.error("Error parsing catalog: {}", e); + errors.add(e); + return null; + } + }) .collect(Collectors.toList()); + if (!errors.isEmpty()) { + throw errors.get(0); + } return new io.airbyte.protocol.models.ConfiguredAirbyteCatalog() .withStreams(streams); } + /** + * Generate the map from StreamDescriptor to indicator of whether field selection is enabled for + * that stream. + * + * @param syncCatalog the catalog + * @return the map as a FieldSelectionData object + */ + public static FieldSelectionData getFieldSelectionData(final AirbyteCatalog syncCatalog) { + if (syncCatalog == null) { + return null; + } + final var fieldSelectionData = new FieldSelectionData(); + for (final AirbyteStreamAndConfiguration streamAndConfig : syncCatalog.getStreams()) { + final var streamDescriptor = new StreamDescriptor() + .name(streamAndConfig.getStream().getName()) + .namespace(streamAndConfig.getStream().getNamespace()); + final boolean fieldSelectionEnabled = + streamAndConfig.getConfig().getFieldSelectionEnabled() == null ? false : streamAndConfig.getConfig().getFieldSelectionEnabled(); + fieldSelectionData.setAdditionalProperty(streamDescriptorToStringForFieldSelection(streamDescriptor), fieldSelectionEnabled); + } + return fieldSelectionData; + } + + // Return a string representation of a stream descriptor that's convenient to use as a key for the + // field selection data. + private static String streamDescriptorToStringForFieldSelection(final StreamDescriptor streamDescriptor) { + return String.format("%s/%s", streamDescriptor.getNamespace(), streamDescriptor.getName()); + } + } diff --git a/airbyte-server/src/test/java/io/airbyte/server/converters/CatalogConverterTest.java b/airbyte-server/src/test/java/io/airbyte/server/converters/CatalogConverterTest.java index 1641a5e7dec3..b10cd31bbe3b 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/converters/CatalogConverterTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/converters/CatalogConverterTest.java @@ -4,25 +4,32 @@ package io.airbyte.server.converters; +import static io.airbyte.server.helpers.ConnectionHelpers.FIELD_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.api.model.generated.SelectedFieldInfo; import io.airbyte.commons.enums.Enums; import io.airbyte.config.DataType; +import io.airbyte.config.FieldSelectionData; import io.airbyte.server.handlers.helpers.CatalogConverter; import io.airbyte.server.helpers.ConnectionHelpers; +import io.airbyte.validation.json.JsonValidationException; import org.junit.jupiter.api.Test; class CatalogConverterTest { @Test - void testConvertToProtocol() { + void testConvertToProtocol() throws JsonValidationException { assertEquals(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog(), CatalogConverter.toProtocol(ConnectionHelpers.generateBasicApiCatalog())); } @Test void testConvertToAPI() { - assertEquals(ConnectionHelpers.generateBasicApiCatalog(), CatalogConverter.toApi(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog())); + assertEquals(ConnectionHelpers.generateBasicApiCatalog(), CatalogConverter.toApi(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog(), + new FieldSelectionData())); } @Test @@ -31,4 +38,51 @@ void testEnumConversion() { assertTrue(Enums.isCompatible(io.airbyte.config.SyncMode.class, io.airbyte.api.model.generated.SyncMode.class)); } + @Test + void testConvertToProtocolColumnSelectionValidation() { + assertThrows(JsonValidationException.class, () -> { + // fieldSelectionEnabled=true but selectedFields=null. + final var catalog = ConnectionHelpers.generateBasicApiCatalog(); + catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).selectedFields(null); + CatalogConverter.toProtocol(catalog); + }); + + assertThrows(JsonValidationException.class, () -> { + // JSON schema has no `properties` node. + final var catalog = ConnectionHelpers.generateBasicApiCatalog(); + ((ObjectNode) catalog.getStreams().get(0).getStream().getJsonSchema()).remove("properties"); + catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem("foo")); + CatalogConverter.toProtocol(catalog); + }); + + assertThrows(JsonValidationException.class, () -> { + // SelectedFieldInfo with empty path. + final var catalog = ConnectionHelpers.generateBasicApiCatalog(); + catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo()); + CatalogConverter.toProtocol(catalog); + }); + + assertThrows(UnsupportedOperationException.class, () -> { + // SelectedFieldInfo with nested field path. + final var catalog = ConnectionHelpers.generateBasicApiCatalog(); + catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true) + .addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem("foo").addFieldPathItem("bar")); + CatalogConverter.toProtocol(catalog); + }); + + assertThrows(JsonValidationException.class, () -> { + // SelectedFieldInfo with empty path. + final var catalog = ConnectionHelpers.generateBasicApiCatalog(); + catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem("foo")); + CatalogConverter.toProtocol(catalog); + }); + } + + @Test + void testConvertToProtocolFieldSelection() throws JsonValidationException { + final var catalog = ConnectionHelpers.generateApiCatalogWithTwoFields(); + catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME)); + assertEquals(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog(), CatalogConverter.toProtocol(catalog)); + } + } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java index 9e48179153ac..aef4ed143c37 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java @@ -4,6 +4,7 @@ package io.airbyte.server.handlers; +import static io.airbyte.server.helpers.ConnectionHelpers.FIELD_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -34,6 +35,7 @@ import io.airbyte.api.model.generated.DestinationSyncMode; import io.airbyte.api.model.generated.NamespaceDefinitionType; import io.airbyte.api.model.generated.ResourceRequirements; +import io.airbyte.api.model.generated.SelectedFieldInfo; import io.airbyte.api.model.generated.SourceSearch; import io.airbyte.api.model.generated.StreamDescriptor; import io.airbyte.api.model.generated.SyncMode; @@ -45,6 +47,7 @@ import io.airbyte.config.Cron; import io.airbyte.config.DataType; import io.airbyte.config.DestinationConnection; +import io.airbyte.config.FieldSelectionData; import io.airbyte.config.Geography; import io.airbyte.config.JobSyncConfig; import io.airbyte.config.Schedule; @@ -143,6 +146,7 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio .withPrefix(PRESTO_TO_HUDI_PREFIX) .withStatus(StandardSync.Status.ACTIVE) .withCatalog(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog()) + .withFieldSelectionData(new FieldSelectionData().withAdditionalProperty("null/users-data0", false)) .withSourceId(sourceId) .withDestinationId(destinationId) .withOperationIds(List.of(operationId)) @@ -307,6 +311,48 @@ void testCreateConnectionUsesDefaultGeographyFromWorkspace() throws JsonValidati verify(configRepository).writeStandardSync(standardSync); } + @Test + void testCreateConnectionWithSelectedFields() throws IOException, JsonValidationException, ConfigNotFoundException { + final StandardWorkspace workspace = new StandardWorkspace() + .withWorkspaceId(workspaceId) + .withDefaultGeography(Geography.AUTO); + when(configRepository.getStandardWorkspaceNoSecrets(workspaceId, true)).thenReturn(workspace); + + final AirbyteCatalog catalog = ConnectionHelpers.generateApiCatalogWithTwoFields(); + // Only select one of the two fields. + catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true) + .selectedFields(List.of(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME))); + + final ConnectionCreate connectionCreate = new ConnectionCreate() + .sourceId(standardSync.getSourceId()) + .destinationId(standardSync.getDestinationId()) + .operationIds(standardSync.getOperationIds()) + .name(PRESTO_TO_HUDI) + .namespaceDefinition(NamespaceDefinitionType.SOURCE) + .namespaceFormat(null) + .prefix(PRESTO_TO_HUDI_PREFIX) + .status(ConnectionStatus.ACTIVE) + .schedule(ConnectionHelpers.generateBasicConnectionSchedule()) + .syncCatalog(catalog) + .resourceRequirements(new io.airbyte.api.model.generated.ResourceRequirements() + .cpuRequest(standardSync.getResourceRequirements().getCpuRequest()) + .cpuLimit(standardSync.getResourceRequirements().getCpuLimit()) + .memoryRequest(standardSync.getResourceRequirements().getMemoryRequest()) + .memoryLimit(standardSync.getResourceRequirements().getMemoryLimit())) + .sourceCatalogId(standardSync.getSourceCatalogId()) + .geography(ApiPojoConverters.toApiGeography(standardSync.getGeography())); + + final ConnectionRead actualConnectionRead = connectionsHandler.createConnection(connectionCreate); + + final ConnectionRead expectedConnectionRead = ConnectionHelpers.generateExpectedConnectionRead(standardSync); + + assertEquals(expectedConnectionRead, actualConnectionRead); + + standardSync.withFieldSelectionData(new FieldSelectionData().withAdditionalProperty("null/users-data0", true)); + + verify(configRepository).writeStandardSync(standardSync); + } + @Test void testValidateConnectionCreateSourceAndDestinationInDifferenceWorkspace() { @@ -515,7 +561,8 @@ void testUpdateConnectionPatchAddingNewStream() throws Exception { .syncCatalog(catalogForUpdate); final StandardSync expectedPersistedSync = Jsons.clone(standardSync) - .withCatalog(expectedPersistedCatalog); + .withCatalog(expectedPersistedCatalog) + .withFieldSelectionData(CatalogConverter.getFieldSelectionData(catalogForUpdate)); when(configRepository.getStandardSync(standardSync.getConnectionId())).thenReturn(standardSync); @@ -552,7 +599,41 @@ void testUpdateConnectionPatchEditExistingStreamWhileAddingNewStream() throws Ex .syncCatalog(catalogForUpdate); final StandardSync expectedPersistedSync = Jsons.clone(standardSync) - .withCatalog(expectedPersistedCatalog); + .withCatalog(expectedPersistedCatalog) + .withFieldSelectionData(CatalogConverter.getFieldSelectionData(catalogForUpdate)); + + when(configRepository.getStandardSync(standardSync.getConnectionId())).thenReturn(standardSync); + + final ConnectionRead actualConnectionRead = connectionsHandler.updateConnection(connectionUpdate); + + assertEquals(expectedRead, actualConnectionRead); + verify(configRepository).writeStandardSync(expectedPersistedSync); + verify(eventRunner).update(connectionUpdate.getConnectionId()); + } + + @Test + void testUpdateConnectionPatchColumnSelection() throws Exception { + // The connection initially has a catalog with one stream, and two fields in that stream. + standardSync.setCatalog(ConnectionHelpers.generateAirbyteCatalogWithTwoFields()); + + // Send an update that only selects one of the fields. + final AirbyteCatalog catalogForUpdate = ConnectionHelpers.generateApiCatalogWithTwoFields(); + catalogForUpdate.getStreams().get(0).getConfig().fieldSelectionEnabled(true) + .selectedFields(List.of(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME))); + + // Expect one column in the final persisted catalog + final ConfiguredAirbyteCatalog expectedPersistedCatalog = ConnectionHelpers.generateBasicConfiguredAirbyteCatalog(); + + final ConnectionUpdate connectionUpdate = new ConnectionUpdate() + .connectionId(standardSync.getConnectionId()) + .syncCatalog(catalogForUpdate); + + final ConnectionRead expectedRead = ConnectionHelpers.generateExpectedConnectionRead(standardSync) + .syncCatalog(catalogForUpdate); + + final StandardSync expectedPersistedSync = Jsons.clone(standardSync) + .withCatalog(expectedPersistedCatalog) + .withFieldSelectionData(CatalogConverter.getFieldSelectionData(catalogForUpdate)); when(configRepository.getStandardSync(standardSync.getConnectionId())).thenReturn(standardSync); @@ -602,6 +683,7 @@ void testUpdateConnectionPatchingSeveralFieldsAndReplaceAStream() throws JsonVal .withSchedule(null) .withManual(true) .withCatalog(expectedPersistedCatalog) + .withFieldSelectionData(CatalogConverter.getFieldSelectionData(catalogForUpdate)) .withResourceRequirements(ApiPojoConverters.resourceRequirementsToInternal(resourceRequirements)) .withSourceCatalogId(newSourceCatalogId) .withOperationIds(List.of(operationId, otherOperationId)) @@ -642,7 +724,7 @@ void testValidateConnectionUpdateOperationInDifferentWorkspace() throws JsonVali final ConnectionUpdate connectionUpdate = new ConnectionUpdate() .connectionId(standardSync.getConnectionId()) .operationIds(Collections.singletonList(operationId)) - .syncCatalog(CatalogConverter.toApi(standardSync.getCatalog())); + .syncCatalog(CatalogConverter.toApi(standardSync.getCatalog(), standardSync.getFieldSelectionData())); assertThrows(IllegalArgumentException.class, () -> connectionsHandler.updateConnection(connectionUpdate)); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java index 908a5749e72f..53c199c19892 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java +++ b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java @@ -4,6 +4,8 @@ package io.airbyte.server.helpers; +import static io.airbyte.server.handlers.helpers.CatalogConverter.toApi; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; import io.airbyte.api.model.generated.AirbyteCatalog; @@ -42,7 +44,6 @@ import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.StreamDescriptor; import io.airbyte.server.converters.ApiPojoConverters; -import io.airbyte.server.handlers.helpers.CatalogConverter; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -52,7 +53,7 @@ public class ConnectionHelpers { private static final String STREAM_NAME_BASE = "users-data"; private static final String STREAM_NAME = STREAM_NAME_BASE + "0"; - private static final String FIELD_NAME = "id"; + public static final String FIELD_NAME = "id"; private static final String BASIC_SCHEDULE_TIME_UNIT = "days"; private static final long BASIC_SCHEDULE_UNITS = 1L; private static final String BASIC_SCHEDULE_DATA_TIME_UNITS = "days"; @@ -237,7 +238,7 @@ public static ConnectionRead connectionReadFromStandardSync(final StandardSync s ApiPojoConverters.populateConnectionReadSchedule(standardSync, connectionRead); if (standardSync.getCatalog() != null) { - connectionRead.syncCatalog(CatalogConverter.toApi(standardSync.getCatalog())); + connectionRead.syncCatalog(toApi(standardSync.getCatalog(), standardSync.getFieldSelectionData())); } if (standardSync.getResourceRequirements() != null) { connectionRead.resourceRequirements(new io.airbyte.api.model.generated.ResourceRequirements() @@ -280,10 +281,29 @@ public static JsonNode generateBasicJsonSchema() { return CatalogHelpers.fieldsToJsonSchema(Field.of(FIELD_NAME, JsonSchemaType.STRING)); } + public static JsonNode generateJsonSchemaWithTwoFields() { + return CatalogHelpers.fieldsToJsonSchema( + Field.of(FIELD_NAME, JsonSchemaType.STRING), + Field.of(FIELD_NAME + "2", JsonSchemaType.STRING)); + } + public static ConfiguredAirbyteCatalog generateBasicConfiguredAirbyteCatalog() { return new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(generateBasicConfiguredStream(null))); } + public static ConfiguredAirbyteCatalog generateAirbyteCatalogWithTwoFields() { + return new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(new ConfiguredAirbyteStream() + .withStream( + new io.airbyte.protocol.models.AirbyteStream() + .withName(STREAM_NAME) + .withJsonSchema(generateJsonSchemaWithTwoFields()) + .withDefaultCursorField(Lists.newArrayList(FIELD_NAME)) + .withSourceDefinedCursor(false) + .withSourceDefinedPrimaryKey(Collections.emptyList()) + .withSupportedSyncModes( + List.of(io.airbyte.protocol.models.SyncMode.FULL_REFRESH, io.airbyte.protocol.models.SyncMode.INCREMENTAL))))); + } + public static ConfiguredAirbyteCatalog generateMultipleStreamsConfiguredAirbyteCatalog(final int streamsCount) { final List configuredStreams = new ArrayList<>(); for (int i = 0; i < streamsCount; i++) { @@ -314,6 +334,17 @@ public static AirbyteCatalog generateBasicApiCatalog() { .config(generateBasicApiStreamConfig(null)))); } + /** + * Generates an API catalog that has two fields, both selected. + * + * @return AirbyteCatalog + */ + public static AirbyteCatalog generateApiCatalogWithTwoFields() { + return new AirbyteCatalog().streams(Lists.newArrayList(new AirbyteStreamAndConfiguration() + .stream(generateApiStreamWithTwoFields()) + .config(generateBasicApiStreamConfig(null)))); + } + public static AirbyteCatalog generateMultipleStreamsApiCatalog(final int streamsCount) { final List streamAndConfigurations = new ArrayList<>(); for (int i = 0; i < streamsCount; i++) { @@ -331,7 +362,8 @@ private static AirbyteStreamConfiguration generateBasicApiStreamConfig(final Str .destinationSyncMode(io.airbyte.api.model.generated.DestinationSyncMode.APPEND) .primaryKey(Collections.emptyList()) .aliasName(Names.toAlphanumericAndUnderscore(nameSuffix == null ? STREAM_NAME : STREAM_NAME_BASE + nameSuffix)) - .selected(true); + .selected(true) + .fieldSelectionEnabled(false); } private static AirbyteStream generateBasicApiStream(final String nameSuffix) { @@ -344,4 +376,14 @@ private static AirbyteStream generateBasicApiStream(final String nameSuffix) { .supportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)); } + private static AirbyteStream generateApiStreamWithTwoFields() { + return new AirbyteStream() + .name(STREAM_NAME) + .jsonSchema(generateJsonSchemaWithTwoFields()) + .defaultCursorField(Lists.newArrayList(FIELD_NAME)) + .sourceDefinedCursor(false) + .sourceDefinedPrimaryKey(Collections.emptyList()) + .supportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)); + } + } diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index aaa219ed07d1..69ed5d19de84 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -354,7 +354,7 @@ void testCreateConnection() throws ApiException { final String name = "test-connection-" + UUID.randomUUID(); final SyncMode syncMode = SyncMode.FULL_REFRESH; final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; - catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); + catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode).setFieldSelectionEnabled(false)); final ConnectionRead createdConnection = testHarness.createConnection(name, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.BASIC, BASIC_SCHEDULE_DATA); diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index c16f41ebcf94..0f5b3cfee5bb 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -548,6 +548,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -563,6 +569,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -713,6 +725,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -728,6 +746,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -834,6 +858,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -849,6 +879,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -898,6 +934,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -913,6 +955,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -1020,6 +1068,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -1035,6 +1089,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -1084,6 +1144,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -1099,6 +1165,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -1393,6 +1465,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -1408,6 +1486,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -1457,6 +1541,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -1472,6 +1562,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -1765,6 +1861,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -1780,6 +1882,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -5781,6 +5889,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -5796,6 +5910,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -6241,6 +6361,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -6256,6 +6382,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -8326,6 +8458,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -8341,6 +8479,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -8562,6 +8706,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -8577,6 +8727,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -9046,6 +9202,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -9061,6 +9223,12 @@

Example data

}, "config" : { "aliasName" : "aliasName", + "fieldSelectionEnabled" : true, + "selectedFields" : [ { + "fieldPath" : [ "fieldPath", "fieldPath" ] + }, { + "fieldPath" : [ "fieldPath", "fieldPath" ] + } ], "cursorField" : [ "cursorField", "cursorField" ], "selected" : true, "primaryKey" : [ [ "primaryKey", "primaryKey" ], [ "primaryKey", "primaryKey" ] ] @@ -9963,6 +10131,7 @@

Table of Contents

  • ResourceRequirements -
  • SaveStatsRequestBody -
  • SchemaChange -
  • +
  • SelectedFieldInfo -
  • SetInstancewideDestinationOauthParamsRequestBody -
  • SetInstancewideSourceOauthParamsRequestBody -
  • SetWorkflowInAttemptRequestBody -
  • @@ -10073,7 +10242,9 @@

    AirbyteStreamConfiguration
    destinationSyncMode
    primaryKey (optional)
    array[array[String]] Paths to the fields that will be used as primary key. This field is REQUIRED if destination_sync_mode is *_dedup. Otherwise it is ignored.
    aliasName (optional)
    String Alias name to the stream to be used in the destination
    -
    selected (optional)
    +
    selected (optional)
    Boolean If this is true, the stream is selected with all of its properties.
    +
    fieldSelectionEnabled (optional)
    Boolean Whether field selection should be enabled. If this is true, only the properties in selectedFields will be included.
    +
    selectedFields (optional)
    array[SelectedFieldInfo] Paths to the fields that will be included in the configured catalog. This must be set if fieldSelectedEnabled is set. An empty list indicates that no properties will be included.
    @@ -11148,6 +11319,13 @@

    SchemaChange -

    +
    +

    SelectedFieldInfo - Up

    +
    Path to a field/column/property in a stream to be selected. For example, if the field to be selected is a database column called "foo", this will be ["foo"]. Use multiple path elements for nested schemas.
    +
    +
    fieldPath (optional)
    +
    +