Skip to content

Commit

Permalink
Destination redshift: escape string literal at runtime (#34544)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jan 26, 2024
1 parent 96b9f5a commit 0f7f77f
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 2.1.2
dockerImageTag: 2.1.3
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.jooq.InsertValuesStep4;
import org.jooq.Record;
import org.jooq.SQLDialect;
import org.jooq.conf.ParamType;
import org.jooq.impl.DefaultDataType;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
Expand Down Expand Up @@ -133,11 +134,17 @@ protected void insertRecordsInternalV2(final JdbcDatabase database,
for (final PartialAirbyteMessage record : batch) {
insert = insert.values(
val(UUID.randomUUID().toString()),
function("JSON_PARSE", String.class, val(record.getSerialized())),
function("JSON_PARSE", String.class, val(escapeStringLiteral(record.getSerialized()))),
val(Instant.ofEpochMilli(record.getRecord().getEmittedAt()).atOffset(ZoneOffset.UTC)),
val((OffsetDateTime) null));
}
insert.execute();
// Intentionally don't use insert.execute().
// jooq will try to use a parameterized query if there are not too many query params.
// This means that for small queries, we would need to not-escape backslashes,
// but for large queries we _would_ need to escape them.
// Instead, force jooq to always inline params.
// This allows us to always escape backslashes.
connection.createStatement().execute(insert.getSQL(ParamType.INLINED));
LOGGER.info("Executed batch size: {}, {}, {}", batch.size(), schemaName, tableName);
}
});
Expand All @@ -147,4 +154,15 @@ protected void insertRecordsInternalV2(final JdbcDatabase database,
}
}

public static String escapeStringLiteral(final String str) {
if (str == null) {
return null;
} else {
// jooq handles most things
// but we need to manually escape backslashes because postgres and redshift have
// different backslash handling.
return str.replace("\\", "\\\\");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.destination.redshift.typing_deduping;

import static io.airbyte.cdk.db.jdbc.DateTimeConverter.putJavaSQLTime;
import static io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations.escapeStringLiteral;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -205,14 +206,4 @@ public void testCreateTableIncremental() throws Exception {
// TODO assert on table clustering, etc.
}

private static String escapeStringLiteral(final String str) {
if (str == null) {
return null;
} else {
// jooq handles most things
// but we need to manually escape backslashes for some reason
return str.replace("\\", "\\\\");
}
}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.1.3 | 2024-01-26 | [34544](https://github.com/airbytehq/airbyte/pull/34544) | Proper string-escaping in raw tables |
| 2.1.2 | 2024-01-24 | [34451](https://github.com/airbytehq/airbyte/pull/34451) | Improve logging for unparseable input |
| 2.1.1 | 2024-01-24 | [34458](https://github.com/airbytehq/airbyte/pull/34458) | Improve error reporting |
| 2.1.0 | 2024-01-24 | [34467](https://github.com/airbytehq/airbyte/pull/34467) | Upgrade CDK to 0.14.0 |
Expand Down

0 comments on commit 0f7f77f

Please sign in to comment.