Skip to content

Commit

Permalink
馃悶 Fix db config persistence unique constraint conflict (#5846)
Browse files Browse the repository at this point in the history
* Resolve merge conflict when there are duplicated connectors

* Add warning message

* Prevent insertion conflict

* Format code

* Log duplicated docker repo name

* Do nothing for duplicated insertion

* Log unexpected insertion & update count

* Use internal method

* Format code

* Fix unit test
  • Loading branch information
tuliren committed Sep 4, 2021
1 parent d03ec50 commit 6e35dc5
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.ConfigSchemaMigrationSupport;
Expand All @@ -50,6 +51,7 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.JSONB;
Expand Down Expand Up @@ -121,8 +123,6 @@ public <T> List<T> listConfigs(AirbyteConfig configType, Class<T> clazz) throws

@Override
public <T> void writeConfig(AirbyteConfig configType, String configId, T config) throws IOException {
LOGGER.info("Upserting {} record {}", configType, configId);

database.transaction(ctx -> {
boolean isExistingConfig = ctx.fetchExists(select()
.from(AIRBYTE_CONFIGS)
Expand All @@ -131,27 +131,9 @@ public <T> void writeConfig(AirbyteConfig configType, String configId, T config)
OffsetDateTime timestamp = OffsetDateTime.now();

if (isExistingConfig) {
int updateCount = ctx.update(AIRBYTE_CONFIGS)
.set(AIRBYTE_CONFIGS.CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(config)))
.set(AIRBYTE_CONFIGS.UPDATED_AT, timestamp)
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType.name()), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId))
.execute();
if (updateCount != 0 && updateCount != 1) {
LOGGER.warn("{} config {} has been updated; updated record count: {}", configType, configId, updateCount);
}

return null;
}

int insertionCount = ctx.insertInto(AIRBYTE_CONFIGS)
.set(AIRBYTE_CONFIGS.CONFIG_ID, configId)
.set(AIRBYTE_CONFIGS.CONFIG_TYPE, configType.name())
.set(AIRBYTE_CONFIGS.CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(config)))
.set(AIRBYTE_CONFIGS.CREATED_AT, timestamp)
.set(AIRBYTE_CONFIGS.UPDATED_AT, timestamp)
.execute();
if (insertionCount != 1) {
LOGGER.warn("{} config {} has been inserted; insertion record count: {}", configType, configId, insertionCount);
updateConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config), configId);
} else {
insertConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config), configType.getIdFieldName());
}

return null;
Expand Down Expand Up @@ -215,20 +197,25 @@ public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
* @return the number of inserted records for convenience, which is always 1.
*/
@VisibleForTesting
int insertConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configType, JsonNode configJson, String idFieldName) {
int insertConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configType, JsonNode configJson, @Nullable String idFieldName) {
String configId = idFieldName == null
? UUID.randomUUID().toString()
: configJson.get(idFieldName).asText();
LOGGER.info("Inserting {} record {}", configType, configId);

ctx.insertInto(AIRBYTE_CONFIGS)
int insertionCount = ctx.insertInto(AIRBYTE_CONFIGS)
.set(AIRBYTE_CONFIGS.CONFIG_ID, configId)
.set(AIRBYTE_CONFIGS.CONFIG_TYPE, configType)
.set(AIRBYTE_CONFIGS.CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(configJson)))
.set(AIRBYTE_CONFIGS.CREATED_AT, timestamp)
.set(AIRBYTE_CONFIGS.UPDATED_AT, timestamp)
.onConflict(AIRBYTE_CONFIGS.CONFIG_TYPE, AIRBYTE_CONFIGS.CONFIG_ID)
.doNothing()
.execute();
return 1;
if (insertionCount != 1) {
LOGGER.warn("{} config {} already exists (insertion record count: {})", configType, configId, insertionCount);
}
return insertionCount;
}

