Skip to content

Commit

Permalink
Add persistence function for discovered schema (#10326)
Browse files Browse the repository at this point in the history
- Add functions to persist/edit/delete ACTOR_CATALOG and ACTOR_CATALOG_FETCH_EVENT in ConfigPersistence
- Add high level operation in ConfigRepository
  • Loading branch information
malikdiarra committed Feb 17, 2022
1 parent a56c4fb commit fe1eb8d
Show file tree
Hide file tree
Showing 11 changed files with 609 additions and 3 deletions.
Expand Up @@ -80,7 +80,7 @@ void testBootloaderAppBlankDb() throws Exception {
mockedConfigs.getConfigDatabaseUrl())
.getAndInitialize();
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName());
assertEquals("0.35.26.001", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.35.28.001", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(version, jobsPersistence.getVersion().get());
Expand Down Expand Up @@ -112,7 +112,7 @@ void testIsLegalUpgradePredicate() {

@Test
void testPostLoadExecutionExecutes() throws Exception {
var testTriggered = new AtomicBoolean();
final var testTriggered = new AtomicBoolean();

val container = new PostgreSQLContainer<>("postgres:13-alpine")
.withDatabaseName("public")
Expand Down
Expand Up @@ -60,6 +60,9 @@ public enum ConfigSchema implements AirbyteConfig {

STANDARD_SYNC_SUMMARY("StandardSyncSummary.yaml", StandardSyncSummary.class),

ACTOR_CATALOG("ActorCatalog.yaml", ActorCatalog.class),
ACTOR_CATALOG_FETCH_EVENT("ActorCatalogFetchEvent.yaml", ActorCatalogFetchEvent.class),

// worker
STANDARD_SYNC_INPUT("StandardSyncInput.yaml", StandardSyncInput.class),
NORMALIZATION_INPUT("NormalizationInput.yaml", NormalizationInput.class),
Expand Down
20 changes: 20 additions & 0 deletions airbyte-config/models/src/main/resources/types/ActorCatalog.yaml
@@ -0,0 +1,20 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/AttemptFailureSummary.yaml
title: ActorCatalog
description: Catalog of an actor.
type: object
additionalProperties: false
required:
- id
- catalog
- catalogHash
properties:
id:
type: string
format: uuid
catalog:
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
catalogHash:
type: string
@@ -0,0 +1,27 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/AttemptFailureSummary.yaml
title: ActorCatalogFetchEvent
description: Link actor to their actual catalog
type: object
additionalProperties: false
required:
- id
- actorCatalogId
- actorId
- configHash
- connectorVersion
properties:
id:
type: string
format: uuid
actorId:
type: string
format: uuid
actorCatalogId:
type: string
format: uuid
configHash:
type: string
connectorVersion:
type: string
Expand Up @@ -5,9 +5,14 @@
package io.airbyte.config.persistence;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Charsets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
Expand All @@ -25,6 +30,7 @@
import io.airbyte.config.persistence.split_secrets.SecretsHelpers;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.config.persistence.split_secrets.SplitSecretConfig;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
Expand Down Expand Up @@ -551,6 +557,95 @@ public void updateConnectionState(final UUID connectionId, final State state) th
}
}

public Optional<ActorCatalog> getSourceCatalog(final UUID sourceId,
final String configurationHash,
final String connectorVersion)
throws JsonValidationException, IOException {
for (final ActorCatalogFetchEvent event : listActorCatalogFetchEvents()) {
if (event.getConnectorVersion().equals(connectorVersion)
&& event.getConfigHash().equals(configurationHash)
&& event.getActorId().equals(sourceId)) {
return getCatalogById(event.getActorCatalogId());
}
}
return Optional.empty();
}

public List<ActorCatalogFetchEvent> listActorCatalogFetchEvents()
throws JsonValidationException, IOException {
final List<ActorCatalogFetchEvent> actorCatalogFetchEvents = new ArrayList<>();

for (final ActorCatalogFetchEvent event : persistence.listConfigs(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT,
ActorCatalogFetchEvent.class)) {
actorCatalogFetchEvents.add(event);
}
return actorCatalogFetchEvents;
}

public Optional<ActorCatalog> getCatalogById(final UUID catalogId)
throws IOException {
try {
return Optional.of(persistence.getConfig(ConfigSchema.ACTOR_CATALOG, catalogId.toString(),
ActorCatalog.class));
} catch (final ConfigNotFoundException e) {
return Optional.empty();
} catch (final JsonValidationException e) {
throw new IllegalStateException(e);
}
}

public Optional<ActorCatalog> findExistingCatalog(final ActorCatalog actorCatalog)
throws JsonValidationException, IOException {
for (final ActorCatalog fetchedCatalog : listActorCatalogs()) {
if (actorCatalog.getCatalogHash().equals(fetchedCatalog.getCatalogHash())) {
return Optional.of(fetchedCatalog);
}
}
return Optional.empty();
}

public List<ActorCatalog> listActorCatalogs()
throws JsonValidationException, IOException {
final List<ActorCatalog> actorCatalogs = new ArrayList<>();

for (final ActorCatalog event : persistence.listConfigs(ConfigSchema.ACTOR_CATALOG,
ActorCatalog.class)) {
actorCatalogs.add(event);
}
return actorCatalogs;
}

public void writeCatalog(final AirbyteCatalog catalog,
final UUID sourceId,
final String configurationHash,
final String connectorVersion)
throws JsonValidationException, IOException {
final HashFunction hashFunction = Hashing.murmur3_32_fixed();
final String catalogHash = hashFunction.hashBytes(Jsons.serialize(catalog).getBytes(
Charsets.UTF_8)).toString();
ActorCatalog actorCatalog = new ActorCatalog()
.withCatalog(Jsons.jsonNode(catalog))
.withId(UUID.randomUUID())
.withCatalogHash(catalogHash);
final Optional<ActorCatalog> existingCatalog = findExistingCatalog(actorCatalog);
if (existingCatalog.isPresent()) {
actorCatalog = existingCatalog.get();
} else {
persistence.writeConfig(ConfigSchema.ACTOR_CATALOG,
actorCatalog.getId().toString(),
actorCatalog);
}
final ActorCatalogFetchEvent actorCatalogFetchEvent = new ActorCatalogFetchEvent()
.withActorCatalogId(actorCatalog.getId())
.withId(UUID.randomUUID())
.withConfigHash(configurationHash)
.withConnectorVersion(connectorVersion)
.withActorId(sourceId);
persistence.writeConfig(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT,
actorCatalogFetchEvent.getId().toString(),
actorCatalogFetchEvent);
}

/**
* Converts between a dumpConfig() output and a replaceAllConfigs() input, by deserializing the
* string/jsonnode into the AirbyteConfig, Stream&lt;Object&lt;AirbyteConfig.getClassName()&gt;&gt;
Expand Down

0 comments on commit fe1eb8d

Please sign in to comment.