Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Redshift - additional check method check, fix s3 file deletion #34186

Merged
merged 37 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
409c357
Actually Delete S3 Objects
jbfbell Jan 10, 2024
557ad12
add svv table check
jbfbell Jan 11, 2024
8110a34
docs update
jbfbell Jan 11, 2024
0a4f94c
typos and docs
jbfbell Jan 11, 2024
95002d5
version updates
jbfbell Jan 11, 2024
98c6a29
formatting
jbfbell Jan 11, 2024
9b2b243
docs: Mention that DBT Worker not supported on kubernetes (#34087)
sitaramshelke Jan 11, 2024
b82d983
🧹 Destination Redshift: clean up DAT classes (#34134)
cynthiaxyin Jan 11, 2024
99c35d8
🎉 Airbyte CDK (File-based CDK): Stop the sync if the record could not…
bazarnov Jan 11, 2024
19142a3
🤖 Bump patch version of Python CDK
bazarnov Jan 11, 2024
f061bca
Docs: Updated grammar and formatting for clarity and consistency (#34…
kekiss Jan 11, 2024
cf66abb
✨Source Google Ads: Add possibility to sync all connected accounts (#…
tolik0 Jan 11, 2024
3c9c1b1
Merge branch 'master' into joseph.bell/redshift-check-updates
jbfbell Jan 11, 2024
9aa2ee0
build gradle setting
jbfbell Jan 11, 2024
1052b89
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Jan 11, 2024
1fc244c
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Jan 11, 2024
4f20b6e
DV2 TypingDedupingTest: read container stdout in real time (#34173)
edgao Jan 12, 2024
1a9b3ff
airbyte-ci: fix format (#34199)
alafanechere Jan 12, 2024
0e61a45
Source Slack: Convert to airbyte-lib (#34098)
Jan 12, 2024
ee19c20
Source Freshdesk: Convert to airbyte-lib (#34101)
Jan 12, 2024
7a41b25
Source GCS: Fix unstructured format (#34158)
Jan 12, 2024
93e1748
🐛 Source Google Ads: Disable raising error for not enabled accounts (…
tolik0 Jan 12, 2024
038c825
Add S3 IAM roles + ALB ingress definition (#33944)
Hesperide Jan 12, 2024
2345a24
AirbyteLib: add SQLCaches for DuckDB and Postgres (includes Ruff+Mypy…
aaronsteers Jan 12, 2024
b54b197
Vectara Destination: Add info box (#34159)
Jan 12, 2024
8b35263
CI: Fix linting issue (#34224)
bnchrch Jan 12, 2024
95876bd
Destination Redshift: Use cdk for TD dependency (#34194)
gisripa Jan 12, 2024
07be323
Destination Bigquery: Clean up dependencies with TD/CDK (#34226)
gisripa Jan 12, 2024
59b70e2
Destination Snowflake: Cleanup dependencies and upgrade CDK (#34227)
gisripa Jan 12, 2024
e082c3e
version updates
jbfbell Jan 11, 2024
ea989df
actually limit stuff
jbfbell Jan 12, 2024
48d56b5
remove getStageName method
jbfbell Jan 12, 2024
0719b69
merge conflicts
jbfbell Jan 12, 2024
18d0f9f
formatting
jbfbell Jan 13, 2024
2c6438d
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Jan 16, 2024
89ba758
use newly published cdk
jbfbell Jan 16, 2024
9a705bb
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Jan 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.12.1 | 2024-01-11 | [\#34186](https://github.com/airbytehq/airbyte/pull/34186) | Add hook for additional destination specific checks to JDBC destination check method |
| 0.12.0 | 2024-01-10 | [\#33875](https://github.com/airbytehq/airbyte/pull/33875) | Upgrade sshd-mina to 2.11.1 |
| 0.11.5 | 2024-01-10 | [\#34119](https://github.com/airbytehq/airbyte/pull/34119) | Remove wal2json support for postgres+debezium. |
| 0.11.4 | 2024-01-09 | [\#33305](https://github.com/airbytehq/airbyte/pull/33305) | Source stats in incremental syncs |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.12.0
version=0.12.1
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
final var v2RawSchema = namingResolver.getIdentifier(TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
attemptTableOperations(v2RawSchema, database, namingResolver, sqlOperations, false);
destinationSpecificTableOperations(database);
}
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (final ConnectionErrorException ex) {
Expand All @@ -114,6 +115,15 @@ public AirbyteConnectionStatus check(final JsonNode config) {
}
}

/**
* Specific Databases may have additional checks unique to them which they need to perform, override
* this method to add additional checks.
*
* @param database the database to run checks against
* @throws Exception
*/
protected void destinationSpecificTableOperations(final JdbcDatabase database) throws Exception {}
edgao marked this conversation as resolved.
Show resolved Hide resolved

/**
* This method is deprecated. It verifies table creation, but not insert right to a newly created
* table. Use attemptTableOperations with the attemptInsert argument instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -22,6 +23,16 @@
@Slf4j
public class GeneralStagingFunctions {

// using a random string here as a placeholder for the moment.
// This would avoid mixing data in the staging area between different syncs (especially if they
// manipulate streams with similar names)
// if we replaced the random connection id by the actual connection_id, we'd gain the opportunity to
// leverage data that was uploaded to stage
// in a previous attempt but failed to load to the warehouse for some reason (interrupted?) instead.
// This would also allow other programs/scripts
// to load (or reload backups?) in the connection's staging area to be loaded at the next sync.
public static final UUID RANDOM_CONNECTION_ID = UUID.randomUUID();

public static OnStartFunction onStartFunction(final JdbcDatabase database,
final StagingOperations stagingOperations,
final List<WriteConfig> writeConfigs,
Expand All @@ -34,7 +45,6 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database,
final String schema = writeConfig.getOutputSchemaName();
final String stream = writeConfig.getStreamName();
final String dstTableName = writeConfig.getOutputTableName();
final String stageName = stagingOperations.getStageName(schema, dstTableName);
final String stagingPath =
stagingOperations.getStagingPath(SerialStagingConsumerFactory.RANDOM_CONNECTION_ID, schema, stream, writeConfig.getOutputTableName(),
writeConfig.getWriteDatetime());
Expand All @@ -44,7 +54,7 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database,

stagingOperations.createSchemaIfNotExists(database, schema);
stagingOperations.createTableIfNotExists(database, schema, dstTableName);
stagingOperations.createStageIfNotExists(database, stageName);
stagingOperations.createStageIfNotExists();

/*
* When we're in OVERWRITE, clear out the table at the start of a sync, this is an expected side
Expand All @@ -68,7 +78,6 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database,
* upload was unsuccessful
*/
public static void copyIntoTableFromStage(final JdbcDatabase database,
final String stageName,
final String stagingPath,
final List<String> stagedFiles,
final String tableName,
Expand All @@ -83,7 +92,7 @@ public static void copyIntoTableFromStage(final JdbcDatabase database,
final Lock rawTableInsertLock = typerDeduper.getRawTableInsertLock(streamNamespace, streamName);
rawTableInsertLock.lock();
try {
stagingOperations.copyIntoTableFromStage(database, stageName, stagingPath, stagedFiles,
stagingOperations.copyIntoTableFromStage(database, stagingPath, stagedFiles,
tableName, schemaName);
} finally {
rawTableInsertLock.unlock();
Expand All @@ -96,8 +105,6 @@ public static void copyIntoTableFromStage(final JdbcDatabase database,
typerDeduperValve.updateTimeAndIncreaseInterval(streamId);
}
} catch (final Exception e) {
stagingOperations.cleanUpStage(database, stageName, stagedFiles);
log.info("Cleaning stage path {}", stagingPath);
throw new RuntimeException("Failed to upload data from stage " + stagingPath, e);
}
}
Expand All @@ -124,10 +131,15 @@ public static OnCloseFunction onCloseFunction(final JdbcDatabase database,
for (final WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputSchemaName();
if (purgeStagingData) {
final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getOutputTableName());
final String stagePath = stagingOperations.getStagingPath(
RANDOM_CONNECTION_ID,
schemaName,
writeConfig.getStreamName(),
writeConfig.getOutputTableName(),
writeConfig.getWriteDatetime());
log.info("Cleaning stage in destination started for stream {}. schema {}, stage: {}", writeConfig.getStreamName(), schemaName,
stageName);
stagingOperations.dropStageIfExists(database, stageName);
stagePath);
stagingOperations.dropStageIfExists(database, stagePath);
}
}
typerDeduper.commitFinalTables();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,14 @@ public static FlushBufferFunction function(

final WriteConfig writeConfig = pairToWriteConfig.get(pair);
final String schemaName = writeConfig.getOutputSchemaName();
final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getOutputTableName());
final String stagingPath =
stagingOperations.getStagingPath(
SerialStagingConsumerFactory.RANDOM_CONNECTION_ID, schemaName, writeConfig.getStreamName(),
writeConfig.getOutputTableName(), writeConfig.getWriteDatetime());
try (writer) {
writer.flush();
final String stagedFile = stagingOperations.uploadRecordsToStage(database, writer, schemaName, stageName, stagingPath);
GeneralStagingFunctions.copyIntoTableFromStage(database, stageName, stagingPath, List.of(stagedFile), writeConfig.getOutputTableName(),
final String stagedFile = stagingOperations.uploadRecordsToStage(database, writer, schemaName, stagingPath);
GeneralStagingFunctions.copyIntoTableFromStage(database, stagingPath, List.of(stagedFile), writeConfig.getOutputTableName(),
schemaName,
stagingOperations,
writeConfig.getNamespace(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,6 @@
*/
public interface StagingOperations extends SqlOperations {

/**
* Returns the staging environment's name
*
* @param namespace Name of schema
* @param streamName Name of the stream
* @return Fully qualified name of the staging environment
*/
String getStageName(String namespace, String streamName);

/**
* @param outputTableName The name of the table this staging file will be loaded into (typically a
* raw table). Not all destinations use the table name in the staging path (e.g. Snowflake
Expand All @@ -37,48 +28,36 @@ public interface StagingOperations extends SqlOperations {
/**
* Create a staging folder where to upload temporary files before loading into the final destination
*/
void createStageIfNotExists(JdbcDatabase database, String stageName) throws Exception;
void createStageIfNotExists() throws Exception;

/**
* Upload the data file into the stage area.
*
* @param database database used for syncing
* @param recordsData records stored in in-memory buffer
* @param schemaName name of schema
* @param stageName name of the staging area folder
* @param stagingPath path of staging folder to data files
* @return the name of the file that was uploaded.
*/
String uploadRecordsToStage(JdbcDatabase database, SerializableBuffer recordsData, String schemaName, String stageName, String stagingPath)
String uploadRecordsToStage(JdbcDatabase database, SerializableBuffer recordsData, String schemaName, String stagingPath)
throws Exception;

/**
* Load the data stored in the stage area into a temporary table in the destination
*
* @param database database interface
* @param stageName name of staging area folder
* @param stagingPath path to staging files
* @param stagedFiles collection of staged files
* @param tableName name of table to write staging files to
* @param schemaName name of schema
*/
void copyIntoTableFromStage(JdbcDatabase database,
String stageName,
String stagingPath,
List<String> stagedFiles,
String tableName,
String schemaName)
throws Exception;

/**
* Remove files that were just staged
*
* @param database database used for syncing
* @param stageName name of staging area folder
* @param stagedFiles collection of the staging files to remove
*/
void cleanUpStage(JdbcDatabase database, String stageName, List<String> stagedFiles) throws Exception;

/**
* Delete the stage area and all staged files that was in it
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ protected BlobStorageOperations() {
*
* @return the name of the file that was uploaded.
*/
public abstract String uploadRecordsToBucket(SerializableBuffer recordsData, String namespace, String streamName, String objectPath)
public abstract String uploadRecordsToBucket(SerializableBuffer recordsData, String namespace, String objectPath)
throws Exception;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ private FlushBufferFunction flushBufferFunction(final BlobStorageOperations stor
writeConfig.addStoredFile(storageOperations.uploadRecordsToBucket(
writer,
writeConfig.getNamespace(),
writeConfig.getStreamName(),
writeConfig.getFullOutputPath()));
} catch (final Exception e) {
LOGGER.error("Failed to flush and upload buffer to storage:", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ protected boolean doesBucketExist(final String bucket) {
@Override
public String uploadRecordsToBucket(final SerializableBuffer recordsData,
final String namespace,
final String streamName,
final String objectPath) {
final List<Exception> exceptionsThrown = new ArrayList<>();
while (exceptionsThrown.size() < UPLOAD_RETRY_LIMIT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,17 @@ public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessag

final WriteConfig writeConfig = streamDescToWriteConfig.get(decs);
final String schemaName = writeConfig.getOutputSchemaName();
final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getOutputTableName());
final String stagingPath =
stagingOperations.getStagingPath(
StagingConsumerFactory.RANDOM_CONNECTION_ID,
GeneralStagingFunctions.RANDOM_CONNECTION_ID,
schemaName,
writeConfig.getStreamName(),
writeConfig.getOutputTableName(),
writeConfig.getWriteDatetime());
try {
final String stagedFile = stagingOperations.uploadRecordsToStage(database, writer, schemaName, stageName, stagingPath);
final String stagedFile = stagingOperations.uploadRecordsToStage(database, writer, schemaName, stagingPath);
GeneralStagingFunctions.copyIntoTableFromStage(
database,
stageName,
stagingPath,
List.of(stagedFile),
writeConfig.getOutputTableName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import org.joda.time.DateTime;
Expand All @@ -49,16 +48,7 @@ public class StagingConsumerFactory extends SerialStagingConsumerFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(StagingConsumerFactory.class);

// using a random string here as a placeholder for the moment.
// This would avoid mixing data in the staging area between different syncs (especially if they
// manipulate streams with similar names)
// if we replaced the random connection id by the actual connection_id, we'd gain the opportunity to
// leverage data that was uploaded to stage
// in a previous attempt but failed to load to the warehouse for some reason (interrupted?) instead.
// This would also allow other programs/scripts
// to load (or reload backups?) in the connection's staging area to be loaded at the next sync.
private static final DateTime SYNC_DATETIME = DateTime.now(DateTimeZone.UTC);
public static final UUID RANDOM_CONNECTION_ID = UUID.randomUUID();

public SerializedAirbyteMessageConsumer createAsync(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.12.0'
cdkVersionRequired = '0.12.1'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}

//remove once upgrading the CDK version to 0.4.x or later
java {
compileJava {
options.compilerArgs.remove("-Werror")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 0.7.14
dockerImageTag: 0.7.15
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator;
import io.airbyte.integrations.destination.redshift.util.RedshiftUtil;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -60,6 +61,11 @@ public DataSource getDataSource(final JsonNode config) {
Duration.ofMinutes(2));
}

@Override
protected void destinationSpecificTableOperations(final JdbcDatabase database) throws Exception {
RedshiftUtil.checkSvvTableAccess(database);
}

@Override
public JdbcDatabase getDatabase(final DataSource dataSource) {
return new DefaultJdbcDatabase(dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator;
import io.airbyte.integrations.destination.redshift.util.RedshiftUtil;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand Down Expand Up @@ -103,7 +104,8 @@ public AirbyteConnectionStatus check(final JsonNode config) {
try {
final JdbcDatabase database = new DefaultJdbcDatabase(dataSource);
final String outputSchema = super.getNamingResolver().getIdentifier(config.get(JdbcUtils.SCHEMA_KEY).asText());
attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, redshiftS3StagingSqlOperations);
attemptTableOperations(outputSchema, database, nameTransformer, redshiftS3StagingSqlOperations, false);
RedshiftUtil.checkSvvTableAccess(database);
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (final ConnectionErrorException e) {
final String message = getErrorMessage(e.getStateCode(), e.getErrorCode(), e.getExceptionMessage(), e);
Expand Down