Skip to content

Commit

Permalink
Revert "JDBC connections to support checkpointing (#24604)"
Browse files Browse the repository at this point in the history
This reverts commit 73900fa.
  • Loading branch information
ryankfu committed Apr 5, 2023
1 parent 73900fa commit 5a835c2
Show file tree
Hide file tree
Showing 13 changed files with 149 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-clickhouse-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-clickhouse-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-clickhouse

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-clickhouse
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**Strategy:
* <p>1. Create a final table for each stream
* <p>2. Accumulate records in a buffer. One buffer per stream
* <p>3. As records accumulate write them in batch to the database. We set a minimum numbers of records
* before writing to avoid wasteful record-wise writes. In the case with slow syncs this will be
* superseded with a periodic record flush from {@link BufferedStreamConsumer#periodicBufferFlush()}
* <p>4. Once all records have been written to buffer, flush the buffer and write any remaining records
* to the database (regardless of how few are left)
*/
// Strategy:
// 1. Create a temporary table for each stream
// 2. Accumulate records in a buffer. One buffer per stream.
// 3. As records accumulate write them in batch to the database. We set a minimum numbers of records
// before writing to avoid wasteful record-wise writes.
// 4. Once all records have been written to buffer, flush the buffer and write any remaining records
// to the database (regardless of how few are left).
// 5. In a single transaction, delete the target tables if they exist and rename the temp tables to
// the final table name.
public class JdbcBufferedConsumerFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBufferedConsumerFactory.class);
Expand Down Expand Up @@ -116,52 +116,24 @@ private static String getOutputSchema(final AirbyteStream stream,
: namingResolver.getNamespace(defaultDestSchema);
}

/**
* Sets up destination storage through:
* <p>1. Creates Schema (if not exists)
* <p>2. Creates airybte_raw table (if not exists)
* <p>3. <Optional>Truncates table if sync mode is in OVERWRITE
*
* @param database JDBC database to connect to
* @param sqlOperations interface for execution SQL queries
* @param writeConfigs settings for each stream
* @return
*/
private static OnStartFunction onStartFunction(final JdbcDatabase database,
final SqlOperations sqlOperations,
final List<WriteConfig> writeConfigs) {
return () -> {
LOGGER.info("Preparing raw tables in destination started for {} streams", writeConfigs.size());
final List<String> queryList = new ArrayList<>();
LOGGER.info("Preparing tmp tables in destination started for {} streams", writeConfigs.size());
for (final WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputSchemaName();
final String dstTableName = writeConfig.getOutputTableName();
LOGGER.info("Preparing raw table in destination started for stream {}. schema: {}, table name: {}",
writeConfig.getStreamName(),
schemaName,
dstTableName);
final String tmpTableName = writeConfig.getTmpTableName();
LOGGER.info("Preparing tmp table in destination started for stream {}. schema: {}, tmp table name: {}", writeConfig.getStreamName(),
schemaName, tmpTableName);

sqlOperations.createSchemaIfNotExists(database, schemaName);
sqlOperations.createTableIfNotExists(database, schemaName, dstTableName);
switch (writeConfig.getSyncMode()) {
case OVERWRITE -> queryList.add(sqlOperations.truncateTableQuery(database, schemaName, dstTableName));
case APPEND, APPEND_DEDUP -> {}
default -> throw new IllegalStateException("Unrecognized sync mode: " + writeConfig.getSyncMode());
}
sqlOperations.createTableIfNotExists(database, schemaName, tmpTableName);
}
sqlOperations.executeTransaction(database, queryList);
LOGGER.info("Preparing raw tables in destination completed.");
LOGGER.info("Preparing tables in destination completed.");
};
}

