Skip to content

Commit

Permalink
implementation?
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Feb 20, 2024
1 parent 271b36c commit fa52885
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand Down Expand Up @@ -250,7 +251,7 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map<Stri

protected abstract JdbcSqlGenerator getSqlGenerator();

protected abstract JdbcDestinationHandler<?> getDestinationHandler(final String databaseName, final JdbcDatabase database);
protected abstract JdbcDestinationHandler<? extends MinimumDestinationState> getDestinationHandler(final String databaseName, final JdbcDatabase database);

/**
* "database" key at root of the config json, for any other variants in config, override this
Expand Down Expand Up @@ -294,7 +295,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final String databaseName = getDatabaseName(config);
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
final DestinationHandler<?> destinationHandler = getDestinationHandler(databaseName, database);
final DestinationHandler<? extends MinimumDestinationState> destinationHandler = getDestinationHandler(databaseName, database);
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final TyperDeduper typerDeduper;
if (disableTypeDedupe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,34 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.stream.Collectors.toList
import java.util.concurrent.ExecutorService
import java.util.stream.Collectors.toMap

class DV2OperationUtils {
companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(DV2OperationUtils::class.java)

@JvmStatic
fun <DestinationState> executeRawTableMigrations(
initialStateFutures: List<DestinationInitialState<DestinationState>>,
executorService: ExecutorService,
initialStates: List<DestinationInitialState<DestinationState>>,
sqlGenerator: SqlGenerator,
destinationHandler: DestinationHandler<DestinationState>,
v1V2Migrator: DestinationV1V2Migrator,
v2TableMigrator: V2TableMigrator,
migrations: List<Migration<DestinationState>>
): List<Optional<Exception>> {
val migrationFutures: List<CompletableFuture<Optional<Exception>>> = initialStateFutures.stream()
): Map<StreamId, Migration.MigrationResult<DestinationState>> {
data class MigrationResultWithException(
val streamId: StreamId,
val state: DestinationState,
val softReset: Boolean,
val exception: Optional<Exception>
)

// Execute migrations for each stream in parallel
val migrationFutures: List<CompletableFuture<MigrationResultWithException>> = initialStates
.map { initialState ->
CompletableFuture.supplyAsync {
CompletableFuture.supplyAsync({
try {
var currentState = initialState
// Migrate the Raw Tables if this is the first v2 sync after a v1 sync
Expand All @@ -40,20 +50,42 @@ class DV2OperationUtils {
softReset = softReset or migrationResult.softReset
}
}
return@supplyAsync Optional.empty<Exception>()
return@supplyAsync MigrationResultWithException(
initialState.streamConfig.id(),
currentState.destinationState,
softReset,
Optional.empty<Exception>())
} catch (e: Exception) {
// Catch exception + extract as Optional because we're invoking this class from java
// and it's just easier this way.
// TODO just... do kotlin things instead
LOGGER.error("Exception occurred while preparing tables for stream " + initialState.streamConfig.id.originalName, e)
return@supplyAsync Optional.of<Exception>(e)
// If we fail to execute any migration, we don't trigger any state change or soft reset.
// The next sync will reattempt ALL migrations.
// This is OK, because migrations will check if they are necessary before running.
// We _could_ return MigrationResultWithException(currentState, softReset, Optional.of<Exception>(e))
// but then we'd need logic to actually commit the updated state + trigger soft resets
// after triggering an exception, and that's a lot of complexity.
return@supplyAsync MigrationResultWithException(
initialState.streamConfig.id(),
initialState.destinationState,
false,
Optional.of<Exception>(e))
}
}
}.collect(toList())
}, executorService)
}

// Check if any of the migrations threw an exception
val exceptionFutures: List<CompletableFuture<Optional<Exception>>> = migrationFutures.map { future -> future.thenApply { it.exception } }
CompletableFuture.allOf(*exceptionFutures.toTypedArray()).join()
FutureUtils.reduceExceptions(exceptionFutures, "The following exceptions were thrown attempting to handle raw table migrations:\n")

CompletableFuture.allOf(*migrationFutures.toTypedArray()).join()
FutureUtils.reduceExceptions(migrationFutures, "The following exceptions were thrown attempting to handle raw table migrations:\n")
return migrationFutures.map(CompletableFuture<Optional<Exception>>::get)
// If all migrations were successful, return the updated states + softResets needed.
return migrationFutures.stream()
.map { it.get() }
.collect(toMap(
{ it.streamId },
{ Migration.MigrationResult(it.state, it.softReset) }))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@
package io.airbyte.integrations.base.destination.typing_deduping;

import static io.airbyte.cdk.integrations.base.IntegrationRunner.TYPE_AND_DEDUPE_THREAD_NAME;
import static io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.returnOrLogAndThrowFirst;
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.countOfTypingDedupingThreads;
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.reduceExceptions;
import static java.util.Collections.singleton;
import static java.util.stream.Collectors.toMap;

import io.airbyte.cdk.integrations.destination.StreamSyncSummary;
import io.airbyte.commons.concurrency.CompletableFutures;
import io.airbyte.commons.functional.Either;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.HashSet;
Expand All @@ -23,7 +21,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -51,7 +48,7 @@
* Note that #prepareTables() initializes some internal state. The other methods will throw an
* exception if that method was not called.
*/
public class DefaultTyperDeduper<DestinationState> implements TyperDeduper {
public class DefaultTyperDeduper<DestinationState extends MinimumDestinationState<DestinationState>> implements TyperDeduper {

private static final Logger LOGGER = LoggerFactory.getLogger(TyperDeduper.class);

Expand Down Expand Up @@ -80,11 +77,8 @@ public class DefaultTyperDeduper<DestinationState> implements TyperDeduper {
private final Map<StreamId, Lock> internalTdLocks;

private final ExecutorService executorService;

public record TableSetupResult<DestinationState>(
StreamId streamId,
DestinationState updatedState,
Optional<Exception> error) {}
private Map<StreamId, Migration.MigrationResult<DestinationState>> migrationResults;
private List<DestinationInitialState<DestinationState>> initialStates;

public DefaultTyperDeduper(final SqlGenerator sqlGenerator,
final DestinationHandler<DestinationState> destinationHandler,
Expand Down Expand Up @@ -123,13 +117,28 @@ public void prepareSchemasAndRawTables() throws Exception {
// Ideally, we'd create just airbyte_internal here, and defer creating the final table schemas
// until prepareFinalTables... but it doesn't really matter.
DV2OperationUtils.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog);
DV2OperationUtils.executeRawTableMigrations(
destinationHandler.gatherInitialState(parsedCatalog.streams()),
initialStates = destinationHandler.gatherInitialState(parsedCatalog.streams());
migrationResults = DV2OperationUtils.executeRawTableMigrations(

Check failure on line 121 in airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java

View workflow job for this annotation

GitHub Actions / Gradle Check

[Task :airbyte-cdk:java:airbyte-cdk:dependencies:test] method executeRawTableMigrations in class DV2OperationUtils cannot be applied to given types; migrationResults = DV2OperationUtils.executeRawTableMigrations( ^ required: ExecutorService,List<DestinationInitialState<DestinationState#1>>,SqlGenerator,DestinationHandler<DestinationState#1>,DestinationV1V2Migrator,V2TableMigrator,List<? extends Migration<DestinationState#1>>
initialStates,
sqlGenerator,
destinationHandler,
v1V2Migrator,
v2TableMigrator,
migrations);

// Commit our destination states immediately.
// Technically, migrations aren't done until we execute the soft reset.
// However, our state contains a needsSoftReset flag, so we can commit that we already executed the migration
// and even if we fail to run the soft reset in this sync, future syncs will see the soft reset flag
// and finish it for us.
destinationHandler.commitDestinationStates(migrationResults.entrySet().stream().collect(toMap(
Map.Entry::getKey,
e -> {
Migration.MigrationResult<DestinationState> migrationResult = e.getValue();
DestinationState state = migrationResult.getUpdatedDestinationState();
return state.withSoftReset(state.needsSoftReset() || migrationResult.getSoftReset());
}
)));
}

@Override
Expand All @@ -140,26 +149,30 @@ public void prepareFinalTables() throws Exception {
overwriteStreamsWithTmpTable = ConcurrentHashMap.newKeySet();
LOGGER.info("Preparing tables");

final List<DestinationInitialState<DestinationState>> initialStates = destinationHandler.gatherInitialState(parsedCatalog.streams());
final List<Either<? extends Exception, TableSetupResult<DestinationState>>> prepareTablesFutureResult = CompletableFutures.allOf(
initialStates.stream().map(this::prepareTablesFuture).toList()).toCompletableFuture().join();

Map<StreamId, DestinationState> updatedStates = prepareTablesFutureResult.stream()
// TODO handle errors (i.e. getLeft)
.collect(toMap(s -> s.getRight().streamId, s -> s.getRight().updatedState));
destinationHandler.commitDestinationStates(updatedStates);

returnOrLogAndThrowFirst("The following exceptions were thrown attempting to prepare tables:\n", prepareTablesFutureResult);
final List<CompletableFuture<Optional<Exception>>> prepareTableFutures = initialStates.stream()
.map(initialState -> prepareTablesFuture(
initialState,
migrationResults.get(initialState.streamConfig().id()).getSoftReset())
).toList();
CompletableFuture.allOf(prepareTableFutures.toArray(new CompletableFuture[0])).join();
FutureUtils.reduceExceptions(prepareTableFutures, "The following exceptions were thrown attempting to prepare tables:\n");

destinationHandler.commitDestinationStates(migrationResults.entrySet().stream().collect(toMap(
Map.Entry::getKey,
// If we get here, then we've executed all soft resets.
e -> e.getValue().getUpdatedDestinationState().withSoftReset(false)
)));
}

private CompletionStage<TableSetupResult<DestinationState>> prepareTablesFuture(final DestinationInitialState<DestinationState> initialState) {
private CompletableFuture<Optional<Exception>> prepareTablesFuture(
final DestinationInitialState<DestinationState> initialState,
final boolean migrationTriggeredSoftReset) {
// For each stream, make sure that its corresponding final table exists.
// Also, for OVERWRITE streams, decide if we're writing directly to the final table, or into an
// _airbyte_tmp table.
return CompletableFuture.supplyAsync(() -> {
final var stream = initialState.streamConfig();
try {
boolean softReset = false;
if (initialState.isFinalTablePresent()) {
LOGGER.info("Final Table exists for stream {}", stream.id().finalName());
// The table already exists. Decide whether we're writing to it directly, or using a tmp table.
Expand All @@ -175,22 +188,18 @@ private CompletionStage<TableSetupResult<DestinationState>> prepareTablesFuture(
LOGGER.info("Final Table for stream {} is empty and matches the expected v2 format, writing to table directly",
stream.id().finalName());
}

} else if (initialState.isSchemaMismatch()) {
// We're loading data directly into the existing table. Make sure it has the right schema.
softReset = true;
} else if (initialState.isSchemaMismatch() || initialState.destinationState().needsSoftReset() || migrationTriggeredSoftReset) {
// We're loading data directly into the existing table.
// Make sure it has the right schema.
// Also, if a raw table migration wants us to do a soft reset, do that here.
TypeAndDedupeTransaction.executeSoftReset(sqlGenerator, destinationHandler, stream);
}
} else {
LOGGER.info("Final Table does not exist for stream {}, creating.", stream.id().finalName());
// The table doesn't exist. Create it. Don't force.
destinationHandler.execute(sqlGenerator.createTable(stream, NO_SUFFIX, false));
}

if (softReset) {
// TODO also fetch soft reset from migration results
TypeAndDedupeTransaction.executeSoftReset(sqlGenerator, destinationHandler, stream);
}

initialRawTableStateByStream.put(stream.id(), initialState.initialRawTableState());

streamsWithSuccessfulSetup.add(Pair.of(stream.id().originalNamespace(), stream.id().originalName()));
Expand All @@ -204,10 +213,10 @@ private CompletionStage<TableSetupResult<DestinationState>> prepareTablesFuture(
// immediately acquire the lock.
internalTdLocks.put(stream.id(), new ReentrantLock());

return new TableSetupResult<>(stream.id(), initialState.destinationState(), Optional.empty());
return Optional.empty();
} catch (final Exception e) {
LOGGER.error("Exception occurred while preparing tables for stream " + stream.id().originalName(), e);
return new TableSetupResult<>(stream.id(), initialState.destinationState(), Optional.of(e));
return Optional.of(e);
}
}, this.executorService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@

import static io.airbyte.cdk.integrations.base.IntegrationRunner.TYPE_AND_DEDUPE_THREAD_NAME;
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.countOfTypingDedupingThreads;
import static java.util.stream.Collectors.toMap;

import io.airbyte.cdk.integrations.destination.StreamSyncSummary;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -25,17 +28,17 @@
* json->string migrations in the raw tables.
*/
@Slf4j
public class NoOpTyperDeduperWithV1V2Migrations implements TyperDeduper {
public class NoOpTyperDeduperWithV1V2Migrations<DestinationState> implements TyperDeduper {

private final DestinationV1V2Migrator v1V2Migrator;
private final V2TableMigrator v2TableMigrator;
private final ExecutorService executorService;
private final ParsedCatalog parsedCatalog;
private final SqlGenerator sqlGenerator;
private final DestinationHandler<?> destinationHandler;
private final DestinationHandler<DestinationState> destinationHandler;

public NoOpTyperDeduperWithV1V2Migrations(final SqlGenerator sqlGenerator,
final DestinationHandler<?> destinationHandler,
final DestinationHandler<DestinationState> destinationHandler,
final ParsedCatalog parsedCatalog,
final DestinationV1V2Migrator v1V2Migrator,
final V2TableMigrator v2TableMigrator,
Expand All @@ -50,10 +53,23 @@ public NoOpTyperDeduperWithV1V2Migrations(final SqlGenerator sqlGenerator,
}

@Override
public void prepareSchemasAndRawTables() {
public void prepareSchemasAndRawTables() throws Exception {
DV2OperationUtils.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog);
// TODO
DV2OperationUtils.executeRawTableMigrations(null, sqlGenerator, destinationHandler, v1V2Migrator, v2TableMigrator, Collections.emptyList());
List<DestinationInitialState<DestinationState>> initialStates = destinationHandler.gatherInitialState(parsedCatalog.streams());
Map<StreamId, Migration.MigrationResult<DestinationState>> migrationResults = DV2OperationUtils.executeRawTableMigrations(

Check failure on line 59 in airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java

View workflow job for this annotation

GitHub Actions / Gradle Check

[Task :airbyte-cdk:java:airbyte-cdk:dependencies:test] method executeRawTableMigrations in class DV2OperationUtils cannot be applied to given types; Map<StreamId, Migration.MigrationResult<DestinationState>> migrationResults = DV2OperationUtils.executeRawTableMigrations( ^ required: ExecutorService,List<DestinationInitialState<DestinationState#1>>,SqlGenerator,DestinationHandler<DestinationState#1>,DestinationV1V2Migrator,V2TableMigrator,List<? extends Migration<DestinationState#1>>

Check failure on line 59 in airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java

View workflow job for this annotation

GitHub Actions / Gradle Check

[Task :airbyte-cdk:java:airbyte-cdk:dependencies:test] incompatible types: cannot infer type-variable(s) DestinationState#1 Map<StreamId, Migration.MigrationResult<DestinationState>> migrationResults = DV2OperationUtils.executeRawTableMigrations( ^ (argument mismatch; List<DestinationInitialState<DestinationState#2>> cannot be converted to ExecutorService)
initialStates,
sqlGenerator,
destinationHandler,
v1V2Migrator,
v2TableMigrator,
Collections.emptyList());

// Commit the updated destination states.
// We don't need to trigger any soft resets, because we don't have any final tables.
destinationHandler.commitDestinationStates(migrationResults.entrySet().stream().collect(toMap(
Map.Entry::getKey,
e -> e.getValue().getUpdatedDestinationState()
)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.airbyte.integrations.base.destination.typing_deduping.migrators

/**
* The type parameter should be the subclass itself. For example,
* `data class MyState(...) : MinimumDestinationState<MyState>`.
* We need this so that [withSoftReset] can return the correct type.
*/
interface MinimumDestinationState<T: MinimumDestinationState<T>> {
fun needsSoftReset(): Boolean
fun withSoftReset(needsSoftReset: Boolean): T
}
Loading

0 comments on commit fa52885

Please sign in to comment.