Skip to content

Commit

Permalink
pass workspace id to sync workflow and use it to selectively enable f…
Browse files Browse the repository at this point in the history
…ield selection (#20589)

* pass workspace id to sync workflow and use it to selectively enable field selection

* fix tests around workspace id in job creation

* make sure field selection environment variables get passed through properly

* clean up handling around field selection flags

* debug logging for field selection

* properly handle empty field selection feature flag

* fix pmd

* actually fix pmd
  • Loading branch information
mfsiega-airbyte committed Jan 3, 2023
1 parent 3035dc0 commit 6130a54
Show file tree
Hide file tree
Showing 21 changed files with 138 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public class OrchestratorConstants {
EnvConfigs.STATE_STORAGE_S3_SECRET_ACCESS_KEY,
EnvConfigs.STATE_STORAGE_S3_REGION,
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE,
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA))
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA,
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION,
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES))
.build();

public static final String INIT_FILE_ENV_MAP = "envMap.json";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path
.stream()
.collect(Collectors.toMap(s -> s.getStream().getNamespace() + "." + s.getStream().getName(),
s -> String.format("%s - %s", s.getSyncMode(), s.getDestinationSyncMode()))));
LOGGER.debug("field selection enabled: {}", fieldSelectionEnabled);
final WorkerSourceConfig sourceConfig = WorkerUtils.syncToWorkerSourceConfig(syncInput);

ApmTraceUtils.addTagsToTrace(generateTraceTags(destinationConfig, jobRoot));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ private Map<String, String> getWorkerMetadata() {
WorkerEnvConstants.WORKER_JOB_ID, jobId,
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt),
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()),
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()));
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()),
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()),
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ class AirbyteIntegrationLauncherTest {
WorkerEnvConstants.WORKER_JOB_ID, JOB_ID,
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT),
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState()),
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(new EnvVariableFeatureFlags().autoDetectSchema()));
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(new EnvVariableFeatureFlags().autoDetectSchema()),
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(new EnvVariableFeatureFlags().applyFieldSelection()),
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, new EnvVariableFeatureFlags().fieldSelectionWorkspaces());

