Skip to content

Commit

Permalink
correct migration logic
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Feb 21, 2024
1 parent 837418f commit be41e16
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ class DV2OperationUtils {

for (migration in migrations) {
if (migration.requireMigration(currentState.destinationState)) {
val updatedState: DestinationState = migration.migrateIfNecessary(initialState.streamConfig)
val updatedState: DestinationState = migration.migrateIfNecessary(initialState.streamConfig, currentState.destinationState)
// We don't _need_ to do this, but might as well hedge against migrations accidentally
// setting softReset=false.
val needsSoftReset = updatedState.needsSoftReset() || currentState.destinationState.needsSoftReset()
currentState = currentState.withDestinationState(updatedState.withSoftReset(needsSoftReset))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ interface Migration<DestinationState: MinimumDestinationState<DestinationState>>
* [requireMigration] returned true. Implementations of this method MUST check against the database
* to confirm the the migration is still necessary, in case a previous migration ran, but failed
* to update the state.
*
* Migrations MUST NOT set the `needsSoftReset` flag to false, but they MAY set it to true.
*/
fun migrateIfNecessary(stream: StreamConfig): DestinationState
fun migrateIfNecessary(stream: StreamConfig, state: DestinationState): DestinationState

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@ public class DefaultTyperDeduperTest {
null,
null);

private record MockState(boolean needsSoftReset, boolean isMigrated) implements MinimumDestinationState<MockState> {
private record MockState(
boolean needsSoftReset,
boolean softResetMigrationCompleted,
boolean nonSoftResetMigrationCompleted) implements MinimumDestinationState<MockState> {
@Override
public MockState withSoftReset(boolean needsSoftReset) {
return new MockState(needsSoftReset, this.isMigrated);
return new MockState(needsSoftReset, this.softResetMigrationCompleted, this.nonSoftResetMigrationCompleted);
}
}

Expand All @@ -79,28 +82,31 @@ public MockState withSoftReset(boolean needsSoftReset) {
private final Migration<MockState> MIGRATION_REQUIRING_SOFT_RESET = new Migration<>() {
@Override
public boolean requireMigration(MockState mockState) {
return !mockState.isMigrated;
return !mockState.softResetMigrationCompleted;
}

@SneakyThrows
@NotNull
@Override
public MockState migrateIfNecessary(@NotNull StreamConfig stream) {
public MockState migrateIfNecessary(@NotNull StreamConfig stream, MockState state) {
destinationHandler.execute(Sql.of("MIGRATE " + stream.id().rawTableId("")));
return new MockState(true, true);
return new MockState(true, true, state.nonSoftResetMigrationCompleted);
}
};

private final Migration<MockState> MIGRATION_NOT_REQUIRING_SOFT_RESET = new Migration<>() {
@Override
public boolean requireMigration(MockState mockState) {
return true;
return !mockState.nonSoftResetMigrationCompleted;
}

@NotNull
@Override
public MockState migrateIfNecessary(@NotNull StreamConfig stream) {
return new MockState(false, true);
public MockState migrateIfNecessary(@NotNull StreamConfig stream, MockState state) {
return new MockState(
state.needsSoftReset,
state.softResetMigrationCompleted,
true);
}
};

Expand All @@ -112,7 +118,7 @@ public boolean requireMigration(MockState mockState) {

@NotNull
@Override
public MockState migrateIfNecessary(@NotNull StreamConfig stream) {
public MockState migrateIfNecessary(@NotNull StreamConfig stream, MockState state) {
throw new IllegalStateException("This migration should never execute.");
}
};
Expand All @@ -123,15 +129,15 @@ void setup() throws Exception {
destinationHandler = mock(DestinationHandler.class);

DestinationInitialState<MockState> overwriteNsState = mock(DestinationInitialState.class);
when(overwriteNsState.destinationState()).thenReturn(new MockState(false, false));
when(overwriteNsState.destinationState()).thenReturn(new MockState(false, false, true));
when(overwriteNsState.streamConfig()).thenReturn(OVERWRITE_STREAM_CONFIG);

DestinationInitialState<MockState> appendNsState = mock(DestinationInitialState.class);
when(appendNsState.destinationState()).thenReturn(new MockState(false, false));
when(appendNsState.destinationState()).thenReturn(new MockState(false, false, true));
when(appendNsState.streamConfig()).thenReturn(APPEND_STREAM_CONFIG);

DestinationInitialState<MockState> dedupeNsState = mock(DestinationInitialState.class);
when(dedupeNsState.destinationState()).thenReturn(new MockState(false, false));
when(dedupeNsState.destinationState()).thenReturn(new MockState(false, false, true));
when(dedupeNsState.streamConfig()).thenReturn(DEDUPE_STREAM_CONFIG);

initialStates = List.of(overwriteNsState, appendNsState, dedupeNsState);
Expand All @@ -140,9 +146,9 @@ void setup() throws Exception {
initialStates.forEach(initialState -> when(initialState.initialRawTableState()).thenReturn(new InitialRawTableState(true, Optional.empty())));

updatedStates = new HashMap<>();
updatedStates.put(OVERWRITE_STREAM_CONFIG.id(), new MockState(false, false));
updatedStates.put(APPEND_STREAM_CONFIG.id(), new MockState(false, false));
updatedStates.put(DEDUPE_STREAM_CONFIG.id(), new MockState(false, false));
updatedStates.put(OVERWRITE_STREAM_CONFIG.id(), new MockState(false, false, true));
updatedStates.put(APPEND_STREAM_CONFIG.id(), new MockState(false, false, true));
updatedStates.put(DEDUPE_STREAM_CONFIG.id(), new MockState(false, false, true));

migrator = new NoOpDestinationV1V2Migrator();

Expand Down Expand Up @@ -420,31 +426,31 @@ void multipleSoftResets() throws Exception {
new InitialRawTableState(true, Optional.of(Instant.ofEpochMilli(42))),
true,
false,
new MockState(false, false)),
new MockState(false, false, true)),
new DestinationInitialState<>(
APPEND_STREAM_CONFIG,
true,
new InitialRawTableState(true, Optional.of(Instant.ofEpochMilli(42))),
true,
false,
new MockState(false, false)),
new MockState(false, false, true)),
new DestinationInitialState<>(
DEDUPE_STREAM_CONFIG,
true,
new InitialRawTableState(true, Optional.of(Instant.ofEpochMilli(42))),
true,
false,
new MockState(false, false))
new MockState(false, false, true))
));

typerDeduper.prepareSchemasAndRawTables();
verify(destinationHandler).execute(Sql.of("MIGRATE airbyte_internal.overwrite_stream"));
verify(destinationHandler).execute(Sql.of("MIGRATE airbyte_internal.append_stream"));
verify(destinationHandler).execute(Sql.of("MIGRATE airbyte_internal.dedup_stream"));
verify(destinationHandler).commitDestinationStates(Map.of(
OVERWRITE_STREAM_CONFIG.id(), new MockState(true, true),
APPEND_STREAM_CONFIG.id(), new MockState(true, true),
DEDUPE_STREAM_CONFIG.id(), new MockState(true, true)
OVERWRITE_STREAM_CONFIG.id(), new MockState(true, true, true),
APPEND_STREAM_CONFIG.id(), new MockState(true, true, true),
DEDUPE_STREAM_CONFIG.id(), new MockState(true, true, true)
));
verify(destinationHandler).gatherInitialState(any());
verify(destinationHandler).execute(separately("CREATE SCHEMA airbyte_internal", "CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns"));
Expand All @@ -467,9 +473,9 @@ void multipleSoftResets() throws Exception {

// And we should commit the states. Note that we now set needsSoftReset=false.
verify(destinationHandler).commitDestinationStates(Map.of(
OVERWRITE_STREAM_CONFIG.id(), new MockState(false, true),
APPEND_STREAM_CONFIG.id(), new MockState(false, true),
DEDUPE_STREAM_CONFIG.id(), new MockState(false, true)
OVERWRITE_STREAM_CONFIG.id(), new MockState(false, true, true),
APPEND_STREAM_CONFIG.id(), new MockState(false, true, true),
DEDUPE_STREAM_CONFIG.id(), new MockState(false, true, true)
));

verifyNoMoreInteractions(destinationHandler);
Expand Down Expand Up @@ -497,28 +503,28 @@ void noopMigrations() throws Exception {
new InitialRawTableState(true, Optional.of(Instant.ofEpochMilli(42))),
false,
false,
new MockState(false, true)),
new MockState(false, true, true)),
new DestinationInitialState<>(
APPEND_STREAM_CONFIG,
true,
new InitialRawTableState(true, Optional.of(Instant.ofEpochMilli(42))),
false,
false,
new MockState(false, true)),
new MockState(false, true, true)),
new DestinationInitialState<>(
DEDUPE_STREAM_CONFIG,
true,
new InitialRawTableState(true, Optional.of(Instant.ofEpochMilli(42))),
false,
false,
new MockState(false, true))
new MockState(false, true, true))
));

typerDeduper.prepareSchemasAndRawTables();
verify(destinationHandler).commitDestinationStates(Map.of(
OVERWRITE_STREAM_CONFIG.id(), new MockState(false, true),
APPEND_STREAM_CONFIG.id(), new MockState(false, true),
DEDUPE_STREAM_CONFIG.id(), new MockState(false, true)
OVERWRITE_STREAM_CONFIG.id(), new MockState(false, true, true),
APPEND_STREAM_CONFIG.id(), new MockState(false, true, true),
DEDUPE_STREAM_CONFIG.id(), new MockState(false, true, true)
));
verify(destinationHandler).gatherInitialState(any());
verify(destinationHandler).execute(separately("CREATE SCHEMA airbyte_internal", "CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns"));
Expand All @@ -532,9 +538,9 @@ void noopMigrations() throws Exception {

// And we should commit the states.
verify(destinationHandler).commitDestinationStates(Map.of(
OVERWRITE_STREAM_CONFIG.id(), new MockState(false, true),
APPEND_STREAM_CONFIG.id(), new MockState(false, true),
DEDUPE_STREAM_CONFIG.id(), new MockState(false, true)
OVERWRITE_STREAM_CONFIG.id(), new MockState(false, true, true),
APPEND_STREAM_CONFIG.id(), new MockState(false, true, true),
DEDUPE_STREAM_CONFIG.id(), new MockState(false, true, true)
));

verifyNoMoreInteractions(destinationHandler);
Expand Down Expand Up @@ -562,31 +568,31 @@ void migrationsMixedResults() throws Exception {
new InitialRawTableState(true, Optional.of(Instant.ofEpochMilli(42))),
false,
false,
new MockState(false, false)),
new MockState(false, false, false)),
new DestinationInitialState<>(
APPEND_STREAM_CONFIG,
true,
new InitialRawTableState(true, Optional.of(Instant.ofEpochMilli(42))),
false,
false,
new MockState(false, false)),
new MockState(false, false, false)),
new DestinationInitialState<>(
DEDUPE_STREAM_CONFIG,
true,
new InitialRawTableState(true, Optional.of(Instant.ofEpochMilli(42))),
false,
false,
new MockState(false, false))
new MockState(false, false, false))
));

