diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java index 6face8f6f35ee..a16ea2b6e6552 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +67,18 @@ private static io.airbyte.protocol.models.AirbyteStream toProtocol(final Airbyte } } // Only include the selected fields. - List selectedFieldNames = config.getSelectedFields().stream().map((field) -> field.getFieldPath().get(0)).collect(Collectors.toList()); + final Set selectedFieldNames = + config.getSelectedFields().stream().map((field) -> field.getFieldPath().get(0)).collect(Collectors.toSet()); + // TODO(mfsiega-airbyte): we only check the top level of the cursor/primary key fields because we + // don't support filtering nested fields yet. + if (!selectedFieldNames.contains(config.getCursorField().get(0))) { + throw new JsonValidationException("Cursor field cannot be de-selected"); + } + for (final List primaryKeyComponent : config.getPrimaryKey()) { + if (!selectedFieldNames.contains(primaryKeyComponent.get(0))) { + throw new JsonValidationException("Primary key field cannot be de-selected"); + } + } for (final String selectedFieldName : selectedFieldNames) { if (!properties.has(selectedFieldName)) { throw new JsonValidationException(String.format("Requested selected field %s not found in JSON schema", selectedFieldName)); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java index aef4ed143c373..5a23f9f128fab 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java @@ -5,6 +5,7 @@ package io.airbyte.server.handlers; import static io.airbyte.server.helpers.ConnectionHelpers.FIELD_NAME; +import static io.airbyte.server.helpers.ConnectionHelpers.SECOND_FIELD_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -353,6 +354,47 @@ void testCreateConnectionWithSelectedFields() throws IOException, JsonValidation verify(configRepository).writeStandardSync(standardSync); } + @Test + void testFieldSelectionRemoveCursorFails() throws JsonValidationException, ConfigNotFoundException, IOException { + // Test that if we try to de-select a field that's being used for the cursor, the request will fail. + // The connection initially has a catalog with one stream, and two fields in that stream. + standardSync.setCatalog(ConnectionHelpers.generateAirbyteCatalogWithTwoFields()); + + // Send an update that sets a cursor but de-selects that field. + final AirbyteCatalog catalogForUpdate = ConnectionHelpers.generateApiCatalogWithTwoFields(); + catalogForUpdate.getStreams().get(0).getConfig() + .fieldSelectionEnabled(true) + .selectedFields(List.of(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME))) + .cursorField(List.of(SECOND_FIELD_NAME)); + + final ConnectionUpdate connectionUpdate = new ConnectionUpdate() + .connectionId(standardSync.getConnectionId()) + .syncCatalog(catalogForUpdate); + + assertThrows(JsonValidationException.class, () -> connectionsHandler.updateConnection(connectionUpdate)); + } + + @Test + void testFieldSelectionRemovePrimaryKeyFails() throws JsonValidationException, ConfigNotFoundException, IOException { + // Test that if we try to de-select a field that's being used for the primary key, the request will + // fail. + // The connection initially has a catalog with one stream, and two fields in that stream. + standardSync.setCatalog(ConnectionHelpers.generateAirbyteCatalogWithTwoFields()); + + // Send an update that sets a primary key but deselects that field. + final AirbyteCatalog catalogForUpdate = ConnectionHelpers.generateApiCatalogWithTwoFields(); + catalogForUpdate.getStreams().get(0).getConfig() + .fieldSelectionEnabled(true) + .selectedFields(List.of(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME))) + .primaryKey(List.of(List.of(SECOND_FIELD_NAME))); + + final ConnectionUpdate connectionUpdate = new ConnectionUpdate() + .connectionId(standardSync.getConnectionId()) + .syncCatalog(catalogForUpdate); + + assertThrows(JsonValidationException.class, () -> connectionsHandler.updateConnection(connectionUpdate)); + } + @Test void testValidateConnectionCreateSourceAndDestinationInDifferenceWorkspace() { diff --git a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java index 53c199c198927..223135d04669c 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java +++ b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java @@ -54,6 +54,8 @@ public class ConnectionHelpers { private static final String STREAM_NAME_BASE = "users-data"; private static final String STREAM_NAME = STREAM_NAME_BASE + "0"; public static final String FIELD_NAME = "id"; + + public static final String SECOND_FIELD_NAME = "id2"; private static final String BASIC_SCHEDULE_TIME_UNIT = "days"; private static final long BASIC_SCHEDULE_UNITS = 1L; private static final String BASIC_SCHEDULE_DATA_TIME_UNITS = "days"; @@ -284,7 +286,7 @@ public static JsonNode generateBasicJsonSchema() { public static JsonNode generateJsonSchemaWithTwoFields() { return CatalogHelpers.fieldsToJsonSchema( Field.of(FIELD_NAME, JsonSchemaType.STRING), - Field.of(FIELD_NAME + "2", JsonSchemaType.STRING)); + Field.of(SECOND_FIELD_NAME, JsonSchemaType.STRING)); } public static ConfiguredAirbyteCatalog generateBasicConfiguredAirbyteCatalog() {