Skip to content

Commit

Permalink
Discover worker starts to use API to write schema result (#21875)
Browse files Browse the repository at this point in the history
* api changes for writing discover catalog

* api changes

* format

* worker change 1

* change return type of the API to return catalogId

* worker to call api

* typo

* 🎉 Source GoogleSheets -  migrated SAT to strictness level (#21399)

* migrated SAT to strictness level

* fixed expected records

* revert file from another source

* changed extension to txt

* changed extension to txt

* 🐛Destination-Bigquery: Added an explicit error message if sync fails due to a config issue (#21144)

* [19998] Destination-Bigquery: Added an explicit error message in sync fails due to a config issue

* ci-connector-ops: split workflows(#21474)

* CI:  nightly build alpha sources and destinations (#21562)

* Revert "Change main class in strict-encrypt destination and bump versions on both destinations to keep them in sync (#21509)" (#21567)

This reverts commit 1d202d1.

* Fixes webhook updating logic (#21519)

* ci_credentials: disable tooling test run by tox (#21580)

* disable tox

* rename steps

* revert changes on experimental workflow

* do not install tox

* Revert "CI:  nightly build alpha sources and destinations (#21562)" (#21589)

This reverts commit 61f88f3.

* Security update of default docker images (#21407)

Because there is a lot of CVEs in those releases.

Co-authored-by: Topher Lubaway <asimplechris@gmail.com>

* 📝 add docs for how to add normalization (#21563)

* add docs

* add schema link

* update based on feedback

* 🪟 🚦  E2E tests: clean up matchers (#20887)

* improve serviceTypeDropdownOption selector

* add test ids to PathPopout component(s)

* add unique id's to table dropdowns

* extend submitButtonClick to support optional click options

* update dropdown(pathPopout) matchers

* add test-id to Overlay component

* remove redundant function brackets

* revert changes onSubmit button click

* fix dropDown overlay issue

* move all duplicated intercepters to beforeEach

* add test id's to Connections, Sources and Destinations tables

* add table helper functions

* update source page actions

* intercepter fixes

* update createTestConnection function with optional replication settings

* remove extra Connection name check

* replace "cypress-postgres" with "pg-promise" npm package

* update cypress config

* Revert "update createTestConnection function with optional replication settings"

This reverts commit 8e47c78.

* Revert "remove extra Connection name check"

This reverts commit dfb19c7.

* replace openSourceDestinationFromGrid with specific selector

* replace openSourceDestinationFromGrid with specific selector

* turn on test

* add test-id's

* fix selectors

* update test

* update test snapshots

* fix lost data-testid after resolve merge conflicts

* remove extra check

* move clickOnCellInTable helper to common.ts file

* remove empty line and comments

* fix dropdownType

* replace partial string check with exact

* extract interceptors and waiters to separate file

* fix selector for predefined PK

* fix selector

* add comment regarding dropdown

* 🪟 🎨 [Free connectors] Update modal copy (#21600)

* move start/end time options out of optional block (#21541)

* lingering fix

* reflecting api changes

* test fix

* worker to call api to do discover work

* recovered deleted html

* self review

* more converters refactor

* fix connector test

* fix test

* fix

* fix integration test

* add unit test for converter

* static fix

* api client needs to have a timeout in case request does not get responded

---------

Co-authored-by: midavadim <midavadim@yahoo.com>
Co-authored-by: Eugene <etsybaev@gmail.com>
Co-authored-by: Augustin <augustin@airbyte.io>
Co-authored-by: Greg Solovyev <grishick@users.noreply.github.com>
Co-authored-by: Yatsuk Bogdan <yatsukbogdan@gmail.com>
Co-authored-by: Hervé Commowick <github@herve.commowick.fr>
Co-authored-by: Topher Lubaway <asimplechris@gmail.com>
Co-authored-by: Pedro S. Lopez <pedroslopez@me.com>
Co-authored-by: Vladimir <volodymyr.s.petrov@globallogic.com>
Co-authored-by: Joey Marshment-Howell <josephkmh@users.noreply.github.com>
Co-authored-by: Lake Mossman <lake@airbyte.io>
  • Loading branch information
12 people committed Feb 6, 2023
1 parent 0cef7b0 commit 36698ce
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.security.interfaces.RSAPrivateKey;
import java.time.Duration;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -53,6 +54,8 @@ public ApiClient apiClient(
.setPort(parsePort(airbyteApiHost))
.setBasePath("/api")
.setHttpClientBuilder(HttpClient.newBuilder().version(Version.HTTP_1_1))
.setConnectTimeout(Duration.ofSeconds(30))
.setReadTimeout(Duration.ofSeconds(30))
.setRequestInterceptor(builder -> {
builder.setHeader("User-Agent", "WorkerApp");
// internalApiAuthToken is in BeanProvider because we want to create a new token each
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@

import com.fasterxml.jackson.databind.JsonNode;
import datadog.trace.api.Trace;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.model.generated.DiscoverCatalogResult;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaWriteRequestBody;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteControlConnectorConfigMessage;
Expand All @@ -26,6 +28,7 @@
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.CatalogClientConverters;
import io.airbyte.workers.helper.ConnectorConfigUpdater;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
Expand All @@ -43,29 +46,28 @@
public class DefaultDiscoverCatalogWorker implements DiscoverCatalogWorker {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultDiscoverCatalogWorker.class);

private final ConfigRepository configRepository;
private static final String WRITE_DISCOVER_CATALOG_LOGS_TAG = "call to write discover schema result";

private final IntegrationLauncher integrationLauncher;
private final AirbyteStreamFactory streamFactory;
private final ConnectorConfigUpdater connectorConfigUpdater;

private final AirbyteApiClient airbyteApiClient;
private volatile Process process;

public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository,
public DefaultDiscoverCatalogWorker(final AirbyteApiClient airbyteApiClient,
final IntegrationLauncher integrationLauncher,
final ConnectorConfigUpdater connectorConfigUpdater,
final AirbyteStreamFactory streamFactory) {
this.configRepository = configRepository;
this.airbyteApiClient = airbyteApiClient;
this.integrationLauncher = integrationLauncher;
this.streamFactory = streamFactory;
this.connectorConfigUpdater = connectorConfigUpdater;
}

public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository,
public DefaultDiscoverCatalogWorker(final AirbyteApiClient airbyteApiClient,
final IntegrationLauncher integrationLauncher,
final ConnectorConfigUpdater connectorConfigUpdater) {
this(configRepository, integrationLauncher, connectorConfigUpdater, new DefaultAirbyteStreamFactory());
this(airbyteApiClient, integrationLauncher, connectorConfigUpdater, new DefaultAirbyteStreamFactory());
}

@Trace(operationName = WORKER_OPERATION_NAME)
Expand Down Expand Up @@ -108,14 +110,11 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
}

if (catalog.isPresent()) {
final UUID catalogId =
configRepository.writeActorCatalogFetchEvent(catalog.get(),
// NOTE: sourceId is marked required in the OpenAPI config but the code generator doesn't enforce
// it, so we check again here.
discoverSchemaInput.getSourceId() == null ? null : UUID.fromString(discoverSchemaInput.getSourceId()),
discoverSchemaInput.getConnectorVersion(),
discoverSchemaInput.getConfigHash());
jobOutput.setDiscoverCatalogId(catalogId);
final DiscoverCatalogResult result =
AirbyteApiClient.retryWithJitter(() -> airbyteApiClient.getSourceApi()
.writeDiscoverCatalogResult(buildSourceDiscoverSchemaWriteRequestBody(discoverSchemaInput, catalog.get())),
WRITE_DISCOVER_CATALOG_LOGS_TAG);
jobOutput.setDiscoverCatalogId(result.getCatalogId());
} else if (failureReason.isEmpty()) {
WorkerUtils.throwWorkerException("Integration failed to output a catalog struct and did not output a failure reason", process);
}
Expand All @@ -129,6 +128,19 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
}
}

private SourceDiscoverSchemaWriteRequestBody buildSourceDiscoverSchemaWriteRequestBody(final StandardDiscoverCatalogInput discoverSchemaInput,
final AirbyteCatalog catalog) {
return new SourceDiscoverSchemaWriteRequestBody().catalog(
CatalogClientConverters.toAirbyteCatalogClientApi(catalog)).sourceId(
// NOTE: sourceId is marked required in the OpenAPI config but the code generator doesn't enforce
// it, so we check again here.
discoverSchemaInput.getSourceId() == null ? null : UUID.fromString(discoverSchemaInput.getSourceId()))
.connectorVersion(
discoverSchemaInput.getConnectorVersion())
.configurationHash(
discoverSchemaInput.getConfigHash());
}

private Map<String, Object> generateTraceTags(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) {
final Map<String, Object> tags = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.helper;

import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.text.Names;
import io.airbyte.protocol.models.AirbyteStream;
import java.util.stream.Collectors;

/**
* Utilities to convert Catalog protocol to Catalog API client. This class was similar to existing
* logic in CatalogConverter.java; But code can't be shared because the protocol model is
* essentially converted to two different api models. Thus, if we need to change logic on either
* place we have to take care of the other one too.
*/
public class CatalogClientConverters {

/**
* Converts a protocol AirbyteCatalog to an OpenAPI client versioned AirbyteCatalog.
*/
public static io.airbyte.api.client.model.generated.AirbyteCatalog toAirbyteCatalogClientApi(
final io.airbyte.protocol.models.AirbyteCatalog catalog) {
return new io.airbyte.api.client.model.generated.AirbyteCatalog()
.streams(catalog.getStreams()
.stream()
.map(stream -> toAirbyteStreamClientApi(stream))
.map(s -> new io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration()
.stream(s)
.config(generateDefaultConfiguration(s)))
.collect(Collectors.toList()));
}

private static io.airbyte.api.client.model.generated.AirbyteStreamConfiguration generateDefaultConfiguration(
final io.airbyte.api.client.model.generated.AirbyteStream stream) {
final io.airbyte.api.client.model.generated.AirbyteStreamConfiguration result =
new io.airbyte.api.client.model.generated.AirbyteStreamConfiguration()
.aliasName(Names.toAlphanumericAndUnderscore(stream.getName()))
.cursorField(stream.getDefaultCursorField())
.destinationSyncMode(io.airbyte.api.client.model.generated.DestinationSyncMode.APPEND)
.primaryKey(stream.getSourceDefinedPrimaryKey())
.selected(true);
if (stream.getSupportedSyncModes().size() > 0) {
result.setSyncMode(Enums.convertTo(stream.getSupportedSyncModes().get(0),
io.airbyte.api.client.model.generated.SyncMode.class));
} else {
result.setSyncMode(io.airbyte.api.client.model.generated.SyncMode.INCREMENTAL);
}
return result;
}

private static io.airbyte.api.client.model.generated.AirbyteStream toAirbyteStreamClientApi(
final AirbyteStream stream) {
return new io.airbyte.api.client.model.generated.AirbyteStream()
.name(stream.getName())
.jsonSchema(stream.getJsonSchema())
.supportedSyncModes(Enums.convertListTo(stream.getSupportedSyncModes(),
io.airbyte.api.client.model.generated.SyncMode.class))
.sourceDefinedCursor(stream.getSourceDefinedCursor())
.defaultCursorField(stream.getDefaultCursorField())
.sourceDefinedPrimaryKey(stream.getSourceDefinedPrimaryKey())
.namespace(stream.getNamespace());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.helper;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.google.common.collect.Lists;
import io.airbyte.commons.text.Names;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Test;

class CatalogClientConvertersTest {

public static final String ID_FIELD_NAME = "id";
private static final String STREAM_NAME = "users-data";
private static final AirbyteStream STREAM = new AirbyteStream()
.withName(STREAM_NAME)
.withJsonSchema(
CatalogHelpers.fieldsToJsonSchema(Field.of(ID_FIELD_NAME, JsonSchemaType.STRING)))
.withDefaultCursorField(Lists.newArrayList(ID_FIELD_NAME))
.withSourceDefinedCursor(false)
.withSourceDefinedPrimaryKey(Collections.emptyList())
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL));

private static final io.airbyte.api.client.model.generated.AirbyteStream CLIENT_STREAM =
new io.airbyte.api.client.model.generated.AirbyteStream()
.name(STREAM_NAME)
.jsonSchema(CatalogHelpers.fieldsToJsonSchema(Field.of(ID_FIELD_NAME, JsonSchemaType.STRING)))
.defaultCursorField(Lists.newArrayList(ID_FIELD_NAME))
.sourceDefinedCursor(false)
.sourceDefinedPrimaryKey(Collections.emptyList())
.supportedSyncModes(List.of(io.airbyte.api.client.model.generated.SyncMode.FULL_REFRESH,
io.airbyte.api.client.model.generated.SyncMode.INCREMENTAL));
private static final io.airbyte.api.client.model.generated.AirbyteStreamConfiguration CLIENT_DEFAULT_STREAM_CONFIGURATION =
new io.airbyte.api.client.model.generated.AirbyteStreamConfiguration()
.syncMode(io.airbyte.api.client.model.generated.SyncMode.FULL_REFRESH)
.cursorField(Lists.newArrayList(ID_FIELD_NAME))
.destinationSyncMode(io.airbyte.api.client.model.generated.DestinationSyncMode.APPEND)
.primaryKey(Collections.emptyList())
.aliasName(Names.toAlphanumericAndUnderscore(STREAM_NAME))
.selected(true);

private static final AirbyteCatalog BASIC_MODEL_CATALOG = new AirbyteCatalog().withStreams(
Lists.newArrayList(STREAM));

private static final io.airbyte.api.client.model.generated.AirbyteCatalog EXPECTED_CLIENT_CATALOG =
new io.airbyte.api.client.model.generated.AirbyteCatalog()
.streams(Lists.newArrayList(
new io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration()
.stream(CLIENT_STREAM)
.config(CLIENT_DEFAULT_STREAM_CONFIGURATION)));

@Test
void testConvertToClientAPI() {
assertEquals(EXPECTED_CLIENT_CATALOG,
CatalogClientConverters.toAirbyteCatalogClientApi(BASIC_MODEL_CATALOG));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.jsoup.Jsoup;

dependencies {
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-api')
implementation project(':airbyte-commons-worker')
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-config:config-persistence')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaWriteRequestBody;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.EnvConfigs;
Expand All @@ -21,7 +24,6 @@
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.State;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
Expand Down Expand Up @@ -112,7 +114,10 @@ public abstract class AbstractSourceConnectorTest {

private WorkerConfigs workerConfigs;

private ConfigRepository mConfigRepository;
private AirbyteApiClient mAirbyteApiClient;

private SourceApi mSourceApi;

private ConnectorConfigUpdater mConnectorConfigUpdater;

// This has to be using the protocol version of the platform in order to capture the arg
Expand All @@ -123,6 +128,9 @@ protected AirbyteCatalog getLastPersistedCatalog() {
return convertProtocolObject(lastPersistedCatalog.getValue(), AirbyteCatalog.class);
}

private final ArgumentCaptor<SourceDiscoverSchemaWriteRequestBody> discoverWriteRequest =
ArgumentCaptor.forClass(SourceDiscoverSchemaWriteRequestBody.class);

@BeforeEach
public void setUpInternal() throws Exception {
final Path testDir = Path.of("/tmp/airbyte_tests/");
Expand All @@ -133,7 +141,9 @@ public void setUpInternal() throws Exception {
environment = new TestDestinationEnv(localRoot);
setupEnvironment(environment);
workerConfigs = new WorkerConfigs(new EnvConfigs());
mConfigRepository = mock(ConfigRepository.class);
mAirbyteApiClient = mock(AirbyteApiClient.class);
mSourceApi = mock(SourceApi.class);
when(mAirbyteApiClient.getSourceApi()).thenReturn(mSourceApi);
mConnectorConfigUpdater = mock(ConnectorConfigUpdater.class);
processFactory = new DockerProcessFactory(
workerConfigs,
Expand Down Expand Up @@ -182,13 +192,13 @@ protected String runCheckAndGetStatusAsString(final JsonNode config) throws Exce

protected UUID runDiscover() throws Exception {
final UUID toReturn = new DefaultDiscoverCatalogWorker(
mConfigRepository,
mAirbyteApiClient,
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), null, false,
new EnvVariableFeatureFlags()),
mConnectorConfigUpdater)
.run(new StandardDiscoverCatalogInput().withSourceId(SOURCE_ID.toString()).withConnectionConfiguration(getConfig()), jobRoot)
.getDiscoverCatalogId();
verify(mConfigRepository).writeActorCatalogFetchEvent(lastPersistedCatalog.capture(), any(), any(), any());
verify(mSourceApi).writeDiscoverCatalogResult(discoverWriteRequest.capture());
return toReturn;
}

Expand Down
1 change: 1 addition & 0 deletions airbyte-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ dependencies {

testImplementation project(':airbyte-commons-docker')
testImplementation project(':airbyte-test-utils')
testImplementation project(':airbyte-api')

integrationTestJavaImplementation project(':airbyte-workers')
integrationTestJavaImplementation libs.bundles.micronaut.test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private CheckedSupplier<Worker<StandardDiscoverCatalogInput, ConnectorJobOutput>
Optional.empty());
final ConnectorConfigUpdater connectorConfigUpdater =
new ConnectorConfigUpdater(airbyteApiClient.getSourceApi(), airbyteApiClient.getDestinationApi());
return new DefaultDiscoverCatalogWorker(configRepository, integrationLauncher, connectorConfigUpdater, streamFactory);
return new DefaultDiscoverCatalogWorker(airbyteApiClient, integrationLauncher, connectorConfigUpdater, streamFactory);
};
}

Expand Down

0 comments on commit 36698ce

Please sign in to comment.