Skip to content

Commit

Permalink
Merge branch 'master' into leti/remove-oauth-position-ff
Browse files Browse the repository at this point in the history
* master:
  Discover worker starts to use API to write schema result (#21875)
  馃獰 馃帀  Connector Builder Landing Page (#22122)
  Fix pnpm cache path (#22418)
  Add additional shorter setup guides (#22318)
  Source Amazon Ads: fix reports stream records primary keys (#21677)
  Connector acceptance test: Fix discovered catalog caching for different configs (#22301)
  馃獰馃悰 Make modal scrollable (#21973)
  only compute diff if the schema discovery actually succeeded (#22377)
  Source Klaviyo: fix schema (#22071)
  馃獰 馃敡 Switch to `pnpm` for package managing (#22053)
  Source Sentry: turn on default availability strategy (#22303)
  Source freshdesk: deduplicate table names (#22164)
  • Loading branch information
letiescanciano committed Feb 6, 2023
2 parents 9597444 + 36698ce commit e022bcb
Show file tree
Hide file tree
Showing 113 changed files with 17,947 additions and 58,452 deletions.
9 changes: 9 additions & 0 deletions .github/actions/cache-build-artifacts/action.yml
Expand Up @@ -31,6 +31,15 @@ runs:
restore-keys: |
${{ inputs.cache-key }}-npm-${{ runner.os }}-
- name: pnpm Caching
uses: actions/cache@v3
with:
path: |
~/.local/share/pnpm/store
key: ${{ inputs.cache-key }}-pnpm-${{ runner.os }}-${{ hashFiles('**/pnpm-lock.yaml') }}
restore-keys: |
${{ inputs.cache-key }}-pnpm-${{ runner.os }}-
# this intentionally does not use restore-keys so we don't mess with gradle caching
- name: Gradle and Python Caching
uses: actions/cache@v3
Expand Down
Expand Up @@ -126,6 +126,7 @@ definitions:
- stream_slicers
properties:
type:
type: string
enum: [CartesianProductStreamSlicer]
stream_slicers:
type: array
Expand Down Expand Up @@ -746,6 +747,7 @@ definitions:
- type
properties:
type:
type: string
enum: [JsonFileSchemaLoader, JsonSchema] # TODO As part of Beta, remove JsonSchema and update connectors to use JsonFileSchemaLoader
file_path:
type: string
Expand Down
Expand Up @@ -269,7 +269,7 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source
isCustomConnector);
final SourceDiscoverSchemaRead discoveredSchema = retrieveDiscoveredSchema(persistedCatalogId, sourceDef);

if (discoverSchemaRequestBody.getConnectionId() != null) {
if (persistedCatalogId.isSuccess() && discoverSchemaRequestBody.getConnectionId() != null) {
// modify discoveredSchema object to add CatalogDiff, containsBreakingChange, and connectionStatus
generateCatalogDiffsAndDisableConnectionsIfNeeded(discoveredSchema, discoverSchemaRequestBody);
}
Expand Down
Expand Up @@ -977,6 +977,40 @@ void testDiscoverSchemaForSourceMultipleConnectionsFeatureFlagOn() throws IOExce
assertEquals(ConnectionStatus.INACTIVE, connectionUpdateValues.get(2).getStatus());
}

@Test
void testDiscoverSchemaFromSourceIdWithConnectionUpdateNonSuccessResponse() throws IOException, JsonValidationException, ConfigNotFoundException {
final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID());
final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId())
.connectionId(UUID.randomUUID());

// Mock the source definition.
when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId()))
.thenReturn(new StandardSourceDefinition()
.withDockerRepository(SOURCE_DOCKER_REPO)
.withDockerImageTag(SOURCE_DOCKER_TAG)
.withProtocolVersion(SOURCE_PROTOCOL_VERSION)
.withSourceDefinitionId(source.getSourceDefinitionId()));
// Mock the source itself.
when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source);
// Mock the Discover job results.
final SynchronousResponse<UUID> discoverResponse = (SynchronousResponse<UUID>) jobResponse;
final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class);
when(discoverResponse.isSuccess()).thenReturn(false);
when(discoverResponse.getMetadata()).thenReturn(metadata);
when(metadata.isSucceeded()).thenReturn(false);
when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION),
false))
.thenReturn(discoverResponse);

final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request);

assertNull(actual.getCatalog());
assertNotNull(actual.getJobInfo());
assertFalse(actual.getJobInfo().getSucceeded());
verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION),
false);
}