private WorkerConfigs workerConfigs;
@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public class EnvVariableFeatureFlags implements FeatureFlags {
public static final String NEED_STATE_VALIDATION = "NEED_STATE_VALIDATION";
public static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION";

public static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES";

@Override
public boolean autoDisablesFailingConnections() {
log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS")));
Expand Down Expand Up @@ -55,6 +57,11 @@ public boolean applyFieldSelection() {
return getEnvOrDefault(APPLY_FIELD_SELECTION, false, Boolean::parseBoolean);
}

@Override
public String fieldSelectionWorkspaces() {
return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "", (arg) -> arg);
}

// TODO: refactor in order to use the same method than the ones in EnvConfigs.java
public <T> T getEnvOrDefault(final String key, final T defaultValue, final Function<String, T> parser) {
final String value = System.getenv(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ public interface FeatureFlags {

boolean needStateValidation();

/**
* Return true if field selection should be applied. See also fieldSelectionWorkspaces.
*
* @return whether field selection should be applied
*/
boolean applyFieldSelection();

/**
* Get the workspaces allow-listed for field selection. This should take precedence over
* applyFieldSelection.
*
* @return a comma-separated list of workspace ids where field selection should be enabled.
*/
String fieldSelectionWorkspaces();

}
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,10 @@ public interface Configs {

boolean getAutoDetectSchema();

boolean getApplyFieldSelection();

String getFieldSelectionWorkspaces();

enum TrackingStrategy {
SEGMENT,
LOGGING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ public class EnvConfigs implements Configs {
private static final long DEFAULT_MAX_NOTIFY_WORKERS = 5;
private static final String DEFAULT_NETWORK = "host";
private static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA";
private static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION";
private static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES";

public static final Map<String, Function<EnvConfigs, String>> JOB_SHARED_ENVS = Map.of(
AIRBYTE_VERSION, (instance) -> instance.getAirbyteVersion().serialize(),
Expand Down Expand Up @@ -1123,6 +1125,16 @@ public boolean getAutoDetectSchema() {
return getEnvOrDefault(AUTO_DETECT_SCHEMA, false);
}

@Override
public boolean getApplyFieldSelection() {
return getEnvOrDefault(APPLY_FIELD_SELECTION, false);
}

@Override
public String getFieldSelectionWorkspaces() {
return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "");
}

@Override
public int getActivityNumberOfAttempt() {
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,7 @@ properties:
isDestinationCustomConnector:
description: determine if the running image of the destination is a custom connector.
type: boolean
workspaceId:
description: The id of the workspace associated with the sync
type: string
format: uuid
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,7 @@ properties:
isDestinationCustomConnector:
description: determine if the destination running image is a custom connector.
type: boolean
workspaceId:
description: The id of the workspace associated with the sync
type: string
format: uuid
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ properties:
description: optional resource requirements to use in dest container - this is used instead of `resourceRequirements` for the dest container
type: object
"$ref": ResourceRequirements.yaml
workspaceId:
description: The id of the workspace associated with this sync
type: string
format: uuid
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@
import io.airbyte.workers.sync.ReplicationLauncherWorker;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -148,7 +151,7 @@ public Optional<String> runJob() throws Exception {
new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())),
new AirbyteMessageTracker(),
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)),
metricReporter, featureFlags.applyFieldSelection());
metricReporter, enableFieldSelection(featureFlags, syncInput.getWorkspaceId()));

log.info("Running replication worker...");
final var jobRoot = TemporalUtils.getJobRoot(configs.getWorkspaceRoot(),
Expand All @@ -165,4 +168,20 @@ private AirbyteStreamFactory getStreamFactory(final Version protocolVersion, fin
: new DefaultAirbyteStreamFactory(mdcScope);
}

private boolean enableFieldSelection(final FeatureFlags featureFlags, final UUID workspaceId) {
final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces();
final Set<UUID> workspaceIds = new HashSet<>();
for (final String id : workspaceIdsString.split(",")) {
workspaceIds.add(UUID.fromString(id));
}
for (final UUID workspace : workspaceIds) {
log.info("field selection workspace: {}", workspace);
}
if (workspaceId != null && workspaceIds.contains(workspaceId)) {
return true;
}

return featureFlags.applyFieldSelection();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public Optional<Long> createSyncJob(final SourceConnection source,
final List<StandardSyncOperation> standardSyncOperations,
@Nullable final JsonNode webhookOperationConfigs,
final StandardSourceDefinition sourceDefinition,
final StandardDestinationDefinition destinationDefinition)
final StandardDestinationDefinition destinationDefinition,
final UUID workspaceId)
throws IOException {
// reusing this isn't going to quite work.

Expand Down Expand Up @@ -96,7 +97,8 @@ public Optional<Long> createSyncJob(final SourceConnection source,
.withSourceResourceRequirements(mergedSrcResourceReq)
.withDestinationResourceRequirements(mergedDstResourceReq)
.withIsSourceCustomConnector(sourceDefinition.getCustom())
.withIsDestinationCustomConnector(destinationDefinition.getCustom());
.withIsDestinationCustomConnector(destinationDefinition.getCustom())
.withWorkspaceId(workspaceId);

getCurrentConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nullable;

public interface JobCreator {

/**
*
* @param source db model representing where data comes from
* @param destination db model representing where data goes
* @param standardSync sync options
* @param sourceDockerImage docker image to use for the source
* @param destinationDockerImage docker image to use for the destination
* @param workspaceId
* @return the new job if no other conflicting job was running, otherwise empty
* @throws IOException if something wrong happens
*/
Expand All @@ -40,7 +41,8 @@ Optional<Long> createSyncJob(SourceConnection source,
List<StandardSyncOperation> standardSyncOperations,
@Nullable JsonNode webhookOperationConfigs,
StandardSourceDefinition sourceDefinition,
StandardDestinationDefinition destinationDefinition)
StandardDestinationDefinition destinationDefinition,
UUID workspaceId)
throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public Long create(final UUID connectionId) {
standardSyncOperations,
workspace.getWebhookOperationConfigs(),
sourceDefinition,
destinationDefinition)
destinationDefinition,
workspace.getWorkspaceId())
.orElseThrow(() -> new IllegalStateException("We shouldn't be trying to create a new sync job if there is one running already."));

} catch (final IOException | JsonValidationException | ConfigNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class DefaultJobCreatorTest {
private static final StandardSourceDefinition STANDARD_SOURCE_DEFINITION;
private static final StandardDestinationDefinition STANDARD_DESTINATION_DEFINITION;
private static final long JOB_ID = 12L;
private static final UUID WORKSPACE_ID = UUID.randomUUID();

private JobPersistence jobPersistence;
private StatePersistence statePersistence;
Expand Down Expand Up @@ -190,7 +191,8 @@ void testCreateSyncJob() throws IOException {
.withDestinationResourceRequirements(workerResourceRequirements)
.withWebhookOperationConfigs(PERSISTED_WEBHOOK_CONFIGS)
.withIsSourceCustomConnector(false)
.withIsDestinationCustomConnector(false);
.withIsDestinationCustomConnector(false)
.withWorkspaceId(WORKSPACE_ID);

final JobConfig jobConfig = new JobConfig()
.withConfigType(JobConfig.ConfigType.SYNC)
Expand All @@ -210,7 +212,7 @@ void testCreateSyncJob() throws IOException {
List.of(STANDARD_SYNC_OPERATION),
PERSISTED_WEBHOOK_CONFIGS,
STANDARD_SOURCE_DEFINITION,
STANDARD_DESTINATION_DEFINITION).orElseThrow();
STANDARD_DESTINATION_DEFINITION, WORKSPACE_ID).orElseThrow();
assertEquals(JOB_ID, jobId);
}

Expand Down Expand Up @@ -247,7 +249,7 @@ void testCreateSyncJobEnsureNoQueuing() throws IOException {
DESTINATION_PROTOCOL_VERSION,
List.of(STANDARD_SYNC_OPERATION),
null,
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION).isEmpty());
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, UUID.randomUUID()).isEmpty());
}

