From b22a4399c865ace2006f85258b3ea87fa56424fa Mon Sep 17 00:00:00 2001 From: Anne <102554163+alovew@users.noreply.github.com> Date: Fri, 7 Oct 2022 13:28:11 -0700 Subject: [PATCH] Add connection migrations for schema changes (#17651) --- .../airbyte/bootloader/BootloaderAppTest.java | 2 +- ...__AddSchemaChangeColumnsToConnections.java | 90 +++++++++++++++++++ .../configs_database/schema_dump.txt | 3 + 3 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_40_11_002__AddSchemaChangeColumnsToConnections.java diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java index f649b8a956019..7ca66e367b2ca 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java @@ -138,7 +138,7 @@ void testBootloaderAppBlankDb() throws Exception { val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway); // this line should change with every new migration // to show that you meant to make a new migration to the prod database - assertEquals("0.40.11.001", configsMigrator.getLatestMigration().getVersion().getVersion()); + assertEquals("0.40.11.002", configsMigrator.getLatestMigration().getVersion().getVersion()); val jobsPersistence = new DefaultJobPersistence(jobDatabase); assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get()); diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_40_11_002__AddSchemaChangeColumnsToConnections.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_40_11_002__AddSchemaChangeColumnsToConnections.java new file mode 100644 index 0000000000000..d0e443b38215f --- /dev/null +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_40_11_002__AddSchemaChangeColumnsToConnections.java @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.configs.migrations; + +import java.util.Arrays; +import org.flywaydb.core.api.migration.BaseJavaMigration; +import org.flywaydb.core.api.migration.Context; +import org.jooq.DSLContext; +import org.jooq.EnumType; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class V0_40_11_002__AddSchemaChangeColumnsToConnections extends BaseJavaMigration { + + private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_11_002__AddSchemaChangeColumnsToConnections.class); + + @Override + public void migrate(final Context context) throws Exception { + LOGGER.info("Running migration: {}", this.getClass().getSimpleName()); + + final DSLContext ctx = DSL.using(context.getConnection()); + + addNonBreakingChangePreferenceEnumTypes(ctx); + + addNotifySchemaChanges(ctx); + addNonBreakingChangePreference(ctx); + addBreakingChange(ctx); + } + + private static void addNonBreakingChangePreferenceEnumTypes(final DSLContext ctx) { + ctx.createType(NonBreakingChangePreferenceType.NAME) + .asEnum(Arrays.stream(NonBreakingChangePreferenceType.values()).map(NonBreakingChangePreferenceType::getLiteral).toList()) + .execute(); + } + + private static void addNotifySchemaChanges(final DSLContext ctx) { + ctx.alterTable("connection") + .addColumnIfNotExists(DSL.field( + "notify_schema_changes", + SQLDataType.BOOLEAN.nullable(false).defaultValue(true))) + .execute(); + } + + private static void addNonBreakingChangePreference(final DSLContext ctx) { + ctx.alterTable("connection") + .addColumnIfNotExists(DSL.field( + "non_breaking_change_preference", + SQLDataType.VARCHAR.asEnumDataType(NonBreakingChangePreferenceType.class).nullable(false) + .defaultValue(NonBreakingChangePreferenceType.IGNORE))) + .execute(); + + } + + private static void addBreakingChange(final DSLContext ctx) { + ctx.alterTable("connection") + .addColumnIfNotExists(DSL.field( + "breaking_change", + SQLDataType.BOOLEAN.nullable(false).defaultValue(false))) + .execute(); + } + + public enum NonBreakingChangePreferenceType implements EnumType { + + IGNORE("ignore"), + DISABLE("disable"); + + private final String literal; + public static final String NAME = "non_breaking_change_preference_type"; + + NonBreakingChangePreferenceType(final String literal) { + this.literal = literal; + } + + @Override + public String getName() { + return NAME; + } + + @Override + public String getLiteral() { + return literal; + } + + } + +} diff --git a/airbyte-db/db-lib/src/main/resources/configs_database/schema_dump.txt b/airbyte-db/db-lib/src/main/resources/configs_database/schema_dump.txt index 307a091f7966c..2c9a2ea0ec52c 100644 --- a/airbyte-db/db-lib/src/main/resources/configs_database/schema_dump.txt +++ b/airbyte-db/db-lib/src/main/resources/configs_database/schema_dump.txt @@ -105,6 +105,9 @@ create table "public"."connection"( "schedule_type" schedule_type null, "schedule_data" jsonb null, "geography" geography_type not null default null, + "notify_schema_changes" bool not null default true, + "non_breaking_change_preference" varchar(7) not null default '''ignore''::character varying', + "breaking_change" bool not null default false, constraint "connection_pkey" primary key ("id") );