@Test
void testDiscoverSchemaForSourceFromSourceCreate() throws JsonValidationException, IOException, ConfigNotFoundException {
final SourceConnection source = new SourceConnection()
Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.security.interfaces.RSAPrivateKey;
import java.time.Duration;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -53,6 +54,8 @@ public ApiClient apiClient(
.setPort(parsePort(airbyteApiHost))
.setBasePath("/api")
.setHttpClientBuilder(HttpClient.newBuilder().version(Version.HTTP_1_1))
.setConnectTimeout(Duration.ofSeconds(30))
.setReadTimeout(Duration.ofSeconds(30))
.setRequestInterceptor(builder -> {
builder.setHeader("User-Agent", "WorkerApp");
// internalApiAuthToken is in BeanProvider because we want to create a new token each
Expand Down
Expand Up @@ -11,13 +11,15 @@

import com.fasterxml.jackson.databind.JsonNode;
import datadog.trace.api.Trace;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.model.generated.DiscoverCatalogResult;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaWriteRequestBody;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteControlConnectorConfigMessage;
Expand All @@ -26,6 +28,7 @@
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.CatalogClientConverters;
import io.airbyte.workers.helper.ConnectorConfigUpdater;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
Expand All @@ -43,29 +46,28 @@
public class DefaultDiscoverCatalogWorker implements DiscoverCatalogWorker {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultDiscoverCatalogWorker.class);

private final ConfigRepository configRepository;
private static final String WRITE_DISCOVER_CATALOG_LOGS_TAG = "call to write discover schema result";

private final IntegrationLauncher integrationLauncher;
private final AirbyteStreamFactory streamFactory;
private final ConnectorConfigUpdater connectorConfigUpdater;

private final AirbyteApiClient airbyteApiClient;
private volatile Process process;

public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository,
public DefaultDiscoverCatalogWorker(final AirbyteApiClient airbyteApiClient,
final IntegrationLauncher integrationLauncher,
final ConnectorConfigUpdater connectorConfigUpdater,
final AirbyteStreamFactory streamFactory) {
this.configRepository = configRepository;
this.airbyteApiClient = airbyteApiClient;
this.integrationLauncher = integrationLauncher;
this.streamFactory = streamFactory;
this.connectorConfigUpdater = connectorConfigUpdater;
}

public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository,
public DefaultDiscoverCatalogWorker(final AirbyteApiClient airbyteApiClient,
final IntegrationLauncher integrationLauncher,
final ConnectorConfigUpdater connectorConfigUpdater) {
this(configRepository, integrationLauncher, connectorConfigUpdater, new DefaultAirbyteStreamFactory());
this(airbyteApiClient, integrationLauncher, connectorConfigUpdater, new DefaultAirbyteStreamFactory());
}

@Trace(operationName = WORKER_OPERATION_NAME)
Expand Down Expand Up @@ -108,14 +110,11 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
}

if (catalog.isPresent()) {
final UUID catalogId =
configRepository.writeActorCatalogFetchEvent(catalog.get(),
// NOTE: sourceId is marked required in the OpenAPI config but the code generator doesn't enforce
// it, so we check again here.
discoverSchemaInput.getSourceId() == null ? null : UUID.fromString(discoverSchemaInput.getSourceId()),
discoverSchemaInput.getConnectorVersion(),
discoverSchemaInput.getConfigHash());
jobOutput.setDiscoverCatalogId(catalogId);
final DiscoverCatalogResult result =
AirbyteApiClient.retryWithJitter(() -> airbyteApiClient.getSourceApi()
.writeDiscoverCatalogResult(buildSourceDiscoverSchemaWriteRequestBody(discoverSchemaInput, catalog.get())),
WRITE_DISCOVER_CATALOG_LOGS_TAG);
jobOutput.setDiscoverCatalogId(result.getCatalogId());
} else if (failureReason.isEmpty()) {
WorkerUtils.throwWorkerException("Integration failed to output a catalog struct and did not output a failure reason", process);
}
Expand All @@ -129,6 +128,19 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
}
}

private SourceDiscoverSchemaWriteRequestBody buildSourceDiscoverSchemaWriteRequestBody(final StandardDiscoverCatalogInput discoverSchemaInput,
final AirbyteCatalog catalog) {
return new SourceDiscoverSchemaWriteRequestBody().catalog(
CatalogClientConverters.toAirbyteCatalogClientApi(catalog)).sourceId(
// NOTE: sourceId is marked required in the OpenAPI config but the code generator doesn't enforce
// it, so we check again here.
discoverSchemaInput.getSourceId() == null ? null : UUID.fromString(discoverSchemaInput.getSourceId()))
.connectorVersion(
discoverSchemaInput.getConnectorVersion())
.configurationHash(
discoverSchemaInput.getConfigHash());
}

private Map<String, Object> generateTraceTags(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) {
final Map<String, Object> tags = new HashMap<>();

Expand Down
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.helper;

import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.text.Names;
import io.airbyte.protocol.models.AirbyteStream;
import java.util.stream.Collectors;

/**
* Utilities to convert Catalog protocol to Catalog API client. This class was similar to existing
* logic in CatalogConverter.java; But code can't be shared because the protocol model is
* essentially converted to two different api models. Thus, if we need to change logic on either
* place we have to take care of the other one too.
*/
public class CatalogClientConverters {

/**
* Converts a protocol AirbyteCatalog to an OpenAPI client versioned AirbyteCatalog.
*/
public static io.airbyte.api.client.model.generated.AirbyteCatalog toAirbyteCatalogClientApi(
final io.airbyte.protocol.models.AirbyteCatalog catalog) {
return new io.airbyte.api.client.model.generated.AirbyteCatalog()
.streams(catalog.getStreams()
.stream()
.map(stream -> toAirbyteStreamClientApi(stream))
.map(s -> new io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration()
.stream(s)
.config(generateDefaultConfiguration(s)))
.collect(Collectors.toList()));
}

private static io.airbyte.api.client.model.generated.AirbyteStreamConfiguration generateDefaultConfiguration(
final io.airbyte.api.client.model.generated.AirbyteStream stream) {
final io.airbyte.api.client.model.generated.AirbyteStreamConfiguration result =
new io.airbyte.api.client.model.generated.AirbyteStreamConfiguration()
.aliasName(Names.toAlphanumericAndUnderscore(stream.getName()))
.cursorField(stream.getDefaultCursorField())
.destinationSyncMode(io.airbyte.api.client.model.generated.DestinationSyncMode.APPEND)
.primaryKey(stream.getSourceDefinedPrimaryKey())
.selected(true);
if (stream.getSupportedSyncModes().size() > 0) {
result.setSyncMode(Enums.convertTo(stream.getSupportedSyncModes().get(0),
io.airbyte.api.client.model.generated.SyncMode.class));
} else {
result.setSyncMode(io.airbyte.api.client.model.generated.SyncMode.INCREMENTAL);
}
return result;
}

private static io.airbyte.api.client.model.generated.AirbyteStream toAirbyteStreamClientApi(
final AirbyteStream stream) {
return new io.airbyte.api.client.model.generated.AirbyteStream()
.name(stream.getName())
.jsonSchema(stream.getJsonSchema())
.supportedSyncModes(Enums.convertListTo(stream.getSupportedSyncModes(),
io.airbyte.api.client.model.generated.SyncMode.class))
.sourceDefinedCursor(stream.getSourceDefinedCursor())
.defaultCursorField(stream.getDefaultCursorField())
.sourceDefinedPrimaryKey(stream.getSourceDefinedPrimaryKey())
.namespace(stream.getNamespace());
}

}
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.helper;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.google.common.collect.Lists;
import io.airbyte.commons.text.Names;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Test;

class CatalogClientConvertersTest {

public static final String ID_FIELD_NAME = "id";
private static final String STREAM_NAME = "users-data";
private static final AirbyteStream STREAM = new AirbyteStream()
.withName(STREAM_NAME)
.withJsonSchema(
CatalogHelpers.fieldsToJsonSchema(Field.of(ID_FIELD_NAME, JsonSchemaType.STRING)))
.withDefaultCursorField(Lists.newArrayList(ID_FIELD_NAME))
.withSourceDefinedCursor(false)
.withSourceDefinedPrimaryKey(Collections.emptyList())
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL));

private static final io.airbyte.api.client.model.generated.AirbyteStream CLIENT_STREAM =
new io.airbyte.api.client.model.generated.AirbyteStream()
.name(STREAM_NAME)
.jsonSchema(CatalogHelpers.fieldsToJsonSchema(Field.of(ID_FIELD_NAME, JsonSchemaType.STRING)))
.defaultCursorField(Lists.newArrayList(ID_FIELD_NAME))
.sourceDefinedCursor(false)
.sourceDefinedPrimaryKey(Collections.emptyList())
.supportedSyncModes(List.of(io.airbyte.api.client.model.generated.SyncMode.FULL_REFRESH,
io.airbyte.api.client.model.generated.SyncMode.INCREMENTAL));
private static final io.airbyte.api.client.model.generated.AirbyteStreamConfiguration CLIENT_DEFAULT_STREAM_CONFIGURATION =
new io.airbyte.api.client.model.generated.AirbyteStreamConfiguration()
.syncMode(io.airbyte.api.client.model.generated.SyncMode.FULL_REFRESH)
.cursorField(Lists.newArrayList(ID_FIELD_NAME))
.destinationSyncMode(io.airbyte.api.client.model.generated.DestinationSyncMode.APPEND)
.primaryKey(Collections.emptyList())
.aliasName(Names.toAlphanumericAndUnderscore(STREAM_NAME))
.selected(true);

private static final AirbyteCatalog BASIC_MODEL_CATALOG = new AirbyteCatalog().withStreams(
Lists.newArrayList(STREAM));

private static final io.airbyte.api.client.model.generated.AirbyteCatalog EXPECTED_CLIENT_CATALOG =
new io.airbyte.api.client.model.generated.AirbyteCatalog()
.streams(Lists.newArrayList(
new io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration()
.stream(CLIENT_STREAM)
.config(CLIENT_DEFAULT_STREAM_CONFIGURATION)));

@Test
void testConvertToClientAPI() {
assertEquals(EXPECTED_CLIENT_CATALOG,
CatalogClientConverters.toAirbyteCatalogClientApi(BASIC_MODEL_CATALOG));
}

}
Expand Up @@ -61,7 +61,7 @@
- name: Amazon Ads
sourceDefinitionId: c6b0a29e-1da9-4512-9002-7bfd0cba2246
dockerRepository: airbyte/source-amazon-ads
dockerImageTag: 0.1.29
dockerImageTag: 1.0.0
documentationUrl: https://docs.airbyte.com/integrations/sources/amazon-ads
icon: amazonads.svg
sourceType: api
Expand Down Expand Up @@ -560,7 +560,7 @@
- name: Freshdesk
sourceDefinitionId: ec4b9503-13cb-48ab-a4ab-6ade4be46567
dockerRepository: airbyte/source-freshdesk
dockerImageTag: 2.0.1
dockerImageTag: 3.0.0
documentationUrl: https://docs.airbyte.com/integrations/sources/freshdesk
icon: freshdesk.svg
sourceType: api
Expand Down Expand Up @@ -900,7 +900,7 @@
- name: Klaviyo
sourceDefinitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde
dockerRepository: airbyte/source-klaviyo
dockerImageTag: 0.1.11
dockerImageTag: 0.1.12
documentationUrl: https://docs.airbyte.com/integrations/sources/klaviyo
icon: klaviyo.svg
sourceType: api
Expand Down Expand Up @@ -2051,7 +2051,7 @@
- sourceDefinitionId: cdaf146a-9b75-49fd-9dd2-9d64a0bb4781
name: Sentry
dockerRepository: airbyte/source-sentry
dockerImageTag: 0.1.10
dockerImageTag: 0.1.11
documentationUrl: https://docs.airbyte.com/integrations/sources/sentry
icon: sentry.svg
sourceType: api
Expand Down

0 comments on commit e022bcb

Please sign in to comment.