Skip to content

Commit

Permalink
Destination Postgres: Fix casing for raw table in T+D query (#34630)
Browse files Browse the repository at this point in the history
## What
* Fixes: #34632 

## How
* Change the raw table convention to always use lowercase, since that is the default when created as unquoted identifier.
This still preserves the mixed case identifers in Final tables.
  • Loading branch information
gisripa committed Jan 30, 2024
1 parent acd26ac commit 1ad7155
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 6 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.16.2 | 2024-01-29 | [\#34630](https://github.com/airbytehq/airbyte/pull/34630) | expose NamingTransformer to sub-classes in destinations JdbcSqlGenerator. |
| 0.16.1 | 2024-01-29 | [\#34533](https://github.com/airbytehq/airbyte/pull/34533) | Add a safe method to execute DatabaseMetadata's Resultset returning queries. |
| 0.16.0 | 2024-01-26 | [\#34573](https://github.com/airbytehq/airbyte/pull/34573) | Untangle Debezium harness dependencies. |
| 0.15.2 | 2024-01-25 | [\#34441](https://github.com/airbytehq/airbyte/pull/34441) | Improve airbyte-api build performance. |
Expand Down
@@ -1 +1 @@
version=0.16.1
version=0.16.2
Expand Up @@ -72,7 +72,7 @@ public abstract class JdbcSqlGenerator implements SqlGenerator<TableDefinition>
private static final String TYPING_CTE_ALIAS = "intermediate_data";
private static final String NUMBERED_ROWS_CTE_ALIAS = "numbered_rows";

private final NamingConventionTransformer namingTransformer;
protected final NamingConventionTransformer namingTransformer;
protected final ColumnId cdcDeletedAtColumn;

public JdbcSqlGenerator(final NamingConventionTransformer namingTransformer) {
Expand Down
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.13.1'
cdkVersionRequired = '0.16.2'
features = [
'db-sources', // required for tests
'db-destinations'
Expand Down
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 0.6.0
dockerImageTag: 0.6.1
dockerRepository: airbyte/destination-postgres-strict-encrypt
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.14.0'
cdkVersionRequired = '0.16.2'
features = [
'db-sources', // required for tests
'db-destinations',
Expand Down
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 0.6.0
dockerImageTag: 0.6.1
dockerRepository: airbyte/destination-postgres
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Expand Up @@ -31,6 +31,7 @@
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.util.ArrayList;
Expand Down Expand Up @@ -64,6 +65,23 @@ public PostgresSqlGenerator(final NamingConventionTransformer namingTransformer)
super(namingTransformer);
}

@Override
public StreamId buildStreamId(final String namespace, final String name, final String rawNamespaceOverride) {
// There is a mismatch between convention used in create table query in SqlOperations vs this.
// For postgres specifically, when a create table is issued without a quoted identifier, it will be
// converted to lowercase.
// To keep it consistent when querying raw table in T+D query, convert it to lowercase.
// TODO: This logic should be unified across Raw and final table operations in a single class
// operating on a StreamId.
return new StreamId(
namingTransformer.getNamespace(namespace),
namingTransformer.convertStreamName(name),
namingTransformer.getNamespace(rawNamespaceOverride).toLowerCase(),
namingTransformer.convertStreamName(StreamId.concatenateRawTableName(namespace, name)).toLowerCase(),
namespace,
name);
}

@Override
protected DataType<?> getStructType() {
return JSONB_TYPE;
Expand Down
Expand Up @@ -12,9 +12,17 @@
import io.airbyte.integrations.destination.postgres.PostgresDestination;
import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer;
import io.airbyte.integrations.destination.postgres.PostgresTestDatabase;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.List;
import javax.sql.DataSource;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class PostgresTypingDedupingTest extends JdbcTypingDedupingTest {

Expand Down Expand Up @@ -72,4 +80,31 @@ protected JdbcCompatibleSourceOperations<?> getSourceOperations() {
return new PostgresSqlGeneratorIntegrationTest.PostgresSourceOperations();
}

@Test
public void testMixedCasedSchema() throws Exception {
streamName = "MixedCaseSchema" + streamName;
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withStream(new AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(SCHEMA))));

// First sync
final List<AirbyteMessage> messages1 = readMessages("dat/sync1_messages.jsonl");

runSync(catalog, messages1);

final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison());
}

@Override
protected List<JsonNode> dumpRawTableRecords(String streamNamespace, String streamName) throws Exception {
return super.dumpRawTableRecords(streamNamespace, streamName.toLowerCase());
}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/postgres.md
Expand Up @@ -170,6 +170,7 @@ Now that you have set up the Postgres destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------|
| 0.6.1 | 2024-01-29 | [34630](https://github.com/airbytehq/airbyte/pull/34630) | CDK Upgrade; Use lowercase raw table in T+D queries. |
| 0.6.0 | 2024-01-19 | [34372](https://github.com/airbytehq/airbyte/pull/34372) | Add dv2 flag in spec |
| 0.5.5 | 2024-01-18 | [34236](https://github.com/airbytehq/airbyte/pull/34236) | Upgrade CDK to 0.13.1; Add indexes in raw table for query optimization |
| 0.5.4 | 2024-01-11 | [34177](https://github.com/airbytehq/airbyte/pull/34177) | Add code for DV2 beta (no user-visible changes) |
Expand Down

0 comments on commit 1ad7155

Please sign in to comment.