/**
* Writes {@link AirbyteRecordMessage} to JDBC database's airbyte_raw table
*
* @param database JDBC database to connect to
* @param sqlOperations interface of SQL queries to execute
* @param writeConfigs settings for each stream
* @param catalog catalog of all streams to sync
* @return
*/
private static RecordWriter<AirbyteRecordMessage> recordWriterFunction(final JdbcDatabase database,
final SqlOperations sqlOperations,
final List<WriteConfig> writeConfigs,
Expand All @@ -176,26 +148,54 @@ private static RecordWriter<AirbyteRecordMessage> recordWriterFunction(final Jdb
}

final WriteConfig writeConfig = pairToWriteConfig.get(pair);
sqlOperations.insertRecords(database, records, writeConfig.getOutputSchemaName(), writeConfig.getOutputTableName());
sqlOperations.insertRecords(database, records, writeConfig.getOutputSchemaName(), writeConfig.getTmpTableName());
};
}

/**
* Closes connection to JDBC database and other tear down functionality
*
* @param database JDBC database to connect to
* @param sqlOperations interface used to execute SQL queries
* @param writeConfigs settings for each stream
* @return
*/
private static OnCloseFunction onCloseFunction(final JdbcDatabase database,
final SqlOperations sqlOperations,
final List<WriteConfig> writeConfigs) {
return (hasFailed) -> {
// copy data
if (!hasFailed) {
final List<String> queryList = new ArrayList<>();
sqlOperations.onDestinationCloseOperations(database, writeConfigs);
LOGGER.info("Finalizing tables in destination started for {} streams", writeConfigs.size());
for (final WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputSchemaName();
final String srcTableName = writeConfig.getTmpTableName();
final String dstTableName = writeConfig.getOutputTableName();
LOGGER.info("Finalizing stream {}. schema {}, tmp table {}, final table {}", writeConfig.getStreamName(), schemaName, srcTableName,
dstTableName);

sqlOperations.createTableIfNotExists(database, schemaName, dstTableName);
switch (writeConfig.getSyncMode()) {
case OVERWRITE -> queryList.add(sqlOperations.truncateTableQuery(database, schemaName, dstTableName));
case APPEND -> {}
case APPEND_DEDUP -> {}
default -> throw new IllegalStateException("Unrecognized sync mode: " + writeConfig.getSyncMode());
}
queryList.add(sqlOperations.insertTableQuery(database, schemaName, srcTableName, dstTableName));
}

LOGGER.info("Executing finalization of tables.");
sqlOperations.executeTransaction(database, queryList);
LOGGER.info("Finalizing tables in destination completed.");
}
};
// clean up
LOGGER.info("Cleaning tmp tables in destination started for {} streams", writeConfigs.size());
for (final WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputSchemaName();
final String tmpTableName = writeConfig.getTmpTableName();
LOGGER.info("Cleaning tmp table in destination started for stream {}. schema {}, tmp table name: {}", writeConfig.getStreamName(), schemaName,
tmpTableName);

sqlOperations.dropTableIfExists(database, schemaName, tmpTableName);
}
LOGGER.info("Cleaning tmp tables in destination completed.");
}

;
}