typerDeduper.prepareSchemasAndRawTables();
verify(destinationHandler).execute(Sql.of("MIGRATE airbyte_internal.overwrite_stream"));
verify(destinationHandler).execute(Sql.of("MIGRATE airbyte_internal.append_stream"));
verify(destinationHandler).execute(Sql.of("MIGRATE airbyte_internal.dedup_stream"));
verify(destinationHandler).commitDestinationStates(Map.of(
OVERWRITE_STREAM_CONFIG.id(), new MockState(true, true),
APPEND_STREAM_CONFIG.id(), new MockState(true, true),
DEDUPE_STREAM_CONFIG.id(), new MockState(true, true)
OVERWRITE_STREAM_CONFIG.id(), new MockState(true, true, true),
APPEND_STREAM_CONFIG.id(), new MockState(true, true, true),
DEDUPE_STREAM_CONFIG.id(), new MockState(true, true, true)
));
verify(destinationHandler).gatherInitialState(any());
verify(destinationHandler).execute(separately("CREATE SCHEMA airbyte_internal", "CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns"));
Expand All @@ -609,9 +615,9 @@ void migrationsMixedResults() throws Exception {

// And we should commit the states.
verify(destinationHandler).commitDestinationStates(Map.of(
OVERWRITE_STREAM_CONFIG.id(), new MockState(false, true),
APPEND_STREAM_CONFIG.id(), new MockState(false, true),
DEDUPE_STREAM_CONFIG.id(), new MockState(false, true)
OVERWRITE_STREAM_CONFIG.id(), new MockState(false, true, true),
APPEND_STREAM_CONFIG.id(), new MockState(false, true, true),
DEDUPE_STREAM_CONFIG.id(), new MockState(false, true, true)
));

verifyNoMoreInteractions(destinationHandler);
Expand All @@ -632,30 +638,30 @@ void previousSyncSoftReset() throws Exception {
new InitialRawTableState(true, Optional.of(Instant.ofEpochMilli(42))),
false,
false,
new MockState(true, false)),
new MockState(true, false, false)),
new DestinationInitialState<>(
APPEND_STREAM_CONFIG,
true,
new InitialRawTableState(true, Optional.of(Instant.ofEpochMilli(42))),
false,
false,
new MockState(true, false)),
new MockState(true, false, false)),
new DestinationInitialState<>(
DEDUPE_STREAM_CONFIG,
true,
new InitialRawTableState(true, Optional.of(Instant.ofEpochMilli(42))),
false,
false,
new MockState(true, false))
new MockState(true, false, false))
));

