Skip to content

Commit

Permalink
Add connection migrations for schema changes (#17651)
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew committed Oct 7, 2022
1 parent 33f0cc6 commit b22a439
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 1 deletion.
Expand Up @@ -138,7 +138,7 @@ void testBootloaderAppBlankDb() throws Exception {
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
// to show that you meant to make a new migration to the prod database
assertEquals("0.40.11.001", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.40.11.002", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get());
Expand Down
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import java.util.Arrays;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.EnumType;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V0_40_11_002__AddSchemaChangeColumnsToConnections extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_11_002__AddSchemaChangeColumnsToConnections.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

final DSLContext ctx = DSL.using(context.getConnection());

addNonBreakingChangePreferenceEnumTypes(ctx);

addNotifySchemaChanges(ctx);
addNonBreakingChangePreference(ctx);
addBreakingChange(ctx);
}

private static void addNonBreakingChangePreferenceEnumTypes(final DSLContext ctx) {
ctx.createType(NonBreakingChangePreferenceType.NAME)
.asEnum(Arrays.stream(NonBreakingChangePreferenceType.values()).map(NonBreakingChangePreferenceType::getLiteral).toList())
.execute();
}

private static void addNotifySchemaChanges(final DSLContext ctx) {
ctx.alterTable("connection")
.addColumnIfNotExists(DSL.field(
"notify_schema_changes",
SQLDataType.BOOLEAN.nullable(false).defaultValue(true)))
.execute();
}

private static void addNonBreakingChangePreference(final DSLContext ctx) {
ctx.alterTable("connection")
.addColumnIfNotExists(DSL.field(
"non_breaking_change_preference",
SQLDataType.VARCHAR.asEnumDataType(NonBreakingChangePreferenceType.class).nullable(false)
.defaultValue(NonBreakingChangePreferenceType.IGNORE)))
.execute();

}

private static void addBreakingChange(final DSLContext ctx) {
ctx.alterTable("connection")
.addColumnIfNotExists(DSL.field(
"breaking_change",
SQLDataType.BOOLEAN.nullable(false).defaultValue(false)))
.execute();
}

public enum NonBreakingChangePreferenceType implements EnumType {

IGNORE("ignore"),
DISABLE("disable");

private final String literal;
public static final String NAME = "non_breaking_change_preference_type";

NonBreakingChangePreferenceType(final String literal) {
this.literal = literal;
}

@Override
public String getName() {
return NAME;
}

@Override
public String getLiteral() {
return literal;
}

}

}
Expand Up @@ -105,6 +105,9 @@ create table "public"."connection"(
"schedule_type" schedule_type null,
"schedule_data" jsonb null,
"geography" geography_type not null default null,
"notify_schema_changes" bool not null default true,
"non_breaking_change_preference" varchar(7) not null default '''ignore''::character varying',
"breaking_change" bool not null default false,
constraint "connection_pkey"
primary key ("id")
);
Expand Down

0 comments on commit b22a439

Please sign in to comment.