@Test
Expand All @@ -262,7 +264,7 @@ void testCreateSyncJobDefaultWorkerResourceReqs() throws IOException {
DESTINATION_PROTOCOL_VERSION,
List.of(STANDARD_SYNC_OPERATION),
null,
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION);
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, WORKSPACE_ID);

final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig()
.withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition())
Expand All @@ -280,7 +282,8 @@ void testCreateSyncJobDefaultWorkerResourceReqs() throws IOException {
.withSourceResourceRequirements(workerResourceRequirements)
.withDestinationResourceRequirements(workerResourceRequirements)
.withIsSourceCustomConnector(false)
.withIsDestinationCustomConnector(false);
.withIsDestinationCustomConnector(false)
.withWorkspaceId(WORKSPACE_ID);

final JobConfig expectedJobConfig = new JobConfig()
.withConfigType(JobConfig.ConfigType.SYNC)
Expand Down Expand Up @@ -310,7 +313,7 @@ void testCreateSyncJobConnectionResourceReqs() throws IOException {
DESTINATION_PROTOCOL_VERSION,
List.of(STANDARD_SYNC_OPERATION),
null,
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION);
STANDARD_SOURCE_DEFINITION, STANDARD_DESTINATION_DEFINITION, WORKSPACE_ID);

