Skip to content

Commit

Permalink
Merge branch 'master' into teal/connection-styling-system
Browse files Browse the repository at this point in the history
  • Loading branch information
krishnaglick committed Sep 29, 2022
2 parents 6059c4f + da5df45 commit e091e54
Show file tree
Hide file tree
Showing 17 changed files with 172 additions and 54 deletions.
1 change: 0 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ WEBAPP_URL=http://localhost:8000/
# Although not present as an env var, required for webapp configuration.
API_URL=/api/v1/


### JOBS ###
# Relevant to scaling.
SYNC_JOB_MAX_ATTEMPTS=3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.commons.lang.CloseableShutdownHook;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.StandardWorkspace;
Expand Down Expand Up @@ -153,6 +154,11 @@ public void load() throws Exception {
final AirbyteVersion currAirbyteVersion = configs.getAirbyteVersion();
assertNonBreakingMigration(jobPersistence, currAirbyteVersion);

final Version airbyteProtocolVersionMax = configs.getAirbyteProtocolVersionMax();
final Version airbyteProtocolVersionMin = configs.getAirbyteProtocolVersionMin();
// TODO ProtocolVersion validation should happen here
trackProtocolVersion(airbyteProtocolVersionMin, airbyteProtocolVersionMax);

// TODO Will be converted to an injected singleton during DI migration
final DatabaseMigrator configDbMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
final DatabaseMigrator jobDbMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
Expand Down Expand Up @@ -299,6 +305,12 @@ private static void assertNonBreakingMigration(final JobPersistence jobPersisten
}
}

private void trackProtocolVersion(final Version airbyteProtocolVersionMin, final Version airbyteProtocolVersionMax) throws IOException {
jobPersistence.setAirbyteProtocolVersionMin(airbyteProtocolVersionMin);
jobPersistence.setAirbyteProtocolVersionMax(airbyteProtocolVersionMax);
LOGGER.info("AirbyteProtocol version support range [{}:{}]", airbyteProtocolVersionMin.serialize(), airbyteProtocolVersionMax.serialize());
}

