Skip to content

Commit

Permalink
OSS changes to support acceptance test on data plane (#20853)
Browse files Browse the repository at this point in the history
* addlog

* fix applicaion.yml

* remove logging

* var name for boolean

* test setup

* test

* more fix for testing

* self review

* remove unrelated changes

* remove unwanted cdk changes

* more clean ups
  • Loading branch information
xiaohansong committed Jan 3, 2023
1 parent b776a2d commit 4eca4a4
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.airbyte.api.client.model.generated.DestinationIdRequestBody;
import io.airbyte.api.client.model.generated.DestinationRead;
import io.airbyte.api.client.model.generated.DestinationSyncMode;
import io.airbyte.api.client.model.generated.Geography;
import io.airbyte.api.client.model.generated.JobConfigType;
import io.airbyte.api.client.model.generated.JobDebugInfoRead;
import io.airbyte.api.client.model.generated.JobIdRequestBody;
Expand Down Expand Up @@ -490,6 +491,18 @@ public ConnectionRead createConnection(final String name,
final ConnectionScheduleType scheduleType,
final ConnectionScheduleData scheduleData)
throws ApiException {
return createConnectionWithGeography(name, sourceId, destinationId, operationIds, catalog, scheduleType, scheduleData, Geography.AUTO);
}

public ConnectionRead createConnectionWithGeography(final String name,
final UUID sourceId,
final UUID destinationId,
final List<UUID> operationIds,
final AirbyteCatalog catalog,
final ConnectionScheduleType scheduleType,
final ConnectionScheduleData scheduleData,
final Geography geography)
throws ApiException {
final ConnectionRead connection = apiClient.getConnectionApi().createConnection(
new ConnectionCreate()
.status(ConnectionStatus.ACTIVE)
Expand All @@ -502,7 +515,8 @@ public ConnectionRead createConnection(final String name,
.name(name)
.namespaceDefinition(NamespaceDefinitionType.CUSTOMFORMAT)
.namespaceFormat(OUTPUT_NAMESPACE)
.prefix(OUTPUT_STREAM_PREFIX));
.prefix(OUTPUT_STREAM_PREFIX)
.geography(geography));
connectionIds.add(connection.getConnectionId());
return connection;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,12 @@ public HttpClient httpClient() {

@Singleton
@Named("internalApiScheme")
public String internalApiScheme(final Environment environment) {
public String internalApiScheme(@Value("${airbyte.acceptance.test.enabled}") final boolean isInTestMode, final Environment environment) {
// control plane workers communicate with the Airbyte API within their internal network, so https
// isn't needed
if (isInTestMode) {
return "http";
}
return environment.getActiveNames().contains(WorkerMode.CONTROL_PLANE) ? "http" : "https";
}

Expand All @@ -97,8 +100,9 @@ public String internalApiAuthToken(
@Value("${airbyte.control.plane.auth-endpoint}") final String controlPlaneAuthEndpoint,
@Value("${airbyte.data.plane.service-account.email}") final String dataPlaneServiceAccountEmail,
@Value("${airbyte.data.plane.service-account.credentials-path}") final String dataPlaneServiceAccountCredentialsPath,
@Value("${airbyte.acceptance.test.enabled}") final boolean isInTestMode,
final Environment environment) {
if (environment.getActiveNames().contains(WorkerMode.CONTROL_PLANE)) {
if (isInTestMode || environment.getActiveNames().contains(WorkerMode.CONTROL_PLANE)) {
// control plane workers communicate with the Airbyte API within their internal network, so a signed
// JWT isn't needed
return airbyteApiAuthHeaderValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.config.persistence.split_secrets.GoogleSecretManagerPersistence;
import io.airbyte.config.persistence.split_secrets.LocalTestingSecretPersistence;
import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator;
import io.airbyte.config.persistence.split_secrets.RealSecretsHydrator;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
Expand Down Expand Up @@ -68,6 +69,15 @@ public SecretPersistence vaultSecretPersistence(@Value("${airbyte.secret.store.v
}

@Singleton
@Requires(property = "airbyte.acceptance.test.enabled",
value = "true")
public SecretsHydrator noOpSecretsHydrator() {
return new NoOpSecretsHydrator();
}

@Singleton
@Requires(property = "airbyte.acceptance.test.enabled",
value = "false")
public SecretsHydrator secretsHydrator(@Named("secretPersistence") final SecretPersistence secretPersistence) {
return new RealSecretsHydrator(secretPersistence);
}
Expand Down
3 changes: 3 additions & 0 deletions airbyte-workers/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ airbyte:
max-attempts: ${ACTIVITY_MAX_ATTEMPT:5}
max-delay: ${ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS:600}
max-timeout: ${ACTIVITY_MAX_TIMEOUT_SECOND:120}
acceptance:
test:
enabled: ${ACCEPTANCE_TEST_ENABLED:false}
cloud:
storage:
logs:
Expand Down

0 comments on commit 4eca4a4

Please sign in to comment.