Skip to content

Commit

Permalink
Merge branch 'master' into source-amazon-ads-migrate-to-low-code
Browse files Browse the repository at this point in the history
  • Loading branch information
askarpets committed Mar 5, 2024
2 parents 85dae3f + ef98194 commit b278d61
Show file tree
Hide file tree
Showing 698 changed files with 5,914 additions and 44,936 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/format_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
format-check:
# IMPORTANT: This name must match the require check name on the branch protection settings
name: "Check for formatting errors"
runs-on: ubuntu-latest
runs-on: tooling-test-small
steps:
- name: Checkout Airbyte
uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-java-cdk-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ on:
required: false

concurrency:
group: publish-airbyte-cdk
group: publish-java-cdk
cancel-in-progress: false

env:
Expand Down
2 changes: 1 addition & 1 deletion LICENSE_SHORT
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Copyright (c) 2023 Airbyte, Inc., all rights reserved.
Copyright (c) 2024 Airbyte, Inc., all rights reserved.
8 changes: 6 additions & 2 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,13 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.23.9 | 2024-03-01 | [\#35720](https://github.com/airbytehq/airbyte/pull/35720) | various improvements for tests TestDataHolder |
| 0.23.13 | 2024-03-04 | [\#35774](https://github.com/airbytehq/airbyte/pull/35774) | minor changes to the CDK test fixtures. |
| 0.23.12 | 2024-03-01 | [\#35767](https://github.com/airbytehq/airbyte/pull/35767) | introducing a timeout for java tests. |
| 0.23.11 | 2024-03-01 | [\#35313](https://github.com/airbytehq/airbyte/pull/35313) | Preserve timezone offset in CSV writer for destinations |
| 0.23.10 | 2024-03-01 | [\#35303](https://github.com/airbytehq/airbyte/pull/35303) | Migration framework with DestinationState for softReset |
| 0.23.9 | 2024-02-29 | [\#35720](https://github.com/airbytehq/airbyte/pull/35720) | various improvements for tests TestDataHolder |
| 0.23.8 | 2024-02-28 | [\#35529](https://github.com/airbytehq/airbyte/pull/35529) | Refactor on state iterators |
| 0.23.7 | 2024-02-28 | [\#35376](https://github.com/airbytehq/airbyte/pull/35376) | Extract typereduper migrations to separte method |
| 0.23.7 | 2024-02-28 | [\#35376](https://github.com/airbytehq/airbyte/pull/35376) | Extract typereduper migrations to separte method |
| 0.23.6 | 2024-02-26 | [\#35647](https://github.com/airbytehq/airbyte/pull/35647) | Add a getNamespace into TestDataHolder |
| 0.23.5 | 2024-02-26 | [\#35512](https://github.com/airbytehq/airbyte/pull/35512) | Remove @DisplayName from all CDK tests. |
| 0.23.4 | 2024-02-26 | [\#35507](https://github.com/airbytehq/airbyte/pull/35507) | Add more logs into TestDatabase. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.util

import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
Expand All @@ -6,10 +10,10 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
* For streams in [catalog] which do not have a namespace specified, explicitly set their namespace
* to the [defaultNamespace]
*/
fun addDefaultNamespaceToStreams(catalog: ConfiguredAirbyteCatalog, defaultNamespace: String?) {
if (defaultNamespace == null) {
return
}
fun addDefaultNamespaceToStreams(catalog: ConfiguredAirbyteCatalog, defaultNamespace: String?) {
if (defaultNamespace == null) {
return
}
// TODO: This logic exists in all V2 destinations.
// This is sad that if we forget to add this, there will be a null pointer during parseCatalog
for (catalogStream in catalog.streams) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.23.9
version=0.23.13
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,23 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Proxy;
import java.time.Duration;
import java.time.Instant;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.Timeout.ThreadMode;
import org.junit.jupiter.api.extension.DynamicTestInvocationContext;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.InvocationInterceptor;
Expand All @@ -33,9 +41,12 @@
*/
public class LoggingInvocationInterceptor implements InvocationInterceptor {

private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(5);
private static final Logger LOGGER = LoggerFactory.getLogger(LoggingInvocationInterceptor.class);

private static final class LoggingInvocationInterceptorHandler implements InvocationHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(LoggingInvocationInterceptor.class);
private static final Map<Integer, ExecutorService> executorByThread = new ConcurrentHashMap<>();

private static final Pattern methodPattern = Pattern.compile("intercept(.*)Method");

Expand Down Expand Up @@ -65,16 +76,24 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
} else {
logLineSuffix = "execution of unknown intercepted call %s".formatted(methodName);
}
LOGGER.info("Junit starting {}", logLineSuffix);
Instant start = Instant.now();
try {
Instant start = Instant.now();
Object retVal = invocation.proceed();
final Object retVal;
Duration timeout = getTimeout(invocationContext);
if (timeout != null) {
LOGGER.info("Junit starting {} with timeout of {}", logLineSuffix, DurationFormatUtils.formatDurationWords(timeout.toMillis(), true, true));
retVal = Assertions.assertTimeoutPreemptively(timeout, invocation::proceed);
} else {
LOGGER.warn("Junit starting {} with no timeout", logLineSuffix);
retVal = invocation.proceed();
}
long elapsedMs = Duration.between(start, Instant.now()).toMillis();
LOGGER.info("Junit completed {} in {} ms", logLineSuffix, elapsedMs);
LOGGER.info("Junit completed {} in {}", logLineSuffix, DurationFormatUtils.formatDurationWords(elapsedMs, true, true));
return retVal;
} catch (Throwable t) {
long elapsedMs = Duration.between(start, Instant.now()).toMillis();
boolean belowCurrentCall = false;
List<String> stackToDisplay = new LinkedList<String>();
List<String> stackToDisplay = new LinkedList<>();
for (String stackString : ExceptionUtils.getStackFrames(t)) {
if (stackString.startsWith("\tat ")) {
if (!belowCurrentCall && stackString.contains(LoggingInvocationInterceptor.class.getCanonicalName())) {
Expand All @@ -88,11 +107,32 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
}
}
String stackTrace = StringUtils.join(stackToDisplay, "\n ");
LOGGER.warn("Junit exception throw during {}:\n{}", logLineSuffix, stackTrace);
LOGGER.error("Junit exception throw during {} after {}:\n{}", logLineSuffix, DurationFormatUtils.formatDurationWords(elapsedMs, true, true),
stackTrace);
throw t;
}
}

private static Duration getTimeout(ReflectiveInvocationContext<Method> invocationContext) {
Duration timeout = DEFAULT_TIMEOUT;
if (invocationContext.getExecutable()instanceof Method m) {
Timeout timeoutAnnotation = m.getAnnotation(Timeout.class);
if (timeoutAnnotation == null) {
timeoutAnnotation = invocationContext.getTargetClass().getAnnotation(Timeout.class);
}
if (timeoutAnnotation != null) {
if (timeoutAnnotation.threadMode() == ThreadMode.SAME_THREAD) {
return null;
}
timeout = Duration.ofMillis(timeoutAnnotation.unit().toMillis(timeoutAnnotation.value()));
}
}
if (timeout.compareTo(Duration.ofHours(1)) > 0) {
return DEFAULT_TIMEOUT;
}
return timeout;
}

}

private final InvocationInterceptor proxy = (InvocationInterceptor) Proxy.newProxyInstance(
Expand Down Expand Up @@ -145,6 +185,10 @@ public void interceptTestMethod(Invocation<Void> invocation,
ReflectiveInvocationContext<Method> invocationContext,
ExtensionContext extensionContext)
throws Throwable {
if (!Modifier.isPublic(invocationContext.getExecutable().getModifiers())) {
LOGGER.warn("Junit method {}.{} is not declared as public", invocationContext.getExecutable().getDeclaringClass().getCanonicalName(),
invocationContext.getExecutable().getName());
}
proxy.interceptTestMethod(invocation, invocationContext, extensionContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public abstract class ContainerFactory<C extends GenericContainer<?>> {

private record ContainerKey<C extends GenericContainer<?>> (Class<? extends ContainerFactory> clazz,
DockerImageName imageName,
List<? extends NamedContainerModifier<C>> methods) {};
List<? extends NamedContainerModifier<C>> methods) {}

;

private static class ContainerOrException {

Expand Down Expand Up @@ -70,7 +72,7 @@ GenericContainer<?> container() {

}

private final ConcurrentMap<ContainerKey<C>, ContainerOrException> SHARED_CONTAINERS = new ConcurrentHashMap<>();
private static final ConcurrentMap<ContainerKey<?>, ContainerOrException> SHARED_CONTAINERS = new ConcurrentHashMap<>();
private static final AtomicInteger containerId = new AtomicInteger(0);

private final MdcScope.Builder getTestContainerLogMdcBuilder(DockerImageName imageName,
Expand Down Expand Up @@ -112,7 +114,7 @@ public final C shared(String imageName, List<? extends NamedContainerModifier<C>
// Container creation can be exceedingly slow.
// Furthermore, we need to handle exceptions raised during container creation.
ContainerOrException containerOrError = SHARED_CONTAINERS.computeIfAbsent(containerKey,
key -> new ContainerOrException(() -> createAndStartContainer(key.imageName(), key.methods())));
key -> new ContainerOrException(() -> createAndStartContainer(key.imageName(), ((ContainerKey<C>) key).methods())));
// Instead, the container creation (if applicable) is deferred to here.
return (C) containerOrError.container();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public T with(String fmtSql, Object... fmtArgs) {
* object. This typically entails at least a CREATE DATABASE and a CREATE USER. Also Initializes the
* {@link DataSource} and {@link DSLContext} owned by this object.
*/
final public T initialized() {
public T initialized() {
inContainerBootstrapCmd().forEach(this::execInContainer);
this.dataSource = DataSourceFactory.create(
getUserName(),
Expand Down Expand Up @@ -193,7 +193,7 @@ protected void execSQL(final Stream<String> sql) {
try {
getDatabase().query(ctx -> {
sql.forEach(statement -> {
LOGGER.debug("{}", statement);
LOGGER.info("executing SQL statement {}", statement);
ctx.execute(statement);
});
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.integrations.destination.jdbc;

import static io.airbyte.cdk.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
import static io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage;
import static io.airbyte.cdk.integrations.util.ConfiguredCatalogUtilKt.addDefaultNamespaceToStreams;

Expand All @@ -17,7 +18,6 @@
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
Expand All @@ -37,6 +37,7 @@
import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand All @@ -45,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import javax.sql.DataSource;
Expand Down Expand Up @@ -93,7 +95,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
attemptTableOperations(outputSchema, database, namingResolver, sqlOperations, false);
if (TypingAndDedupingFlag.isDestinationV2()) {
final var v2RawSchema = namingResolver.getIdentifier(TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
.orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
attemptTableOperations(v2RawSchema, database, namingResolver, sqlOperations, false);
destinationSpecificTableOperations(database);
}
Expand Down Expand Up @@ -252,7 +254,9 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map<Stri

protected abstract JdbcSqlGenerator getSqlGenerator();

protected abstract JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database);
protected abstract JdbcDestinationHandler<? extends MinimumDestinationState> getDestinationHandler(final String databaseName,
final JdbcDatabase database,
final String rawTableSchema);

/**
* "database" key at root of the config json, for any other variants in config, override this
Expand Down Expand Up @@ -309,21 +313,23 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
*/
private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JdbcDatabase database) {
final JdbcSqlGenerator sqlGenerator = getSqlGenerator();
final ParsedCatalog parsedCatalog = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
Optional<String> rawNamespaceOverride = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE);
final ParsedCatalog parsedCatalog = rawNamespaceOverride
.map(override -> new CatalogParser(sqlGenerator, override))
.orElse(new CatalogParser(sqlGenerator))
.parseCatalog(catalog);
final String databaseName = getDatabaseName(config);
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
final DestinationHandler destinationHandler = getDestinationHandler(databaseName, database);
final DestinationHandler<? extends MinimumDestinationState> destinationHandler =
getDestinationHandler(databaseName, database, rawNamespaceOverride.orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final TyperDeduper typerDeduper;
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
} else {
typerDeduper =
new DefaultTyperDeduper(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
}
return typerDeduper;
}
Expand Down
Loading

0 comments on commit b278d61

Please sign in to comment.