private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteConfig config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ CheckedBiConsumer<AirbyteStreamNameNamespacePair, SerializableBuffer, Exception>
final StagingOperations stagingOperations,
final List<WriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog) {
// TODO: (ryankfu) move this block of code that executes before the lambda to #onStartFunction
final Set<WriteConfig> conflictingStreams = new HashSet<>();
final Map<AirbyteStreamNameNamespacePair, WriteConfig> pairToWriteConfig = new HashMap<>();
for (final WriteConfig config : writeConfigs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-mssql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.23
LABEL io.airbyte.version=0.1.22
LABEL io.airbyte.name=airbyte/destination-mssql-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-mssql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.23
LABEL io.airbyte.version=0.1.22
LABEL io.airbyte.name=airbyte/destination-mssql
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.27
LABEL io.airbyte.version=0.3.26
LABEL io.airbyte.name=airbyte/destination-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.27
LABEL io.airbyte.version=0.3.26
LABEL io.airbyte.name=airbyte/destination-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ ENV APPLICATION destination-tidb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-tidb
29 changes: 14 additions & 15 deletions docs/integrations/destinations/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

## Features

| Feature | Supported?\(Yes/No\) | Notes |
|:------------------------------|:---------------------|:------|
| Full Refresh Sync | Yes | |
| Incremental - Append Sync | Yes | |
| Incremental - Deduped History | Yes | |
| Namespaces | Yes | |
| Feature | Supported?\(Yes/No\) | Notes |
| :--- | :--- | :--- |
| Full Refresh Sync | Yes | |
| Incremental - Append Sync | Yes | |
| Incremental - Deduped History | Yes | |
| Namespaces | Yes | |

#### Output Schema

Expand Down Expand Up @@ -80,17 +80,16 @@ Therefore, Airbyte ClickHouse destination will create tables and schemas using t

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:--------------------------------------------------------------------|
| 0.2.3 | 2023-04-04 | [\#24604](https://github.com/airbytehq/airbyte/pull/24604) | Support for destination checkpointing |
| 0.2.2 | 2023-02-21 | [\#21509](https://github.com/airbytehq/airbyte/pull/21509) | Compatibility update with security patch for strict encrypt version |
| 0.2.1 | 2022-12-06 | [\#19573](https://github.com/airbytehq/airbyte/pull/19573) | Update dbt version to 1.3.1 |
| 0.2.0 | 2022-09-27 | [\#16970](https://github.com/airbytehq/airbyte/pull/16970) | Remove TCP port from spec parameters |
| 0.1.12 | 2022-09-08 | [\#16444](https://github.com/airbytehq/airbyte/pull/16444) | Added custom jdbc params field |
| 0.2.2 | 2023-02-21 | [21509](https://github.com/airbytehq/airbyte/pull/21509) | Compatibility update with security patch for strict encrypt version |
| 0.2.1 | 2022-12-06 | [19573](https://github.com/airbytehq/airbyte/pull/19573) | Update dbt version to 1.3.1 |
| 0.2.0 | 2022-09-27 | [16970](https://github.com/airbytehq/airbyte/pull/16970) | Remove TCP port from spec parameters |
| 0.1.12 | 2022-09-08 | [16444](https://github.com/airbytehq/airbyte/pull/16444) | Added custom jdbc params field |
| 0.1.10 | 2022-07-05 | [\#13639](https://github.com/airbytehq/airbyte/pull/13639) | Change JDBC ClickHouse version into 0.3.2-patch9 |
| 0.1.8 | 2022-07-05 | [\#13516](https://github.com/airbytehq/airbyte/pull/13516) | Added JDBC default parameter socket timeout |
| 0.1.7 | 2022-06-16 | [\#13852](https://github.com/airbytehq/airbyte/pull/13852) | Updated stacktrace format for any trace message errors |
| 0.1.6 | 2022-05-17 | [\#12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance |
| 0.1.5 | 2022-04-06 | [\#11729](https://github.com/airbytehq/airbyte/pull/11729) | Bump mina-sshd from 2.7.0 to 2.8.0 |
| 0.1.4 | 2022-02-25 | [\#10421](https://github.com/airbytehq/airbyte/pull/10421) | Refactor JDBC parameters handling |
| 0.1.3 | 2022-02-14 | [\#10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
| 0.1.6 | 2022-05-17 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance |
| 0.1.5 | 2022-04-06 | [11729](https://github.com/airbytehq/airbyte/pull/11729) | Bump mina-sshd from 2.7.0 to 2.8.0 |
| 0.1.4 | 2022-02-25 | [10421](https://github.com/airbytehq/airbyte/pull/10421) | Refactor JDBC parameters handling |
| 0.1.3 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
| 0.1.1 | 2021-12-21 | [\#8982](https://github.com/airbytehq/airbyte/pull/8982) | Set isSchemaRequired to false |
| 0.1.0 | 2021-11-04 | [\#7620](https://github.com/airbytehq/airbyte/pull/7620) | Add ClickHouse destination |
Loading

0 comments on commit 5a835c2

Please sign in to comment.