diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/functional/CheckedBiFunction.java b/airbyte-commons/src/main/java/io/airbyte/commons/functional/CheckedBiFunction.java new file mode 100644 index 0000000000000..84b76329d31b9 --- /dev/null +++ b/airbyte-commons/src/main/java/io/airbyte/commons/functional/CheckedBiFunction.java @@ -0,0 +1,31 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.commons.functional; + +public interface CheckedBiFunction { + + Result apply(First first, Second second) throws E; + +} diff --git a/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_protocol.py b/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_protocol.py index 30a1c508a6487..e983ad64c6fdc 100644 --- a/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_protocol.py +++ b/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_protocol.py @@ -137,6 +137,10 @@ class Config: None, description="If the source defines the primary key, paths to the fields that will be used as a primary key. If not provided by the source, the end user will have to specify the primary key themselves.", ) + namespace: Optional[str] = Field( + None, + description="Optional Source-defined namespace. Currently only used by JDBC destinations to determine what schema to write to. Airbyte streams from the same sources should have the same namespace.", + ) class ConfiguredAirbyteStream(BaseModel): diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DataArgumentsProvider.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DataArgumentsProvider.java new file mode 100644 index 0000000000000..d11abd64087dd --- /dev/null +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DataArgumentsProvider.java @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.standardtest.destination; + +import java.util.stream.Stream; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; + +/** + * Class encapsulating all arguments required for Standard Destination Tests. + * + * All files defined here can be found in src/main/resources of this package. + */ +public class DataArgumentsProvider implements ArgumentsProvider { + + public static final CatalogMessageTestConfigPair EXCHANGE_RATE_CONFIG = + new CatalogMessageTestConfigPair("exchange_rate_catalog.json", "exchange_rate_messages.txt"); + public static final CatalogMessageTestConfigPair EDGE_CASE_CONFIG = + new CatalogMessageTestConfigPair("edge_case_catalog.json", "edge_case_messages.txt"); + + @Override + public Stream provideArguments(ExtensionContext context) { + return Stream.of( + Arguments.of(EXCHANGE_RATE_CONFIG.messageFile, EXCHANGE_RATE_CONFIG.catalogFile), + Arguments.of(EDGE_CASE_CONFIG.messageFile, EDGE_CASE_CONFIG.catalogFile) + // todo - need to use the new protocol to capture this. + // Arguments.of("stripe_messages.txt", "stripe_schema.json") + ); + + } + + public static class CatalogMessageTestConfigPair { + + final String catalogFile; + final String messageFile; + + public CatalogMessageTestConfigPair(String catalogFile, String messageFile) { + this.catalogFile = catalogFile; + this.messageFile = messageFile; + } + + } + +} diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java index d8f269786a712..16eb8d6f58993 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java @@ -33,7 +33,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.resources.MoreResources; @@ -71,14 +70,10 @@ import java.util.Map.Entry; import java.util.UUID; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,19 +122,29 @@ public abstract class TestDestination { * * @param testEnv - information about the test environment. * @param streamName - name of the stream for which we are retrieving records. + * @param namespace - the destination namespace records are located in. Null if not applicable. + * Usually a JDBC schema. * @return All of the records in the destination at the time this method is invoked. * @throws Exception - can throw any exception, test framework will handle. */ - protected abstract List retrieveRecords(TestDestinationEnv testEnv, String streamName) throws Exception; + protected abstract List retrieveRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception; /** - * Override to return true to if the destination implements basic normalization and it should be - * tested here. + * Returns a destination's default schema. The default implementation assumes this corresponds to + * the configuration's 'schema' field, as this is how most of our destinations implement this. + * Destinations are free to appropriately override this. The return value is used to assert + * correctness. * - * @return - a boolean. + * If not applicable, Destinations are free to ignore this. + * + * @param config - integration-specific configuration returned by {@link #getConfig()}. + * @return the default schema, if applicatble. */ - protected boolean implementsBasicNormalization() { - return false; + protected String getDefaultSchema(JsonNode config) throws Exception { + if (config.get("schema") == null) { + return null; + } + return config.get("schema").asText(); } /** @@ -159,17 +164,29 @@ protected boolean implementsIncremental() throws WorkerException { } /** - * Same idea as {@link #retrieveRecords(TestDestinationEnv, String)}. Except this method should pull - * records from the table that contains the normalized records and convert them back into the data - * as it would appear in an {@link AirbyteRecordMessage}. Only need to override this method if - * {@link #implementsBasicNormalization} returns true. + * Override to return true to if the destination implements basic normalization and it should be + * tested here. + * + * @return - a boolean. + */ + protected boolean implementsBasicNormalization() { + return false; + } + + /** + * Same idea as {@link #retrieveRecords(TestDestinationEnv, String, String)}. Except this method + * should pull records from the table that contains the normalized records and convert them back + * into the data as it would appear in an {@link AirbyteRecordMessage}. Only need to override this + * method if {@link #implementsBasicNormalization} returns true. * * @param testEnv - information about the test environment. * @param streamName - name of the stream for which we are retrieving records. + * @param namespace - the destination namespace records are located in. Null if not applicable. + * Usually a JDBC schema. * @return All of the records in the destination at the time this method is invoked. * @throws Exception - can throw any exception, test framework will handle. */ - protected List retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName) throws Exception { + protected List retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception { throw new IllegalStateException("Not implemented"); } @@ -245,20 +262,6 @@ public void testCheckConnectionInvalidCredentials() throws Exception { assertEquals(Status.FAILED, runCheck(getFailCheckConfig()).getStatus()); } - private static class DataArgumentsProvider implements ArgumentsProvider { - - @Override - public Stream provideArguments(ExtensionContext context) { - return Stream.of( - Arguments.of("exchange_rate_messages.txt", "exchange_rate_catalog.json"), - Arguments.of("edge_case_messages.txt", "edge_case_catalog.json") - // todo - need to use the new protocol to capture this. - // Arguments.of("stripe_messages.txt", "stripe_schema.json") - ); - } - - } - /** * Verify that the integration successfully writes records. Tests a wide variety of messages and * schemas (aspirationally, anyway). @@ -270,9 +273,38 @@ public void testSync(String messagesFilename, String catalogFilename) throws Exc final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog); final List messages = MoreResources.readResource(messagesFilename).lines() .map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList()); - runSync(getConfig(), messages, configuredCatalog); - assertSameMessages(messages, retrieveRecordsForCatalog(catalog)); + final JsonNode config = getConfig(); + final String defaultSchema = getDefaultSchema(config); + runSync(config, messages, configuredCatalog); + retrieveRawRecordsAndAssertSameMessages(catalog, messages, defaultSchema); + } + + /** + * Verify that the integration overwrites the first sync with the second sync. + */ + @Test + public void testSecondSync() throws Exception { + final AirbyteCatalog catalog = + Jsons.deserialize(MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile), AirbyteCatalog.class); + final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog); + final List firstSyncMessages = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines() + .map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList()); + final JsonNode config = getConfig(); + runSync(config, firstSyncMessages, configuredCatalog); + + final List secondSyncMessages = Lists.newArrayList(new AirbyteMessage() + .withRecord(new AirbyteRecordMessage() + .withStream(catalog.getStreams().get(0).getName()) + .withData(Jsons.jsonNode(ImmutableMap.builder() + .put("date", "2020-03-31T00:00:00Z") + .put("HKD", 10) + .put("NZD", 700) + .build())))); + + runSync(config, secondSyncMessages, configuredCatalog); + final String defaultSchema = getDefaultSchema(config); + retrieveRawRecordsAndAssertSameMessages(catalog, secondSyncMessages, defaultSchema); } /** @@ -286,20 +318,18 @@ public void testIncrementalSync() throws Exception { return; } - testIncrementalSync("exchange_rate_messages.txt", "exchange_rate_catalog.json"); - } - - public void testIncrementalSync(String messagesFilename, String catalogFilename) throws Exception { final AirbyteCatalog catalog = - Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class); + Jsons.deserialize(MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile), AirbyteCatalog.class); final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog); configuredCatalog.getStreams().forEach(s -> { s.withSyncMode(SyncMode.INCREMENTAL); s.withDestinationSyncMode(DestinationSyncMode.APPEND); }); - final List firstSyncMessages = MoreResources.readResource(messagesFilename).lines() + + final List firstSyncMessages = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines() .map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList()); - runSync(getConfig(), firstSyncMessages, configuredCatalog); + final JsonNode config = getConfig(); + runSync(config, firstSyncMessages, configuredCatalog); final List secondSyncMessages = Lists.newArrayList(new AirbyteMessage() .withRecord(new AirbyteRecordMessage() @@ -309,11 +339,14 @@ public void testIncrementalSync(String messagesFilename, String catalogFilename) .put("HKD", 10) .put("NZD", 700) .build())))); - runSync(getConfig(), secondSyncMessages, configuredCatalog); + runSync(config, secondSyncMessages, configuredCatalog); + final List expectedMessagesAfterSecondSync = new ArrayList<>(); expectedMessagesAfterSecondSync.addAll(firstSyncMessages); expectedMessagesAfterSecondSync.addAll(secondSyncMessages); - assertSameMessages(expectedMessagesAfterSecondSync, retrieveRecordsForCatalog(catalog)); + + final String defaultSchema = getDefaultSchema(config); + retrieveRawRecordsAndAssertSameMessages(catalog, expectedMessagesAfterSecondSync, defaultSchema); } /** @@ -331,36 +364,30 @@ public void testSyncWithNormalization(String messagesFilename, String catalogFil final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog); final List messages = MoreResources.readResource(messagesFilename).lines() .map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList()); - runSync(getConfigWithBasicNormalization(), messages, configuredCatalog); - LOGGER.info("Comparing retrieveRecordsForCatalog for {} and {}", messagesFilename, catalogFilename); - assertSameMessages(messages, retrieveRecordsForCatalog(catalog)); - LOGGER.info("Comparing retrieveNormalizedRecordsForCatalog for {} and {}", messagesFilename, catalogFilename); - assertSameMessages(messages, retrieveNormalizedRecordsForCatalog(catalog), true); + final JsonNode config = getConfigWithBasicNormalization(); + runSync(config, messages, configuredCatalog); + + String defaultSchema = getDefaultSchema(config); + final List actualMessages = retrieveNormalizedRecords(catalog, defaultSchema); + assertSameMessages(messages, actualMessages, true); } - /** - * Verify that the integration overwrites the first sync with the second sync. - */ @Test - public void testSecondSync() throws Exception { + void testSyncUsesAirbyteStreamNamespaceIfNotNull() throws Exception { final AirbyteCatalog catalog = - Jsons.deserialize(MoreResources.readResource("exchange_rate_catalog.json"), AirbyteCatalog.class); + Jsons.deserialize(MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile), AirbyteCatalog.class); + final String namespace = "sourcenamespace"; + catalog.getStreams().forEach(stream -> stream.setNamespace(namespace)); final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog); - final List firstSyncMessages = MoreResources.readResource("exchange_rate_messages.txt").lines() + + final List messages = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines() .map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList()); - runSync(getConfig(), firstSyncMessages, configuredCatalog); - final List secondSyncMessages = Lists.newArrayList(new AirbyteMessage() - .withRecord(new AirbyteRecordMessage() - .withStream(catalog.getStreams().get(0).getName()) - .withData(Jsons.jsonNode(ImmutableMap.builder() - .put("date", "2020-03-31T00:00:00Z") - .put("HKD", 10) - .put("NZD", 700) - .build())))); - runSync(getConfig(), secondSyncMessages, configuredCatalog); - assertSameMessages(secondSyncMessages, retrieveRecordsForCatalog(catalog)); + final JsonNode config = getConfig(); + final String defaultSchema = getDefaultSchema(config); + runSync(config, messages, configuredCatalog); + retrieveRawRecordsAndAssertSameMessages(catalog, messages, defaultSchema); } private ConnectorSpecification runSpec() throws WorkerException { @@ -403,34 +430,20 @@ private void runSync(JsonNode config, List messages, ConfiguredA runner.close(); } - private List retrieveNormalizedRecordsForCatalog(AirbyteCatalog catalog) throws Exception { - return retrieveRecordsForCatalog(streamName -> retrieveNormalizedRecords(testEnv, streamName), catalog); - } - - private List retrieveRecordsForCatalog(AirbyteCatalog catalog) throws Exception { - return retrieveRecordsForCatalog(streamName -> retrieveRecords(testEnv, streamName), catalog); - } - - private List retrieveRecordsForCatalog(CheckedFunction, Exception> retriever, AirbyteCatalog catalog) - throws Exception { + private void retrieveRawRecordsAndAssertSameMessages(AirbyteCatalog catalog, List messages, String defaultSchema) throws Exception { final List actualMessages = new ArrayList<>(); - final List streamNames = catalog.getStreams() - .stream() - .map(AirbyteStream::getName) - .collect(Collectors.toList()); - for (final String streamName : streamNames) { - actualMessages.addAll(retriever.apply(streamName) + for (final AirbyteStream stream : catalog.getStreams()) { + final String streamName = stream.getName(); + final String schema = stream.getNamespace() != null ? stream.getNamespace() : defaultSchema; + List msgList = retrieveRecords(testEnv, streamName, schema) .stream() .map(data -> new AirbyteRecordMessage().withStream(streamName).withData(data)) - .collect(Collectors.toList())); + .collect(Collectors.toList()); + actualMessages.addAll(msgList); } - return actualMessages; - } - - private void assertSameMessages(List expected, List actual) { - assertSameMessages(expected, actual, false); + assertSameMessages(messages, actualMessages, false); } // ignores emitted at. @@ -483,6 +496,21 @@ private void assertSameData(List expected, List actual) { } } + private List retrieveNormalizedRecords(AirbyteCatalog catalog, String defaultSchema) throws Exception { + final List actualMessages = new ArrayList<>(); + + for (final AirbyteStream stream : catalog.getStreams()) { + final String streamName = stream.getName(); + + List msgList = retrieveNormalizedRecords(testEnv, streamName, defaultSchema) + .stream() + .map(data -> new AirbyteRecordMessage().withStream(streamName).withData(data)) + .collect(Collectors.toList()); + actualMessages.addAll(msgList); + } + return actualMessages; + } + /** * Same as {@link #pruneMutate(JsonNode)}, except does a defensive copy and returns a new json node * object instead of mutating in place. diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index f9ebff351a707..63a2d8783b203 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -208,20 +208,15 @@ private static Job waitForQuery(Job queryJob) { public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog) { final BigQuery bigquery = getBigQuery(config); Map writeConfigs = new HashMap<>(); - final String datasetId = config.get(CONFIG_DATASET_ID).asText(); Set schemaSet = new HashSet<>(); // create tmp tables if not exist for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { final String streamName = stream.getStream().getName(); - final String schemaName = namingResolver.getIdentifier(datasetId); + final String schemaName = getSchema(config, stream); final String tableName = namingResolver.getRawTableName(streamName); final String tmpTableName = namingResolver.getTmpTableName(streamName); - if (!schemaSet.contains(schemaName)) { - createSchemaTable(bigquery, schemaName); - schemaSet.add(schemaName); - } - createTable(bigquery, schemaName, tmpTableName); + createSchemaAndTableIfNeeded(bigquery, schemaSet, schemaName, tmpTableName); // https://cloud.google.com/bigquery/docs/loading-data-local#loading_data_from_a_local_data_source final WriteChannelConfiguration writeChannelConfiguration = WriteChannelConfiguration @@ -233,8 +228,7 @@ public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCata final TableDataWriteChannel writer = bigquery.writer(JobId.of(UUID.randomUUID().toString()), writeChannelConfiguration); final WriteDisposition syncMode = getWriteDisposition(stream.getDestinationSyncMode()); - writeConfigs.put(stream.getStream().getName(), - new WriteConfig(TableId.of(schemaName, tableName), TableId.of(schemaName, tmpTableName), writer, syncMode)); + writeConfigs.put(streamName, new WriteConfig(TableId.of(schemaName, tableName), TableId.of(schemaName, tmpTableName), writer, syncMode)); } // write to tmp tables @@ -242,6 +236,24 @@ public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCata return new RecordConsumer(bigquery, writeConfigs, catalog); } + private static String getSchema(JsonNode config, ConfiguredAirbyteStream stream) { + final String defaultSchema = config.get(CONFIG_DATASET_ID).asText(); + final String srcNamespace = stream.getStream().getNamespace(); + if (srcNamespace == null) { + return defaultSchema; + } + + return srcNamespace; + } + + private void createSchemaAndTableIfNeeded(BigQuery bigquery, Set schemaSet, String schemaName, String tmpTableName) { + if (!schemaSet.contains(schemaName)) { + createSchemaTable(bigquery, schemaName); + schemaSet.add(schemaName); + } + createTable(bigquery, schemaName, tmpTableName); + } + private static WriteDisposition getWriteDisposition(DestinationSyncMode syncMode) { if (syncMode == null) { throw new IllegalStateException("Undefined destination sync mode"); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-bigquery/src/main/resources/spec.json index c78db949bee98..8249fe1be10fd 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/resources/spec.json @@ -16,8 +16,8 @@ }, "dataset_id": { "type": "string", - "description": "The BigQuery dataset id that will house replicated tables.", - "title": "Dataset ID" + "description": "Default BigQuery Dataset ID tables are replicated to if the source does not specify a namespace.", + "title": "Default Dataset ID" }, "credentials_json": { "type": "string", diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryStandardTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryStandardTest.java index bb2a154226d70..530018f3597b9 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryStandardTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryStandardTest.java @@ -98,14 +98,20 @@ protected boolean implementsBasicNormalization() { } @Override - protected List retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName) throws Exception { + protected String getDefaultSchema(JsonNode config) { + return config.get(CONFIG_DATASET_ID).asText(); + } + + @Override + protected List retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception { String tableName = namingResolver.getIdentifier(streamName); - return retrieveRecordsFromTable(testEnv, tableName); + String schema = namingResolver.getIdentifier(namespace); + return retrieveRecordsFromTable(tableName, schema); } @Override - protected List retrieveRecords(TestDestinationEnv env, String streamName) throws Exception { - return retrieveRecordsFromTable(env, namingResolver.getRawTableName(streamName)) + protected List retrieveRecords(TestDestinationEnv env, String streamName, String namespace) throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namingResolver.getIdentifier(namespace)) .stream() .map(node -> node.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()) .map(Jsons::deserialize) @@ -120,11 +126,11 @@ protected List resolveIdentifier(String identifier) { return result; } - private List retrieveRecordsFromTable(TestDestinationEnv env, String tableName) throws InterruptedException { + private List retrieveRecordsFromTable(String tableName, String schema) throws InterruptedException { final QueryJobConfiguration queryConfig = QueryJobConfiguration .newBuilder( - String.format("SELECT * FROM `%s`.`%s` order by %s asc;", dataset.getDatasetId().getDataset(), tableName, + String.format("SELECT * FROM `%s`.`%s` order by %s asc;", schema, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) .setUseLegacySql(false).build(); diff --git a/airbyte-integrations/connectors/destination-csv/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationIntegrationTest.java b/airbyte-integrations/connectors/destination-csv/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationIntegrationTest.java index 87fa3afd88e5d..0752fdb07a317 100644 --- a/airbyte-integrations/connectors/destination-csv/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-csv/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationIntegrationTest.java @@ -74,7 +74,7 @@ protected JsonNode getFailCheckConfig() { public void testCheckConnectionInvalidCredentials() {} @Override - protected List retrieveRecords(TestDestinationEnv testEnv, String streamName) throws Exception { + protected List retrieveRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception { final List allOutputs = Files.list(testEnv.getLocalRoot().resolve(RELATIVE_PATH)).collect(Collectors.toList()); final Optional streamOutput = allOutputs.stream().filter(path -> path.getFileName().toString().contains(new StandardNameTransformer().getRawTableName(streamName))) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index b605241167c53..2afda06a3c157 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -31,12 +31,13 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.destination.NamingConventionTransformer; -import io.airbyte.integrations.destination.WriteConfig; import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer; import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.OnCloseFunction; import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.OnStartFunction; import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.RecordWriter; +import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.DestinationSyncMode; import java.time.Instant; import java.util.List; @@ -77,27 +78,47 @@ public static AirbyteMessageConsumer create(JdbcDatabase database, private static List createWriteConfigs(NamingConventionTransformer namingResolver, JsonNode config, ConfiguredAirbyteCatalog catalog) { Preconditions.checkState(config.has("schema"), "jdbc destinations must specify a schema."); final Instant now = Instant.now(); + return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config, now)).collect(Collectors.toList()); + } + + private static Function toWriteConfig(NamingConventionTransformer namingResolver, + JsonNode config, + Instant now) { + return stream -> { + Preconditions.checkNotNull(stream.getDestinationSyncMode(), "Undefined destination sync mode"); + final AirbyteStream abStream = stream.getStream(); - return catalog.getStreams().stream().map(stream -> { - final String streamName = stream.getStream().getName(); - final String schemaName = namingResolver.getIdentifier(config.get("schema").asText()); + final String defaultSchemaName = namingResolver.getIdentifier(config.get("schema").asText()); + final String outputSchema = getOutputSchema(abStream, defaultSchemaName); + + final String streamName = abStream.getName(); final String tableName = Names.concatQuotedNames("_airbyte_raw_", namingResolver.getIdentifier(streamName)); final String tmpTableName = Names.concatQuotedNames("_airbyte_" + now.toEpochMilli() + "_", tableName); final DestinationSyncMode syncMode = stream.getDestinationSyncMode(); - if (syncMode == null) { - throw new IllegalStateException("Undefined destination sync mode"); - } - return new WriteConfig(streamName, schemaName, tmpTableName, tableName, syncMode); - }).collect(Collectors.toList()); + + return new WriteConfig(streamName, outputSchema, tmpTableName, tableName, syncMode); + }; + } + + /** + * Defer to the {@link AirbyteStream}'s namespace. If this is not set, use the destination's default + * schema. This namespace is source-provided, and can be potentially empty. + */ + private static String getOutputSchema(AirbyteStream stream, String defaultDestSchema) { + final String sourceSchema = stream.getNamespace(); + if (sourceSchema != null) { + return sourceSchema; + } + return defaultDestSchema; } private static OnStartFunction onStartFunction(JdbcDatabase database, SqlOperations sqlOperations, List writeConfigs) { return () -> { LOGGER.info("Preparing tmp tables in destination started for {} streams", writeConfigs.size()); for (final WriteConfig writeConfig : writeConfigs) { - final String schemaName = writeConfig.getOutputNamespaceName(); + final String schemaName = writeConfig.getOutputSchemaName(); final String tmpTableName = writeConfig.getTmpTableName(); - LOGGER.info("Preparing tmp table in destination started for stream {}. schema {}, tmp table name: {}", writeConfig.getStreamName(), + LOGGER.info("Preparing tmp table in destination started for stream {}. schema: {}, tmp table name: {}", writeConfig.getStreamName(), schemaName, tmpTableName); sqlOperations.createSchemaIfNotExists(database, schemaName); @@ -121,7 +142,7 @@ private static RecordWriter recordWriterFunction(JdbcDatabase database, } final WriteConfig writeConfig = streamNameToWriteConfig.get(streamName); - sqlOperations.insertRecords(database, recordStream, writeConfig.getOutputNamespaceName(), writeConfig.getTmpTableName()); + sqlOperations.insertRecords(database, recordStream, writeConfig.getOutputSchemaName(), writeConfig.getTmpTableName()); }; } @@ -132,7 +153,7 @@ private static OnCloseFunction onCloseFunction(JdbcDatabase database, SqlOperati final StringBuilder queries = new StringBuilder(); LOGGER.info("Finalizing tables in destination started for {} streams", writeConfigs.size()); for (WriteConfig writeConfig : writeConfigs) { - final String schemaName = writeConfig.getOutputNamespaceName(); + final String schemaName = writeConfig.getOutputSchemaName(); final String srcTableName = writeConfig.getTmpTableName(); final String dstTableName = writeConfig.getOutputTableName(); LOGGER.info("Finalizing stream {}. schema {}, tmp table {}, final table {}", writeConfig.getStreamName(), schemaName, srcTableName, @@ -155,7 +176,7 @@ private static OnCloseFunction onCloseFunction(JdbcDatabase database, SqlOperati // clean up LOGGER.info("Cleaning tmp tables in destination started for {} streams", writeConfigs.size()); for (WriteConfig writeConfig : writeConfigs) { - final String schemaName = writeConfig.getOutputNamespaceName(); + final String schemaName = writeConfig.getOutputSchemaName(); final String tmpTableName = writeConfig.getTmpTableName(); LOGGER.info("Cleaning tmp table in destination started for stream {}. schema {}, tmp table name: {}", writeConfig.getStreamName(), schemaName, tmpTableName); diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/WriteConfig.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/WriteConfig.java similarity index 80% rename from airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/WriteConfig.java rename to airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/WriteConfig.java index 20b8f3e819883..5dd5724188bc5 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/WriteConfig.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/WriteConfig.java @@ -22,21 +22,24 @@ * SOFTWARE. */ -package io.airbyte.integrations.destination; +package io.airbyte.integrations.destination.jdbc; import io.airbyte.protocol.models.DestinationSyncMode; +/** + * Write configuration POJO for all destinations extending {@link AbstractJdbcDestination}. + */ public class WriteConfig { private final String streamName; - private final String outputNamespaceName; + private final String outputSchemaName; private final String tmpTableName; private final String outputTableName; private final DestinationSyncMode syncMode; - public WriteConfig(String streamName, String outputNamespaceName, String tmpTableName, String outputTableName, DestinationSyncMode syncMode) { + public WriteConfig(String streamName, String outputSchemaName, String tmpTableName, String outputTableName, DestinationSyncMode syncMode) { this.streamName = streamName; - this.outputNamespaceName = outputNamespaceName; + this.outputSchemaName = outputSchemaName; this.tmpTableName = tmpTableName; this.outputTableName = outputTableName; this.syncMode = syncMode; @@ -50,8 +53,8 @@ public String getTmpTableName() { return tmpTableName; } - public String getOutputNamespaceName() { - return outputNamespaceName; + public String getOutputSchemaName() { + return outputSchemaName; } public String getOutputTableName() { diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-jdbc/src/main/resources/spec.json index 2cac7c81005dc..5f644c102be10 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/resources/spec.json @@ -23,10 +23,11 @@ "type": "string" }, "schema": { - "description": "Unless specifically configured, the usual value for this field is \"public\".", + "description": "The default schema tables are written to if the source does not specify a namespace. The usual value for this field is \"public\".", "type": "string", "examples": ["public"], - "default": "public" + "default": "public", + "title": "Default Schema" } } } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/test-integration/java/io/airbyte/integrations/destination/jdbc/JdbcIntegrationTest.java b/airbyte-integrations/connectors/destination-jdbc/src/test-integration/java/io/airbyte/integrations/destination/jdbc/JdbcIntegrationTest.java index 1f4f2d14a156c..b68085430e764 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/test-integration/java/io/airbyte/integrations/destination/jdbc/JdbcIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/test-integration/java/io/airbyte/integrations/destination/jdbc/JdbcIntegrationTest.java @@ -78,8 +78,8 @@ protected JsonNode getFailCheckConfig() { } @Override - protected List retrieveRecords(TestDestinationEnv env, String streamName) throws Exception { - return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName)) + protected List retrieveRecords(TestDestinationEnv env, String streamName, String namespace) throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namingResolver.getIdentifier(namespace)) .stream() .map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())) .collect(Collectors.toList()); @@ -91,14 +91,14 @@ protected boolean implementsBasicNormalization() { } @Override - protected List retrieveNormalizedRecords(TestDestinationEnv env, String streamName) + protected List retrieveNormalizedRecords(TestDestinationEnv env, String streamName, String namespace) throws Exception { String tableName = namingResolver.getIdentifier(streamName); if (!tableName.startsWith("\"")) { // Currently, Normalization always quote tables identifiers tableName = "\"" + tableName + "\""; } - return retrieveRecordsFromTable(tableName); + return retrieveRecordsFromTable(tableName, namingResolver.getIdentifier(namespace)); } @Override @@ -114,11 +114,11 @@ protected List resolveIdentifier(String identifier) { return result; } - private List retrieveRecordsFromTable(String tableName) throws SQLException { + private List retrieveRecordsFromTable(String tableName, String schema) throws SQLException { return Databases.createPostgresDatabase(db.getUsername(), db.getPassword(), db.getJdbcUrl()).query( ctx -> ctx - .fetch(String.format("SELECT * FROM %s ORDER BY %s ASC;", tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) + .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schema, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) .stream() .map(r -> r.formatJSON(JSON_FORMAT)) .map(Jsons::deserialize) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/JdbcDestinationTest.java b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/JdbcDestinationTest.java deleted file mode 100644 index f5ca788d624fb..0000000000000 --- a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/JdbcDestinationTest.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.integrations.destination.jdbc; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.spy; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; -import io.airbyte.db.Database; -import io.airbyte.db.Databases; -import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.base.JavaBaseConstants; -import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.destination.NamingConventionTransformer; -import io.airbyte.protocol.models.AirbyteConnectionStatus; -import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteMessage.Type; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStateMessage; -import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.CatalogHelpers; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.ConnectorSpecification; -import io.airbyte.protocol.models.DestinationSyncMode; -import io.airbyte.protocol.models.Field; -import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; -import io.airbyte.protocol.models.SyncMode; -import java.io.IOException; -import java.sql.SQLException; -import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; -import org.jooq.JSONFormat; -import org.jooq.JSONFormat.RecordFormat; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.PostgreSQLContainer; - -class JdbcDestinationTest { - - private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT); - private static final Instant NOW = Instant.now(); - private static final String USERS_STREAM_NAME = "users"; - private static final String TASKS_STREAM_NAME = "tasks-list"; - private static final AirbyteMessage MESSAGE_USERS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "john").put("id", "10").build())) - .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_USERS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "susan").put("id", "30").build())) - .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_TASKS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(TASKS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("goal", "announce the game.").build())) - .withEmittedAt(NOW.toEpochMilli())); - // also used for testing quote escaping - private static final AirbyteMessage MESSAGE_TASKS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(TASKS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("goal", "ship some 'code'.").build())) - .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE) - .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build()))); - - private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, Field.of("name", JsonSchemaPrimitive.STRING), - Field.of("id", JsonSchemaPrimitive.STRING)), - CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, Field.of("goal", JsonSchemaPrimitive.STRING)))); - - private static final NamingConventionTransformer NAMING_TRANSFORMER = new ExtendedNameTransformer(); - - private PostgreSQLContainer container; - private JsonNode config; - private Database database; - - @BeforeEach - void setup() { - container = new PostgreSQLContainer<>("postgres:13-alpine"); - container.start(); - - config = createConfig("public"); - - database = Databases.createPostgresDatabase( - config.get("username").asText(), - config.get("password").asText(), - config.get("jdbc_url").asText()); - } - - @AfterEach - void tearDown() throws Exception { - database.close(); - container.close(); - } - - @Test - void testSpec() throws IOException { - final ConnectorSpecification actual = new JdbcDestination().spec(); - final String resourceString = MoreResources.readResource("spec.json"); - final ConnectorSpecification expected = Jsons.deserialize(resourceString, ConnectorSpecification.class); - - assertEquals(expected, actual); - } - - @Test - void testCheckSuccess() { - final AirbyteConnectionStatus actual = new JdbcDestination().check(config); - final AirbyteConnectionStatus expected = new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); - assertEquals(expected, actual); - } - - @Test - void testCheckFailure() { - ((ObjectNode) config).put("password", "fake"); - final AirbyteConnectionStatus actual = new JdbcDestination().check(config); - assertEquals(Status.FAILED, actual.getStatus()); - assertTrue(actual.getMessage().startsWith("Could not connect with provided configuration.")); - } - - @Test - void testWriteSuccess() throws Exception { - final JdbcDestination destination = new JdbcDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, CATALOG); - - consumer.start(); - consumer.accept(MESSAGE_USERS1); - consumer.accept(MESSAGE_TASKS1); - consumer.accept(MESSAGE_USERS2); - consumer.accept(MESSAGE_TASKS2); - consumer.accept(MESSAGE_STATE); - consumer.close(); - - Set usersActual = recordRetriever(NAMING_TRANSFORMER.getRawTableName(USERS_STREAM_NAME)); - final Set expectedUsersJson = Sets.newHashSet(MESSAGE_USERS1.getRecord().getData(), MESSAGE_USERS2.getRecord().getData()); - assertEquals(expectedUsersJson, usersActual); - - Set tasksActual = recordRetriever(NAMING_TRANSFORMER.getRawTableName(TASKS_STREAM_NAME)); - final Set expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData()); - assertEquals(expectedTasksJson, tasksActual); - - assertTmpTablesNotPresent(CATALOG.getStreams() - .stream() - .map(ConfiguredAirbyteStream::getStream) - .map(AirbyteStream::getName) - .collect(Collectors.toList())); - } - - @Test - void testWriteIncremental() throws Exception { - final ConfiguredAirbyteCatalog catalog = Jsons.clone(CATALOG); - catalog.getStreams().forEach(stream -> { - stream.withSyncMode(SyncMode.INCREMENTAL); - stream.withDestinationSyncMode(DestinationSyncMode.APPEND); - }); - - final JdbcDestination destination = new JdbcDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog); - - consumer.start(); - consumer.accept(MESSAGE_USERS1); - consumer.accept(MESSAGE_TASKS1); - consumer.accept(MESSAGE_USERS2); - consumer.accept(MESSAGE_TASKS2); - consumer.accept(MESSAGE_STATE); - consumer.close(); - - final AirbyteMessageConsumer consumer2 = destination.getConsumer(config, catalog); - - final AirbyteMessage messageUser3 = new AirbyteMessage().withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "michael").put("id", "87").build())) - .withEmittedAt(NOW.toEpochMilli())); - consumer2.start(); - consumer2.accept(messageUser3); - consumer2.close(); - - Set usersActual = recordRetriever(NAMING_TRANSFORMER.getRawTableName(USERS_STREAM_NAME)); - final Set expectedUsersJson = Sets.newHashSet( - MESSAGE_USERS1.getRecord().getData(), - MESSAGE_USERS2.getRecord().getData(), - messageUser3.getRecord().getData()); - assertEquals(expectedUsersJson, usersActual); - - Set tasksActual = recordRetriever(NAMING_TRANSFORMER.getRawTableName(TASKS_STREAM_NAME)); - final Set expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData()); - assertEquals(expectedTasksJson, tasksActual); - - assertTmpTablesNotPresent(CATALOG.getStreams() - .stream() - .map(ConfiguredAirbyteStream::getStream) - .map(AirbyteStream::getName) - .collect(Collectors.toList())); - } - - @Test - void testWriteNewSchema() throws Exception { - JsonNode newConfig = createConfig("new_schema"); - final JdbcDestination destination = new JdbcDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(newConfig, CATALOG); - - consumer.start(); - consumer.accept(MESSAGE_USERS1); - consumer.accept(MESSAGE_TASKS1); - consumer.accept(MESSAGE_USERS2); - consumer.accept(MESSAGE_TASKS2); - consumer.accept(MESSAGE_STATE); - consumer.close(); - - final String schemaName = NAMING_TRANSFORMER.getIdentifier("new_schema"); - String streamName = schemaName + "." + NAMING_TRANSFORMER.getRawTableName(USERS_STREAM_NAME); - Set usersActual = recordRetriever(streamName); - final Set expectedUsersJson = Sets.newHashSet(MESSAGE_USERS1.getRecord().getData(), MESSAGE_USERS2.getRecord().getData()); - assertEquals(expectedUsersJson, usersActual); - - streamName = schemaName + "." + NAMING_TRANSFORMER.getRawTableName(TASKS_STREAM_NAME); - Set tasksActual = recordRetriever(streamName); - final Set expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData()); - assertEquals(expectedTasksJson, tasksActual); - - assertTmpTablesNotPresent(CATALOG.getStreams() - .stream() - .map(ConfiguredAirbyteStream::getStream) - .map(AirbyteStream::getName) - .collect(Collectors.toList())); - - assertThrows(RuntimeException.class, () -> recordRetriever(NAMING_TRANSFORMER.getRawTableName(USERS_STREAM_NAME))); - } - - @SuppressWarnings("ResultOfMethodCallIgnored") - @Test - void testWriteFailure() throws Exception { - // hack to force an exception to be thrown from within the consumer. - final AirbyteMessage spiedMessage = spy(MESSAGE_USERS1); - doThrow(new RuntimeException()).when(spiedMessage).getRecord(); - - final JdbcDestination destination = new JdbcDestination(); - final AirbyteMessageConsumer consumer = spy(destination.getConsumer(config, CATALOG)); - - assertThrows(RuntimeException.class, () -> consumer.accept(spiedMessage)); - consumer.start(); - consumer.accept(MESSAGE_USERS2); - consumer.close(); - - final List tableNames = CATALOG.getStreams() - .stream() - .map(ConfiguredAirbyteStream::getStream) - .map(s -> NAMING_TRANSFORMER.getRawTableName(s.getName())) - .collect(Collectors.toList()); - assertTmpTablesNotPresent(CATALOG.getStreams() - .stream() - .map(ConfiguredAirbyteStream::getStream) - .map(AirbyteStream::getName) - .collect(Collectors.toList())); - // assert that no tables were created. - assertTrue(fetchNamesOfTablesInDb().stream().noneMatch(tableName -> tableNames.stream().anyMatch(tableName::startsWith))); - } - - private List fetchNamesOfTablesInDb() throws SQLException { - return database.query( - ctx -> ctx.fetch("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';")) - .stream() - .map(record -> (String) record.get("table_name")).collect(Collectors.toList()); - } - - private void assertTmpTablesNotPresent(List tableNames) throws SQLException { - Set tmpTableNamePrefixes = tableNames.stream().map(name -> name + "_\\d+").collect(Collectors.toSet()); - assertTrue(fetchNamesOfTablesInDb().stream().noneMatch(tableName -> tmpTableNamePrefixes.stream().anyMatch(tableName::matches))); - } - - private Set recordRetriever(String streamName) throws Exception { - return database.query(ctx -> ctx - .fetch(String.format("SELECT * FROM %s ORDER BY %s ASC;", streamName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) - .stream() - .peek(record -> { - // ensure emitted_at is not in the future - OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC); - OffsetDateTime emitted_at = record.get(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, OffsetDateTime.class); - - assertTrue(now.toEpochSecond() >= emitted_at.toEpochSecond()); - }) - .map(r -> r.formatJSON(JSON_FORMAT)) - .map(Jsons::deserialize) - .map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())) - .collect(Collectors.toSet())); - } - - private JsonNode createConfig(String schemaName) { - return Jsons.jsonNode(ImmutableMap.builder() - .put("username", container.getUsername()) - .put("password", container.getPassword()) - .put("schema", schemaName) - .put("jdbc_url", String.format("jdbc:postgresql://%s:%s/%s", - container.getHost(), - container.getFirstMappedPort(), - container.getDatabaseName())) - .build()); - } - -} diff --git a/airbyte-integrations/connectors/destination-local-json/src/test-integration/java/io/airbyte/integrations/destination/local_json/LocalJsonDestinationIntegrationTest.java b/airbyte-integrations/connectors/destination-local-json/src/test-integration/java/io/airbyte/integrations/destination/local_json/LocalJsonDestinationIntegrationTest.java index a2100d0768f9f..140e397fd335c 100644 --- a/airbyte-integrations/connectors/destination-local-json/src/test-integration/java/io/airbyte/integrations/destination/local_json/LocalJsonDestinationIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-local-json/src/test-integration/java/io/airbyte/integrations/destination/local_json/LocalJsonDestinationIntegrationTest.java @@ -70,7 +70,7 @@ protected JsonNode getFailCheckConfig() { public void testCheckConnectionInvalidCredentials() {} @Override - protected List retrieveRecords(TestDestinationEnv testEnv, String streamName) throws Exception { + protected List retrieveRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception { final List allOutputs = Files.list(testEnv.getLocalRoot().resolve(RELATIVE_PATH)).collect(Collectors.toList()); final Optional streamOutput = allOutputs.stream() .filter(path -> path.getFileName().toString().contains(new StandardNameTransformer().getRawTableName(streamName))) diff --git a/airbyte-integrations/connectors/destination-meilisearch/src/test-integration/java/io/airbyte/integrations/destination/meilisearch/MeiliSearchStandardTest.java b/airbyte-integrations/connectors/destination-meilisearch/src/test-integration/java/io/airbyte/integrations/destination/meilisearch/MeiliSearchStandardTest.java index 58e39f78abd0f..b02f1dcf49aad 100644 --- a/airbyte-integrations/connectors/destination-meilisearch/src/test-integration/java/io/airbyte/integrations/destination/meilisearch/MeiliSearchStandardTest.java +++ b/airbyte-integrations/connectors/destination-meilisearch/src/test-integration/java/io/airbyte/integrations/destination/meilisearch/MeiliSearchStandardTest.java @@ -93,7 +93,7 @@ protected JsonNode getFailCheckConfig() { } @Override - protected List retrieveRecords(TestDestinationEnv env, String streamName) throws Exception { + protected List retrieveRecords(TestDestinationEnv env, String streamName, String namespace) throws Exception { final Index index = meiliSearchClient.index(Names.toAlphanumericAndUnderscore(streamName)); final String responseString = index.getDocuments(); final JsonNode response = Jsons.deserialize(responseString); diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json index f83cd637476a0..10142d5a67d65 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json @@ -32,8 +32,8 @@ "order": 2 }, "schema": { - "title": "Schema", - "description": "Unless specifically configured, the usual value for this field is \"public\".", + "title": "Default Schema", + "description": "The default schema tables are written to if the source does not specify a namespace. The usual value for this field is \"public\".", "type": "string", "examples": ["public"], "default": "public", diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java index d5bc22c1b653f..857fb69672078 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresIntegrationTest.java @@ -76,8 +76,8 @@ protected JsonNode getFailCheckConfig() { } @Override - protected List retrieveRecords(TestDestinationEnv env, String streamName) throws Exception { - return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName)) + protected List retrieveRecords(TestDestinationEnv env, String streamName, String namespace) throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) .stream() .map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())) .collect(Collectors.toList()); @@ -89,7 +89,7 @@ protected boolean implementsBasicNormalization() { } @Override - protected List retrieveNormalizedRecords(TestDestinationEnv env, String streamName) + protected List retrieveNormalizedRecords(TestDestinationEnv env, String streamName, String namespace) throws Exception { String tableName = namingResolver.getIdentifier(streamName); // Temporarily disabling the behavior of the ExtendedNameTransformer, see (issue #1785) so we don't @@ -98,7 +98,7 @@ protected List retrieveNormalizedRecords(TestDestinationEnv env, Strin // // Currently, Normalization always quote tables identifiers // //tableName = "\"" + tableName + "\""; // } - return retrieveRecordsFromTable(tableName); + return retrieveRecordsFromTable(tableName, namespace); } @Override @@ -114,11 +114,11 @@ protected List resolveIdentifier(String identifier) { return result; } - private List retrieveRecordsFromTable(String tableName) throws SQLException { + private List retrieveRecordsFromTable(String tableName, String schemaName) throws SQLException { return Databases.createPostgresDatabase(db.getUsername(), db.getPassword(), db.getJdbcUrl()).query( ctx -> ctx - .fetch(String.format("SELECT * FROM %s ORDER BY %s ASC;", tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) + .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) .stream() .map(r -> r.formatJSON(JSON_FORMAT)) .map(Jsons::deserialize) diff --git a/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java b/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java deleted file mode 100644 index 8dc8325e478ea..0000000000000 --- a/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java +++ /dev/null @@ -1,333 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.integrations.destination.postgres; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.spy; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; -import io.airbyte.db.Database; -import io.airbyte.db.Databases; -import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.base.JavaBaseConstants; -import io.airbyte.integrations.destination.NamingConventionTransformer; -import io.airbyte.protocol.models.AirbyteConnectionStatus; -import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteMessage.Type; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStateMessage; -import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.CatalogHelpers; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.ConnectorSpecification; -import io.airbyte.protocol.models.DestinationSyncMode; -import io.airbyte.protocol.models.Field; -import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; -import io.airbyte.protocol.models.SyncMode; -import java.io.IOException; -import java.sql.SQLException; -import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; -import org.jooq.JSONFormat; -import org.jooq.JSONFormat.RecordFormat; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.PostgreSQLContainer; - -class PostgresDestinationTest { - - private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT); - private static final Instant NOW = Instant.now(); - private static final String USERS_STREAM_NAME = "users"; - private static final String TASKS_STREAM_NAME = "tasks-list"; - private static final AirbyteMessage MESSAGE_USERS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "john").put("id", "10").build())) - .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_USERS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "susan").put("id", "30").build())) - .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_TASKS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(TASKS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("goal", "announce the game.").build())) - .withEmittedAt(NOW.toEpochMilli())); - // also used for testing quote escaping - private static final AirbyteMessage MESSAGE_TASKS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(TASKS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("goal", "ship some 'code'.").build())) - .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE) - .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build()))); - - private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, Field.of("name", JsonSchemaPrimitive.STRING), - Field.of("id", JsonSchemaPrimitive.STRING)), - CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, Field.of("goal", JsonSchemaPrimitive.STRING)))); - - private static final NamingConventionTransformer NAMING_TRANSFORMER = new PostgresSQLNameTransformer(); - - private PostgreSQLContainer container; - private JsonNode config; - private Database database; - - @BeforeEach - void setup() { - container = new PostgreSQLContainer<>("postgres:13-alpine"); - container.start(); - - config = Jsons.jsonNode(ImmutableMap.builder() - .put("host", container.getHost()) - .put("username", container.getUsername()) - .put("password", container.getPassword()) - .put("schema", "public") - .put("port", container.getFirstMappedPort()) - .put("database", container.getDatabaseName()) - .build()); - - database = Databases.createPostgresDatabase(container.getUsername(), container.getPassword(), container.getJdbcUrl()); - } - - @AfterEach - void tearDown() throws Exception { - database.close(); - container.close(); - } - - @Test - void testSpec() throws IOException { - final ConnectorSpecification actual = new PostgresDestination().spec(); - final String resourceString = MoreResources.readResource("spec.json"); - final ConnectorSpecification expected = Jsons.deserialize(resourceString, ConnectorSpecification.class); - - assertEquals(expected, actual); - } - - @Test - void testCheckSuccess() { - final AirbyteConnectionStatus actual = new PostgresDestination().check(config); - final AirbyteConnectionStatus expected = new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); - assertEquals(expected, actual); - } - - @Test - void testCheckFailure() { - ((ObjectNode) config).put("password", "fake"); - final AirbyteConnectionStatus actual = new PostgresDestination().check(config); - assertEquals(Status.FAILED, actual.getStatus()); - assertTrue(actual.getMessage().startsWith("Could not connect with provided configuration.")); - } - - @Test - void testWriteSuccess() throws Exception { - final PostgresDestination destination = new PostgresDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, CATALOG); - - consumer.start(); - consumer.accept(MESSAGE_USERS1); - consumer.accept(MESSAGE_TASKS1); - consumer.accept(MESSAGE_USERS2); - consumer.accept(MESSAGE_TASKS2); - consumer.accept(MESSAGE_STATE); - consumer.close(); - - Set usersActual = recordRetriever(NAMING_TRANSFORMER.getRawTableName(USERS_STREAM_NAME)); - final Set expectedUsersJson = Sets.newHashSet(MESSAGE_USERS1.getRecord().getData(), MESSAGE_USERS2.getRecord().getData()); - assertEquals(expectedUsersJson, usersActual); - - Set tasksActual = recordRetriever(NAMING_TRANSFORMER.getRawTableName(TASKS_STREAM_NAME)); - final Set expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData()); - assertEquals(expectedTasksJson, tasksActual); - - assertTmpTablesNotPresent(CATALOG.getStreams() - .stream() - .map(ConfiguredAirbyteStream::getStream) - .map(AirbyteStream::getName) - .collect(Collectors.toList())); - } - - @Test - void testWriteIncremental() throws Exception { - final ConfiguredAirbyteCatalog catalog = Jsons.clone(CATALOG); - catalog.getStreams().forEach(stream -> { - stream.withSyncMode(SyncMode.INCREMENTAL); - stream.withDestinationSyncMode(DestinationSyncMode.APPEND); - }); - - final PostgresDestination destination = new PostgresDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog); - - consumer.start(); - consumer.accept(MESSAGE_USERS1); - consumer.accept(MESSAGE_TASKS1); - consumer.accept(MESSAGE_USERS2); - consumer.accept(MESSAGE_TASKS2); - consumer.accept(MESSAGE_STATE); - consumer.close(); - - final AirbyteMessageConsumer consumer2 = destination.getConsumer(config, catalog); - - final AirbyteMessage messageUser3 = new AirbyteMessage().withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "michael").put("id", "87").build())) - .withEmittedAt(NOW.toEpochMilli())); - consumer2.start(); - consumer2.accept(messageUser3); - consumer2.close(); - - Set usersActual = recordRetriever(NAMING_TRANSFORMER.getRawTableName(USERS_STREAM_NAME)); - final Set expectedUsersJson = Sets.newHashSet( - MESSAGE_USERS1.getRecord().getData(), - MESSAGE_USERS2.getRecord().getData(), - messageUser3.getRecord().getData()); - assertEquals(expectedUsersJson, usersActual); - - Set tasksActual = recordRetriever(NAMING_TRANSFORMER.getRawTableName(TASKS_STREAM_NAME)); - final Set expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData()); - assertEquals(expectedTasksJson, tasksActual); - - assertTmpTablesNotPresent(CATALOG.getStreams() - .stream() - .map(ConfiguredAirbyteStream::getStream) - .map(AirbyteStream::getName) - .collect(Collectors.toList())); - } - - @Test - void testWriteNewSchema() throws Exception { - JsonNode newConfig = Jsons.jsonNode(ImmutableMap.builder() - .put("host", container.getHost()) - .put("username", container.getUsername()) - .put("password", container.getPassword()) - .put("schema", "new_schema") - .put("port", container.getFirstMappedPort()) - .put("database", container.getDatabaseName()) - .build()); - final PostgresDestination destination = new PostgresDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(newConfig, CATALOG); - - consumer.start(); - consumer.accept(MESSAGE_USERS1); - consumer.accept(MESSAGE_TASKS1); - consumer.accept(MESSAGE_USERS2); - consumer.accept(MESSAGE_TASKS2); - consumer.accept(MESSAGE_STATE); - consumer.close(); - - final String schemaName = NAMING_TRANSFORMER.getIdentifier("new_schema"); - String streamName = schemaName + "." + NAMING_TRANSFORMER.getRawTableName(USERS_STREAM_NAME); - Set usersActual = recordRetriever(streamName); - final Set expectedUsersJson = Sets.newHashSet(MESSAGE_USERS1.getRecord().getData(), MESSAGE_USERS2.getRecord().getData()); - assertEquals(expectedUsersJson, usersActual); - - streamName = schemaName + "." + NAMING_TRANSFORMER.getRawTableName(TASKS_STREAM_NAME); - Set tasksActual = recordRetriever(streamName); - final Set expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData()); - assertEquals(expectedTasksJson, tasksActual); - - assertTmpTablesNotPresent(CATALOG.getStreams() - .stream() - .map(ConfiguredAirbyteStream::getStream) - .map(AirbyteStream::getName) - .collect(Collectors.toList())); - - assertThrows(RuntimeException.class, () -> recordRetriever(NAMING_TRANSFORMER.getRawTableName(USERS_STREAM_NAME))); - } - - @SuppressWarnings("ResultOfMethodCallIgnored") - @Test - void testWriteFailure() throws Exception { - // hack to force an exception to be thrown from within the consumer. - final AirbyteMessage spiedMessage = spy(MESSAGE_USERS1); - doThrow(new RuntimeException()).when(spiedMessage).getRecord(); - - final PostgresDestination destination = new PostgresDestination(); - final AirbyteMessageConsumer consumer = spy(destination.getConsumer(config, CATALOG)); - - consumer.start(); - assertThrows(RuntimeException.class, () -> consumer.accept(spiedMessage)); - consumer.accept(MESSAGE_USERS2); - consumer.close(); - - final List tableNames = CATALOG.getStreams() - .stream() - .map(ConfiguredAirbyteStream::getStream) - .map(s -> NAMING_TRANSFORMER.getRawTableName(s.getName())) - .collect(Collectors.toList()); - assertTmpTablesNotPresent(CATALOG.getStreams() - .stream() - .map(ConfiguredAirbyteStream::getStream) - .map(AirbyteStream::getName) - .collect(Collectors.toList())); - // assert that no tables were created. - assertTrue(fetchNamesOfTablesInDb().stream().noneMatch(tableName -> tableNames.stream().anyMatch(tableName::startsWith))); - } - - private List fetchNamesOfTablesInDb() throws SQLException { - return database.query( - ctx -> ctx.fetch("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';")) - .stream() - .map(record -> (String) record.get("table_name")).collect(Collectors.toList()); - } - - private void assertTmpTablesNotPresent(List tableNames) throws SQLException { - Set tmpTableNamePrefixes = tableNames.stream().map(name -> name + "_\\d+").collect(Collectors.toSet()); - assertTrue(fetchNamesOfTablesInDb().stream().noneMatch(tableName -> tmpTableNamePrefixes.stream().anyMatch(tableName::matches))); - } - - private Set recordRetriever(String streamName) throws Exception { - return database.query(ctx -> ctx - .fetch(String.format("SELECT * FROM %s ORDER BY %s ASC;", streamName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) - .stream() - .peek(record -> { - // ensure emitted_at is not in the future - OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC); - OffsetDateTime emitted_at = record.get(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, OffsetDateTime.class); - - assertTrue(now.toEpochSecond() >= emitted_at.toEpochSecond()); - }) - .map(r -> r.formatJSON(JSON_FORMAT)) - .map(Jsons::deserialize) - .map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())) - .collect(Collectors.toSet())); - } - -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopier.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopier.java index 0aabc440aff3e..b56c65fb34801 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopier.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopier.java @@ -35,7 +35,6 @@ import java.io.IOException; import java.io.PrintWriter; import java.nio.charset.StandardCharsets; -import java.sql.SQLException; import java.sql.Timestamp; import java.time.Instant; import java.util.List; @@ -187,10 +186,11 @@ private void closeS3WriteStreamAndUpload() throws IOException { LOGGER.info("All data for {} stream uploaded.", streamName); } - private void createTmpTableAndCopyS3FileInto() throws SQLException { - LOGGER.info("Preparing tmp table in destination for stream {}. tmp table name: {}.", streamName, tmpTableName); + private void createTmpTableAndCopyS3FileInto() throws Exception { + REDSHIFT_SQL_OPS.createSchemaIfNotExists(redshiftDb, schemaName); + LOGGER.info("Preparing tmp table in destination for stream: {}, schema: {}, tmp table name: {}.", streamName, schemaName, tmpTableName); REDSHIFT_SQL_OPS.createTableIfNotExists(redshiftDb, schemaName, tmpTableName); - LOGGER.info("Starting copy to tmp table {} in destination for stream {} .", tmpTableName, streamName); + LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}, .", tmpTableName, streamName, schemaName); REDSHIFT_SQL_OPS.copyS3CsvFileIntoTable(redshiftDb, getFullS3Path(s3BucketName, stagingFolder, streamName), schemaName, tmpTableName, s3KeyId, s3Key, s3Region); @@ -203,11 +203,11 @@ private String mergeIntoDestTableIncrementalOrFullRefreshQuery() throws Exceptio REDSHIFT_SQL_OPS.createTableIfNotExists(redshiftDb, schemaName, destTableName); LOGGER.info("Tmp table {} in destination prepared.", tmpTableName); - LOGGER.info("Preparing to merge tmp table {} to dest table {} in destination.", tmpTableName, destTableName); + LOGGER.info("Preparing to merge tmp table {} to dest table: {}, schema: {}, in destination.", tmpTableName, destTableName, schemaName); var queries = new StringBuilder(); if (destSyncMode.equals(DestinationSyncMode.OVERWRITE)) { queries.append(REDSHIFT_SQL_OPS.truncateTableQuery(schemaName, destTableName)); - LOGGER.info("Destination OVERWRITE mode detected. Dest table {} truncated.", destTableName); + LOGGER.info("Destination OVERWRITE mode detected. Dest table: {}, schema: {}, truncated.", destTableName, schemaName); } queries.append(REDSHIFT_SQL_OPS.copyTableQuery(schemaName, tmpTableName, destTableName)); return queries.toString(); diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestination.java index 98aead21245a0..20a8a64e64e0b 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestination.java @@ -36,7 +36,6 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; -import io.airbyte.integrations.destination.StandardNameTransformer; import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; @@ -62,7 +61,7 @@ */ public class RedshiftCopyDestination { - private static final StandardNameTransformer namingResolver = new StandardNameTransformer(); + private static final RedshiftSQLNameTransformer namingResolver = new RedshiftSQLNameTransformer(); private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftCopyDestination.class); public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog) { @@ -75,7 +74,7 @@ public AirbyteConnectionStatus check(JsonNode config) { var s3Config = new S3Config(config); attemptWriteAndDeleteS3Object(s3Config, outputTableName); - var outputSchema = namingResolver.getIdentifier(config.get("schema").asText()); + var outputSchema = namingResolver.convertStreamName(config.get("schema").asText()); JdbcDatabase database = getRedshift(config); AbstractJdbcDestination.attemptSQLCreateAndDropTableOperations(outputSchema, database, namingResolver, new RedshiftSqlOperations()); @@ -128,8 +127,8 @@ static String extractRegionFromRedshiftUrl(String url) { static class RedshiftCopyDestinationConsumer extends FailureTrackingAirbyteMessageConsumer { private final ConfiguredAirbyteCatalog catalog; + private final String defaultSchema; private final JdbcDatabase redshiftDb; - private final String schema; private final S3Config s3Config; private final AmazonS3 s3Client; private final Map streamNameToCopier; @@ -137,7 +136,7 @@ static class RedshiftCopyDestinationConsumer extends FailureTrackingAirbyteMessa public RedshiftCopyDestinationConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog) { this.catalog = catalog; this.redshiftDb = getRedshift(config); - this.schema = config.get("schema").asText(); + this.defaultSchema = config.get("schema").asText(); this.s3Config = new S3Config(config); this.s3Client = getAmazonS3(s3Config); this.streamNameToCopier = new HashMap<>(); @@ -146,15 +145,17 @@ public RedshiftCopyDestinationConsumer(JsonNode config, ConfiguredAirbyteCatalog @Override protected void startTracked() throws Exception { var stagingFolder = UUID.randomUUID().toString(); - for (var stream : catalog.getStreams()) { - var streamName = stream.getStream().getName(); - var syncMode = stream.getDestinationSyncMode(); - if (stream.getDestinationSyncMode() == null) { + for (var configuredStream : catalog.getStreams()) { + if (configuredStream.getDestinationSyncMode() == null) { throw new IllegalStateException("Undefined destination sync mode."); } + var stream = configuredStream.getStream(); + var streamName = stream.getName(); + var syncMode = configuredStream.getDestinationSyncMode(); + var schema = + stream.getNamespace() != null ? namingResolver.convertStreamName(stream.getNamespace()) : namingResolver.convertStreamName(defaultSchema); var copier = new RedshiftCopier(s3Config.bucketName, stagingFolder, syncMode, schema, streamName, s3Client, redshiftDb, s3Config.accessKeyId, s3Config.secretAccessKey, s3Config.region); - streamNameToCopier.put(streamName, copier); } } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json index a3a0b9475e3ce..11d6d5ccca9de 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json @@ -11,7 +11,8 @@ "properties": { "host": { "description": "Host Endpoint of the Redshift Cluster (must include the cluster-id, region and end with .redshift.amazonaws.com)", - "type": "string" + "type": "string", + "title": "Host" }, "port": { "description": "Port of the database.", @@ -19,26 +20,31 @@ "minimum": 0, "maximum": 65536, "default": 5439, - "examples": ["5439"] + "examples": ["5439"], + "title": "Port" }, "username": { "description": "Username to use to access the database.", - "type": "string" + "type": "string", + "title": "Username" }, "password": { "description": "Password associated with the username.", "type": "string", - "airbyte_secret": true + "airbyte_secret": true, + "title": "Password" }, "database": { "description": "Name of the database.", - "type": "string" + "type": "string", + "title": "Database" }, "schema": { - "description": "Unless specifically configured, the usual value for this field is \"public\".", + "description": "The default schema tables are written to if the source does not specify a namespace. Unless specifically configured, the usual value for this field is \"public\".", "type": "string", "examples": ["public"], - "default": "public" + "default": "public", + "title": "Default Schema" }, "basic_normalization": { "type": "boolean", diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftCopyIntegrationTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftCopyIntegrationTest.java index edcf4cdb8e118..db446b4d4a6f3 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftCopyIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftCopyIntegrationTest.java @@ -31,7 +31,6 @@ import io.airbyte.db.Database; import io.airbyte.db.Databases; import io.airbyte.integrations.base.JavaBaseConstants; -import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.standardtest.destination.TestDestination; import java.nio.file.Path; import java.sql.SQLException; @@ -53,7 +52,7 @@ public class RedshiftCopyIntegrationTest extends TestDestination { private JsonNode baseConfig; // config which refers to the schema that the test is being run in. private JsonNode config; - private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer(); + private final RedshiftSQLNameTransformer namingResolver = new RedshiftSQLNameTransformer(); @Override protected String getImageName() { @@ -77,8 +76,8 @@ protected JsonNode getFailCheckConfig() { } @Override - protected List retrieveRecords(TestDestinationEnv env, String streamName) throws Exception { - return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName)) + protected List retrieveRecords(TestDestinationEnv env, String streamName, String namespace) throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) .stream() .map(j -> Jsons.deserialize(j.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())) .collect(Collectors.toList()); @@ -90,13 +89,13 @@ protected boolean implementsBasicNormalization() { } @Override - protected List retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName) throws Exception { + protected List retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception { String tableName = namingResolver.getIdentifier(streamName); if (!tableName.startsWith("\"")) { // Currently, Normalization always quote tables identifiers tableName = "\"" + tableName + "\""; } - return retrieveRecordsFromTable(tableName); + return retrieveRecordsFromTable(tableName, namespace); } @Override @@ -112,8 +111,7 @@ protected List resolveIdentifier(String identifier) { return result; } - private List retrieveRecordsFromTable(String tableName) throws SQLException { - final String schemaName = config.get("schema").asText(); + private List retrieveRecordsFromTable(String tableName, String schemaName) throws SQLException { return getDatabase().query( ctx -> ctx .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertIntegrationTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertIntegrationTest.java index 13967a655baa7..62d922f28be87 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertIntegrationTest.java @@ -80,8 +80,8 @@ protected JsonNode getFailCheckConfig() { } @Override - protected List retrieveRecords(TestDestinationEnv env, String streamName) throws Exception { - return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName)) + protected List retrieveRecords(TestDestinationEnv env, String streamName, String namespace) throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) .stream() .map(j -> Jsons.deserialize(j.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())) .collect(Collectors.toList()); @@ -93,13 +93,13 @@ protected boolean implementsBasicNormalization() { } @Override - protected List retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName) throws Exception { + protected List retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception { String tableName = namingResolver.getIdentifier(streamName); if (!tableName.startsWith("\"")) { // Currently, Normalization always quote tables identifiers tableName = "\"" + tableName + "\""; } - return retrieveRecordsFromTable(tableName); + return retrieveRecordsFromTable(tableName, namespace); } @Override @@ -115,8 +115,7 @@ protected List resolveIdentifier(String identifier) { return result; } - private List retrieveRecordsFromTable(String tableName) throws SQLException { - final String schemaName = config.get("schema").asText(); + private List retrieveRecordsFromTable(String tableName, String schemaName) throws SQLException { return getDatabase().query( ctx -> ctx .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json index a160c1587202b..cecbb43c66217 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json @@ -21,37 +21,44 @@ "host": { "description": "Host domain of the snowflake instance (must include the account, region, cloud environment, and end with snowflakecomputing.com).", "examples": ["accountname.us-east-2.aws.snowflakecomputing.com"], - "type": "string" + "type": "string", + "title": "Host" }, "role": { "description": "The role you created for Airbyte to access Snowflake.", "examples": ["AIRBYTE_ROLE"], - "type": "string" + "type": "string", + "title": "Role" }, "warehouse": { "description": "The warehouse you created for Airbyte to sync data into.", "examples": ["AIRBYTE_WAREHOUSE"], - "type": "string" + "type": "string", + "title": "Warehouse" }, "database": { "description": "The database you created for Airbyte to sync data into.", "examples": ["AIRBYTE_DATABASE"], - "type": "string" + "type": "string", + "title": "Database" }, "schema": { - "description": "The Snowflake schema you created for Airbyte to sync data into.", + "description": "The default Snowflake schema tables are written to if the source does not specify a namespace.", "examples": ["AIRBYTE_SCHEMA"], - "type": "string" + "type": "string", + "title": "Default Schema" }, "username": { "description": "The username you created to allow Airbyte to access the database.", "examples": ["AIRBYTE_USER"], - "type": "string" + "type": "string", + "title": "Username" }, "password": { "description": "Password associated with the username.", "type": "string", - "airbyte_secret": true + "airbyte_secret": true, + "title": "Password" }, "basic_normalization": { "type": "boolean", diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeIntegrationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeIntegrationTest.java index dbf1826c293de..3889dff8b94b6 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeIntegrationTest.java @@ -69,8 +69,8 @@ protected JsonNode getFailCheckConfig() { } @Override - protected List retrieveRecords(TestDestinationEnv env, String streamName) throws Exception { - return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName)) + protected List retrieveRecords(TestDestinationEnv env, String streamName, String namespace) throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namingResolver.getIdentifier(namespace)) .stream() .map(j -> Jsons.deserialize(j.get(JavaBaseConstants.COLUMN_NAME_DATA.toUpperCase()).asText())) .collect(Collectors.toList()); @@ -82,15 +82,16 @@ protected boolean implementsBasicNormalization() { } @Override - protected List retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName) throws Exception { + protected List retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception { String tableName = namingResolver.getIdentifier(streamName); + String schema = namingResolver.getIdentifier(namespace); // Temporarily disabling the behavior of the ExtendedNameTransformer, see (issue #1785) so we don't // use quoted names // if (!tableName.startsWith("\"")) { // // Currently, Normalization always quote tables identifiers // tableName = "\"" + tableName + "\""; // } - return retrieveRecordsFromTable(tableName); + return retrieveRecordsFromTable(tableName, schema); } @Override @@ -106,10 +107,10 @@ protected List resolveIdentifier(String identifier) { return result; } - private List retrieveRecordsFromTable(String tableName) throws SQLException, InterruptedException { + private List retrieveRecordsFromTable(String tableName, String schema) throws SQLException, InterruptedException { return SnowflakeDatabase.getDatabase(getConfig()).bufferedResultSetQuery( connection -> connection.createStatement() - .executeQuery(String.format("SELECT * FROM %s ORDER BY %s ASC;", tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)), + .executeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schema, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)), JdbcUtils::rowToJson); } diff --git a/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml b/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml index 55bec6609495c..567cfcf75c3e2 100644 --- a/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml +++ b/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml @@ -151,6 +151,9 @@ definitions: type: array items: type: string + namespace: + description: Optional Source-defined namespace. Currently only used by JDBC destinations to determine what schema to write to. Airbyte streams from the same sources should have the same namespace. + type: string ConfiguredAirbyteCatalog: description: Airbyte stream schema catalog type: object diff --git a/airbyte-webapp/src/locales/en.json b/airbyte-webapp/src/locales/en.json index 16c07cadf5a39..591d024fc5b24 100644 --- a/airbyte-webapp/src/locales/en.json +++ b/airbyte-webapp/src/locales/en.json @@ -76,8 +76,8 @@ "form.edit": "Edit", "form.done": "Done", "form.examples": "(e.g. {examples})", - "form.prefix": "Namespace Prefix", - "form.prefix.message": "Where to replicate your data to in your destination", + "form.prefix": "Table Prefix", + "form.prefix.message": "Prefix to prefix all destination tables created as part of this connection. Useful for identifying tables on a per-connection basis.", "form.prefix.placeholder": "/configured-setting", "form.nameSearch": "Search name", "form.sourceConnector": "Source connector",