typerDeduper.prepareSchemasAndRawTables();
// Even though we didn't do anything, we still commit the destination states.
// This is technically unnecessary, but it's a single extra call and it's simpler to just do it.
verify(destinationHandler).commitDestinationStates(Map.of(
OVERWRITE_STREAM_CONFIG.id(), new MockState(true, false),
APPEND_STREAM_CONFIG.id(), new MockState(true, false),
DEDUPE_STREAM_CONFIG.id(), new MockState(true, false)
OVERWRITE_STREAM_CONFIG.id(), new MockState(true, false, false),
APPEND_STREAM_CONFIG.id(), new MockState(true, false, false),
DEDUPE_STREAM_CONFIG.id(), new MockState(true, false, false)
));
verify(destinationHandler).gatherInitialState(any());
verify(destinationHandler).execute(separately("CREATE SCHEMA airbyte_internal", "CREATE SCHEMA overwrite_ns", "CREATE SCHEMA append_ns", "CREATE SCHEMA dedup_ns"));
Expand All @@ -678,9 +684,9 @@ void previousSyncSoftReset() throws Exception {

// And we should commit the states. Note that we now set needsSoftReset=false.
verify(destinationHandler).commitDestinationStates(Map.of(
OVERWRITE_STREAM_CONFIG.id(), new MockState(false, false),
APPEND_STREAM_CONFIG.id(), new MockState(false, false),
DEDUPE_STREAM_CONFIG.id(), new MockState(false, false)
OVERWRITE_STREAM_CONFIG.id(), new MockState(false, false, false),
APPEND_STREAM_CONFIG.id(), new MockState(false, false, false),
DEDUPE_STREAM_CONFIG.id(), new MockState(false, false, false)
));

verifyNoMoreInteractions(destinationHandler);
Expand Down

0 comments on commit be41e16

Please sign in to comment.