Skip to content

Commit

Permalink
Track protocol version support range (#17366)
Browse files Browse the repository at this point in the history
* Add Airbyte Protocol Range configs

* Refactor metadata read/write

* Add ProtocolVersion Min/Max get/set to JobsPersistence

* Store the supported protocol version range in airbyte_metadata

* Use defaults in EnvConfigs instead of .env
  • Loading branch information
gosusnp committed Sep 29, 2022
1 parent 7271bca commit 4974855
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 38 deletions.
1 change: 0 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ WEBAPP_URL=http://localhost:8000/
# Although not present as an env var, required for webapp configuration.
API_URL=/api/v1/


### JOBS ###
# Relevant to scaling.
SYNC_JOB_MAX_ATTEMPTS=3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.commons.lang.CloseableShutdownHook;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.StandardWorkspace;
Expand Down Expand Up @@ -153,6 +154,11 @@ public void load() throws Exception {
final AirbyteVersion currAirbyteVersion = configs.getAirbyteVersion();
assertNonBreakingMigration(jobPersistence, currAirbyteVersion);

final Version airbyteProtocolVersionMax = configs.getAirbyteProtocolVersionMax();
final Version airbyteProtocolVersionMin = configs.getAirbyteProtocolVersionMin();
// TODO ProtocolVersion validation should happen here
trackProtocolVersion(airbyteProtocolVersionMin, airbyteProtocolVersionMax);

// TODO Will be converted to an injected singleton during DI migration
final DatabaseMigrator configDbMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
final DatabaseMigrator jobDbMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
Expand Down Expand Up @@ -299,6 +305,12 @@ private static void assertNonBreakingMigration(final JobPersistence jobPersisten
}
}

private void trackProtocolVersion(final Version airbyteProtocolVersionMin, final Version airbyteProtocolVersionMax) throws IOException {
jobPersistence.setAirbyteProtocolVersionMin(airbyteProtocolVersionMin);
jobPersistence.setAirbyteProtocolVersionMax(airbyteProtocolVersionMax);
LOGGER.info("AirbyteProtocol version support range [{}:{}]", airbyteProtocolVersionMin.serialize(), airbyteProtocolVersionMax.serialize());
}

static boolean isLegalUpgrade(final AirbyteVersion airbyteDatabaseVersion, final AirbyteVersion airbyteVersion) {
// means there was no previous version so upgrade even needs to happen. always legal.
if (airbyteDatabaseVersion == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.config.Configs;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardWorkspace;
Expand Down Expand Up @@ -60,6 +61,8 @@ class BootloaderAppTest {
private DataSource configsDataSource;
private DataSource jobsDataSource;
private static final String DOCKER = "docker";
private static final String PROTOCOL_VERSION_123 = "1.2.3";
private static final String PROTOCOL_VERSION_124 = "1.2.4";
private static final String VERSION_0330_ALPHA = "0.33.0-alpha";
private static final String VERSION_0320_ALPHA = "0.32.0-alpha";
private static final String VERSION_0321_ALPHA = "0.32.1-alpha";
Expand Down Expand Up @@ -99,6 +102,8 @@ void testBootloaderAppBlankDb() throws Exception {
when(mockedConfigs.getDatabaseUser()).thenReturn(container.getUsername());
when(mockedConfigs.getDatabasePassword()).thenReturn(container.getPassword());
when(mockedConfigs.getAirbyteVersion()).thenReturn(new AirbyteVersion(VERSION_0330_ALPHA));
when(mockedConfigs.getAirbyteProtocolVersionMin()).thenReturn(new Version(PROTOCOL_VERSION_123));
when(mockedConfigs.getAirbyteProtocolVersionMax()).thenReturn(new Version(PROTOCOL_VERSION_124));
when(mockedConfigs.runDatabaseMigrationOnStartup()).thenReturn(true);
when(mockedConfigs.getConfigsDatabaseInitializationTimeoutMs()).thenReturn(60000L);
when(mockedConfigs.getJobsDatabaseInitializationTimeoutMs()).thenReturn(60000L);
Expand Down Expand Up @@ -137,6 +142,8 @@ void testBootloaderAppBlankDb() throws Exception {

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get());
assertEquals(new Version(PROTOCOL_VERSION_123), jobsPersistence.getAirbyteProtocolVersionMin().get());
assertEquals(new Version(PROTOCOL_VERSION_124), jobsPersistence.getAirbyteProtocolVersionMax().get());

assertNotEquals(Optional.empty(), jobsPersistence.getDeployment().get());
}
Expand All @@ -152,6 +159,8 @@ void testBootloaderAppRunSecretMigration() throws Exception {
when(mockedConfigs.getDatabaseUser()).thenReturn(container.getUsername());
when(mockedConfigs.getDatabasePassword()).thenReturn(container.getPassword());
when(mockedConfigs.getAirbyteVersion()).thenReturn(new AirbyteVersion(VERSION_0330_ALPHA));
when(mockedConfigs.getAirbyteProtocolVersionMin()).thenReturn(new Version(PROTOCOL_VERSION_123));
when(mockedConfigs.getAirbyteProtocolVersionMax()).thenReturn(new Version(PROTOCOL_VERSION_123));
when(mockedConfigs.runDatabaseMigrationOnStartup()).thenReturn(true);
when(mockedConfigs.getSecretPersistenceType()).thenReturn(TESTING_CONFIG_DB_TABLE);
when(mockedConfigs.getConfigsDatabaseInitializationTimeoutMs()).thenReturn(60000L);
Expand Down Expand Up @@ -295,6 +304,8 @@ void testPostLoadExecutionExecutes() throws Exception {
when(mockedConfigs.getDatabaseUser()).thenReturn(container.getUsername());
when(mockedConfigs.getDatabasePassword()).thenReturn(container.getPassword());
when(mockedConfigs.getAirbyteVersion()).thenReturn(new AirbyteVersion(VERSION_0330_ALPHA));
when(mockedConfigs.getAirbyteProtocolVersionMin()).thenReturn(new Version(PROTOCOL_VERSION_123));
when(mockedConfigs.getAirbyteProtocolVersionMax()).thenReturn(new Version(PROTOCOL_VERSION_123));
when(mockedConfigs.runDatabaseMigrationOnStartup()).thenReturn(true);
when(mockedConfigs.getConfigsDatabaseInitializationTimeoutMs()).thenReturn(60000L);
when(mockedConfigs.getJobsDatabaseInitializationTimeoutMs()).thenReturn(60000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ public class AirbyteProtocolVersion {

public final static Version DEFAULT_AIRBYTE_PROTOCOL_VERSION = new Version("0.2.0");

public final static String AIRBYTE_PROTOCOL_VERSION_MAX_KEY_NAME = "airbyte_protocol_version_max";
public final static String AIRBYTE_PROTOCOL_VERSION_MIN_KEY_NAME = "airbyte_protocol_version_min";

public static Version getWithDefault(final String version) {
if (version == null || version.isEmpty() || version.isBlank()) {
return DEFAULT_AIRBYTE_PROTOCOL_VERSION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.config;

import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.storage.CloudStorageConfigs;
import java.net.URI;
Expand Down Expand Up @@ -43,6 +44,16 @@ public interface Configs {
*/
AirbyteVersion getAirbyteVersion();

/**
* Defines the max supported Airbyte Protocol Version
*/
Version getAirbyteProtocolVersionMax();

/**
* Defines the min supported Airbyte Protocol Version
*/
Version getAirbyteProtocolVersionMin();

String getAirbyteVersionOrWarning();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.storage.CloudStorageConfigs;
Expand Down Expand Up @@ -41,6 +42,8 @@ public class EnvConfigs implements Configs {
// env variable names
public static final String AIRBYTE_ROLE = "AIRBYTE_ROLE";
public static final String AIRBYTE_VERSION = "AIRBYTE_VERSION";
public static final String AIRBYTE_PROTOCOL_VERSION_MAX = "AIRBYTE_PROTOCOL_VERSION_MAX";
public static final String AIRBYTE_PROTOCOL_VERSION_MIN = "AIRBYTE_PROTOCOL_VERSION_MIN";
public static final String INTERNAL_API_HOST = "INTERNAL_API_HOST";
public static final String AIRBYTE_API_AUTH_HEADER_NAME = "AIRBYTE_API_AUTH_HEADER_NAME";
public static final String AIRBYTE_API_AUTH_HEADER_VALUE = "AIRBYTE_API_AUTH_HEADER_VALUE";
Expand Down Expand Up @@ -286,6 +289,16 @@ public AirbyteVersion getAirbyteVersion() {
return new AirbyteVersion(getEnsureEnv(AIRBYTE_VERSION));
}

@Override
public Version getAirbyteProtocolVersionMax() {
return new Version(getEnvOrDefault(AIRBYTE_PROTOCOL_VERSION_MAX, "0.3.0"));
}

@Override
public Version getAirbyteProtocolVersionMin() {
return new Version(getEnvOrDefault(AIRBYTE_PROTOCOL_VERSION_MIN, "0.0.0"));
}

@Override
public String getAirbyteVersionOrWarning() {
return Optional.ofNullable(getEnv(AIRBYTE_VERSION)).orElse("version not set");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.text.Names;
import io.airbyte.commons.text.Sqls;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.FailureReason;
import io.airbyte.config.JobConfig;
Expand Down Expand Up @@ -705,65 +707,35 @@ private static long getEpoch(final Record record, final String fieldName) {

@Override
public boolean isSecretMigrated() throws IOException {
final Result<Record> result = jobDatabase.query(ctx -> ctx.select()
.from(AIRBYTE_METADATA_TABLE)
.where(DSL.field(METADATA_KEY_COL).eq(SECRET_MIGRATION_STATUS))
.fetch());

return result.stream().count() == 1;
return getMetadata(SECRET_MIGRATION_STATUS).count() == 1;
}

@Override
public void setSecretMigrationDone() throws IOException {
jobDatabase.query(ctx -> ctx.execute(String.format(
"INSERT INTO %s(%s, %s) VALUES('%s', '%s') ON CONFLICT (%s) DO UPDATE SET %s = '%s'",
AIRBYTE_METADATA_TABLE,
METADATA_KEY_COL,
METADATA_VAL_COL,
SECRET_MIGRATION_STATUS,
true,
METADATA_KEY_COL,
METADATA_VAL_COL,
true)));
setMetadata(SECRET_MIGRATION_STATUS, "true");
}

private final String SCHEDULER_MIGRATION_STATUS = "schedulerMigration";

@Override
public boolean isSchedulerMigrated() throws IOException {
final Result<Record> result = jobDatabase.query(ctx -> ctx.select()
.from(AIRBYTE_METADATA_TABLE)
.where(DSL.field(METADATA_KEY_COL).eq(SCHEDULER_MIGRATION_STATUS))
.fetch());

return result.stream().count() == 1;
return getMetadata(SCHEDULER_MIGRATION_STATUS).count() == 1;
}

@Override
public void setSchedulerMigrationDone() throws IOException {
jobDatabase.query(ctx -> ctx.execute(String.format(
"INSERT INTO %s(%s, %s) VALUES('%s', '%s') ON CONFLICT (%s) DO UPDATE SET %s = '%s'",
AIRBYTE_METADATA_TABLE,
METADATA_KEY_COL,
METADATA_VAL_COL,
SCHEDULER_MIGRATION_STATUS,
true,
METADATA_KEY_COL,
METADATA_VAL_COL,
true)));
setMetadata(SCHEDULER_MIGRATION_STATUS, "true");
}

@Override
public Optional<String> getVersion() throws IOException {
final Result<Record> result = jobDatabase.query(ctx -> ctx.select()
.from(AIRBYTE_METADATA_TABLE)
.where(DSL.field(METADATA_KEY_COL).eq(AirbyteVersion.AIRBYTE_VERSION_KEY_NAME))
.fetch());
return result.stream().findFirst().map(r -> r.getValue(METADATA_VAL_COL, String.class));
return getMetadata(AirbyteVersion.AIRBYTE_VERSION_KEY_NAME).findFirst();
}

@Override
public void setVersion(final String airbyteVersion) throws IOException {
// This is not using setMetadata due to the extra (<timestamp>s_init_db, airbyteVersion) that is
// added to the metadata table
jobDatabase.query(ctx -> ctx.execute(String.format(
"INSERT INTO %s(%s, %s) VALUES('%s', '%s'), ('%s_init_db', '%s') ON CONFLICT (%s) DO UPDATE SET %s = '%s'",
AIRBYTE_METADATA_TABLE,
Expand All @@ -776,6 +748,45 @@ public void setVersion(final String airbyteVersion) throws IOException {
METADATA_KEY_COL,
METADATA_VAL_COL,
airbyteVersion)));

}

@Override
public Optional<Version> getAirbyteProtocolVersionMax() throws IOException {
return getMetadata(AirbyteProtocolVersion.AIRBYTE_PROTOCOL_VERSION_MAX_KEY_NAME).findFirst().map(Version::new);
}

@Override
public void setAirbyteProtocolVersionMax(final Version version) throws IOException {
setMetadata(AirbyteProtocolVersion.AIRBYTE_PROTOCOL_VERSION_MAX_KEY_NAME, version.serialize());
}

@Override
public Optional<Version> getAirbyteProtocolVersionMin() throws IOException {
return getMetadata(AirbyteProtocolVersion.AIRBYTE_PROTOCOL_VERSION_MIN_KEY_NAME).findFirst().map(Version::new);
}

@Override
public void setAirbyteProtocolVersionMin(final Version version) throws IOException {
setMetadata(AirbyteProtocolVersion.AIRBYTE_PROTOCOL_VERSION_MIN_KEY_NAME, version.serialize());
}

private Stream<String> getMetadata(final String keyName) throws IOException {
return jobDatabase.query(ctx -> ctx.select()
.from(AIRBYTE_METADATA_TABLE)
.where(DSL.field(METADATA_KEY_COL).eq(keyName))
.fetch()).stream().map(r -> r.getValue(METADATA_VAL_COL, String.class));
}

private void setMetadata(final String keyName, final String value) throws IOException {
jobDatabase.query(ctx -> ctx
.insertInto(DSL.table(AIRBYTE_METADATA_TABLE))
.columns(DSL.field(METADATA_KEY_COL), DSL.field(METADATA_VAL_COL))
.values(keyName, value)
.onConflict(DSL.field(METADATA_KEY_COL))
.doUpdate()
.set(DSL.field(METADATA_VAL_COL), value)
.execute());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.persistence.job;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.version.Version;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
Expand Down Expand Up @@ -235,6 +236,26 @@ List<JobWithStatusAndTimestamp> listJobStatusAndTimestampWithConnection(UUID con
*/
void setVersion(String airbyteVersion) throws IOException;

/**
* Get the max supported Airbyte Protocol Version
*/
Optional<Version> getAirbyteProtocolVersionMax() throws IOException;

/**
* Set the max supported Airbyte Protocol Version
*/
void setAirbyteProtocolVersionMax(Version version) throws IOException;

/**
* Get the min supported Airbyte Protocol Version
*/
Optional<Version> getAirbyteProtocolVersionMin() throws IOException;

/**
* Set the min supported Airbyte Protocol Version
*/
void setAirbyteProtocolVersionMin(Version version) throws IOException;

/**
* Returns a deployment UUID.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Sets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.text.Sqls;
import io.airbyte.commons.version.Version;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
Expand Down Expand Up @@ -580,6 +581,36 @@ void testSchedulerMigrationMetadata() throws IOException {
assertTrue(isMigrated);
}

@Test
void testAirbyteProtocolVersionMaxMetadata() throws IOException {
assertTrue(jobPersistence.getAirbyteProtocolVersionMax().isEmpty());

final Version maxVersion1 = new Version("0.1.0");
jobPersistence.setAirbyteProtocolVersionMax(maxVersion1);
final Optional<Version> maxVersion1read = jobPersistence.getAirbyteProtocolVersionMax();
assertEquals(maxVersion1, maxVersion1read.orElseThrow());

final Version maxVersion2 = new Version("1.2.1");
jobPersistence.setAirbyteProtocolVersionMax(maxVersion2);
final Optional<Version> maxVersion2read = jobPersistence.getAirbyteProtocolVersionMax();
assertEquals(maxVersion2, maxVersion2read.orElseThrow());
}

@Test
void testAirbyteProtocolVersionMinMetadata() throws IOException {
assertTrue(jobPersistence.getAirbyteProtocolVersionMin().isEmpty());

final Version minVersion1 = new Version("1.1.0");
jobPersistence.setAirbyteProtocolVersionMin(minVersion1);
final Optional<Version> minVersion1read = jobPersistence.getAirbyteProtocolVersionMin();
assertEquals(minVersion1, minVersion1read.orElseThrow());

final Version minVersion2 = new Version("3.0.1");
jobPersistence.setAirbyteProtocolVersionMin(minVersion2);
final Optional<Version> minVersion2read = jobPersistence.getAirbyteProtocolVersionMin();
assertEquals(minVersion2, minVersion2read.orElseThrow());
}

private long createJobAt(final Instant created_at) throws IOException {
when(timeSupplier.get()).thenReturn(created_at);
return jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
Expand Down

0 comments on commit 4974855

Please sign in to comment.