From 44a85a987b763d537204e33dd5481f0a8a0ae09e Mon Sep 17 00:00:00 2001 From: "Pedro S. Lopez" Date: Thu, 9 Feb 2023 11:00:57 -0800 Subject: [PATCH] fix: inject oauth params when generating sync input (#22635) * fix: inject oauth params when generating sync inpt * make it final * rm oauth injection from job creator factory * keep it * keep it 2 * test that we're actually using the oauth values * fmt --- .../activities/GenerateInputActivityImpl.java | 19 +++++++-- .../activities/GenerateInputActivityTest.java | 41 ++++++++++++++----- 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index a70607fef72bd..d206ef694f4c2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -8,6 +8,7 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; +import com.fasterxml.jackson.databind.JsonNode; import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.api.client.generated.AttemptApi; @@ -39,6 +40,7 @@ import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.persistence.job.JobPersistence; +import io.airbyte.persistence.job.factory.OAuthConfigSupplier; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.Job; import io.airbyte.persistence.job.models.JobRunConfig; @@ -63,6 +65,7 @@ public class GenerateInputActivityImpl implements GenerateInputActivity { private final AttemptApi attemptApi; private final StateApi stateApi; private final FeatureFlags featureFlags; + private final OAuthConfigSupplier oAuthConfigSupplier; private static final Logger LOGGER = LoggerFactory.getLogger(GenerateInputActivity.class); @@ -70,12 +73,14 @@ public GenerateInputActivityImpl(final JobPersistence jobPersistence, final ConfigRepository configRepository, final StateApi stateApi, final AttemptApi attemptApi, - final FeatureFlags featureFlags) { + final FeatureFlags featureFlags, + final OAuthConfigSupplier oAuthConfigSupplier) { this.jobPersistence = jobPersistence; this.configRepository = configRepository; this.stateApi = stateApi; this.attemptApi = attemptApi; this.featureFlags = featureFlags; + this.oAuthConfigSupplier = oAuthConfigSupplier; } private Optional getCurrentConnectionState(final UUID connectionId) { @@ -122,7 +127,11 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { if (ConfigType.SYNC.equals(jobConfigType)) { config = job.getConfig().getSync(); final SourceConnection source = configRepository.getSourceConnection(standardSync.getSourceId()); - attemptSyncConfig.setSourceConfiguration(source.getConfiguration()); + final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters( + source.getSourceDefinitionId(), + source.getWorkspaceId(), + source.getConfiguration()); + attemptSyncConfig.setSourceConfiguration(sourceConfiguration); } else if (ConfigType.RESET_CONNECTION.equals(jobConfigType)) { final JobResetConnectionConfig resetConnection = job.getConfig().getResetConnection(); final ResetSourceConfiguration resetSourceConfiguration = resetConnection.getResetSourceConfiguration(); @@ -156,7 +165,11 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt); final DestinationConnection destination = configRepository.getDestinationConnection(standardSync.getDestinationId()); - attemptSyncConfig.setDestinationConfiguration(destination.getConfiguration()); + final JsonNode destinationConfiguration = oAuthConfigSupplier.injectDestinationOAuthParameters( + destination.getDestinationDefinitionId(), + destination.getWorkspaceId(), + destination.getConfiguration()); + attemptSyncConfig.setDestinationConfiguration(destinationConfiguration); final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromSource(standardSync.getSourceId()); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityTest.java index e5d35152e13d8..fe31d26d590b3 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityTest.java @@ -35,6 +35,7 @@ import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.persistence.job.JobPersistence; +import io.airbyte.persistence.job.factory.OAuthConfigSupplier; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.Job; import io.airbyte.persistence.job.models.JobRunConfig; @@ -58,15 +59,21 @@ class GenerateInputActivityTest { static private JobPersistence jobPersistence; static private ConfigRepository configRepository; static private GenerateInputActivityImpl generateInputActivity; + static private OAuthConfigSupplier oAuthConfigSupplier; static private Job job; static private final JsonNode SOURCE_CONFIGURATION = Jsons.jsonNode(Map.of("source_key", "source_value")); + static private final JsonNode SOURCE_CONFIG_WITH_OAUTH = Jsons.jsonNode(Map.of("source_key", "source_value", "oauth", "oauth_value")); static private final JsonNode DESTINATION_CONFIGURATION = Jsons.jsonNode(Map.of("destination_key", "destination_value")); + static private final JsonNode DESTINATION_CONFIG_WITH_OAUTH = + Jsons.jsonNode(Map.of("destination_key", "destination_value", "oauth", "oauth_value")); static private final State STATE = new State().withState(Jsons.jsonNode(Map.of("state_key", "state_value"))); + static private final UUID WORKSPACE_ID = UUID.randomUUID(); static private final long JOB_ID = 1; static private final int ATTEMPT_ID = 1; static private final UUID SOURCE_ID = UUID.randomUUID(); + static private final UUID DESTINATION_DEFINITION_ID = UUID.randomUUID(); static private final UUID DESTINATION_ID = UUID.randomUUID(); static private final UUID CONNECTION_ID = UUID.randomUUID(); @@ -75,24 +82,26 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio final StateApi stateApi = mock(StateApi.class); final FeatureFlags featureFlags = mock(FeatureFlags.class); + oAuthConfigSupplier = mock(OAuthConfigSupplier.class); attemptApi = mock(AttemptApi.class); jobPersistence = mock(JobPersistence.class); configRepository = mock(ConfigRepository.class); - generateInputActivity = new GenerateInputActivityImpl(jobPersistence, configRepository, stateApi, attemptApi, featureFlags); + generateInputActivity = new GenerateInputActivityImpl(jobPersistence, configRepository, stateApi, attemptApi, featureFlags, oAuthConfigSupplier); job = mock(Job.class); when(jobPersistence.getJob(JOB_ID)).thenReturn(job); - final UUID destinationDefinitionId = UUID.randomUUID(); - final DestinationConnection destinationConnection = new DestinationConnection() .withDestinationId(DESTINATION_ID) - .withDestinationDefinitionId(destinationDefinitionId) + .withWorkspaceId(WORKSPACE_ID) + .withDestinationDefinitionId(DESTINATION_DEFINITION_ID) .withConfiguration(DESTINATION_CONFIGURATION); when(configRepository.getDestinationConnection(DESTINATION_ID)).thenReturn(destinationConnection); - when(configRepository.getStandardDestinationDefinition(destinationDefinitionId)).thenReturn(mock(StandardDestinationDefinition.class)); + when(configRepository.getStandardDestinationDefinition(DESTINATION_DEFINITION_ID)).thenReturn(mock(StandardDestinationDefinition.class)); when(configRepository.getSourceDefinitionFromSource(SOURCE_ID)).thenReturn(mock(StandardSourceDefinition.class)); + when(oAuthConfigSupplier.injectDestinationOAuthParameters(DESTINATION_DEFINITION_ID, WORKSPACE_ID, DESTINATION_CONFIGURATION)) + .thenReturn(DESTINATION_CONFIG_WITH_OAUTH); final StandardSync standardSync = new StandardSync() .withSourceId(SOURCE_ID) @@ -109,10 +118,15 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio void testGetSyncWorkflowInput() throws JsonValidationException, ConfigNotFoundException, IOException, ApiException { final SyncInput syncInput = new SyncInput(ATTEMPT_ID, JOB_ID); + final UUID sourceDefinitionId = UUID.randomUUID(); final SourceConnection sourceConnection = new SourceConnection() .withSourceId(SOURCE_ID) + .withSourceDefinitionId(sourceDefinitionId) + .withWorkspaceId(WORKSPACE_ID) .withConfiguration(SOURCE_CONFIGURATION); when(configRepository.getSourceConnection(SOURCE_ID)).thenReturn(sourceConnection); + when(oAuthConfigSupplier.injectSourceOAuthParameters(sourceDefinitionId, WORKSPACE_ID, SOURCE_CONFIGURATION)) + .thenReturn(SOURCE_CONFIG_WITH_OAUTH); final JobSyncConfig jobSyncConfig = new JobSyncConfig() .withWorkspaceId(UUID.randomUUID()) @@ -131,8 +145,8 @@ void testGetSyncWorkflowInput() throws JsonValidationException, ConfigNotFoundEx .withWorkspaceId(jobSyncConfig.getWorkspaceId()) .withSourceId(SOURCE_ID) .withDestinationId(DESTINATION_ID) - .withSourceConfiguration(SOURCE_CONFIGURATION) - .withDestinationConfiguration(DESTINATION_CONFIGURATION) + .withSourceConfiguration(SOURCE_CONFIG_WITH_OAUTH) + .withDestinationConfiguration(DESTINATION_CONFIG_WITH_OAUTH) .withState(STATE) .withCatalog(jobSyncConfig.getConfiguredAirbyteCatalog()) .withWorkspaceId(jobSyncConfig.getWorkspaceId()); @@ -161,10 +175,13 @@ void testGetSyncWorkflowInput() throws JsonValidationException, ConfigNotFoundEx assertEquals(expectedGeneratedJobInput, generatedJobInput); final AttemptSyncConfig expectedAttemptSyncConfig = new AttemptSyncConfig() - .withSourceConfiguration(SOURCE_CONFIGURATION) - .withDestinationConfiguration(DESTINATION_CONFIGURATION) + .withSourceConfiguration(SOURCE_CONFIG_WITH_OAUTH) + .withDestinationConfiguration(DESTINATION_CONFIG_WITH_OAUTH) .withState(STATE); + verify(oAuthConfigSupplier).injectSourceOAuthParameters(sourceDefinitionId, WORKSPACE_ID, SOURCE_CONFIGURATION); + verify(oAuthConfigSupplier).injectDestinationOAuthParameters(DESTINATION_DEFINITION_ID, WORKSPACE_ID, DESTINATION_CONFIGURATION); + verify(attemptApi).saveSyncConfig(new SaveAttemptSyncConfigRequestBody() .jobId(JOB_ID) .attemptNumber(ATTEMPT_ID) @@ -192,7 +209,7 @@ void testGetResetSyncWorkflowInput() throws IOException, ApiException { .withSourceId(SOURCE_ID) .withDestinationId(DESTINATION_ID) .withSourceConfiguration(Jsons.emptyObject()) - .withDestinationConfiguration(DESTINATION_CONFIGURATION) + .withDestinationConfiguration(DESTINATION_CONFIG_WITH_OAUTH) .withState(STATE) .withCatalog(jobResetConfig.getConfiguredAirbyteCatalog()) .withWorkspaceId(jobResetConfig.getWorkspaceId()); @@ -222,9 +239,11 @@ void testGetResetSyncWorkflowInput() throws IOException, ApiException { final AttemptSyncConfig expectedAttemptSyncConfig = new AttemptSyncConfig() .withSourceConfiguration(Jsons.emptyObject()) - .withDestinationConfiguration(DESTINATION_CONFIGURATION) + .withDestinationConfiguration(DESTINATION_CONFIG_WITH_OAUTH) .withState(STATE); + verify(oAuthConfigSupplier).injectDestinationOAuthParameters(DESTINATION_DEFINITION_ID, WORKSPACE_ID, DESTINATION_CONFIGURATION); + verify(attemptApi).saveSyncConfig(new SaveAttemptSyncConfigRequestBody() .jobId(JOB_ID) .attemptNumber(ATTEMPT_ID)