From 6130a5479527d453fb8274affa0743bbd25246a7 Mon Sep 17 00:00:00 2001 From: Michael Siega <109092231+mfsiega-airbyte@users.noreply.github.com> Date: Wed, 4 Jan 2023 00:30:35 +0100 Subject: [PATCH] pass workspace id to sync workflow and use it to selectively enable field selection (#20589) * pass workspace id to sync workflow and use it to selectively enable field selection * fix tests around workspace id in job creation * make sure field selection environment variables get passed through properly * clean up handling around field selection flags * debug logging for field selection * properly handle empty field selection feature flag * fix pmd * actually fix pmd --- .../temporal/sync/OrchestratorConstants.java | 4 +++- .../general/DefaultReplicationWorker.java | 1 + .../process/AirbyteIntegrationLauncher.java | 4 +++- .../AirbyteIntegrationLauncherTest.java | 4 +++- .../features/EnvVariableFeatureFlags.java | 7 ++++++ .../commons/features/FeatureFlags.java | 13 ++++++++++ .../main/java/io/airbyte/config/Configs.java | 4 ++++ .../java/io/airbyte/config/EnvConfigs.java | 12 ++++++++++ .../types/JobResetConnectionConfig.yaml | 4 ++++ .../main/resources/types/JobSyncConfig.yaml | 4 ++++ .../resources/types/StandardSyncInput.yaml | 4 ++++ .../ReplicationJobOrchestrator.java | 21 +++++++++++++++- .../persistence/job/DefaultJobCreator.java | 6 +++-- .../airbyte/persistence/job/JobCreator.java | 6 +++-- .../job/factory/DefaultSyncJobFactory.java | 3 ++- .../job/DefaultJobCreatorTest.java | 24 ++++++++++++------- .../factory/DefaultSyncJobFactoryTest.java | 7 +++--- ...ontainerOrchestratorConfigBeanFactory.java | 2 ++ .../activities/GenerateInputActivityImpl.java | 6 +++-- .../sync/ReplicationActivityImpl.java | 24 ++++++++++++++++++- docker-compose.yaml | 2 ++ 21 files changed, 138 insertions(+), 24 deletions(-) diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java index 61fccbad4e1e3..ef8d7452d136c 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java @@ -68,7 +68,9 @@ public class OrchestratorConstants { EnvConfigs.STATE_STORAGE_S3_SECRET_ACCESS_KEY, EnvConfigs.STATE_STORAGE_S3_REGION, EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, - EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA)) + EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, + EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, + EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES)) .build(); public static final String INIT_FILE_ENV_MAP = "envMap.json"; diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index 1d8034479239a..4b669bd9e71e9 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -158,6 +158,7 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path .stream() .collect(Collectors.toMap(s -> s.getStream().getNamespace() + "." + s.getStream().getName(), s -> String.format("%s - %s", s.getSyncMode(), s.getDestinationSyncMode())))); + LOGGER.debug("field selection enabled: {}", fieldSelectionEnabled); final WorkerSourceConfig sourceConfig = WorkerUtils.syncToWorkerSourceConfig(syncInput); ApmTraceUtils.addTagsToTrace(generateTraceTags(destinationConfig, jobRoot)); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index b28db1742ec5e..43393e2b03ad9 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -214,7 +214,9 @@ private Map getWorkerMetadata() { WorkerEnvConstants.WORKER_JOB_ID, jobId, WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt), EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()), - EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema())); + EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()), + EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()), + EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces()); } } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index 06d0fd0d59354..8057446fa48b5 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -54,7 +54,9 @@ class AirbyteIntegrationLauncherTest { WorkerEnvConstants.WORKER_JOB_ID, JOB_ID, WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT), EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState()), - EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(new EnvVariableFeatureFlags().autoDetectSchema())); + EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(new EnvVariableFeatureFlags().autoDetectSchema()), + EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(new EnvVariableFeatureFlags().applyFieldSelection()), + EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, new EnvVariableFeatureFlags().fieldSelectionWorkspaces()); private WorkerConfigs workerConfigs; @Mock diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index d6d57ce885c51..2038fd8934590 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -18,6 +18,8 @@ public class EnvVariableFeatureFlags implements FeatureFlags { public static final String NEED_STATE_VALIDATION = "NEED_STATE_VALIDATION"; public static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION"; + public static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES"; + @Override public boolean autoDisablesFailingConnections() { log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS"))); @@ -55,6 +57,11 @@ public boolean applyFieldSelection() { return getEnvOrDefault(APPLY_FIELD_SELECTION, false, Boolean::parseBoolean); } + @Override + public String fieldSelectionWorkspaces() { + return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "", (arg) -> arg); + } + // TODO: refactor in order to use the same method than the ones in EnvConfigs.java public T getEnvOrDefault(final String key, final T defaultValue, final Function parser) { final String value = System.getenv(key); diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java index 3633f9bb4ed7a..aa1858acda1ec 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java @@ -22,6 +22,19 @@ public interface FeatureFlags { boolean needStateValidation(); + /** + * Return true if field selection should be applied. See also fieldSelectionWorkspaces. + * + * @return whether field selection should be applied + */ boolean applyFieldSelection(); + /** + * Get the workspaces allow-listed for field selection. This should take precedence over + * applyFieldSelection. + * + * @return a comma-separated list of workspace ids where field selection should be enabled. + */ + String fieldSelectionWorkspaces(); + } diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java index 1d4d03139e1e6..956d67b9bc4eb 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java @@ -740,6 +740,10 @@ public interface Configs { boolean getAutoDetectSchema(); + boolean getApplyFieldSelection(); + + String getFieldSelectionWorkspaces(); + enum TrackingStrategy { SEGMENT, LOGGING diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index ca175dd8c39fd..c183336f620cd 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -217,6 +217,8 @@ public class EnvConfigs implements Configs { private static final long DEFAULT_MAX_NOTIFY_WORKERS = 5; private static final String DEFAULT_NETWORK = "host"; private static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA"; + private static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION"; + private static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES"; public static final Map> JOB_SHARED_ENVS = Map.of( AIRBYTE_VERSION, (instance) -> instance.getAirbyteVersion().serialize(), @@ -1123,6 +1125,16 @@ public boolean getAutoDetectSchema() { return getEnvOrDefault(AUTO_DETECT_SCHEMA, false); } + @Override + public boolean getApplyFieldSelection() { + return getEnvOrDefault(APPLY_FIELD_SELECTION, false); + } + + @Override + public String getFieldSelectionWorkspaces() { + return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, ""); + } + @Override public int getActivityNumberOfAttempt() { return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5")); diff --git a/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml b/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml index 3bba82085306b..462a8ab1229d4 100644 --- a/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml +++ b/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml @@ -58,3 +58,7 @@ properties: isDestinationCustomConnector: description: determine if the running image of the destination is a custom connector. type: boolean + workspaceId: + description: The id of the workspace associated with the sync + type: string + format: uuid diff --git a/airbyte-config/config-models/src/main/resources/types/JobSyncConfig.yaml b/airbyte-config/config-models/src/main/resources/types/JobSyncConfig.yaml index 1862f17f884c8..652996a9b5c05 100644 --- a/airbyte-config/config-models/src/main/resources/types/JobSyncConfig.yaml +++ b/airbyte-config/config-models/src/main/resources/types/JobSyncConfig.yaml @@ -77,3 +77,7 @@ properties: isDestinationCustomConnector: description: determine if the destination running image is a custom connector. type: boolean + workspaceId: + description: The id of the workspace associated with the sync + type: string + format: uuid diff --git a/airbyte-config/config-models/src/main/resources/types/StandardSyncInput.yaml b/airbyte-config/config-models/src/main/resources/types/StandardSyncInput.yaml index ea84cecce3d49..2e0b943b91044 100644 --- a/airbyte-config/config-models/src/main/resources/types/StandardSyncInput.yaml +++ b/airbyte-config/config-models/src/main/resources/types/StandardSyncInput.yaml @@ -56,3 +56,7 @@ properties: description: optional resource requirements to use in dest container - this is used instead of `resourceRequirements` for the dest container type: object "$ref": ResourceRequirements.yaml + workspaceId: + description: The id of the workspace associated with this sync + type: string + format: uuid diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java index cd5ed8d7694a9..3aac414423023 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java @@ -45,8 +45,11 @@ import io.airbyte.workers.sync.ReplicationLauncherWorker; import java.lang.invoke.MethodHandles; import java.nio.file.Path; +import java.util.HashSet; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,7 +151,7 @@ public Optional runJob() throws Exception { new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())), new AirbyteMessageTracker(), new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)), - metricReporter, featureFlags.applyFieldSelection()); + metricReporter, enableFieldSelection(featureFlags, syncInput.getWorkspaceId())); log.info("Running replication worker..."); final var jobRoot = TemporalUtils.getJobRoot(configs.getWorkspaceRoot(), @@ -165,4 +168,20 @@ private AirbyteStreamFactory getStreamFactory(final Version protocolVersion, fin : new DefaultAirbyteStreamFactory(mdcScope); } + private boolean enableFieldSelection(final FeatureFlags featureFlags, final UUID workspaceId) { + final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces(); + final Set workspaceIds = new HashSet<>(); + for (final String id : workspaceIdsString.split(",")) { + workspaceIds.add(UUID.fromString(id)); + } + for (final UUID workspace : workspaceIds) { + log.info("field selection workspace: {}", workspace); + } + if (workspaceId != null && workspaceIds.contains(workspaceId)) { + return true; + } + + return featureFlags.applyFieldSelection(); + } + } diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java index 408953197783b..e09056525a282 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java @@ -60,7 +60,8 @@ public Optional createSyncJob(final SourceConnection source, final List standardSyncOperations, @Nullable final JsonNode webhookOperationConfigs, final StandardSourceDefinition sourceDefinition, - final StandardDestinationDefinition destinationDefinition) + final StandardDestinationDefinition destinationDefinition, + final UUID workspaceId) throws IOException { // reusing this isn't going to quite work. @@ -96,7 +97,8 @@ public Optional createSyncJob(final SourceConnection source, .withSourceResourceRequirements(mergedSrcResourceReq) .withDestinationResourceRequirements(mergedDstResourceReq) .withIsSourceCustomConnector(sourceDefinition.getCustom()) - .withIsDestinationCustomConnector(destinationDefinition.getCustom()); + .withIsDestinationCustomConnector(destinationDefinition.getCustom()) + .withWorkspaceId(workspaceId); getCurrentConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState); diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobCreator.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobCreator.java index 92d285e699a8e..d3905878d4209 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobCreator.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobCreator.java @@ -16,17 +16,18 @@ import java.io.IOException; import java.util.List; import java.util.Optional; +import java.util.UUID; import javax.annotation.Nullable; public interface JobCreator { /** - * * @param source db model representing where data comes from * @param destination db model representing where data goes * @param standardSync sync options * @param sourceDockerImage docker image to use for the source * @param destinationDockerImage docker image to use for the destination + * @param workspaceId * @return the new job if no other conflicting job was running, otherwise empty * @throws IOException if something wrong happens */ @@ -40,7 +41,8 @@ Optional createSyncJob(SourceConnection source, List standardSyncOperations, @Nullable JsonNode webhookOperationConfigs, StandardSourceDefinition sourceDefinition, - StandardDestinationDefinition destinationDefinition) + StandardDestinationDefinition destinationDefinition, + UUID workspaceId) throws IOException; /** diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/factory/DefaultSyncJobFactory.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/factory/DefaultSyncJobFactory.java index 116114e061cc2..e7a69eb64533e 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/factory/DefaultSyncJobFactory.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/factory/DefaultSyncJobFactory.java @@ -94,7 +94,8 @@ public Long create(final UUID connectionId) { standardSyncOperations, workspace.getWebhookOperationConfigs(), sourceDefinition, - destinationDefinition) + destinationDefinition, + workspace.getWorkspaceId()) .orElseThrow(() -> new IllegalStateException("We shouldn't be trying to create a new sync job if there is one running already.")); } catch (final IOException | JsonValidationException | ConfigNotFoundException e) { diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java index 1804f3226a67c..7311ade09c974 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java @@ -75,6 +75,7 @@ class DefaultJobCreatorTest { private static final StandardSourceDefinition STANDARD_SOURCE_DEFINITION; private static final StandardDestinationDefinition STANDARD_DESTINATION_DEFINITION; private static final long JOB_ID = 12L; + private static final UUID WORKSPACE_ID = UUID.randomUUID(); private JobPersistence jobPersistence; private StatePersistence statePersistence; @@ -190,7 +191,8 @@ void testCreateSyncJob() throws IOException { .withDestinationResourceRequirements(workerResourceRequirements) .withWebhookOperationConfigs(PERSISTED_WEBHOOK_CONFIGS) .withIsSourceCustomConnector(false) - .withIsDestinationCustomConnector(false); + .withIsDestinationCustomConnector(false) + .withWorkspaceId(WORKSPACE_ID); final JobConfig jobConfig = new JobConfig() .withConfigType(JobConfig.ConfigType.SYNC) @@ -210,7 +212,7 @@ void testCreateSyncJob() throws IOException { List.of(STANDARD_SYNC_OPERATION), PERSISTED_WEBHOOK_CONFIGS, STANDARD_SOURCE_DEFINITION, - STANDARD_DESTINATION_DEFINITION).orElseThrow(); + STANDARD_DESTINATION_DEFINITION, WORKSPACE_ID).orElseThrow(); assertEquals(JOB_ID, jobId); } @@ -247,7 +249,7 @@ void testCreateSyncJobEnsureNoQueuing() throws IOException { DESTINATION_PROTOCOL_VERSION, List.of(STANDARD_SYNC_OPERATION), null, - STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION).isEmpty()); + STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, UUID.randomUUID()).isEmpty()); } @Test @@ -262,7 +264,7 @@ void testCreateSyncJobDefaultWorkerResourceReqs() throws IOException { DESTINATION_PROTOCOL_VERSION, List.of(STANDARD_SYNC_OPERATION), null, - STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION); + STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, WORKSPACE_ID); final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig() .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) @@ -280,7 +282,8 @@ void testCreateSyncJobDefaultWorkerResourceReqs() throws IOException { .withSourceResourceRequirements(workerResourceRequirements) .withDestinationResourceRequirements(workerResourceRequirements) .withIsSourceCustomConnector(false) - .withIsDestinationCustomConnector(false); + .withIsDestinationCustomConnector(false) + .withWorkspaceId(WORKSPACE_ID); final JobConfig expectedJobConfig = new JobConfig() .withConfigType(JobConfig.ConfigType.SYNC) @@ -310,7 +313,7 @@ void testCreateSyncJobConnectionResourceReqs() throws IOException { DESTINATION_PROTOCOL_VERSION, List.of(STANDARD_SYNC_OPERATION), null, - STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION); + STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, WORKSPACE_ID); final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig() .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) @@ -328,7 +331,8 @@ void testCreateSyncJobConnectionResourceReqs() throws IOException { .withSourceResourceRequirements(standardSyncResourceRequirements) .withDestinationResourceRequirements(standardSyncResourceRequirements) .withIsSourceCustomConnector(false) - .withIsDestinationCustomConnector(false); + .withIsDestinationCustomConnector(false) + .withWorkspaceId(WORKSPACE_ID); final JobConfig expectedJobConfig = new JobConfig() .withConfigType(JobConfig.ConfigType.SYNC) @@ -364,7 +368,8 @@ void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException { null, new StandardSourceDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withDefault(sourceResourceRequirements)), new StandardDestinationDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withJobSpecific(List.of( - new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements(destResourceRequirements))))); + new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements(destResourceRequirements)))), + WORKSPACE_ID); final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig() .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) @@ -382,7 +387,8 @@ void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException { .withSourceResourceRequirements(sourceResourceRequirements) .withDestinationResourceRequirements(destResourceRequirements) .withIsSourceCustomConnector(false) - .withIsDestinationCustomConnector(false); + .withIsDestinationCustomConnector(false) + .withWorkspaceId(WORKSPACE_ID); final JobConfig expectedJobConfig = new JobConfig() .withConfigType(JobConfig.ConfigType.SYNC) diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/factory/DefaultSyncJobFactoryTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/factory/DefaultSyncJobFactoryTest.java index 120f9e4b984f5..5c4e439e55421 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/factory/DefaultSyncJobFactoryTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/factory/DefaultSyncJobFactoryTest.java @@ -44,6 +44,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo final UUID destinationId = UUID.randomUUID(); final UUID operationId = UUID.randomUUID(); final UUID workspaceWebhookConfigId = UUID.randomUUID(); + final UUID workspaceId = UUID.randomUUID(); final String workspaceWebhookName = "test-webhook-name"; final JsonNode persistedWebhookConfigs = Jsons.deserialize( String.format("{\"webhookConfigs\": [{\"id\": \"%s\", \"name\": \"%s\", \"authToken\": {\"_secret\": \"a-secret_v1\"}}]}", @@ -87,7 +88,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo when( jobCreator.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, srcProtocolVersion, dstDockerImage, dstProtocolVersion, operations, - persistedWebhookConfigs, standardSourceDefinition, standardDestinationDefinition)) + persistedWebhookConfigs, standardSourceDefinition, standardDestinationDefinition, workspaceId)) .thenReturn(Optional.of(jobId)); when(configRepository.getStandardSourceDefinition(sourceDefinitionId)) .thenReturn(standardSourceDefinition); @@ -96,7 +97,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo .thenReturn(standardDestinationDefinition); when(configRepository.getStandardWorkspaceNoSecrets(any(), eq(true))).thenReturn( - new StandardWorkspace().withWebhookOperationConfigs(persistedWebhookConfigs)); + new StandardWorkspace().withWorkspaceId(workspaceId).withWebhookOperationConfigs(persistedWebhookConfigs)); final SyncJobFactory factory = new DefaultSyncJobFactory(true, jobCreator, configRepository, mock(OAuthConfigSupplier.class), workspaceHelper); final long actualJobId = factory.create(connectionId); @@ -105,7 +106,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo verify(jobCreator) .createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, srcProtocolVersion, dstDockerImage, dstProtocolVersion, operations, persistedWebhookConfigs, - standardSourceDefinition, standardDestinationDefinition); + standardSourceDefinition, standardDestinationDefinition, workspaceId); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java index d9259005b3cef..f8841b007414d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java @@ -77,6 +77,8 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig( environmentVariables.put(PUBLISH_METRICS_ENV_VAR, shouldPublishMetrics); environmentVariables.put(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, Boolean.toString(featureFlags.useStreamCapableState())); environmentVariables.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, Boolean.toString(featureFlags.autoDetectSchema())); + environmentVariables.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, Boolean.toString(featureFlags.applyFieldSelection())); + environmentVariables.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces()); environmentVariables.put(JAVA_OPTS_ENV_VAR, containerOrchestratorJavaOpts); if (System.getenv(DD_ENV_ENV_VAR) != null) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 2a550a62eb366..bf73a08c2c6ce 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -79,7 +79,8 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withResourceRequirements(resetConnection.getResourceRequirements()) .withState(resetConnection.getState()) .withIsSourceCustomConnector(resetConnection.getIsSourceCustomConnector()) - .withIsDestinationCustomConnector(resetConnection.getIsDestinationCustomConnector()); + .withIsDestinationCustomConnector(resetConnection.getIsDestinationCustomConnector()) + .withWorkspaceId(resetConnection.getWorkspaceId()); } else { throw new IllegalStateException( String.format("Unexpected config type %s for job %d. The only supported config types for this activity are (%s)", @@ -139,7 +140,8 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withState(config.getState()) .withResourceRequirements(config.getResourceRequirements()) .withSourceResourceRequirements(config.getSourceResourceRequirements()) - .withDestinationResourceRequirements(config.getDestinationResourceRequirements()); + .withDestinationResourceRequirements(config.getDestinationResourceRequirements()) + .withWorkspaceId(config.getWorkspaceId()); return new GeneratedJobInput(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 17a501ff25c93..238fd7da2bc58 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -72,8 +72,10 @@ import jakarta.inject.Singleton; import java.nio.file.Path; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -304,7 +306,7 @@ private CheckedSupplier, Exception> new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())), new AirbyteMessageTracker(), new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)), - metricReporter, false); + metricReporter, enableFieldSelection(featureFlags, syncInput.getWorkspaceId())); }; } @@ -342,4 +344,24 @@ private boolean isResetJob(final String dockerImage) { return WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equalsIgnoreCase(dockerImage); } + private boolean enableFieldSelection(final FeatureFlags featureFlags, final UUID workspaceId) { + final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces(); + final Set workspaceIds = new HashSet<>(); + LOGGER.debug("Field selection enabled for {}", workspaceIdsString); + if (!workspaceIdsString.isEmpty()) { + for (final String id : workspaceIdsString.split(",")) { + try { + workspaceIds.add(UUID.fromString(id)); + } catch (IllegalArgumentException e) { + LOGGER.warn("Malformed workspace id for field selection: {}", id); + } + } + } + if (workspaceId != null && workspaceIds.contains(workspaceId)) { + return true; + } + + return featureFlags.applyFieldSelection(); + } + } diff --git a/docker-compose.yaml b/docker-compose.yaml index 46fbab29492cb..095f67bc94c67 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -108,6 +108,8 @@ services: - AUTO_DETECT_SCHEMA=${AUTO_DETECT_SCHEMA} - USE_STREAM_CAPABLE_STATE=${USE_STREAM_CAPABLE_STATE} - MICRONAUT_ENVIRONMENTS=${WORKERS_MICRONAUT_ENVIRONMENTS} + - APPLY_FIELD_SELECTION=${APPLY_FIELD_SELECTION} + - FIELD_SELECTION_WORKSPACES=${FIELD_SELECTION_WORKSPACES} volumes: - /var/run/docker.sock:/var/run/docker.sock - workspace:${WORKSPACE_ROOT}