Skip to content

Commit

Permalink
🎉 Destination redshift: reenable T+D optimizations; bigquery+snowflak…
Browse files Browse the repository at this point in the history
…e: bump to latest CDK version (#33704)
  • Loading branch information
edgao committed Dec 21, 2023
1 parent 29e7d13 commit 1d76fd2
Show file tree
Hide file tree
Showing 14 changed files with 63 additions and 72 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.10.0 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | JdbcDestinationHandler now properly implements `getInitialRawTableState`; reenable SqlGenerator test |
| 0.9.0 | 2023-12-18 | [\#33124](https://github.com/airbytehq/airbyte/pull/33124) | Make Schema Creation Separate from Table Creation, exclude the T&D module from the CDK |
| 0.8.0 | 2023-12-18 | [\#33506](https://github.com/airbytehq/airbyte/pull/33506) | Improve async destination shutdown logic; more JDBC async migration work; improve DAT test schema handling |
| 0.7.9 | 2023-12-18 | [\#33549](https://github.com/airbytehq/airbyte/pull/33549) | Improve MongoDB logging. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.9.0
version=0.10.0
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLType;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.LinkedHashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -59,7 +65,49 @@ public boolean isFinalTableEmpty(final StreamId id) throws Exception {

@Override
public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception {
return new InitialRawTableState(true, Optional.empty());
final ResultSet tables = jdbcDatabase.getMetaData().getTables(
databaseName,
id.rawNamespace(),
id.rawName(),
null);
if (!tables.next()) {
// There's no raw table at all. Therefore there are no unprocessed raw records, and this sync
// should not filter raw records by timestamp.
return new InitialRawTableState(false, Optional.empty());
}
// And use two explicit queries because COALESCE might not short-circuit evaluation.
// This first query tries to find the oldest raw record with loaded_at = NULL.
// Unsafe query requires us to explicitly close the Stream, which is inconvenient,
// but it's also the only method in the JdbcDatabase interface to return non-string/int types
try (final Stream<Timestamp> timestampStream = jdbcDatabase.unsafeQuery(
conn -> conn.prepareStatement(
DSL.select(DSL.field("MIN(_airbyte_extracted_at)").as("min_timestamp"))
.from(DSL.name(id.rawNamespace(), id.rawName()))
.where(DSL.condition("_airbyte_loaded_at IS NULL"))
.getSQL()),
record -> record.getTimestamp("min_timestamp"))) {
// Filter for nonNull values in case the query returned NULL (i.e. no unloaded records).
final Optional<Timestamp> minUnloadedTimestamp = timestampStream.filter(Objects::nonNull).findFirst();
if (minUnloadedTimestamp.isPresent()) {
// Decrement by 1 second since timestamp precision varies between databases.
final Optional<Instant> ts = minUnloadedTimestamp
.map(Timestamp::toInstant)
.map(i -> i.minus(1, ChronoUnit.SECONDS));
return new InitialRawTableState(true, ts);
}
}
// If there are no unloaded raw records, then we can safely skip all existing raw records.
// This second query just finds the newest raw record.
try (final Stream<Timestamp> timestampStream = jdbcDatabase.unsafeQuery(
conn -> conn.prepareStatement(
DSL.select(DSL.field("MAX(_airbyte_extracted_at)").as("min_timestamp"))
.from(DSL.name(id.rawNamespace(), id.rawName()))
.getSQL()),
record -> record.getTimestamp("min_timestamp"))) {
// Filter for nonNull values in case the query returned NULL (i.e. no raw records at all).
final Optional<Timestamp> minUnloadedTimestamp = timestampStream.filter(Objects::nonNull).findFirst();
return new InitialRawTableState(false, minUnloadedTimestamp.map(Timestamp::toInstant));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
Expand Down Expand Up @@ -427,7 +426,6 @@ public void allTypes() throws Exception {
* timestamp.
*/
@Test
@Disabled
public void minTimestampBehavesCorrectly() throws Exception {
// When the raw table doesn't exist, there are no unprocessed records and no timestamp
assertEquals(new DestinationHandler.InitialRawTableState(false, Optional.empty()), destinationHandler.getInitialRawTableState(streamId));
Expand Down Expand Up @@ -525,11 +523,8 @@ public void handlePreexistingRecords() throws Exception {
final DestinationHandler.InitialRawTableState tableState = destinationHandler.getInitialRawTableState(streamId);
assertAll(
() -> assertTrue(tableState.hasUnprocessedRecords(),
"After writing some raw records, we should recognize that there are unprocessed records")
// Needs to be implemented in JDBC
// () -> assertTrue(tableState.maxProcessedTimestamp().isPresent(), "After writing some raw records,
// the min timestamp should be present.")
);
"After writing some raw records, we should recognize that there are unprocessed records"),
() -> assertTrue(tableState.maxProcessedTimestamp().isPresent(), "After writing some raw records, the min timestamp should be present."));

TypeAndDedupeTransaction.executeTypeAndDedupe(generator, destinationHandler, incrementalDedupStream, tableState.maxProcessedTimestamp(), "");

Expand All @@ -549,9 +544,7 @@ public void handleNoPreexistingRecords() throws Exception {
createRawTable(streamId);
final DestinationHandler.InitialRawTableState tableState = destinationHandler.getInitialRawTableState(streamId);
assertAll(
// Commenting out because this needs to be implemented in JDBC
// () -> assertFalse(tableState.hasUnprocessedRecords(), "With an empty raw table, we should
// recognize that there are no unprocessed records"),
() -> assertFalse(tableState.hasUnprocessedRecords(), "With an empty raw table, we should recognize that there are no unprocessed records"),
() -> assertEquals(Optional.empty(), tableState.maxProcessedTimestamp(), "With an empty raw table, the min timestamp should be empty"));

createFinalTable(incrementalDedupStream, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.9.0'
cdkVersionRequired = '0.10.0'
features = ['db-destinations', 's3-destinations']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.3.24
dockerImageTag: 2.3.25
dockerRepository: airbyte/destination-bigquery
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
githubIssueLabel: destination-bigquery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.9.0'
cdkVersionRequired = '0.10.0'
features = ['db-destinations', 's3-destinations']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 0.7.5
dockerImageTag: 0.7.6
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,10 @@
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import java.sql.ResultSet;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.List;
import java.util.Optional;
import org.jooq.impl.DSL;

public class RedshiftDestinationHandler extends JdbcDestinationHandler {

// Redshift doesn't seem to let you actually specify HH:MM TZ offsets, so we have
// build our own formatter rather than just use Instant.parse
private static final DateTimeFormatter TIMESTAMPTZ_FORMAT = new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ISO_LOCAL_DATE)
.appendLiteral(' ')
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.append(DateTimeFormatter.ofPattern("X"))
.toFormatter();

public RedshiftDestinationHandler(final String databaseName, final JdbcDatabase jdbcDatabase) {
super(databaseName, jdbcDatabase);
}
Expand Down Expand Up @@ -54,43 +39,4 @@ public boolean isFinalTableEmpty(final StreamId id) throws Exception {
return query.isEmpty();
}

public Optional<Instant> getMinTimestampForSync(final StreamId id) throws Exception {
final ResultSet tables = jdbcDatabase.getMetaData().getTables(
databaseName,
id.rawNamespace(),
id.rawName(),
null);
if (!tables.next()) {
return Optional.empty();
}
// Redshift timestamps have microsecond precision, but it's basically impossible to work with that.
// Decrement by 1 second instead.
// And use two explicit queries because docs don't specify whether COALESCE
// short-circuits evaluation.
// This first query tries to find the oldest raw record with loaded_at = NULL
Optional<String> minUnloadedTimestamp = Optional.ofNullable(jdbcDatabase.queryStrings(
conn -> conn.createStatement().executeQuery(
DSL.select(DSL.field("MIN(_airbyte_extracted_at) - INTERVAL '1 second'").as("min_timestamp"))
.from(DSL.name(id.rawNamespace(), id.rawName()))
.where(DSL.condition("_airbyte_loaded_at IS NULL"))
.getSQL()),
// The query will always return exactly one record, so use .get(0)
record -> record.getString("min_timestamp")).get(0));
if (minUnloadedTimestamp.isEmpty()) {
// If there are no unloaded raw records, then we can safely skip all existing raw records.
// This second query just finds the newest raw record.
minUnloadedTimestamp = Optional.ofNullable(jdbcDatabase.queryStrings(
conn -> conn.createStatement().executeQuery(
DSL.select(DSL.field("MAX(_airbyte_extracted_at)").as("min_timestamp"))
.from(DSL.name(id.rawNamespace(), id.rawName()))
.getSQL()),
record -> record.getString("min_timestamp")).get(0));
}
return minUnloadedTimestamp.map(RedshiftDestinationHandler::parseInstant);
}

private static Instant parseInstant(final String ts) {
return TIMESTAMPTZ_FORMAT.parse(ts, Instant::from);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.9.0'
cdkVersionRequired = '0.10.0'
features = ['db-destinations', 's3-destinations']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.4.16
dockerImageTag: 3.4.17
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ Now that you have set up the BigQuery destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.3.25 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | Update to java CDK 0.10.0 (no changes) |
| 2.3.24 | 2023-12-20 | [\#33697](https://github.com/airbytehq/airbyte/pull/33697) | Stop creating unnecessary tmp tables |
| 2.3.23 | 2023-12-18 | [\#33124](https://github.com/airbytehq/airbyte/pull/33124) | Make Schema Creation Separate from Table Creation |
| 2.3.22 | 2023-12-14 | [\#33451](https://github.com/airbytehq/airbyte/pull/33451) | Remove old spec option |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.7.6 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | Only run T+D on a stream if it had any records during the sync |
| 0.7.5 | 2023-12-18 | [\#33124](https://github.com/airbytehq/airbyte/pull/33124) | Make Schema Creation Separate from Table Creation |
| 0.7.4 | 2023-12-13 | [\#33369](https://github.com/airbytehq/airbyte/pull/33369) | Use jdbc common sql implementation |
| 0.7.3 | 2023-12-12 | [\#33367](https://github.com/airbytehq/airbyte/pull/33367) | DV2: fix migration logic |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n

| Version | Date | Pull Request | Subject |
|:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.4.17 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | Update to java CDK 0.10.0 (no changes) |
| 3.4.16 | 2023-12-18 | [\#33124](https://github.com/airbytehq/airbyte/pull/33124) | Make Schema Creation Seperate from Table Creation |
| 3.4.15 | 2023-12-13 | [\#33232](https://github.com/airbytehq/airbyte/pull/33232) | Only run typing+deduping for a stream if the stream had any records |
| 3.4.14 | 2023-12-08 | [\#33263](https://github.com/airbytehq/airbyte/pull/33263) | Adopt java CDK version 0.7.0 |
Expand Down

0 comments on commit 1d76fd2

Please sign in to comment.