final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig()
.withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition())
Expand All @@ -328,7 +331,8 @@ void testCreateSyncJobConnectionResourceReqs() throws IOException {
.withSourceResourceRequirements(standardSyncResourceRequirements)
.withDestinationResourceRequirements(standardSyncResourceRequirements)
.withIsSourceCustomConnector(false)
.withIsDestinationCustomConnector(false);
.withIsDestinationCustomConnector(false)
.withWorkspaceId(WORKSPACE_ID);

final JobConfig expectedJobConfig = new JobConfig()
.withConfigType(JobConfig.ConfigType.SYNC)
Expand Down Expand Up @@ -364,7 +368,8 @@ void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException {
null,
new StandardSourceDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withDefault(sourceResourceRequirements)),
new StandardDestinationDefinition().withResourceRequirements(new ActorDefinitionResourceRequirements().withJobSpecific(List.of(
new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements(destResourceRequirements)))));
new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements(destResourceRequirements)))),
WORKSPACE_ID);

final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig()
.withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition())
Expand All @@ -382,7 +387,8 @@ void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException {
.withSourceResourceRequirements(sourceResourceRequirements)
.withDestinationResourceRequirements(destResourceRequirements)
.withIsSourceCustomConnector(false)
.withIsDestinationCustomConnector(false);
.withIsDestinationCustomConnector(false)
.withWorkspaceId(WORKSPACE_ID);

final JobConfig expectedJobConfig = new JobConfig()
.withConfigType(JobConfig.ConfigType.SYNC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
final UUID destinationId = UUID.randomUUID();
final UUID operationId = UUID.randomUUID();
final UUID workspaceWebhookConfigId = UUID.randomUUID();
final UUID workspaceId = UUID.randomUUID();
final String workspaceWebhookName = "test-webhook-name";
final JsonNode persistedWebhookConfigs = Jsons.deserialize(
String.format("{\"webhookConfigs\": [{\"id\": \"%s\", \"name\": \"%s\", \"authToken\": {\"_secret\": \"a-secret_v1\"}}]}",
Expand Down Expand Up @@ -87,7 +88,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
when(
jobCreator.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, srcProtocolVersion, dstDockerImage,
dstProtocolVersion, operations,
persistedWebhookConfigs, standardSourceDefinition, standardDestinationDefinition))
persistedWebhookConfigs, standardSourceDefinition, standardDestinationDefinition, workspaceId))
.thenReturn(Optional.of(jobId));
when(configRepository.getStandardSourceDefinition(sourceDefinitionId))
.thenReturn(standardSourceDefinition);
Expand All @@ -96,7 +97,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
.thenReturn(standardDestinationDefinition);

when(configRepository.getStandardWorkspaceNoSecrets(any(), eq(true))).thenReturn(
new StandardWorkspace().withWebhookOperationConfigs(persistedWebhookConfigs));
new StandardWorkspace().withWorkspaceId(workspaceId).withWebhookOperationConfigs(persistedWebhookConfigs));

final SyncJobFactory factory = new DefaultSyncJobFactory(true, jobCreator, configRepository, mock(OAuthConfigSupplier.class), workspaceHelper);
final long actualJobId = factory.create(connectionId);
Expand All @@ -105,7 +106,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
verify(jobCreator)
.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, srcProtocolVersion, dstDockerImage, dstProtocolVersion,
operations, persistedWebhookConfigs,
standardSourceDefinition, standardDestinationDefinition);
standardSourceDefinition, standardDestinationDefinition, workspaceId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig(
environmentVariables.put(PUBLISH_METRICS_ENV_VAR, shouldPublishMetrics);
environmentVariables.put(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, Boolean.toString(featureFlags.useStreamCapableState()));
environmentVariables.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, Boolean.toString(featureFlags.autoDetectSchema()));
environmentVariables.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, Boolean.toString(featureFlags.applyFieldSelection()));
environmentVariables.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces());
environmentVariables.put(JAVA_OPTS_ENV_VAR, containerOrchestratorJavaOpts);

if (System.getenv(DD_ENV_ENV_VAR) != null) {
Expand Down
Loading

0 comments on commit 6130a54

Please sign in to comment.