Skip to content

Commit

Permalink
Partial CDK revert of #34186 (#34461)
Browse files Browse the repository at this point in the history
## What
* Resurrect removed signatures to fix compilation errors in Snowflake connector. 
* Partial revert of #34186
  • Loading branch information
gisripa committed Jan 24, 2024
1 parent b2ec58e commit 852cb59
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 10 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.14.0 | 2024-01-23 | [\#34461](https://github.com/airbytehq/airbyte/pull/34461) | Revert non backward compatible signature changes from 0.13.1 |
| 0.13.3 | 2024-01-23 | [\#34077](https://github.com/airbytehq/airbyte/pull/34077) | Denote if destinations fully support Destinations V2 |
| 0.13.2 | 2024-01-18 | [\#34364](https://github.com/airbytehq/airbyte/pull/34364) | Better logging in mongo db source connector |
| 0.13.1 | 2024-01-18 | [\#34236](https://github.com/airbytehq/airbyte/pull/34236) | Add postCreateTable hook in destination JdbcSqlGenerator |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.13.3
version=0.14.0
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database,
final String schema = writeConfig.getOutputSchemaName();
final String stream = writeConfig.getStreamName();
final String dstTableName = writeConfig.getOutputTableName();
final String stageName = stagingOperations.getStageName(schema, dstTableName);
final String stagingPath =
stagingOperations.getStagingPath(SerialStagingConsumerFactory.RANDOM_CONNECTION_ID, schema, stream, writeConfig.getOutputTableName(),
writeConfig.getWriteDatetime());
Expand All @@ -54,7 +55,7 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database,

stagingOperations.createSchemaIfNotExists(database, schema);
stagingOperations.createTableIfNotExists(database, schema, dstTableName);
stagingOperations.createStageIfNotExists();
stagingOperations.createStageIfNotExists(database, stageName);

/*
* When we're in OVERWRITE, clear out the table at the start of a sync, this is an expected side
Expand All @@ -78,6 +79,7 @@ public static OnStartFunction onStartFunction(final JdbcDatabase database,
* upload was unsuccessful
*/
public static void copyIntoTableFromStage(final JdbcDatabase database,
final String stageName,
final String stagingPath,
final List<String> stagedFiles,
final String tableName,
Expand All @@ -92,7 +94,7 @@ public static void copyIntoTableFromStage(final JdbcDatabase database,
final Lock rawTableInsertLock = typerDeduper.getRawTableInsertLock(streamNamespace, streamName);
rawTableInsertLock.lock();
try {
stagingOperations.copyIntoTableFromStage(database, stagingPath, stagedFiles,
stagingOperations.copyIntoTableFromStage(database, stageName, stagingPath, stagedFiles,
tableName, schemaName);
} finally {
rawTableInsertLock.unlock();
Expand Down Expand Up @@ -131,6 +133,7 @@ public static OnCloseFunction onCloseFunction(final JdbcDatabase database,
for (final WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputSchemaName();
if (purgeStagingData) {
final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getOutputTableName());
final String stagePath = stagingOperations.getStagingPath(
RANDOM_CONNECTION_ID,
schemaName,
Expand All @@ -139,7 +142,9 @@ public static OnCloseFunction onCloseFunction(final JdbcDatabase database,
writeConfig.getWriteDatetime());
log.info("Cleaning stage in destination started for stream {}. schema {}, stage: {}", writeConfig.getStreamName(), schemaName,
stagePath);
stagingOperations.dropStageIfExists(database, stagePath);
// TODO: This is another weird manifestation of Redshift vs Snowflake using either or variables from
// stageName/StagingPath.
stagingOperations.dropStageIfExists(database, stageName, stagePath);
}
}
typerDeduper.commitFinalTables();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,15 @@ public static FlushBufferFunction function(

final WriteConfig writeConfig = pairToWriteConfig.get(pair);
final String schemaName = writeConfig.getOutputSchemaName();
final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getOutputTableName());
final String stagingPath =
stagingOperations.getStagingPath(
SerialStagingConsumerFactory.RANDOM_CONNECTION_ID, schemaName, writeConfig.getStreamName(),
writeConfig.getOutputTableName(), writeConfig.getWriteDatetime());
try (writer) {
writer.flush();
final String stagedFile = stagingOperations.uploadRecordsToStage(database, writer, schemaName, stagingPath);
GeneralStagingFunctions.copyIntoTableFromStage(database, stagingPath, List.of(stagedFile), writeConfig.getOutputTableName(),
final String stagedFile = stagingOperations.uploadRecordsToStage(database, writer, schemaName, stageName, stagingPath);
GeneralStagingFunctions.copyIntoTableFromStage(database, stageName, stagingPath, List.of(stagedFile), writeConfig.getOutputTableName(),
schemaName,
stagingOperations,
writeConfig.getNamespace(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
* Staging operations focuses on the SQL queries that are needed to success move data into a staging
* environment like GCS or S3. In general, the reference of staging is the usage of an object
* storage for the purposes of efficiently uploading bulk data to destinations
*
* TODO: This interface is shared between Snowflake and Redshift connectors where the staging
* mechanism is different wire protocol. Make the interface more Generic and have sub interfaces to
* support BlobStorageOperations or Jdbc based staging operations.
*/
public interface StagingOperations extends SqlOperations {

Expand All @@ -25,10 +29,19 @@ public interface StagingOperations extends SqlOperations {
*/
String getStagingPath(UUID connectionId, String namespace, String streamName, String outputTableName, DateTime writeDatetime);

/**
* Returns the staging environment's name
*
* @param namespace Name of schema
* @param streamName Name of the stream
* @return Fully qualified name of the staging environment
*/
String getStageName(String namespace, String streamName);

/**
* Create a staging folder where to upload temporary files before loading into the final destination
*/
void createStageIfNotExists() throws Exception;
void createStageIfNotExists(JdbcDatabase database, String stageName) throws Exception;

/**
* Upload the data file into the stage area.
Expand All @@ -39,7 +52,7 @@ public interface StagingOperations extends SqlOperations {
* @param stagingPath path of staging folder to data files
* @return the name of the file that was uploaded.
*/
String uploadRecordsToStage(JdbcDatabase database, SerializableBuffer recordsData, String schemaName, String stagingPath)
String uploadRecordsToStage(JdbcDatabase database, SerializableBuffer recordsData, String schemaName, String stageName, String stagingPath)
throws Exception;

/**
Expand All @@ -52,6 +65,7 @@ String uploadRecordsToStage(JdbcDatabase database, SerializableBuffer recordsDat
* @param schemaName name of schema
*/
void copyIntoTableFromStage(JdbcDatabase database,
String stageName,
String stagingPath,
List<String> stagedFiles,
String tableName,
Expand All @@ -64,6 +78,6 @@ void copyIntoTableFromStage(JdbcDatabase database,
* @param database database used for syncing
* @param stageName Name of the staging area used to store files
*/
void dropStageIfExists(JdbcDatabase database, String stageName) throws Exception;
void dropStageIfExists(JdbcDatabase database, String stageName, String stagingPath) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessag

final WriteConfig writeConfig = streamDescToWriteConfig.get(decs);
final String schemaName = writeConfig.getOutputSchemaName();
final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getOutputTableName());
final String stagingPath =
stagingOperations.getStagingPath(
GeneralStagingFunctions.RANDOM_CONNECTION_ID,
Expand All @@ -111,9 +112,10 @@ public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessag
writeConfig.getOutputTableName(),
writeConfig.getWriteDatetime());
try {
final String stagedFile = stagingOperations.uploadRecordsToStage(database, writer, schemaName, stagingPath);
final String stagedFile = stagingOperations.uploadRecordsToStage(database, writer, schemaName, stageName, stagingPath);
GeneralStagingFunctions.copyIntoTableFromStage(
database,
stageName,
stagingPath,
List.of(stagedFile),
writeConfig.getOutputTableName(),
Expand Down

0 comments on commit 852cb59

Please sign in to comment.