static boolean isLegalUpgrade(final AirbyteVersion airbyteDatabaseVersion, final AirbyteVersion airbyteVersion) {
// means there was no previous version so upgrade even needs to happen. always legal.
if (airbyteDatabaseVersion == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.config.Configs;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardWorkspace;
Expand Down Expand Up @@ -60,6 +61,8 @@ class BootloaderAppTest {
private DataSource configsDataSource;
private DataSource jobsDataSource;
private static final String DOCKER = "docker";
private static final String PROTOCOL_VERSION_123 = "1.2.3";
private static final String PROTOCOL_VERSION_124 = "1.2.4";
private static final String VERSION_0330_ALPHA = "0.33.0-alpha";
private static final String VERSION_0320_ALPHA = "0.32.0-alpha";
private static final String VERSION_0321_ALPHA = "0.32.1-alpha";
Expand Down Expand Up @@ -99,6 +102,8 @@ void testBootloaderAppBlankDb() throws Exception {
when(mockedConfigs.getDatabaseUser()).thenReturn(container.getUsername());
when(mockedConfigs.getDatabasePassword()).thenReturn(container.getPassword());
when(mockedConfigs.getAirbyteVersion()).thenReturn(new AirbyteVersion(VERSION_0330_ALPHA));
when(mockedConfigs.getAirbyteProtocolVersionMin()).thenReturn(new Version(PROTOCOL_VERSION_123));
when(mockedConfigs.getAirbyteProtocolVersionMax()).thenReturn(new Version(PROTOCOL_VERSION_124));
when(mockedConfigs.runDatabaseMigrationOnStartup()).thenReturn(true);
when(mockedConfigs.getConfigsDatabaseInitializationTimeoutMs()).thenReturn(60000L);
when(mockedConfigs.getJobsDatabaseInitializationTimeoutMs()).thenReturn(60000L);
Expand Down Expand Up @@ -137,6 +142,8 @@ void testBootloaderAppBlankDb() throws Exception {

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get());
assertEquals(new Version(PROTOCOL_VERSION_123), jobsPersistence.getAirbyteProtocolVersionMin().get());
assertEquals(new Version(PROTOCOL_VERSION_124), jobsPersistence.getAirbyteProtocolVersionMax().get());

assertNotEquals(Optional.empty(), jobsPersistence.getDeployment().get());
}
Expand All @@ -152,6 +159,8 @@ void testBootloaderAppRunSecretMigration() throws Exception {
when(mockedConfigs.getDatabaseUser()).thenReturn(container.getUsername());
when(mockedConfigs.getDatabasePassword()).thenReturn(container.getPassword());
when(mockedConfigs.getAirbyteVersion()).thenReturn(new AirbyteVersion(VERSION_0330_ALPHA));
when(mockedConfigs.getAirbyteProtocolVersionMin()).thenReturn(new Version(PROTOCOL_VERSION_123));
when(mockedConfigs.getAirbyteProtocolVersionMax()).thenReturn(new Version(PROTOCOL_VERSION_123));
when(mockedConfigs.runDatabaseMigrationOnStartup()).thenReturn(true);
when(mockedConfigs.getSecretPersistenceType()).thenReturn(TESTING_CONFIG_DB_TABLE);
when(mockedConfigs.getConfigsDatabaseInitializationTimeoutMs()).thenReturn(60000L);
Expand Down Expand Up @@ -295,6 +304,8 @@ void testPostLoadExecutionExecutes() throws Exception {
when(mockedConfigs.getDatabaseUser()).thenReturn(container.getUsername());
when(mockedConfigs.getDatabasePassword()).thenReturn(container.getPassword());
when(mockedConfigs.getAirbyteVersion()).thenReturn(new AirbyteVersion(VERSION_0330_ALPHA));
when(mockedConfigs.getAirbyteProtocolVersionMin()).thenReturn(new Version(PROTOCOL_VERSION_123));
when(mockedConfigs.getAirbyteProtocolVersionMax()).thenReturn(new Version(PROTOCOL_VERSION_123));
when(mockedConfigs.runDatabaseMigrationOnStartup()).thenReturn(true);
when(mockedConfigs.getConfigsDatabaseInitializationTimeoutMs()).thenReturn(60000L);
when(mockedConfigs.getJobsDatabaseInitializationTimeoutMs()).thenReturn(60000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ public class AirbyteProtocolVersion {

public final static Version DEFAULT_AIRBYTE_PROTOCOL_VERSION = new Version("0.2.0");

public final static String AIRBYTE_PROTOCOL_VERSION_MAX_KEY_NAME = "airbyte_protocol_version_max";
public final static String AIRBYTE_PROTOCOL_VERSION_MIN_KEY_NAME = "airbyte_protocol_version_min";

public static Version getWithDefault(final String version) {
if (version == null || version.isEmpty() || version.isBlank()) {
return DEFAULT_AIRBYTE_PROTOCOL_VERSION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.config;

import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.storage.CloudStorageConfigs;
import java.net.URI;
Expand Down Expand Up @@ -43,6 +44,16 @@ public interface Configs {
*/
AirbyteVersion getAirbyteVersion();

/**
* Defines the max supported Airbyte Protocol Version
*/
Version getAirbyteProtocolVersionMax();

/**
* Defines the min supported Airbyte Protocol Version
*/
Version getAirbyteProtocolVersionMin();

String getAirbyteVersionOrWarning();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.storage.CloudStorageConfigs;
Expand Down Expand Up @@ -41,6 +42,8 @@ public class EnvConfigs implements Configs {
// env variable names
public static final String AIRBYTE_ROLE = "AIRBYTE_ROLE";
public static final String AIRBYTE_VERSION = "AIRBYTE_VERSION";
public static final String AIRBYTE_PROTOCOL_VERSION_MAX = "AIRBYTE_PROTOCOL_VERSION_MAX";
public static final String AIRBYTE_PROTOCOL_VERSION_MIN = "AIRBYTE_PROTOCOL_VERSION_MIN";
public static final String INTERNAL_API_HOST = "INTERNAL_API_HOST";
public static final String AIRBYTE_API_AUTH_HEADER_NAME = "AIRBYTE_API_AUTH_HEADER_NAME";
public static final String AIRBYTE_API_AUTH_HEADER_VALUE = "AIRBYTE_API_AUTH_HEADER_VALUE";
Expand Down Expand Up @@ -286,6 +289,16 @@ public AirbyteVersion getAirbyteVersion() {
return new AirbyteVersion(getEnsureEnv(AIRBYTE_VERSION));
}

@Override
public Version getAirbyteProtocolVersionMax() {
return new Version(getEnvOrDefault(AIRBYTE_PROTOCOL_VERSION_MAX, "0.3.0"));
}

@Override
public Version getAirbyteProtocolVersionMin() {
return new Version(getEnvOrDefault(AIRBYTE_PROTOCOL_VERSION_MIN, "0.0.0"));
}

@Override
public String getAirbyteVersionOrWarning() {
return Optional.ofNullable(getEnv(AIRBYTE_VERSION)).orElse("version not set");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
documentationUrl: https://docs.airbyte.io/integrations/sources/alloydb
icon: alloydb.svg
sourceType: database
releaseStage: alpha
releaseStage: generally_available
- name: AWS CloudTrail
sourceDefinitionId: 6ff047c0-f5d5-4ce5-8c81-204a830fa7e1
dockerRepository: airbyte/source-aws-cloudtrail
Expand Down Expand Up @@ -678,7 +678,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.6.15
dockerImageTag: 1.0.0
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down
8 changes: 5 additions & 3 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6965,7 +6965,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:0.6.15"
- dockerImage: "airbyte/source-mysql:1.0.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql"
connectionSpecification:
Expand Down Expand Up @@ -7013,8 +7013,10 @@
jdbc_url_params:
description: "Additional properties to pass to the JDBC URL string when\
\ connecting to the database formatted as 'key=value' pairs separated\
\ by the symbol '&'. (example: key1=value1&key2=value2&key3=value3)."
title: "JDBC URL Params"
\ by the symbol '&'. (example: key1=value1&key2=value2&key3=value3). For\
\ more information read about <a href=\"https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-jdbc-url-format.html\"\
>JDBC URL parameters</a>."
title: "JDBC URL Parameters (Advanced)"
type: "string"
order: 5
ssl:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,7 @@ ENV APPLICATION source-mysql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.15

LABEL io.airbyte.version=1.0.0

LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
"order": 4
},
"jdbc_url_params": {
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).",
"title": "JDBC URL Params",
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3). For more information read about <a href=\"https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-jdbc-url-format.html\">JDBC URL parameters</a>.",
"title": "JDBC URL Parameters (Advanced)",
"type": "string",
"order": 5
},
Expand Down
3 changes: 2 additions & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.15
LABEL io.airbyte.version=1.0.0

LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
"order": 4
},
"jdbc_url_params": {
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).",
"title": "JDBC URL Params",
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3). For more information read about <a href=\"https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-jdbc-url-format.html\">JDBC URL parameters</a>.",
"title": "JDBC URL Parameters (Advanced)",
"type": "string",
"order": 5
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.text.Names;
import io.airbyte.commons.text.Sqls;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.FailureReason;
import io.airbyte.config.JobConfig;
Expand Down Expand Up @@ -705,65 +707,35 @@ private static long getEpoch(final Record record, final String fieldName) {

@Override
public boolean isSecretMigrated() throws IOException {
final Result<Record> result = jobDatabase.query(ctx -> ctx.select()
.from(AIRBYTE_METADATA_TABLE)
.where(DSL.field(METADATA_KEY_COL).eq(SECRET_MIGRATION_STATUS))
.fetch());

return result.stream().count() == 1;
return getMetadata(SECRET_MIGRATION_STATUS).count() == 1;
}

@Override
public void setSecretMigrationDone() throws IOException {
jobDatabase.query(ctx -> ctx.execute(String.format(
"INSERT INTO %s(%s, %s) VALUES('%s', '%s') ON CONFLICT (%s) DO UPDATE SET %s = '%s'",
AIRBYTE_METADATA_TABLE,
METADATA_KEY_COL,
METADATA_VAL_COL,
SECRET_MIGRATION_STATUS,
true,
METADATA_KEY_COL,
METADATA_VAL_COL,
true)));
setMetadata(SECRET_MIGRATION_STATUS, "true");
}

private final String SCHEDULER_MIGRATION_STATUS = "schedulerMigration";

@Override
public boolean isSchedulerMigrated() throws IOException {
final Result<Record> result = jobDatabase.query(ctx -> ctx.select()
.from(AIRBYTE_METADATA_TABLE)
.where(DSL.field(METADATA_KEY_COL).eq(SCHEDULER_MIGRATION_STATUS))
.fetch());

return result.stream().count() == 1;
return getMetadata(SCHEDULER_MIGRATION_STATUS).count() == 1;
}

@Override
public void setSchedulerMigrationDone() throws IOException {
jobDatabase.query(ctx -> ctx.execute(String.format(
"INSERT INTO %s(%s, %s) VALUES('%s', '%s') ON CONFLICT (%s) DO UPDATE SET %s = '%s'",
AIRBYTE_METADATA_TABLE,
METADATA_KEY_COL,
METADATA_VAL_COL,
SCHEDULER_MIGRATION_STATUS,
true,
METADATA_KEY_COL,
METADATA_VAL_COL,
true)));
setMetadata(SCHEDULER_MIGRATION_STATUS, "true");
}

@Override
public Optional<String> getVersion() throws IOException {
final Result<Record> result = jobDatabase.query(ctx -> ctx.select()
.from(AIRBYTE_METADATA_TABLE)
.where(DSL.field(METADATA_KEY_COL).eq(AirbyteVersion.AIRBYTE_VERSION_KEY_NAME))
.fetch());
return result.stream().findFirst().map(r -> r.getValue(METADATA_VAL_COL, String.class));
return getMetadata(AirbyteVersion.AIRBYTE_VERSION_KEY_NAME).findFirst();
}

@Override
public void setVersion(final String airbyteVersion) throws IOException {
// This is not using setMetadata due to the extra (<timestamp>s_init_db, airbyteVersion) that is
// added to the metadata table
jobDatabase.query(ctx -> ctx.execute(String.format(
"INSERT INTO %s(%s, %s) VALUES('%s', '%s'), ('%s_init_db', '%s') ON CONFLICT (%s) DO UPDATE SET %s = '%s'",
AIRBYTE_METADATA_TABLE,
Expand All @@ -776,6 +748,45 @@ public void setVersion(final String airbyteVersion) throws IOException {
METADATA_KEY_COL,
METADATA_VAL_COL,
airbyteVersion)));

}

@Override
public Optional<Version> getAirbyteProtocolVersionMax() throws IOException {
return getMetadata(AirbyteProtocolVersion.AIRBYTE_PROTOCOL_VERSION_MAX_KEY_NAME).findFirst().map(Version::new);
}

@Override
public void setAirbyteProtocolVersionMax(final Version version) throws IOException {
setMetadata(AirbyteProtocolVersion.AIRBYTE_PROTOCOL_VERSION_MAX_KEY_NAME, version.serialize());
}

@Override
public Optional<Version> getAirbyteProtocolVersionMin() throws IOException {
return getMetadata(AirbyteProtocolVersion.AIRBYTE_PROTOCOL_VERSION_MIN_KEY_NAME).findFirst().map(Version::new);
}

@Override
public void setAirbyteProtocolVersionMin(final Version version) throws IOException {
setMetadata(AirbyteProtocolVersion.AIRBYTE_PROTOCOL_VERSION_MIN_KEY_NAME, version.serialize());
}

private Stream<String> getMetadata(final String keyName) throws IOException {
return jobDatabase.query(ctx -> ctx.select()
.from(AIRBYTE_METADATA_TABLE)
.where(DSL.field(METADATA_KEY_COL).eq(keyName))
.fetch()).stream().map(r -> r.getValue(METADATA_VAL_COL, String.class));
}

private void setMetadata(final String keyName, final String value) throws IOException {
jobDatabase.query(ctx -> ctx
.insertInto(DSL.table(AIRBYTE_METADATA_TABLE))
.columns(DSL.field(METADATA_KEY_COL), DSL.field(METADATA_VAL_COL))
.values(keyName, value)
.onConflict(DSL.field(METADATA_KEY_COL))
.doUpdate()
.set(DSL.field(METADATA_VAL_COL), value)
.execute());
}

@Override
Expand Down

0 comments on commit e091e54

Please sign in to comment.