Skip to content

Commit

Permalink
destination-redshift: Fix StackOverflowError with eager rendering of …
Browse files Browse the repository at this point in the history
…nested jooq function call sql. (#33877)

Signed-off-by: Gireesh Sreepathi <gisripa@gmail.com>
  • Loading branch information
gisripa committed Jan 3, 2024
1 parent 48431ba commit 0cebfbf
Show file tree
Hide file tree
Showing 5 changed files with 366 additions and 14 deletions.
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 0.7.8
dockerImageTag: 0.7.9
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Expand Up @@ -152,7 +152,8 @@ private Field<String> jsonSerialize(final Field<?> field) {
}

/**
* Redshift ARRAY_CONCAT supports only 2 arrays, recursively build ARRAY_CONCAT for n arrays.
* Redshift ARRAY_CONCAT supports only 2 arrays. Iteratively nest ARRAY_CONCAT to support more than
* 2
*
* @param arrays
* @return
Expand All @@ -162,16 +163,14 @@ Field<?> arrayConcatStmt(List<Field<?>> arrays) {
return field("ARRAY()"); // Return an empty string if the list is empty
}

// Base case: if there's only one element, return it
if (arrays.size() == 1) {
return arrays.get(0);
Field<?> result = arrays.get(0);
String renderedSql = getDslContext().render(result);
for (int i = 1; i < arrays.size(); i++) {
// We lose some nice indentation but thats ok. Queryparts
// are intentionally rendered here to avoid deep stack for function sql rendering.
result = field(getDslContext().renderNamedOrInlinedParams(function("ARRAY_CONCAT", getSuperType(), result, arrays.get(i))));
}

// Recursive case: construct ARRAY_CONCAT function call
Field<?> lastValue = arrays.get(arrays.size() - 1);
Field<?> recursiveCall = arrayConcatStmt(arrays.subList(0, arrays.size() - 1));

return function("ARRAY_CONCAT", getSuperType(), recursiveCall, lastValue);
return result;
}

Field<?> toCastingErrorCaseStmt(final ColumnId column, final AirbyteType type) {
Expand Down
@@ -0,0 +1,137 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redshift.typing_deduping;

import static org.junit.jupiter.api.Assertions.*;

import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
import io.airbyte.integrations.base.destination.typing_deduping.Array;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import org.jooq.DSLContext;
import org.jooq.conf.Settings;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class RedshiftSqlGeneratorTest {

private static final Random RANDOM = new Random();

private static final RedshiftSqlGenerator redshiftSqlGenerator = new RedshiftSqlGenerator(new RedshiftSQLNameTransformer()) {

// Override only for tests to print formatted SQL. The actual implementation should use unformatted
// to save bytes.
@Override
protected DSLContext getDslContext() {
return DSL.using(getDialect(), new Settings().withRenderFormatted(true));
}

};

private StreamId streamId;

private StreamConfig incrementalDedupStream;

private StreamConfig incrementalAppendStream;

@BeforeEach
public void setup() {
streamId = new StreamId("test_schema", "users_final", "test_schema", "users_raw", "test_schema", "users_final");
final ColumnId id1 = redshiftSqlGenerator.buildColumnId("id1");
final ColumnId id2 = redshiftSqlGenerator.buildColumnId("id2");
List<ColumnId> primaryKey = List.of(id1, id2);
ColumnId cursor = redshiftSqlGenerator.buildColumnId("updated_at");

LinkedHashMap<ColumnId, AirbyteType> columns = new LinkedHashMap<>();
columns.put(id1, AirbyteProtocolType.INTEGER);
columns.put(id2, AirbyteProtocolType.INTEGER);
columns.put(cursor, AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE);
columns.put(redshiftSqlGenerator.buildColumnId("struct"), new Struct(new LinkedHashMap<>()));
columns.put(redshiftSqlGenerator.buildColumnId("array"), new Array(AirbyteProtocolType.UNKNOWN));
columns.put(redshiftSqlGenerator.buildColumnId("string"), AirbyteProtocolType.STRING);
columns.put(redshiftSqlGenerator.buildColumnId("number"), AirbyteProtocolType.NUMBER);
columns.put(redshiftSqlGenerator.buildColumnId("integer"), AirbyteProtocolType.INTEGER);
columns.put(redshiftSqlGenerator.buildColumnId("boolean"), AirbyteProtocolType.BOOLEAN);
columns.put(redshiftSqlGenerator.buildColumnId("timestamp_with_timezone"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE);
columns.put(redshiftSqlGenerator.buildColumnId("timestamp_without_timezone"), AirbyteProtocolType.TIMESTAMP_WITHOUT_TIMEZONE);
columns.put(redshiftSqlGenerator.buildColumnId("time_with_timezone"), AirbyteProtocolType.TIME_WITH_TIMEZONE);
columns.put(redshiftSqlGenerator.buildColumnId("time_without_timezone"), AirbyteProtocolType.TIME_WITHOUT_TIMEZONE);
columns.put(redshiftSqlGenerator.buildColumnId("date"), AirbyteProtocolType.DATE);
columns.put(redshiftSqlGenerator.buildColumnId("unknown"), AirbyteProtocolType.UNKNOWN);
columns.put(redshiftSqlGenerator.buildColumnId("_ab_cdc_deleted_at"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE);
incrementalDedupStream = new StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
primaryKey,
Optional.of(cursor),
columns);
incrementalAppendStream = new StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND,
primaryKey,
Optional.of(cursor),
columns);
}

@Test
public void testTypingAndDeduping() throws IOException {
String expectedSql = MoreResources.readResource("typing_deduping_with_cdc.sql");
String generatedSql =
redshiftSqlGenerator.updateTable(incrementalDedupStream, "unittest", Optional.of(Instant.parse("2023-02-15T18:35:24.00Z")), false);
List<String> expectedSqlLines = Arrays.stream(expectedSql.split("\n")).map(String::trim).toList();
List<String> generatedSqlLines = Arrays.stream(generatedSql.split("\n")).map(String::trim).toList();
System.out.println(generatedSql);
assertEquals(expectedSqlLines, generatedSqlLines);
}

@Test
public void test2000ColumnSql() {
final ColumnId id1 = redshiftSqlGenerator.buildColumnId("id1");
final ColumnId id2 = redshiftSqlGenerator.buildColumnId("id2");
List<ColumnId> primaryKey = List.of(id1, id2);
ColumnId cursor = redshiftSqlGenerator.buildColumnId("updated_at");

LinkedHashMap<ColumnId, AirbyteType> columns = new LinkedHashMap<>();
columns.put(id1, AirbyteProtocolType.INTEGER);
columns.put(id2, AirbyteProtocolType.INTEGER);
columns.put(cursor, AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE);

for (int i = 0; i < 2000; i++) {
final String columnName = RANDOM
.ints('a', 'z' + 1)
.limit(15)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
columns.put(redshiftSqlGenerator.buildColumnId(columnName), AirbyteProtocolType.STRING);
}
String generatedSql = redshiftSqlGenerator.updateTable(new StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
primaryKey,
Optional.of(cursor),
columns), "unittest", Optional.of(Instant.parse("2023-02-15T18:35:24.00Z")), false);
// This should not throw an exception.
assertFalse(generatedSql.isEmpty());
}

}

0 comments on commit 0cebfbf

Please sign in to comment.