Skip to content

Commit

Permalink
convert destination-snowflake to Kotlin CDK
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Apr 30, 2024
1 parent e0f9c29 commit d7785fd
Show file tree
Hide file tree
Showing 30 changed files with 247 additions and 207 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 0.31.1 | 2024-04-29 | [\#36910](https://github.com/airbytehq/airbyte/pull/36910) | changes for destination-snowflake. |
| 0.31.0 | 2024-04-26 | [\#37584](https://github.com/airbytehq/airbyte/pull/37584) | Update S3 destination deps to exclude zookeeper and hadoop-yarn-common |
| 0.30.11 | 2024-04-25 | [\#36899](https://github.com/airbytehq/airbyte/pull/36899) | changes for bigQuery destination. |
| 0.30.10 | 2024-04-24 | [\#37541](https://github.com/airbytehq/airbyte/pull/37541) | remove excessive logging |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.31.0
version=0.31.1
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ abstract class JdbcSqlOperations : SqlOperations {
}
}

fun dropTableIfExistsQuery(schemaName: String?, tableName: String?): String {
open fun dropTableIfExistsQuery(schemaName: String?, tableName: String?): String {
return String.format("DROP TABLE IF EXISTS %s.%s;\n", schemaName, tableName)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ abstract class JdbcDestinationHandler<DestinationState>(
existingTable.columns[JavaBaseConstants.COLUMN_NAME_AB_META]!!.type
}

private fun existingSchemaMatchesStreamConfig(
open protected fun existingSchemaMatchesStreamConfig(
stream: StreamConfig?,
existingTable: TableDefinition
): Boolean {
Expand Down Expand Up @@ -479,9 +479,9 @@ abstract class JdbcDestinationHandler<DestinationState>(

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(JdbcDestinationHandler::class.java)
private const val DESTINATION_STATE_TABLE_NAME = "_airbyte_destination_state"
private const val DESTINATION_STATE_TABLE_COLUMN_NAME = "name"
private const val DESTINATION_STATE_TABLE_COLUMN_NAMESPACE = "namespace"
protected const val DESTINATION_STATE_TABLE_NAME = "_airbyte_destination_state"
protected const val DESTINATION_STATE_TABLE_COLUMN_NAME = "name"
protected const val DESTINATION_STATE_TABLE_COLUMN_NAMESPACE = "namespace"
private const val DESTINATION_STATE_TABLE_COLUMN_STATE = "destination_state"
private const val DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT = "updated_at"

Expand Down Expand Up @@ -542,6 +542,7 @@ abstract class JdbcDestinationHandler<DestinationState>(
return Optional.of(TableDefinition(retrievedColumnDefns))
}

@JvmStatic
fun fromIsNullableIsoString(isNullable: String?): Boolean {
return "YES".equals(isNullable, ignoreCase = true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1469,7 +1469,7 @@ abstract class DestinationAcceptanceTest {
}

/** Whether the destination should be tested against different namespaces. */
protected open fun supportNamespaceTest(): Boolean {
open protected fun supportNamespaceTest(): Boolean {
return false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class LocalAirbyteDestination(private val dest: Destination) : AirbyteDestinatio
return isClosed
}

override val exitValue = 0
override var exitValue = 0

override fun attemptRead(): Optional<io.airbyte.protocol.models.AirbyteMessage> {
return Optional.empty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ interface AirbyteDestination : CheckedConsumer<AirbyteMessage, Exception>, AutoC
* @return exit code of the destination process
* @throws IllegalStateException if the destination process has not exited
*/
abstract val exitValue: Int
val exitValue: Int

/**
* Attempts to read an AirbyteMessage from the Destination.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> : Destination
* @return whether it exists and is in the correct format
*/
@Throws(Exception::class)
protected fun doesValidV1RawTableExist(namespace: String?, tableName: String?): Boolean {
protected open fun doesValidV1RawTableExist(namespace: String?, tableName: String?): Boolean {
val existingV1RawTable = getTableIfExists(namespace, tableName)
return existingV1RawTable.isPresent &&
doesV1RawTableMatchExpectedSchema(existingV1RawTable.get())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
* Subclasses should override this method if they need to make changes to the stream ID. For
* example, you could upcase the final table name here.
*/
protected fun buildStreamId(
open protected fun buildStreamId(
namespace: String,
finalTableName: String,
rawTableName: String
Expand Down Expand Up @@ -149,7 +149,7 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
/** Identical to [BaseTypingDedupingTest.getRawMetadataColumnNames]. */
get() = HashMap()

protected val finalMetadataColumnNames: Map<String, String>
open protected val finalMetadataColumnNames: Map<String, String>
/** Identical to [BaseTypingDedupingTest.getFinalMetadataColumnNames]. */
get() = HashMap()

Expand Down Expand Up @@ -728,7 +728,7 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
*/
@Test
@Throws(Exception::class)
fun ignoreOldRawRecords() {
open fun ignoreOldRawRecords() {
createRawTable(streamId)
createFinalTable(incrementalAppendStream, "")
insertRawTableRecords(
Expand Down Expand Up @@ -1519,7 +1519,10 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
executeSoftReset(generator, destinationHandler, incrementalAppendStream)
}

protected fun migrationAssertions(v1RawRecords: List<JsonNode>, v2RawRecords: List<JsonNode>) {
protected open fun migrationAssertions(
v1RawRecords: List<JsonNode>,
v2RawRecords: List<JsonNode>
) {
val v2RecordMap =
v2RawRecords
.stream()
Expand Down Expand Up @@ -1570,7 +1573,7 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
}

@Throws(Exception::class)
protected fun dumpV1RawTableRecords(streamId: StreamId): List<JsonNode> {
open protected fun dumpV1RawTableRecords(streamId: StreamId): List<JsonNode> {
return dumpRawTableRecords(streamId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ abstract class BaseTypingDedupingTest {
/** Conceptually identical to [.getFinalMetadataColumnNames], but for the raw table. */
get() = HashMap()

val finalMetadataColumnNames: Map<String, String>
open val finalMetadataColumnNames: Map<String, String>
/**
* If the destination connector uses a nonstandard schema for the final table, override this
* method. For example, destination-snowflake upcases all column names in the final tables.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.27.7'
cdkVersionRequired = '0.31.1'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# currently limit the number of parallel threads until further investigation into the issues \
# where Snowflake will fail to login using config credentials
testExecutionConcurrency=4
testExecutionConcurrency=1
JunitMethodExecutionTimeout=15 m
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.7.0
dockerImageTag: 3.7.1
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private static String getAccessTokenUsingRefreshToken(final String hostName,
}

public static JdbcDatabase getDatabase(final DataSource dataSource) {
return new DefaultJdbcDatabase(dataSource);
return new DefaultJdbcDatabase(dataSource, new SnowflakeSourceOperations());
}

private static Runnable getRefreshTokenTask(final HikariDataSource dataSource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
Expand Down Expand Up @@ -132,7 +133,7 @@ public JsonNode toJdbcConfig(final JsonNode config) {
}

@Override
protected JdbcSqlGenerator getSqlGenerator() {
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
throw new UnsupportedOperationException("Snowflake does not yet use the native JDBC DV2 interface");
}

Expand Down Expand Up @@ -209,7 +210,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
typerDeduper,
parsedCatalog,
defaultNamespace,
true)
DestinationColumns.V2_WITHOUT_META)
.setBufferMemoryLimit(Optional.of(getSnowflakeBufferMemoryLimit()))
.setOptimalBatchSizeBytes(
// The per stream size limit is following recommendations from:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,26 @@
import static io.airbyte.cdk.db.jdbc.DateTimeConverter.putJavaSQLTime;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.db.DataTypeUtils;
import io.airbyte.cdk.db.jdbc.JdbcSourceOperations;
import io.airbyte.commons.json.Jsons;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;

public class SnowflakeTestSourceOperations extends JdbcSourceOperations {
public class SnowflakeSourceOperations extends JdbcSourceOperations {

private static final DateTimeFormatter SNOWFLAKE_TIMESTAMPTZ_FORMATTER = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.append(DateTimeFormatter.ISO_LOCAL_DATE)
.appendLiteral(' ')
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.optionalStart()
.appendLiteral(' ')
.append(DateTimeFormatter.ofPattern("XX"))
.toFormatter();

@Override
public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException {
Expand Down Expand Up @@ -45,4 +59,12 @@ protected void putTime(final ObjectNode node,
putJavaSQLTime(node, columnName, resultSet, index);
}

@Override
protected void putTimestampWithTimezone(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index)
throws SQLException {
final String timestampAsString = resultSet.getString(index);
OffsetDateTime timestampWithOffset = OffsetDateTime.parse(timestampAsString, SNOWFLAKE_TIMESTAMPTZ_FORMATTER);
node.put(columnName, timestampWithOffset.format(DataTypeUtils.TIMESTAMPTZ_FORMATTER));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.DestinationConfig;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination.async.partial_messages.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations;
import io.airbyte.cdk.integrations.destination.jdbc.SqlOperationsUtils;
Expand Down Expand Up @@ -37,10 +37,10 @@ public class SnowflakeSqlOperations extends JdbcSqlOperations implements SqlOper
@Override
public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception {
try {
if (!schemaSet.contains(schemaName) && !isSchemaExists(database, schemaName)) {
if (!getSchemaSet().contains(schemaName) && !isSchemaExists(database, schemaName)) {
// 1s1t is assuming a lowercase airbyte_internal schema name, so we need to quote it
database.execute(String.format("CREATE SCHEMA IF NOT EXISTS \"%s\";", schemaName));
schemaSet.add(schemaName);
getSchemaSet().add(schemaName);
}
} catch (final Exception e) {
throw checkForKnownConfigExceptions(e).orElseThrow(() -> e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.destination.snowflake;

import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns;
import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer;
import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator;
Expand All @@ -18,14 +19,15 @@ public abstract class SnowflakeSqlStagingOperations extends SnowflakeSqlOperatio
/**
* This method is used in Check connection method to make sure that user has the Write permission
*/
@SuppressWarnings("deprecation")
protected void attemptWriteToStage(final String outputSchema,
final String stageName,
final JdbcDatabase database)
throws Exception {

final CsvSerializedBuffer csvSerializedBuffer = new CsvSerializedBuffer(
new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX),
new StagingDatabaseCsvSheetGenerator(true),
new StagingDatabaseCsvSheetGenerator(DestinationColumns.V2_WITHOUT_META),
true);

// create a dummy stream\records that will bed used to test uploading
Expand Down
Loading

0 comments on commit d7785fd

Please sign in to comment.