Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add persistence function for discovered schema #10326

Merged
merged 10 commits into from
Feb 17, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void testBootloaderAppBlankDb() throws Exception {
mockedConfigs.getConfigDatabaseUrl())
.getAndInitialize();
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName());
assertEquals("0.35.26.001", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.35.28.001", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(version, jobsPersistence.getVersion().get());
Expand Down Expand Up @@ -112,7 +112,7 @@ void testIsLegalUpgradePredicate() {

@Test
void testPostLoadExecutionExecutes() throws Exception {
var testTriggered = new AtomicBoolean();
final var testTriggered = new AtomicBoolean();

val container = new PostgreSQLContainer<>("postgres:13-alpine")
.withDatabaseName("public")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public enum ConfigSchema implements AirbyteConfig {

STANDARD_SYNC_SUMMARY("StandardSyncSummary.yaml", StandardSyncSummary.class),

ACTOR_CATALOG("ActorCatalog.yaml", ActorCatalog.class),
ACTOR_CATALOG_FETCH_EVENT("ActorCatalogFetchEvent.yaml", ActorCatalogFetchEvent.class),

// worker
STANDARD_SYNC_INPUT("StandardSyncInput.yaml", StandardSyncInput.class),
NORMALIZATION_INPUT("NormalizationInput.yaml", NormalizationInput.class),
Expand Down
20 changes: 20 additions & 0 deletions airbyte-config/models/src/main/resources/types/ActorCatalog.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/AttemptFailureSummary.yaml
title: ActorCatalog
description: Catalog of an actor.
type: object
additionalProperties: false
required:
- id
- catalog
- catalogHash
properties:
id:
type: string
format: uuid
catalog:
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
catalogHash:
type: string
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/AttemptFailureSummary.yaml
title: ActorCatalogFetchEvent
description: Link actor to their actual catalog
type: object
additionalProperties: false
required:
- id
- actorCatalogId
- actorId
- configHash
- connectorVersion
properties:
id:
type: string
format: uuid
actorId:
type: string
format: uuid
actorCatalogId:
type: string
format: uuid
configHash:
type: string
connectorVersion:
type: string
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
package io.airbyte.config.persistence;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Charsets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
Expand All @@ -25,6 +30,7 @@
import io.airbyte.config.persistence.split_secrets.SecretsHelpers;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.config.persistence.split_secrets.SplitSecretConfig;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
Expand Down Expand Up @@ -551,6 +557,69 @@ public void updateConnectionState(final UUID connectionId, final State state) th
}
}

public Optional<ActorCatalog> getSourceCatalog(final UUID sourceId,
final String configurationHash,
final String connectorVersion)
throws JsonValidationException, IOException {
for (final ActorCatalogFetchEvent event : listActorCatalogFetchEvents()) {
if (event.getConnectorVersion().equals(connectorVersion)
&& event.getConfigHash().equals(configurationHash)
&& event.getActorId().equals(sourceId)) {
return getCatalogById(event.getActorCatalogId());
}
}
return Optional.empty();
}

public List<ActorCatalogFetchEvent> listActorCatalogFetchEvents()
throws JsonValidationException, IOException {
final List<ActorCatalogFetchEvent> actorCatalogFetchEvents = new ArrayList<>();

for (final ActorCatalogFetchEvent event : persistence.listConfigs(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT,
ActorCatalogFetchEvent.class)) {
actorCatalogFetchEvents.add(event);
}
return actorCatalogFetchEvents;
}

public Optional<ActorCatalog> getCatalogById(final UUID catalogId)
throws IOException {
try {
return Optional.of(persistence.getConfig(ConfigSchema.ACTOR_CATALOG, catalogId.toString(),
ActorCatalog.class));
} catch (final ConfigNotFoundException e) {
return Optional.empty();
} catch (final JsonValidationException e) {
throw new IllegalStateException(e);
}
}

public void writeCatalog(final AirbyteCatalog catalog,
final UUID sourceId,
final String configurationHash,
final String connectorVersion)
throws JsonValidationException, IOException {
final HashFunction hashFunction = Hashing.murmur3_32_fixed();
final String configHash = hashFunction.hashBytes(Jsons.serialize(catalog).getBytes(
Charsets.UTF_8)).toString();
final ActorCatalog actorCatalog = new ActorCatalog()
.withCatalog(Jsons.jsonNode(catalog))
.withId(UUID.randomUUID())
.withCatalogHash(configHash);
persistence.writeConfig(ConfigSchema.ACTOR_CATALOG,
actorCatalog.getId().toString(),
actorCatalog);
final ActorCatalogFetchEvent actorCatalogFetchEvent = new ActorCatalogFetchEvent()
.withActorCatalogId(actorCatalog.getId())
.withId(UUID.randomUUID())
.withConfigHash(configurationHash)
.withConnectorVersion(connectorVersion)
.withActorId(sourceId);
persistence.writeConfig(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT,
actorCatalogFetchEvent.getId().toString(),
actorCatalogFetchEvent);
}

/**
* Converts between a dumpConfig() output and a replaceAllConfigs() input, by deserializing the
* string/jsonnode into the AirbyteConfig, Stream&lt;Object&lt;AirbyteConfig.getClassName()&gt;&gt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package io.airbyte.config.persistence;

import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG_FETCH_EVENT;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_OAUTH_PARAMETER;
import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION;
Expand All @@ -23,6 +25,8 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.ConfigWithMetadata;
Expand Down Expand Up @@ -114,6 +118,10 @@ public <T> T getConfig(final AirbyteConfig configType, final String configId, fi
return (T) getStandardSync(configId);
} else if (configType == ConfigSchema.STANDARD_SYNC_STATE) {
return (T) getStandardSyncState(configId);
} else if (configType == ConfigSchema.ACTOR_CATALOG) {
return (T) getActorCatalog(configId);
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
return (T) getActorCatalogFetchEvent(configId);
} else {
throw new IllegalArgumentException("Unknown Config Type " + configType);
}
Expand Down Expand Up @@ -181,6 +189,18 @@ private StandardSyncState getStandardSyncState(final String configId) throws IOE
return result.get(0).getConfig();
}

private ActorCatalog getActorCatalog(final String configId) throws IOException, ConfigNotFoundException {
final List<ConfigWithMetadata<ActorCatalog>> result = listActorCatalogWithMetadata(Optional.of(UUID.fromString(configId)));
validate(configId, result, ConfigSchema.ACTOR_CATALOG);
return result.get(0).getConfig();
}

private ActorCatalogFetchEvent getActorCatalogFetchEvent(final String configId) throws IOException, ConfigNotFoundException {
final List<ConfigWithMetadata<ActorCatalogFetchEvent>> result = listActorCatalogFetchEventWithMetadata(Optional.of(UUID.fromString(configId)));
validate(configId, result, ConfigSchema.ACTOR_CATALOG_FETCH_EVENT);
return result.get(0).getConfig();
}

private List<UUID> connectionOperationIds(final UUID connectionId) throws IOException {
final Result<Record> result = database.query(ctx -> ctx.select(asterisk())
.from(CONNECTION_OPERATION)
Expand Down Expand Up @@ -243,6 +263,10 @@ public <T> ConfigWithMetadata<T> getConfigWithMetadata(final AirbyteConfig confi
return (ConfigWithMetadata<T>) validateAndReturn(configId, listStandardSyncWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.STANDARD_SYNC_STATE) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listStandardSyncStateWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.ACTOR_CATALOG) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listActorCatalogWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listActorCatalogFetchEventWithMetadata(configIdOpt), configType);
} else {
throw new IllegalArgumentException("Unknown Config Type " + configType);
}
Expand Down Expand Up @@ -271,6 +295,10 @@ public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConf
listStandardSyncWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata<T>) c));
} else if (configType == ConfigSchema.STANDARD_SYNC_STATE) {
listStandardSyncStateWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata<T>) c));
} else if (configType == ConfigSchema.ACTOR_CATALOG) {
listActorCatalogWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata<T>) c));
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
listActorCatalogFetchEventWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata<T>) c));
} else {
throw new IllegalArgumentException("Unknown Config Type " + configType);
}
Expand Down Expand Up @@ -672,6 +700,72 @@ private StandardSyncState buildStandardSyncState(final Record record) {
.withState(Jsons.deserialize(record.get(STATE.STATE_).data(), State.class));
}

private List<ConfigWithMetadata<ActorCatalog>> listActorCatalogWithMetadata() throws IOException {
return listActorCatalogWithMetadata(Optional.empty());
}

private List<ConfigWithMetadata<ActorCatalog>> listActorCatalogWithMetadata(final Optional<UUID> configId) throws IOException {
final Result<Record> result = database.query(ctx -> {
final SelectJoinStep<Record> query = ctx.select(asterisk()).from(ACTOR_CATALOG);
if (configId.isPresent()) {
return query.where(ACTOR_CATALOG.ID.eq(configId.get())).fetch();
}
return query.fetch();
});
final List<ConfigWithMetadata<ActorCatalog>> actorCatalogs = new ArrayList<>();
for (final Record record : result) {
final ActorCatalog actorCatalog = buildActorCatalog(record);
actorCatalogs.add(new ConfigWithMetadata<>(
record.get(ACTOR_CATALOG.ID).toString(),
ConfigSchema.ACTOR_CATALOG.name(),
record.get(ACTOR_CATALOG.CREATED_AT).toInstant(),
record.get(ACTOR_CATALOG.MODIFIED_AT).toInstant(),
actorCatalog));
}
return actorCatalogs;
}

private ActorCatalog buildActorCatalog(final Record record) {
return new ActorCatalog()
.withId(record.get(ACTOR_CATALOG.ID))
.withCatalog(Jsons.deserialize(record.get(ACTOR_CATALOG.CATALOG).toString()))
.withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH));
}

private List<ConfigWithMetadata<ActorCatalogFetchEvent>> listActorCatalogFetchEventWithMetadata() throws IOException {
return listActorCatalogFetchEventWithMetadata(Optional.empty());
}

private List<ConfigWithMetadata<ActorCatalogFetchEvent>> listActorCatalogFetchEventWithMetadata(final Optional<UUID> configId) throws IOException {
final Result<Record> result = database.query(ctx -> {
final SelectJoinStep<Record> query = ctx.select(asterisk()).from(ACTOR_CATALOG_FETCH_EVENT);
if (configId.isPresent()) {
return query.where(ACTOR_CATALOG_FETCH_EVENT.ID.eq(configId.get())).fetch();
}
return query.fetch();
});
final List<ConfigWithMetadata<ActorCatalogFetchEvent>> actorCatalogFetchEvents = new ArrayList<>();
for (final Record record : result) {
final ActorCatalogFetchEvent actorCatalogFetchEvent = buildActorCatalogFetchEvent(record);
actorCatalogFetchEvents.add(new ConfigWithMetadata<>(
record.get(ACTOR_CATALOG_FETCH_EVENT.ID).toString(),
ConfigSchema.ACTOR_CATALOG_FETCH_EVENT.name(),
record.get(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT).toInstant(),
record.get(ACTOR_CATALOG_FETCH_EVENT.MODIFIED_AT).toInstant(),
actorCatalogFetchEvent));
}
return actorCatalogFetchEvents;
}

private ActorCatalogFetchEvent buildActorCatalogFetchEvent(final Record record) {
return new ActorCatalogFetchEvent()
.withId(record.get(ACTOR_CATALOG_FETCH_EVENT.ID))
.withActorCatalogId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID))
.withConfigHash(record.get(ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH))
.withConnectorVersion(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_VERSION))
.withActorId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID));
}

@Override
public <T> void writeConfig(final AirbyteConfig configType, final String configId, final T config) throws JsonValidationException, IOException {
if (configType == ConfigSchema.STANDARD_WORKSPACE) {
Expand All @@ -694,6 +788,10 @@ public <T> void writeConfig(final AirbyteConfig configType, final String configI
writeStandardSync(Collections.singletonList((StandardSync) config));
} else if (configType == ConfigSchema.STANDARD_SYNC_STATE) {
writeStandardSyncState(Collections.singletonList((StandardSyncState) config));
} else if (configType == ConfigSchema.ACTOR_CATALOG) {
writeActorCatalog(Collections.singletonList((ActorCatalog) config));
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
writeActorCatalogFetchEvent(Collections.singletonList((ActorCatalogFetchEvent) config));
} else {
throw new IllegalArgumentException("Unknown Config Type " + configType);
}
Expand Down Expand Up @@ -1200,6 +1298,60 @@ private void writeStandardSyncState(final List<StandardSyncState> configs, final
});
}

private void writeActorCatalog(final List<ActorCatalog> configs) throws IOException {
database.transaction(ctx -> {
writeActorCatalog(configs, ctx);
return null;
});
}

private void writeActorCatalog(final List<ActorCatalog> configs, final DSLContext ctx) {
final OffsetDateTime timestamp = OffsetDateTime.now();
configs.forEach((actorCatalog) -> {
final boolean isExistingConfig = ctx.fetchExists(select()
.from(ACTOR_CATALOG)
.where(ACTOR_CATALOG.ID.eq(actorCatalog.getId())));

if (isExistingConfig) {} else {
malikdiarra marked this conversation as resolved.
Show resolved Hide resolved
ctx.insertInto(ACTOR_CATALOG)
.set(ACTOR_CATALOG.ID, actorCatalog.getId())
.set(ACTOR_CATALOG.CATALOG, JSONB.valueOf(Jsons.serialize(actorCatalog.getCatalog())))
.set(ACTOR_CATALOG.CATALOG_HASH, actorCatalog.getCatalogHash())
.set(ACTOR_CATALOG.CREATED_AT, timestamp)
.set(ACTOR_CATALOG.MODIFIED_AT, timestamp)
.execute();
}
});
}

private void writeActorCatalogFetchEvent(final List<ActorCatalogFetchEvent> configs) throws IOException {
database.transaction(ctx -> {
writeActorCatalogFetchEvent(configs, ctx);
return null;
});
}

private void writeActorCatalogFetchEvent(final List<ActorCatalogFetchEvent> configs, final DSLContext ctx) {
final OffsetDateTime timestamp = OffsetDateTime.now();
configs.forEach((actorCatalogFetchEvent) -> {
final boolean isExistingConfig = ctx.fetchExists(select()
.from(ACTOR_CATALOG_FETCH_EVENT)
.where(ACTOR_CATALOG_FETCH_EVENT.ID.eq(actorCatalogFetchEvent.getId())));

if (isExistingConfig) {} else {
malikdiarra marked this conversation as resolved.
Show resolved Hide resolved
ctx.insertInto(ACTOR_CATALOG_FETCH_EVENT)
.set(ACTOR_CATALOG_FETCH_EVENT.ID, actorCatalogFetchEvent.getId())
.set(ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH, actorCatalogFetchEvent.getConfigHash())
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID, actorCatalogFetchEvent.getActorCatalogId())
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID, actorCatalogFetchEvent.getActorId())
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_VERSION, actorCatalogFetchEvent.getConnectorVersion())
.set(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, timestamp)
.set(ACTOR_CATALOG_FETCH_EVENT.MODIFIED_AT, timestamp)
.execute();
}
});
}

@Override
public <T> void writeConfigs(final AirbyteConfig configType, final Map<String, T> configs) throws IOException, JsonValidationException {
if (configType == ConfigSchema.STANDARD_WORKSPACE) {
Expand All @@ -1222,6 +1374,10 @@ public <T> void writeConfigs(final AirbyteConfig configType, final Map<String, T
writeStandardSync(configs.values().stream().map(c -> (StandardSync) c).collect(Collectors.toList()));
} else if (configType == ConfigSchema.STANDARD_SYNC_STATE) {
writeStandardSyncState(configs.values().stream().map(c -> (StandardSyncState) c).collect(Collectors.toList()));
} else if (configType == ConfigSchema.ACTOR_CATALOG) {
writeActorCatalog(configs.values().stream().map(c -> (ActorCatalog) c).collect(Collectors.toList()));
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
writeActorCatalogFetchEvent(configs.values().stream().map(c -> (ActorCatalogFetchEvent) c).collect(Collectors.toList()));
} else {
throw new IllegalArgumentException("Unknown Config Type " + configType);
}
Expand Down
Loading