diff --git a/airbyte-integrations/connectors/destination-redshift/metadata.yaml b/airbyte-integrations/connectors/destination-redshift/metadata.yaml index 951c565ca8e2d..8b8f28a7a6ca0 100644 --- a/airbyte-integrations/connectors/destination-redshift/metadata.yaml +++ b/airbyte-integrations/connectors/destination-redshift/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc - dockerImageTag: 2.4.1 + dockerImageTag: 2.4.2 dockerRepository: airbyte/destination-redshift documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift githubIssueLabel: destination-redshift diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java index bbaa1f3687dab..4d7d96db0bac6 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java @@ -12,7 +12,6 @@ import static io.airbyte.integrations.destination.redshift.util.RedshiftUtil.findS3Options; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.annotations.VisibleForTesting; import io.airbyte.cdk.db.factory.DataSourceFactory; import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcDatabase; @@ -30,7 +29,6 @@ import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator; -import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer; import io.airbyte.cdk.integrations.destination.s3.AesCbcEnvelopeEncryption; import io.airbyte.cdk.integrations.destination.s3.AesCbcEnvelopeEncryption.KeyType; import io.airbyte.cdk.integrations.destination.s3.EncryptionConfig; @@ -225,14 +223,6 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN : new NoEncryption(); final JsonNode s3Options = findS3Options(config); final S3DestinationConfig s3Config = getS3DestinationConfig(s3Options); - final int numberOfFileBuffers = getNumberOfFileBuffers(s3Options); - if (numberOfFileBuffers > FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER) { - LOGGER.warn(""" - Increasing the number of file buffers past {} can lead to increased performance but - leads to increased memory usage. If the number of file buffers exceeds the number - of streams {} this will create more buffers than necessary, leading to nonexistent gains - """, FileBuffer.SOFT_CAP_CONCURRENT_STREAM_IN_BUFFER, catalog.getStreams().size()); - } final String defaultNamespace = config.get("schema").asText(); for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { @@ -287,26 +277,6 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN .createAsync(); } - /** - * Retrieves user configured file buffer amount so as long it doesn't exceed the maximum number of - * file buffers and sets the minimum number to the default - *

- * NOTE: If Out Of Memory Exceptions (OOME) occur, this can be a likely cause as this hard limit has - * not been thoroughly load tested across all instance sizes - * - * @param config user configurations - * @return number of file buffers if configured otherwise default - */ - @VisibleForTesting - public int getNumberOfFileBuffers(final JsonNode config) { - int numOfFileBuffers = FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER; - if (config.has(FileBuffer.FILE_BUFFER_COUNT_KEY)) { - numOfFileBuffers = Math.min(config.get(FileBuffer.FILE_BUFFER_COUNT_KEY).asInt(), FileBuffer.MAX_CONCURRENT_STREAM_IN_BUFFER); - } - // Only allows for values 10 <= numOfFileBuffers <= 50 - return Math.max(numOfFileBuffers, FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER); - } - private boolean isPurgeStagingData(final JsonNode config) { return !config.has("purge_staging_data") || config.get("purge_staging_data").asBoolean(); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json index 49f5c7676ddcb..b68874c9fe552 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json @@ -223,15 +223,6 @@ } ], "order": 7 - }, - "file_buffer_count": { - "title": "File Buffer Count", - "type": "integer", - "minimum": 10, - "maximum": 50, - "default": 10, - "description": "Number of file buffers allocated for writing data. Increasing this number is beneficial for connections using Change Data Capture (CDC) and up to the number of streams within a connection. Increasing the number of file buffers past the maximum number of streams has deteriorating effects", - "examples": ["10"] } } }, diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftFileBufferTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftFileBufferTest.java deleted file mode 100644 index bbeab71e6be0c..0000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftFileBufferTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.redshift; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer; -import io.airbyte.commons.io.IOs; -import io.airbyte.commons.json.Jsons; -import java.nio.file.Path; -import org.junit.jupiter.api.Test; - -public class RedshiftFileBufferTest { - - private final JsonNode config = Jsons.deserialize(IOs.readFile(Path.of("secrets/config_staging.json"))); - private final RedshiftStagingS3Destination destination = new RedshiftStagingS3Destination(); - - @Test - public void testGetFileBufferDefault() { - assertEquals(destination.getNumberOfFileBuffers(config), FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER); - } - - @Test - public void testGetFileBufferMaxLimited() { - ((ObjectNode) config).put(FileBuffer.FILE_BUFFER_COUNT_KEY, 100); - assertEquals(destination.getNumberOfFileBuffers(config), FileBuffer.MAX_CONCURRENT_STREAM_IN_BUFFER); - } - - @Test - public void testGetMinimumFileBufferCount() { - ((ObjectNode) config).put(FileBuffer.FILE_BUFFER_COUNT_KEY, 1); - // User cannot set number of file counts below the default file buffer count, which is existing - // behavior - assertEquals(destination.getNumberOfFileBuffers(config), FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER); - } - -} diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index 699c536a691d8..839d7351f78b8 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -69,11 +69,6 @@ Optional parameters: `bucketPath/namespace/streamName/syncDate_epochMillis_randomUuid.csv` containing three columns (`ab_id`, `data`, `emitted_at`). Normally these files are deleted after the `COPY` command completes; if you want to keep them for other purposes, set `purge_staging_data` to `false`. -- **File Buffer Count** - - Number of file buffers allocated for writing data. Increasing this number is beneficial for - connections using Change Data Capture (CDC) and up to the number of streams within a connection. - Increasing the number of file buffers past the maximum number of streams has deteriorating - effects. NOTE: S3 staging does not use the SSH Tunnel option for copying data, if configured. SSH Tunnel supports the SQL connection only. S3 is secured through public HTTPS access only. Subsequent typing @@ -247,6 +242,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | | :------ | :--------- | :--------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 2.4.2 | 2024-04-05 | [\#36365](https://github.com/airbytehq/airbyte/pull/36365) | Remove unused config option | | 2.4.1 | 2024-04-04 | [#36846](https://github.com/airbytehq/airbyte/pull/36846) | Remove duplicate S3 Region | | 2.4.0 | 2024-03-21 | [\#36589](https://github.com/airbytehq/airbyte/pull/36589) | Adapt to Kotlin cdk 0.28.19 | | 2.3.2 | 2024-03-21 | [\#36374](https://github.com/airbytehq/airbyte/pull/36374) | Supress Jooq DataAccessException error message in logs |