Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃帀 Destination redshift: reenable T+D optimizations; bigquery+snowflake: bump to latest CDK version #33704

Merged
merged 7 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.10.0 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | JdbcDestinationHandler now properly implements `getInitialRawTableState`; reenable SqlGenerator test |
| 0.9.0 | 2023-12-18 | [\#33124](https://github.com/airbytehq/airbyte/pull/33124) | Make Schema Creation Separate from Table Creation, exclude the T&D module from the CDK |
| 0.8.0 | 2023-12-18 | [\#33506](https://github.com/airbytehq/airbyte/pull/33506) | Improve async destination shutdown logic; more JDBC async migration work; improve DAT test schema handling |
| 0.7.9 | 2023-12-18 | [\#33549](https://github.com/airbytehq/airbyte/pull/33549) | Improve MongoDB logging. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.9.0
version=0.10.0
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLType;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.LinkedHashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -59,7 +65,49 @@ public boolean isFinalTableEmpty(final StreamId id) throws Exception {

@Override
public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception {
return new InitialRawTableState(true, Optional.empty());
final ResultSet tables = jdbcDatabase.getMetaData().getTables(
databaseName,
id.rawNamespace(),
id.rawName(),
null);
if (!tables.next()) {
// There's no raw table at all. Therefore there are no unprocessed raw records, and this sync
// should not filter raw records by timestamp.
return new InitialRawTableState(false, Optional.empty());
}
// And use two explicit queries because COALESCE might not short-circuit evaluation.
// This first query tries to find the oldest raw record with loaded_at = NULL.
// Unsafe query requires us to explicitly close the Stream, which is inconvenient,
// but it's also the only method in the JdbcDatabase interface to return non-string/int types
try (final Stream<Timestamp> timestampStream = jdbcDatabase.unsafeQuery(
conn -> conn.prepareStatement(
DSL.select(DSL.field("MIN(_airbyte_extracted_at)").as("min_timestamp"))
.from(DSL.name(id.rawNamespace(), id.rawName()))
.where(DSL.condition("_airbyte_loaded_at IS NULL"))
.getSQL()),
record -> record.getTimestamp("min_timestamp"))) {
// Filter for nonNull values in case the query returned NULL (i.e. no unloaded records).
final Optional<Timestamp> minUnloadedTimestamp = timestampStream.filter(Objects::nonNull).findFirst();
if (minUnloadedTimestamp.isPresent()) {
// Decrement by 1 second since timestamp precision varies between databases.
final Optional<Instant> ts = minUnloadedTimestamp
.map(Timestamp::toInstant)
.map(i -> i.minus(1, ChronoUnit.SECONDS));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is slightly different from the original redshift implementation - instead of doing a timestamp subtraction within redshift, we just query the raw timestamp and decrement it in java code. Timestamp math can be dialect-specific, so probably not a good idea to have it in a base Jdbc implementation.

return new InitialRawTableState(true, ts);
}
}
// If there are no unloaded raw records, then we can safely skip all existing raw records.
// This second query just finds the newest raw record.
try (final Stream<Timestamp> timestampStream = jdbcDatabase.unsafeQuery(
conn -> conn.prepareStatement(
DSL.select(DSL.field("MAX(_airbyte_extracted_at)").as("min_timestamp"))
.from(DSL.name(id.rawNamespace(), id.rawName()))
.getSQL()),
record -> record.getTimestamp("min_timestamp"))) {
// Filter for nonNull values in case the query returned NULL (i.e. no raw records at all).
final Optional<Timestamp> minUnloadedTimestamp = timestampStream.filter(Objects::nonNull).findFirst();
return new InitialRawTableState(false, minUnloadedTimestamp.map(Timestamp::toInstant));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
Expand Down Expand Up @@ -427,7 +426,6 @@ public void allTypes() throws Exception {
* timestamp.
*/
@Test
@Disabled
public void minTimestampBehavesCorrectly() throws Exception {
// When the raw table doesn't exist, there are no unprocessed records and no timestamp
assertEquals(new DestinationHandler.InitialRawTableState(false, Optional.empty()), destinationHandler.getInitialRawTableState(streamId));
Expand Down Expand Up @@ -525,11 +523,8 @@ public void handlePreexistingRecords() throws Exception {
final DestinationHandler.InitialRawTableState tableState = destinationHandler.getInitialRawTableState(streamId);
assertAll(
() -> assertTrue(tableState.hasUnprocessedRecords(),
"After writing some raw records, we should recognize that there are unprocessed records")
// Needs to be implemented in JDBC
// () -> assertTrue(tableState.maxProcessedTimestamp().isPresent(), "After writing some raw records,
// the min timestamp should be present.")
);
"After writing some raw records, we should recognize that there are unprocessed records"),
() -> assertTrue(tableState.maxProcessedTimestamp().isPresent(), "After writing some raw records, the min timestamp should be present."));

TypeAndDedupeTransaction.executeTypeAndDedupe(generator, destinationHandler, incrementalDedupStream, tableState.maxProcessedTimestamp(), "");

Expand All @@ -549,9 +544,7 @@ public void handleNoPreexistingRecords() throws Exception {
createRawTable(streamId);
final DestinationHandler.InitialRawTableState tableState = destinationHandler.getInitialRawTableState(streamId);
assertAll(
// Commenting out because this needs to be implemented in JDBC
// () -> assertFalse(tableState.hasUnprocessedRecords(), "With an empty raw table, we should
// recognize that there are no unprocessed records"),
() -> assertFalse(tableState.hasUnprocessedRecords(), "With an empty raw table, we should recognize that there are no unprocessed records"),
() -> assertEquals(Optional.empty(), tableState.maxProcessedTimestamp(), "With an empty raw table, the min timestamp should be empty"));

createFinalTable(incrementalDedupStream, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.9.0'
cdkVersionRequired = '0.10.0'
features = ['db-destinations', 's3-destinations']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.3.24
dockerImageTag: 2.3.25
dockerRepository: airbyte/destination-bigquery
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
githubIssueLabel: destination-bigquery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.9.0'
cdkVersionRequired = '0.10.0'
features = ['db-destinations', 's3-destinations']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 0.7.5
dockerImageTag: 0.7.6
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,10 @@
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import java.sql.ResultSet;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.List;
import java.util.Optional;
import org.jooq.impl.DSL;

public class RedshiftDestinationHandler extends JdbcDestinationHandler {

// Redshift doesn't seem to let you actually specify HH:MM TZ offsets, so we have
// build our own formatter rather than just use Instant.parse
private static final DateTimeFormatter TIMESTAMPTZ_FORMAT = new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ISO_LOCAL_DATE)
.appendLiteral(' ')
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.append(DateTimeFormatter.ofPattern("X"))
.toFormatter();

public RedshiftDestinationHandler(final String databaseName, final JdbcDatabase jdbcDatabase) {
super(databaseName, jdbcDatabase);
}
Expand Down Expand Up @@ -54,43 +39,4 @@ public boolean isFinalTableEmpty(final StreamId id) throws Exception {
return query.isEmpty();
}

public Optional<Instant> getMinTimestampForSync(final StreamId id) throws Exception {
final ResultSet tables = jdbcDatabase.getMetaData().getTables(
databaseName,
id.rawNamespace(),
id.rawName(),
null);
if (!tables.next()) {
return Optional.empty();
}
// Redshift timestamps have microsecond precision, but it's basically impossible to work with that.
// Decrement by 1 second instead.
// And use two explicit queries because docs don't specify whether COALESCE
// short-circuits evaluation.
// This first query tries to find the oldest raw record with loaded_at = NULL
Optional<String> minUnloadedTimestamp = Optional.ofNullable(jdbcDatabase.queryStrings(
conn -> conn.createStatement().executeQuery(
DSL.select(DSL.field("MIN(_airbyte_extracted_at) - INTERVAL '1 second'").as("min_timestamp"))
.from(DSL.name(id.rawNamespace(), id.rawName()))
.where(DSL.condition("_airbyte_loaded_at IS NULL"))
.getSQL()),
// The query will always return exactly one record, so use .get(0)
record -> record.getString("min_timestamp")).get(0));
if (minUnloadedTimestamp.isEmpty()) {
// If there are no unloaded raw records, then we can safely skip all existing raw records.
// This second query just finds the newest raw record.
minUnloadedTimestamp = Optional.ofNullable(jdbcDatabase.queryStrings(
conn -> conn.createStatement().executeQuery(
DSL.select(DSL.field("MAX(_airbyte_extracted_at)").as("min_timestamp"))
.from(DSL.name(id.rawNamespace(), id.rawName()))
.getSQL()),
record -> record.getString("min_timestamp")).get(0));
}
return minUnloadedTimestamp.map(RedshiftDestinationHandler::parseInstant);
}

private static Instant parseInstant(final String ts) {
return TIMESTAMPTZ_FORMAT.parse(ts, Instant::from);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.9.0'
cdkVersionRequired = '0.10.0'
features = ['db-destinations', 's3-destinations']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.4.16
dockerImageTag: 3.4.17
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ Now that you have set up the BigQuery destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.3.25 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | Update to java CDK 0.10.0 (no changes) |
| 2.3.24 | 2023-12-20 | [\#33697](https://github.com/airbytehq/airbyte/pull/33697) | Stop creating unnecessary tmp tables |
| 2.3.23 | 2023-12-18 | [\#33124](https://github.com/airbytehq/airbyte/pull/33124) | Make Schema Creation Separate from Table Creation |
| 2.3.22 | 2023-12-14 | [\#33451](https://github.com/airbytehq/airbyte/pull/33451) | Remove old spec option |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.7.6 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | Only run T+D on a stream if it had any records during the sync |
| 0.7.5 | 2023-12-18 | [\#33124](https://github.com/airbytehq/airbyte/pull/33124) | Make Schema Creation Separate from Table Creation |
| 0.7.4 | 2023-12-13 | [\#33369](https://github.com/airbytehq/airbyte/pull/33369) | Use jdbc common sql implementation |
| 0.7.3 | 2023-12-12 | [\#33367](https://github.com/airbytehq/airbyte/pull/33367) | DV2: fix migration logic |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n

| Version | Date | Pull Request | Subject |
|:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.4.17 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | Update to java CDK 0.10.0 (no changes) |
| 3.4.16 | 2023-12-18 | [\#33124](https://github.com/airbytehq/airbyte/pull/33124) | Make Schema Creation Seperate from Table Creation |
| 3.4.15 | 2023-12-13 | [\#33232](https://github.com/airbytehq/airbyte/pull/33232) | Only run typing+deduping for a stream if the stream had any records |
| 3.4.14 | 2023-12-08 | [\#33263](https://github.com/airbytehq/airbyte/pull/33263) | Adopt java CDK version 0.7.0 |
Expand Down