/**
Expand All @@ -238,11 +225,15 @@ int insertConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configTy
int updateConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configType, JsonNode configJson, String configId) {
LOGGER.info("Updating {} record {}", configType, configId);

return ctx.update(AIRBYTE_CONFIGS)
int updateCount = ctx.update(AIRBYTE_CONFIGS)
.set(AIRBYTE_CONFIGS.CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(configJson)))
.set(AIRBYTE_CONFIGS.UPDATED_AT, timestamp)
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId))
.execute();
if (updateCount != 1) {
LOGGER.warn("{} config {} is not updated (updated record count: {})", configType, configId, updateCount);
}
return updateCount;
}

@VisibleForTesting
Expand All @@ -268,12 +259,14 @@ void copyConfigsFromSeed(DSLContext ctx, ConfigPersistence seedConfigPersistence
LOGGER.info("Config database data loading completed with {} records", insertionCount);
}

private static class ConnectorInfo {
static class ConnectorInfo {

private final String connectorDefinitionId;
private final String dockerImageTag;
final String dockerRepository;
final String connectorDefinitionId;
final String dockerImageTag;

private ConnectorInfo(String connectorDefinitionId, String dockerImageTag) {
private ConnectorInfo(String dockerRepository, String connectorDefinitionId, String dockerImageTag) {
this.dockerRepository = dockerRepository;
this.connectorDefinitionId = connectorDefinitionId;
this.dockerImageTag = dockerImageTag;
}
Expand Down Expand Up @@ -334,7 +327,8 @@ private <T> ConnectorCounter updateConnectorDefinitions(DSLContext ctx,
AirbyteConfig configType,
List<T> latestDefinitions,
Set<String> connectorRepositoriesInUse,
Map<String, ConnectorInfo> connectorRepositoryToIdVersionMap) {
Map<String, ConnectorInfo> connectorRepositoryToIdVersionMap)
throws IOException {
int newCount = 0;
int updatedCount = 0;
for (T latestDefinition : latestDefinitions) {
Expand Down Expand Up @@ -364,7 +358,8 @@ private <T> ConnectorCounter updateConnectorDefinitions(DSLContext ctx,
* repository name instead of definition id because connectors can be added manually by
* users, and are not always the same as those in the seed.
*/
private Map<String, ConnectorInfo> getConnectorRepositoryToInfoMap(DSLContext ctx) {
@VisibleForTesting
Map<String, ConnectorInfo> getConnectorRepositoryToInfoMap(DSLContext ctx) {
Field<String> repoField = field("config_blob ->> 'dockerRepository'", SQLDataType.VARCHAR).as("repository");
Field<String> versionField = field("config_blob ->> 'dockerImageTag'", SQLDataType.VARCHAR).as("version");
return ctx.select(AIRBYTE_CONFIGS.CONFIG_ID, repoField, versionField)
Expand All @@ -373,7 +368,20 @@ private Map<String, ConnectorInfo> getConnectorRepositoryToInfoMap(DSLContext ct
.fetch().stream()
.collect(Collectors.toMap(
row -> row.getValue(repoField),
row -> new ConnectorInfo(row.getValue(AIRBYTE_CONFIGS.CONFIG_ID), row.getValue(versionField))));
row -> new ConnectorInfo(row.getValue(repoField), row.getValue(AIRBYTE_CONFIGS.CONFIG_ID), row.getValue(versionField)),
// when there are duplicated connector definitions, return the latest one
(c1, c2) -> {
AirbyteVersion v1 = new AirbyteVersion(c1.dockerImageTag);
AirbyteVersion v2 = new AirbyteVersion(c2.dockerImageTag);
LOGGER.warn("Duplicated connector version found for {}: {} ({}) vs {} ({})",
c1.dockerRepository, c1.dockerImageTag, c1.connectorDefinitionId, c2.dockerImageTag, c2.connectorDefinitionId);
int comparison = v1.patchVersionCompareTo(v2);
if (comparison >= 0) {
return c1;
} else {
return c2;
}
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,10 @@ public void testNoUpdateForUsedConnector() throws Exception {

// create a sync to mark the destination as used
StandardSync s3Sync = new StandardSync()
.withConnectionId(UUID.randomUUID())
.withSourceId(SOURCE_GITHUB.getSourceDefinitionId())
.withDestinationId(destinationS3V2.getDestinationDefinitionId());
configPersistence.writeConfig(ConfigSchema.STANDARD_SYNC, UUID.randomUUID().toString(), s3Sync);
configPersistence.writeConfig(ConfigSchema.STANDARD_SYNC, s3Sync.getConnectionId().toString(), s3Sync);

configPersistence.loadData(seedPersistence);
// s3 destination is not updated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.DatabaseConfigPersistence.ConnectorInfo;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -120,4 +124,98 @@ public void testDumpConfigs() throws Exception {
assertSameConfigDump(expected, actual);
}

@Test
public void testGetConnectorRepositoryToInfoMap() throws Exception {
String connectorRepository = "airbyte/duplicated-connector";
String oldVersion = "0.1.10";
String newVersion = "0.2.0";
StandardSourceDefinition source1 = new StandardSourceDefinition()
.withSourceDefinitionId(UUID.randomUUID())
.withDockerRepository(connectorRepository)
.withDockerImageTag(oldVersion);
StandardSourceDefinition source2 = new StandardSourceDefinition()
.withSourceDefinitionId(UUID.randomUUID())
.withDockerRepository(connectorRepository)
.withDockerImageTag(newVersion);
writeSource(configPersistence, source1);
writeSource(configPersistence, source2);
Map<String, ConnectorInfo> result = database.query(ctx -> configPersistence.getConnectorRepositoryToInfoMap(ctx));
// when there are duplicated connector definitions, the one with the latest version should be
// retrieved
assertEquals(newVersion, result.get(connectorRepository).dockerImageTag);
}

@Test
public void testInsertConfigRecord() throws Exception {
OffsetDateTime timestamp = OffsetDateTime.now();
UUID definitionId = UUID.randomUUID();
String connectorRepository = "airbyte/test-connector";

// when the record does not exist, it is inserted
StandardSourceDefinition source1 = new StandardSourceDefinition()
.withSourceDefinitionId(definitionId)
.withDockerRepository(connectorRepository)
.withDockerImageTag("0.1.2");
int insertionCount = database.query(ctx -> configPersistence.insertConfigRecord(
ctx,
timestamp,
ConfigSchema.STANDARD_SOURCE_DEFINITION.name(),
Jsons.jsonNode(source1),
ConfigSchema.STANDARD_SOURCE_DEFINITION.getIdFieldName()));
assertEquals(1, insertionCount);
// write an irrelevant source to make sure that it is not changed
writeSource(configPersistence, SOURCE_GITHUB);
assertRecordCount(2);
assertHasSource(source1);
assertHasSource(SOURCE_GITHUB);

// when the record already exists, it is ignored
StandardSourceDefinition source2 = new StandardSourceDefinition()
.withSourceDefinitionId(definitionId)
.withDockerRepository(connectorRepository)
.withDockerImageTag("0.1.5");
insertionCount = database.query(ctx -> configPersistence.insertConfigRecord(
ctx,
timestamp,
ConfigSchema.STANDARD_SOURCE_DEFINITION.name(),
Jsons.jsonNode(source2),
ConfigSchema.STANDARD_SOURCE_DEFINITION.getIdFieldName()));
assertEquals(0, insertionCount);
assertRecordCount(2);
assertHasSource(source1);
assertHasSource(SOURCE_GITHUB);
}

@Test
public void testUpdateConfigRecord() throws Exception {
OffsetDateTime timestamp = OffsetDateTime.now();
UUID definitionId = UUID.randomUUID();
String connectorRepository = "airbyte/test-connector";

StandardSourceDefinition oldSource = new StandardSourceDefinition()
.withSourceDefinitionId(definitionId)
.withDockerRepository(connectorRepository)
.withDockerImageTag("0.3.5");
writeSource(configPersistence, oldSource);
// write an irrelevant source to make sure that it is not changed
writeSource(configPersistence, SOURCE_GITHUB);
assertRecordCount(2);
assertHasSource(oldSource);
assertHasSource(SOURCE_GITHUB);

StandardSourceDefinition newSource = new StandardSourceDefinition()
.withSourceDefinitionId(definitionId)
.withDockerRepository(connectorRepository)
.withDockerImageTag("0.3.5");
database.query(ctx -> configPersistence.updateConfigRecord(
ctx,
timestamp,
ConfigSchema.STANDARD_SOURCE_DEFINITION.name(),
Jsons.jsonNode(newSource),
definitionId.toString()));
assertRecordCount(2);
assertHasSource(newSource);
assertHasSource(SOURCE_GITHUB);
}

}

0 comments on commit 6e35dc5

Please sign in to comment.