diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java index 8045a5f6dd0da..1234256f023f4 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java @@ -80,7 +80,7 @@ void testBootloaderAppBlankDb() throws Exception { mockedConfigs.getConfigDatabaseUrl()) .getAndInitialize(); val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName()); - assertEquals("0.35.26.001", configsMigrator.getLatestMigration().getVersion().getVersion()); + assertEquals("0.35.28.001", configsMigrator.getLatestMigration().getVersion().getVersion()); val jobsPersistence = new DefaultJobPersistence(jobDatabase); assertEquals(version, jobsPersistence.getVersion().get()); @@ -112,7 +112,7 @@ void testIsLegalUpgradePredicate() { @Test void testPostLoadExecutionExecutes() throws Exception { - var testTriggered = new AtomicBoolean(); + final var testTriggered = new AtomicBoolean(); val container = new PostgreSQLContainer<>("postgres:13-alpine") .withDatabaseName("public") diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/ConfigSchema.java b/airbyte-config/models/src/main/java/io/airbyte/config/ConfigSchema.java index 2a11f656ebe89..4d2296a0cbde6 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/ConfigSchema.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/ConfigSchema.java @@ -60,6 +60,9 @@ public enum ConfigSchema implements AirbyteConfig { STANDARD_SYNC_SUMMARY("StandardSyncSummary.yaml", StandardSyncSummary.class), + ACTOR_CATALOG("ActorCatalog.yaml", ActorCatalog.class), + ACTOR_CATALOG_FETCH_EVENT("ActorCatalogFetchEvent.yaml", ActorCatalogFetchEvent.class), + // worker STANDARD_SYNC_INPUT("StandardSyncInput.yaml", StandardSyncInput.class), NORMALIZATION_INPUT("NormalizationInput.yaml", NormalizationInput.class), diff --git a/airbyte-config/models/src/main/resources/types/ActorCatalog.yaml b/airbyte-config/models/src/main/resources/types/ActorCatalog.yaml new file mode 100644 index 0000000000000..a925ee891cdf6 --- /dev/null +++ b/airbyte-config/models/src/main/resources/types/ActorCatalog.yaml @@ -0,0 +1,20 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/AttemptFailureSummary.yaml +title: ActorCatalog +description: Catalog of an actor. +type: object +additionalProperties: false +required: + - id + - catalog + - catalogHash +properties: + id: + type: string + format: uuid + catalog: + type: object + existingJavaType: com.fasterxml.jackson.databind.JsonNode + catalogHash: + type: string diff --git a/airbyte-config/models/src/main/resources/types/ActorCatalogFetchEvent.yaml b/airbyte-config/models/src/main/resources/types/ActorCatalogFetchEvent.yaml new file mode 100644 index 0000000000000..bd2b5206c62b4 --- /dev/null +++ b/airbyte-config/models/src/main/resources/types/ActorCatalogFetchEvent.yaml @@ -0,0 +1,27 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/AttemptFailureSummary.yaml +title: ActorCatalogFetchEvent +description: Link actor to their actual catalog +type: object +additionalProperties: false +required: + - id + - actorCatalogId + - actorId + - configHash + - connectorVersion +properties: + id: + type: string + format: uuid + actorId: + type: string + format: uuid + actorCatalogId: + type: string + format: uuid + configHash: + type: string + connectorVersion: + type: string diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index 0517291930579..e8cbe0417bc95 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -5,9 +5,14 @@ package io.airbyte.config.persistence; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Charsets; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.lang.MoreBooleans; +import io.airbyte.config.ActorCatalog; +import io.airbyte.config.ActorCatalogFetchEvent; import io.airbyte.config.AirbyteConfig; import io.airbyte.config.ConfigSchema; import io.airbyte.config.DestinationConnection; @@ -25,6 +30,7 @@ import io.airbyte.config.persistence.split_secrets.SecretsHelpers; import io.airbyte.config.persistence.split_secrets.SecretsHydrator; import io.airbyte.config.persistence.split_secrets.SplitSecretConfig; +import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; @@ -551,6 +557,95 @@ public void updateConnectionState(final UUID connectionId, final State state) th } } + public Optional getSourceCatalog(final UUID sourceId, + final String configurationHash, + final String connectorVersion) + throws JsonValidationException, IOException { + for (final ActorCatalogFetchEvent event : listActorCatalogFetchEvents()) { + if (event.getConnectorVersion().equals(connectorVersion) + && event.getConfigHash().equals(configurationHash) + && event.getActorId().equals(sourceId)) { + return getCatalogById(event.getActorCatalogId()); + } + } + return Optional.empty(); + } + + public List listActorCatalogFetchEvents() + throws JsonValidationException, IOException { + final List actorCatalogFetchEvents = new ArrayList<>(); + + for (final ActorCatalogFetchEvent event : persistence.listConfigs(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT, + ActorCatalogFetchEvent.class)) { + actorCatalogFetchEvents.add(event); + } + return actorCatalogFetchEvents; + } + + public Optional getCatalogById(final UUID catalogId) + throws IOException { + try { + return Optional.of(persistence.getConfig(ConfigSchema.ACTOR_CATALOG, catalogId.toString(), + ActorCatalog.class)); + } catch (final ConfigNotFoundException e) { + return Optional.empty(); + } catch (final JsonValidationException e) { + throw new IllegalStateException(e); + } + } + + public Optional findExistingCatalog(final ActorCatalog actorCatalog) + throws JsonValidationException, IOException { + for (final ActorCatalog fetchedCatalog : listActorCatalogs()) { + if (actorCatalog.getCatalogHash().equals(fetchedCatalog.getCatalogHash())) { + return Optional.of(fetchedCatalog); + } + } + return Optional.empty(); + } + + public List listActorCatalogs() + throws JsonValidationException, IOException { + final List actorCatalogs = new ArrayList<>(); + + for (final ActorCatalog event : persistence.listConfigs(ConfigSchema.ACTOR_CATALOG, + ActorCatalog.class)) { + actorCatalogs.add(event); + } + return actorCatalogs; + } + + public void writeCatalog(final AirbyteCatalog catalog, + final UUID sourceId, + final String configurationHash, + final String connectorVersion) + throws JsonValidationException, IOException { + final HashFunction hashFunction = Hashing.murmur3_32_fixed(); + final String catalogHash = hashFunction.hashBytes(Jsons.serialize(catalog).getBytes( + Charsets.UTF_8)).toString(); + ActorCatalog actorCatalog = new ActorCatalog() + .withCatalog(Jsons.jsonNode(catalog)) + .withId(UUID.randomUUID()) + .withCatalogHash(catalogHash); + final Optional existingCatalog = findExistingCatalog(actorCatalog); + if (existingCatalog.isPresent()) { + actorCatalog = existingCatalog.get(); + } else { + persistence.writeConfig(ConfigSchema.ACTOR_CATALOG, + actorCatalog.getId().toString(), + actorCatalog); + } + final ActorCatalogFetchEvent actorCatalogFetchEvent = new ActorCatalogFetchEvent() + .withActorCatalogId(actorCatalog.getId()) + .withId(UUID.randomUUID()) + .withConfigHash(configurationHash) + .withConnectorVersion(connectorVersion) + .withActorId(sourceId); + persistence.writeConfig(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT, + actorCatalogFetchEvent.getId().toString(), + actorCatalogFetchEvent); + } + /** * Converts between a dumpConfig() output and a replaceAllConfigs() input, by deserializing the * string/jsonnode into the AirbyteConfig, Stream<Object<AirbyteConfig.getClassName()>> diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java index b761358eae482..f761909bf9c88 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java @@ -5,6 +5,8 @@ package io.airbyte.config.persistence; import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR; +import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG; +import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG_FETCH_EVENT; import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION; import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_OAUTH_PARAMETER; import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION; @@ -23,6 +25,8 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.MoreIterators; import io.airbyte.commons.version.AirbyteVersion; +import io.airbyte.config.ActorCatalog; +import io.airbyte.config.ActorCatalogFetchEvent; import io.airbyte.config.AirbyteConfig; import io.airbyte.config.ConfigSchema; import io.airbyte.config.ConfigWithMetadata; @@ -114,6 +118,10 @@ public T getConfig(final AirbyteConfig configType, final String configId, fi return (T) getStandardSync(configId); } else if (configType == ConfigSchema.STANDARD_SYNC_STATE) { return (T) getStandardSyncState(configId); + } else if (configType == ConfigSchema.ACTOR_CATALOG) { + return (T) getActorCatalog(configId); + } else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) { + return (T) getActorCatalogFetchEvent(configId); } else { throw new IllegalArgumentException("Unknown Config Type " + configType); } @@ -181,6 +189,18 @@ private StandardSyncState getStandardSyncState(final String configId) throws IOE return result.get(0).getConfig(); } + private ActorCatalog getActorCatalog(final String configId) throws IOException, ConfigNotFoundException { + final List> result = listActorCatalogWithMetadata(Optional.of(UUID.fromString(configId))); + validate(configId, result, ConfigSchema.ACTOR_CATALOG); + return result.get(0).getConfig(); + } + + private ActorCatalogFetchEvent getActorCatalogFetchEvent(final String configId) throws IOException, ConfigNotFoundException { + final List> result = listActorCatalogFetchEventWithMetadata(Optional.of(UUID.fromString(configId))); + validate(configId, result, ConfigSchema.ACTOR_CATALOG_FETCH_EVENT); + return result.get(0).getConfig(); + } + private List connectionOperationIds(final UUID connectionId) throws IOException { final Result result = database.query(ctx -> ctx.select(asterisk()) .from(CONNECTION_OPERATION) @@ -243,6 +263,10 @@ public ConfigWithMetadata getConfigWithMetadata(final AirbyteConfig confi return (ConfigWithMetadata) validateAndReturn(configId, listStandardSyncWithMetadata(configIdOpt), configType); } else if (configType == ConfigSchema.STANDARD_SYNC_STATE) { return (ConfigWithMetadata) validateAndReturn(configId, listStandardSyncStateWithMetadata(configIdOpt), configType); + } else if (configType == ConfigSchema.ACTOR_CATALOG) { + return (ConfigWithMetadata) validateAndReturn(configId, listActorCatalogWithMetadata(configIdOpt), configType); + } else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) { + return (ConfigWithMetadata) validateAndReturn(configId, listActorCatalogFetchEventWithMetadata(configIdOpt), configType); } else { throw new IllegalArgumentException("Unknown Config Type " + configType); } @@ -271,6 +295,10 @@ public List> listConfigsWithMetadata(final AirbyteConf listStandardSyncWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata) c)); } else if (configType == ConfigSchema.STANDARD_SYNC_STATE) { listStandardSyncStateWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata) c)); + } else if (configType == ConfigSchema.ACTOR_CATALOG) { + listActorCatalogWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata) c)); + } else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) { + listActorCatalogFetchEventWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata) c)); } else { throw new IllegalArgumentException("Unknown Config Type " + configType); } @@ -672,6 +700,72 @@ private StandardSyncState buildStandardSyncState(final Record record) { .withState(Jsons.deserialize(record.get(STATE.STATE_).data(), State.class)); } + private List> listActorCatalogWithMetadata() throws IOException { + return listActorCatalogWithMetadata(Optional.empty()); + } + + private List> listActorCatalogWithMetadata(final Optional configId) throws IOException { + final Result result = database.query(ctx -> { + final SelectJoinStep query = ctx.select(asterisk()).from(ACTOR_CATALOG); + if (configId.isPresent()) { + return query.where(ACTOR_CATALOG.ID.eq(configId.get())).fetch(); + } + return query.fetch(); + }); + final List> actorCatalogs = new ArrayList<>(); + for (final Record record : result) { + final ActorCatalog actorCatalog = buildActorCatalog(record); + actorCatalogs.add(new ConfigWithMetadata<>( + record.get(ACTOR_CATALOG.ID).toString(), + ConfigSchema.ACTOR_CATALOG.name(), + record.get(ACTOR_CATALOG.CREATED_AT).toInstant(), + record.get(ACTOR_CATALOG.MODIFIED_AT).toInstant(), + actorCatalog)); + } + return actorCatalogs; + } + + private ActorCatalog buildActorCatalog(final Record record) { + return new ActorCatalog() + .withId(record.get(ACTOR_CATALOG.ID)) + .withCatalog(Jsons.deserialize(record.get(ACTOR_CATALOG.CATALOG).toString())) + .withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH)); + } + + private List> listActorCatalogFetchEventWithMetadata() throws IOException { + return listActorCatalogFetchEventWithMetadata(Optional.empty()); + } + + private List> listActorCatalogFetchEventWithMetadata(final Optional configId) throws IOException { + final Result result = database.query(ctx -> { + final SelectJoinStep query = ctx.select(asterisk()).from(ACTOR_CATALOG_FETCH_EVENT); + if (configId.isPresent()) { + return query.where(ACTOR_CATALOG_FETCH_EVENT.ID.eq(configId.get())).fetch(); + } + return query.fetch(); + }); + final List> actorCatalogFetchEvents = new ArrayList<>(); + for (final Record record : result) { + final ActorCatalogFetchEvent actorCatalogFetchEvent = buildActorCatalogFetchEvent(record); + actorCatalogFetchEvents.add(new ConfigWithMetadata<>( + record.get(ACTOR_CATALOG_FETCH_EVENT.ID).toString(), + ConfigSchema.ACTOR_CATALOG_FETCH_EVENT.name(), + record.get(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT).toInstant(), + record.get(ACTOR_CATALOG_FETCH_EVENT.MODIFIED_AT).toInstant(), + actorCatalogFetchEvent)); + } + return actorCatalogFetchEvents; + } + + private ActorCatalogFetchEvent buildActorCatalogFetchEvent(final Record record) { + return new ActorCatalogFetchEvent() + .withId(record.get(ACTOR_CATALOG_FETCH_EVENT.ID)) + .withActorCatalogId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID)) + .withConfigHash(record.get(ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH)) + .withConnectorVersion(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_VERSION)) + .withActorId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID)); + } + @Override public void writeConfig(final AirbyteConfig configType, final String configId, final T config) throws JsonValidationException, IOException { if (configType == ConfigSchema.STANDARD_WORKSPACE) { @@ -694,6 +788,10 @@ public void writeConfig(final AirbyteConfig configType, final String configI writeStandardSync(Collections.singletonList((StandardSync) config)); } else if (configType == ConfigSchema.STANDARD_SYNC_STATE) { writeStandardSyncState(Collections.singletonList((StandardSyncState) config)); + } else if (configType == ConfigSchema.ACTOR_CATALOG) { + writeActorCatalog(Collections.singletonList((ActorCatalog) config)); + } else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) { + writeActorCatalogFetchEvent(Collections.singletonList((ActorCatalogFetchEvent) config)); } else { throw new IllegalArgumentException("Unknown Config Type " + configType); } @@ -1200,6 +1298,76 @@ private void writeStandardSyncState(final List configs, final }); } + private void writeActorCatalog(final List configs) throws IOException { + database.transaction(ctx -> { + writeActorCatalog(configs, ctx); + return null; + }); + } + + private void writeActorCatalog(final List configs, final DSLContext ctx) { + final OffsetDateTime timestamp = OffsetDateTime.now(); + configs.forEach((actorCatalog) -> { + final boolean isExistingConfig = ctx.fetchExists(select() + .from(ACTOR_CATALOG) + .where(ACTOR_CATALOG.ID.eq(actorCatalog.getId()))); + + if (isExistingConfig) { + ctx.update(ACTOR_CATALOG) + .set(ACTOR_CATALOG.CATALOG, JSONB.valueOf(Jsons.serialize(actorCatalog.getCatalog()))) + .set(ACTOR_CATALOG.CATALOG_HASH, actorCatalog.getCatalogHash()) + .set(ACTOR_CATALOG.MODIFIED_AT, timestamp) + .where(ACTOR_CATALOG.ID.eq(actorCatalog.getId())) + .execute(); + } else { + ctx.insertInto(ACTOR_CATALOG) + .set(ACTOR_CATALOG.ID, actorCatalog.getId()) + .set(ACTOR_CATALOG.CATALOG, JSONB.valueOf(Jsons.serialize(actorCatalog.getCatalog()))) + .set(ACTOR_CATALOG.CATALOG_HASH, actorCatalog.getCatalogHash()) + .set(ACTOR_CATALOG.CREATED_AT, timestamp) + .set(ACTOR_CATALOG.MODIFIED_AT, timestamp) + .execute(); + } + }); + } + + private void writeActorCatalogFetchEvent(final List configs) throws IOException { + database.transaction(ctx -> { + writeActorCatalogFetchEvent(configs, ctx); + return null; + }); + } + + private void writeActorCatalogFetchEvent(final List configs, final DSLContext ctx) { + final OffsetDateTime timestamp = OffsetDateTime.now(); + configs.forEach((actorCatalogFetchEvent) -> { + final boolean isExistingConfig = ctx.fetchExists(select() + .from(ACTOR_CATALOG_FETCH_EVENT) + .where(ACTOR_CATALOG_FETCH_EVENT.ID.eq(actorCatalogFetchEvent.getId()))); + + if (isExistingConfig) { + ctx.update(ACTOR_CATALOG_FETCH_EVENT) + .set(ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH, actorCatalogFetchEvent.getConfigHash()) + .set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID, actorCatalogFetchEvent.getActorCatalogId()) + .set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID, actorCatalogFetchEvent.getActorId()) + .set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_VERSION, actorCatalogFetchEvent.getConnectorVersion()) + .set(ACTOR_CATALOG_FETCH_EVENT.MODIFIED_AT, timestamp) + .where(ACTOR_CATALOG_FETCH_EVENT.ID.eq(actorCatalogFetchEvent.getId())) + .execute(); + } else { + ctx.insertInto(ACTOR_CATALOG_FETCH_EVENT) + .set(ACTOR_CATALOG_FETCH_EVENT.ID, actorCatalogFetchEvent.getId()) + .set(ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH, actorCatalogFetchEvent.getConfigHash()) + .set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID, actorCatalogFetchEvent.getActorCatalogId()) + .set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID, actorCatalogFetchEvent.getActorId()) + .set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_VERSION, actorCatalogFetchEvent.getConnectorVersion()) + .set(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, timestamp) + .set(ACTOR_CATALOG_FETCH_EVENT.MODIFIED_AT, timestamp) + .execute(); + } + }); + } + @Override public void writeConfigs(final AirbyteConfig configType, final Map configs) throws IOException, JsonValidationException { if (configType == ConfigSchema.STANDARD_WORKSPACE) { @@ -1222,6 +1390,10 @@ public void writeConfigs(final AirbyteConfig configType, final Map (StandardSync) c).collect(Collectors.toList())); } else if (configType == ConfigSchema.STANDARD_SYNC_STATE) { writeStandardSyncState(configs.values().stream().map(c -> (StandardSyncState) c).collect(Collectors.toList())); + } else if (configType == ConfigSchema.ACTOR_CATALOG) { + writeActorCatalog(configs.values().stream().map(c -> (ActorCatalog) c).collect(Collectors.toList())); + } else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) { + writeActorCatalogFetchEvent(configs.values().stream().map(c -> (ActorCatalogFetchEvent) c).collect(Collectors.toList())); } else { throw new IllegalArgumentException("Unknown Config Type " + configType); } @@ -1249,6 +1421,10 @@ public void deleteConfig(final AirbyteConfig configType, final String configId) deleteStandardSync(configId); } else if (configType == ConfigSchema.STANDARD_SYNC_STATE) { deleteConfig(STATE, STATE.CONNECTION_ID, UUID.fromString(configId)); + } else if (configType == ConfigSchema.ACTOR_CATALOG) { + deleteConfig(ACTOR_CATALOG, ACTOR_CATALOG.ID, UUID.fromString(configId)); + } else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) { + deleteConfig(ACTOR_CATALOG_FETCH_EVENT, ACTOR_CATALOG_FETCH_EVENT.ID, UUID.fromString(configId)); } else { throw new IllegalArgumentException("Unknown Config Type " + configType); } @@ -1304,6 +1480,8 @@ public void replaceAllConfigs(final Map> configs, final ctx.truncate(CONNECTION).restartIdentity().cascade().execute(); ctx.truncate(CONNECTION_OPERATION).restartIdentity().cascade().execute(); ctx.truncate(STATE).restartIdentity().cascade().execute(); + ctx.truncate(ACTOR_CATALOG).restartIdentity().cascade().execute(); + ctx.truncate(ACTOR_CATALOG_FETCH_EVENT).restartIdentity().cascade().execute(); if (configs.containsKey(ConfigSchema.STANDARD_WORKSPACE)) { configs.get(ConfigSchema.STANDARD_WORKSPACE).map(c -> (StandardWorkspace) c) @@ -1383,6 +1561,22 @@ public void replaceAllConfigs(final Map> configs, final LOGGER.warn(ConfigSchema.STANDARD_SYNC_STATE + " not found"); } + if (configs.containsKey(ConfigSchema.ACTOR_CATALOG)) { + configs.get(ConfigSchema.ACTOR_CATALOG).map(c -> (ActorCatalog) c) + .forEach(c -> writeActorCatalog(Collections.singletonList(c), ctx)); + originalConfigs.remove(ConfigSchema.ACTOR_CATALOG); + } else { + LOGGER.warn(ConfigSchema.ACTOR_CATALOG + " not found"); + } + + if (configs.containsKey(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT)) { + configs.get(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT).map(c -> (ActorCatalogFetchEvent) c) + .forEach(c -> writeActorCatalogFetchEvent(Collections.singletonList(c), ctx)); + originalConfigs.remove(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT); + } else { + LOGGER.warn(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT + " not found"); + } + if (!originalConfigs.isEmpty()) { originalConfigs.forEach(c -> LOGGER.warn("Unknown Config " + c + " ignored")); } @@ -1479,6 +1673,22 @@ public Map> dumpConfigs() throws IOException { .map(ConfigWithMetadata::getConfig) .map(Jsons::jsonNode)); } + final List> actorCatalogWithMetadata = listActorCatalogWithMetadata(); + if (!standardSyncStateWithMetadata.isEmpty()) { + result.put(ConfigSchema.ACTOR_CATALOG.name(), + standardSyncStateWithMetadata + .stream() + .map(ConfigWithMetadata::getConfig) + .map(Jsons::jsonNode)); + } + final List> actorCatalogFetchEventWithMetadata = listActorCatalogFetchEventWithMetadata(); + if (!standardSyncStateWithMetadata.isEmpty()) { + result.put(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT.name(), + standardSyncStateWithMetadata + .stream() + .map(ConfigWithMetadata::getConfig) + .map(Jsons::jsonNode)); + } return result; } diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java index 12352c65123f1..99ed6b445969e 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java @@ -10,6 +10,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.spy; +import io.airbyte.config.ActorCatalog; +import io.airbyte.config.ActorCatalogFetchEvent; import io.airbyte.config.ConfigSchema; import io.airbyte.config.DestinationConnection; import io.airbyte.config.DestinationOAuthParameter; @@ -62,22 +64,25 @@ public void test() throws JsonValidationException, IOException, ConfigNotFoundEx standardSyncOperation(); standardSync(); standardSyncState(); + standardActorCatalog(); deletion(); } private void deletion() throws ConfigNotFoundException, IOException, JsonValidationException { - // Deleting the workspace should delete everything except for definitions + // Deleting the workspace should delete everything except for definitions and catalogs configPersistence.deleteConfig(ConfigSchema.STANDARD_WORKSPACE, MockData.standardWorkspace().getWorkspaceId().toString()); assertTrue(configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC_STATE, StandardSyncState.class).isEmpty()); assertTrue(configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class).isEmpty()); assertTrue(configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC_OPERATION, StandardSyncOperation.class).isEmpty()); assertTrue(configPersistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, SourceConnection.class).isEmpty()); assertTrue(configPersistence.listConfigs(ConfigSchema.STANDARD_WORKSPACE, StandardWorkspace.class).isEmpty()); + assertTrue(configPersistence.listConfigs(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT, ActorCatalogFetchEvent.class).isEmpty()); assertFalse(configPersistence.listConfigs(ConfigSchema.SOURCE_OAUTH_PARAM, SourceOAuthParameter.class).isEmpty()); assertFalse(configPersistence.listConfigs(ConfigSchema.DESTINATION_OAUTH_PARAM, DestinationOAuthParameter.class).isEmpty()); assertFalse(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class).isEmpty()); assertFalse(configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class).isEmpty()); + assertFalse(configPersistence.listConfigs(ConfigSchema.ACTOR_CATALOG, ActorCatalog.class).isEmpty()); for (final SourceOAuthParameter sourceOAuthParameter : MockData.sourceOauthParameters()) { configPersistence.deleteConfig(ConfigSchema.SOURCE_OAUTH_PARAM, sourceOAuthParameter.getOauthParameterId().toString()); @@ -99,6 +104,12 @@ private void deletion() throws ConfigNotFoundException, IOException, JsonValidat .deleteConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, standardDestinationDefinition.getDestinationDefinitionId().toString()); } assertTrue(configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class).isEmpty()); + + for (final ActorCatalog actorCatalog : MockData.actorCatalogs()) { + configPersistence + .deleteConfig(ConfigSchema.ACTOR_CATALOG, actorCatalog.getId().toString()); + } + assertTrue(configPersistence.listConfigs(ConfigSchema.ACTOR_CATALOG, ActorCatalog.class).isEmpty()); } private void standardSyncState() throws JsonValidationException, IOException, ConfigNotFoundException { @@ -259,4 +270,31 @@ private void standardWorkspace() throws JsonValidationException, IOException, Co assertTrue(standardWorkspaces.contains(MockData.standardWorkspace())); } + public void standardActorCatalog() throws JsonValidationException, IOException, ConfigNotFoundException { + + for (final ActorCatalog actorCatalog : MockData.actorCatalogs()) { + configPersistence.writeConfig(ConfigSchema.ACTOR_CATALOG, actorCatalog.getId().toString(), actorCatalog); + final ActorCatalog retrievedActorCatalog = configPersistence.getConfig( + ConfigSchema.ACTOR_CATALOG, actorCatalog.getId().toString(), ActorCatalog.class); + assertEquals(actorCatalog, retrievedActorCatalog); + } ; + final List actorCatalogs = configPersistence + .listConfigs(ConfigSchema.ACTOR_CATALOG, ActorCatalog.class); + assertEquals(MockData.actorCatalogs().size(), actorCatalogs.size()); + assertThat(MockData.actorCatalogs()).hasSameElementsAs(actorCatalogs); + + for (final ActorCatalogFetchEvent actorCatalogFetchEvent : MockData.actorCatalogFetchEvents()) { + configPersistence.writeConfig(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT, + actorCatalogFetchEvent.getId().toString(), actorCatalogFetchEvent); + final ActorCatalogFetchEvent retrievedActorCatalogFetchEvent = configPersistence.getConfig( + ConfigSchema.ACTOR_CATALOG_FETCH_EVENT, actorCatalogFetchEvent.getId().toString(), + ActorCatalogFetchEvent.class); + assertEquals(actorCatalogFetchEvent, retrievedActorCatalogFetchEvent); + } + final List actorCatalogFetchEvents = configPersistence + .listConfigs(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT, ActorCatalogFetchEvent.class); + assertEquals(MockData.actorCatalogFetchEvents().size(), actorCatalogFetchEvents.size()); + assertThat(MockData.actorCatalogFetchEvents()).hasSameElementsAs(actorCatalogFetchEvents); + } + } diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/MockData.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/MockData.java index 33d8fb2533099..c7aba39916029 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/MockData.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/MockData.java @@ -6,6 +6,8 @@ import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ActorCatalog; +import io.airbyte.config.ActorCatalogFetchEvent; import io.airbyte.config.DestinationConnection; import io.airbyte.config.DestinationOAuthParameter; import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; @@ -68,6 +70,12 @@ public class MockData { private static final UUID SOURCE_OAUTH_PARAMETER_ID_2 = UUID.randomUUID(); private static final UUID DESTINATION_OAUTH_PARAMETER_ID_1 = UUID.randomUUID(); private static final UUID DESTINATION_OAUTH_PARAMETER_ID_2 = UUID.randomUUID(); + private static final UUID ACTOR_CATALOG_ID_1 = UUID.randomUUID(); + private static final UUID ACTOR_CATALOG_ID_2 = UUID.randomUUID(); + private static final UUID ACTOR_CATALOG_ID_3 = UUID.randomUUID(); + private static final UUID ACTOR_CATALOG_FETCH_EVENT_ID_1 = UUID.randomUUID(); + private static final UUID ACTOR_CATALOG_FETCH_EVENT_ID_2 = UUID.randomUUID(); + private static final Instant NOW = Instant.parse("2021-12-15T20:30:40.00Z"); public static StandardWorkspace standardWorkspace() { @@ -341,6 +349,38 @@ public static List standardSyncStates() { return Arrays.asList(standardSyncState1, standardSyncState2, standardSyncState3, standardSyncState4); } + public static List actorCatalogs() { + final ActorCatalog actorCatalog1 = new ActorCatalog() + .withId(ACTOR_CATALOG_ID_1) + .withCatalog(Jsons.deserialize("{}")) + .withCatalogHash("TESTHASH"); + final ActorCatalog actorCatalog2 = new ActorCatalog() + .withId(ACTOR_CATALOG_ID_2) + .withCatalog(Jsons.deserialize("{}")) + .withCatalogHash("12345"); + final ActorCatalog actorCatalog3 = new ActorCatalog() + .withId(ACTOR_CATALOG_ID_3) + .withCatalog(Jsons.deserialize("{}")) + .withCatalogHash("SomeOtherHash"); + return Arrays.asList(actorCatalog1, actorCatalog2, actorCatalog3); + } + + public static List actorCatalogFetchEvents() { + final ActorCatalogFetchEvent actorCatalogFetchEvent1 = new ActorCatalogFetchEvent() + .withId(ACTOR_CATALOG_FETCH_EVENT_ID_1) + .withActorCatalogId(ACTOR_CATALOG_ID_1) + .withActorId(SOURCE_ID_1) + .withConfigHash("CONFIG_HASH") + .withConnectorVersion("1.0.0"); + final ActorCatalogFetchEvent actorCatalogFetchEvent2 = new ActorCatalogFetchEvent() + .withId(ACTOR_CATALOG_FETCH_EVENT_ID_2) + .withActorCatalogId(ACTOR_CATALOG_ID_2) + .withActorId(SOURCE_ID_2) + .withConfigHash("1394") + .withConnectorVersion("1.2.0"); + return Arrays.asList(actorCatalogFetchEvent1); + } + public static Instant now() { return NOW; } diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_35_28_001__AddActorCatalogMetadataColumns.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_35_28_001__AddActorCatalogMetadataColumns.java new file mode 100644 index 0000000000000..2c9fe64a0daa6 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_35_28_001__AddActorCatalogMetadataColumns.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.configs.migrations; + +import static org.jooq.impl.DSL.currentOffsetDateTime; + +import com.google.common.annotations.VisibleForTesting; +import java.time.OffsetDateTime; +import org.flywaydb.core.api.migration.BaseJavaMigration; +import org.flywaydb.core.api.migration.Context; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class V0_35_28_001__AddActorCatalogMetadataColumns extends BaseJavaMigration { + + private static final Logger LOGGER = LoggerFactory.getLogger( + V0_35_28_001__AddActorCatalogMetadataColumns.class); + + @Override + public void migrate(final Context context) throws Exception { + LOGGER.info("Running migration: {}", this.getClass().getSimpleName()); + + final DSLContext ctx = DSL.using(context.getConnection()); + migrate(ctx); + } + + @VisibleForTesting + public static void migrate(final DSLContext ctx) { + final Field createdAt = + DSL.field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime())); + final Field modifiedAt = + DSL.field("modified_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime())); + ctx.alterTable("actor_catalog") + .addIfNotExists(modifiedAt).execute(); + ctx.alterTable("actor_catalog_fetch_event") + .addIfNotExists(createdAt).execute(); + ctx.alterTable("actor_catalog_fetch_event") + .addIfNotExists(modifiedAt).execute(); + } + +} diff --git a/airbyte-db/lib/src/main/resources/configs_database/schema_dump.txt b/airbyte-db/lib/src/main/resources/configs_database/schema_dump.txt index d7f92c4cfa4aa..5512808ea0d59 100644 --- a/airbyte-db/lib/src/main/resources/configs_database/schema_dump.txt +++ b/airbyte-db/lib/src/main/resources/configs_database/schema_dump.txt @@ -20,6 +20,7 @@ create table "public"."actor_catalog"( "catalog" jsonb not null, "catalog_hash" varchar(32) not null, "created_at" timestamptz(35) not null, + "modified_at" timestamptz(35) not null default null, constraint "actor_catalog_pkey" primary key ("id") ); @@ -29,6 +30,8 @@ create table "public"."actor_catalog_fetch_event"( "actor_id" uuid not null, "config_hash" varchar(32) not null, "actor_version" varchar(256) not null, + "created_at" timestamptz(35) not null default null, + "modified_at" timestamptz(35) not null default null, constraint "actor_catalog_fetch_event_pkey" primary key ("id") ); diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_35_28_001__AddActorCatalogMetadataColumnsTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_35_28_001__AddActorCatalogMetadataColumnsTest.java new file mode 100644 index 0000000000000..7038326b98d4f --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_35_28_001__AddActorCatalogMetadataColumnsTest.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.configs.migrations; + +import io.airbyte.db.Database; +import io.airbyte.db.instance.configs.AbstractConfigsDatabaseTest; +import io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.ActorType; +import java.io.IOException; +import java.sql.SQLException; +import java.time.OffsetDateTime; +import java.util.UUID; +import org.jooq.DSLContext; +import org.jooq.JSONB; +import org.jooq.impl.DSL; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class V0_35_28_001__AddActorCatalogMetadataColumnsTest extends AbstractConfigsDatabaseTest { + + @Test + public void test() throws SQLException, IOException { + + final Database database = getDatabase(); + final DSLContext context = DSL.using(database.getDataSource().getConnection()); + V0_32_8_001__AirbyteConfigDatabaseDenormalization.migrate(context); + V0_35_26_001__PersistDiscoveredCatalog.migrate(context); + V0_35_28_001__AddActorCatalogMetadataColumns.migrate(context); + assertCanInsertSchemaDataWithMetadata(context); + } + + private void assertCanInsertSchemaDataWithMetadata(final DSLContext ctx) { + Assertions.assertDoesNotThrow(() -> { + final UUID catalogId = UUID.randomUUID(); + final UUID actorId = UUID.randomUUID(); + final UUID actorDefinitionId = UUID.randomUUID(); + final UUID workspaceId = UUID.randomUUID(); + + ctx.insertInto(DSL.table("workspace")) + .columns( + DSL.field("id"), + DSL.field("name"), + DSL.field("slug"), + DSL.field("initial_setup_complete")) + .values( + workspaceId, + "base workspace", + "base_workspace", + true) + .execute(); + ctx.insertInto(DSL.table("actor_definition")) + .columns( + DSL.field("id"), + DSL.field("name"), + DSL.field("docker_repository"), + DSL.field("docker_image_tag"), + DSL.field("actor_type"), + DSL.field("spec")) + .values( + actorDefinitionId, + "Jenkins", + "farosai/airbyte-jenkins-source", + "0.1.23", + ActorType.source, + JSONB.valueOf("{}")) + .execute(); + ctx.insertInto(DSL.table("actor")) + .columns( + DSL.field("id"), + DSL.field("workspace_id"), + DSL.field("actor_definition_id"), + DSL.field("name"), + DSL.field("configuration"), + DSL.field("actor_type"), + DSL.field("created_at"), + DSL.field("updated_at")) + .values( + actorId, + workspaceId, + actorDefinitionId, + "JenkinsConnection", + JSONB.valueOf("{}"), + ActorType.source, + OffsetDateTime.now(), + OffsetDateTime.now()) + .execute(); + ctx.insertInto(DSL.table("actor_catalog")) + .columns( + DSL.field("id"), + DSL.field("catalog"), + DSL.field("catalog_hash"), + DSL.field("created_at"), + DSL.field("modified_at")) + .values( + catalogId, + JSONB.valueOf("{}"), + "", + OffsetDateTime.now(), + OffsetDateTime.now()) + .execute(); + ctx.insertInto(DSL.table("actor_catalog_fetch_event")) + .columns( + DSL.field("id"), + DSL.field("actor_catalog_id"), + DSL.field("actor_id"), + DSL.field("config_hash"), + DSL.field("actor_version"), + DSL.field("created_at"), + DSL.field("modified_at")) + .values( + UUID.randomUUID(), + catalogId, + actorId, + "HASHVALUE", + "2.0.1", + OffsetDateTime.now(), + OffsetDateTime.now()) + .execute(); + }); + } + +}