Skip to content

Commit

Permalink
minor changes for destination-snowflake
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed May 2, 2024
1 parent a3f9263 commit 652c43b
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import java.util.function.Consumer
import java.util.function.Function
import java.util.stream.Stream
import java.util.stream.StreamSupport
import org.slf4j.Logger
import org.slf4j.LoggerFactory

/** Database object for interacting with a JDBC connection. */
abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSourceOperations<*>?) :
Expand Down Expand Up @@ -211,6 +213,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
abstract fun <T> executeMetadataQuery(query: Function<DatabaseMetaData?, T>): T

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(JdbcDatabase::class.java)
/**
* Map records returned in a result set. It is an "unsafe" stream because the stream must be
* manually closed. Otherwise, there will be a database connection leak.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,29 @@ abstract class JdbcDestinationHandler<DestinationState>(
return actualColumns == intendedColumns
}

protected open fun getDeleteStatesSql(
destinationStates: Map<StreamId, DestinationState>
): String {
return dslContext
.deleteFrom(table(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME)))
.where(
destinationStates.keys
.stream()
.map { streamId: StreamId ->
field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME))
.eq(streamId.originalName)
.and(
field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE))
.eq(streamId.originalNamespace)
)
}
.reduce(DSL.falseCondition()) { obj: Condition, arg2: Condition? ->
obj.or(arg2)
}
)
.getSQL(ParamType.INLINED)
}

@Throws(Exception::class)
override fun commitDestinationStates(destinationStates: Map<StreamId, DestinationState>) {
try {
Expand All @@ -408,25 +431,7 @@ abstract class JdbcDestinationHandler<DestinationState>(
}

// Delete all state records where the stream name+namespace match one of our states
val deleteStates =
dslContext
.deleteFrom(table(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME)))
.where(
destinationStates.keys
.stream()
.map { streamId: StreamId ->
field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME))
.eq(streamId.originalName)
.and(
field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE))
.eq(streamId.originalNamespace)
)
}
.reduce(DSL.falseCondition()) { obj: Condition, arg2: Condition? ->
obj.or(arg2)
}
)
.getSQL(ParamType.INLINED)
var deleteStates = getDeleteStatesSql(destinationStates)

// Reinsert all of our states
var insertStatesStep =
Expand Down Expand Up @@ -461,12 +466,17 @@ abstract class JdbcDestinationHandler<DestinationState>(
}
val insertStates = insertStatesStep.getSQL(ParamType.INLINED)

jdbcDatabase.executeWithinTransaction(listOf(deleteStates, insertStates))
executeWithinTransaction(listOf(deleteStates, insertStates))
} catch (e: Exception) {
LOGGER.warn("Failed to commit destination states", e)
}
}

@Throws(Exception::class)
protected open fun executeWithinTransaction(statements: List<String>) {
jdbcDatabase.executeWithinTransaction(statements)
}

/**
* Convert to the TYPE_NAME retrieved from [java.sql.DatabaseMetaData.getColumns]
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# currently limit the number of parallel threads until further investigation into the issues \
# where Snowflake will fail to login using config credentials
testExecutionConcurrency=1
testExecutionConcurrency=-1
JunitMethodExecutionTimeout=15 m
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,22 @@
import static io.airbyte.integrations.destination.snowflake.SnowflakeDestination.SCHEDULED_EXECUTOR_SERVICE;

import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.cdk.integrations.base.adaptive.AdaptiveDestinationRunner;
import net.snowflake.client.core.SFStatement;
import net.snowflake.client.jdbc.SnowflakeSQLException;

public class SnowflakeDestinationRunner {

public static void main(final String[] args) throws Exception {
IntegrationRunner.addOrphanedThreadFilter((Thread t) -> {
for (StackTraceElement stackTraceElement : IntegrationRunner.getThreadCreationStack(t).getStack()) {
if (stackTraceElement.getClassName().equals(SFStatement.class.getCanonicalName()) && stackTraceElement.getMethodName().equals("close")) {
return false;
}
}
return true;
});
AirbyteExceptionHandler.addThrowableForDeinterpolation(SnowflakeSQLException.class);
AdaptiveDestinationRunner.baseOnEnv()
.withOssDestination(() -> new SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,10 @@ protected void putTimestampWithTimezone(final ObjectNode node, final String colu
node.put(columnName, timestampWithOffset.format(DataTypeUtils.TIMESTAMPTZ_FORMATTER));
}

protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
// for backward compatibility
var instant = resultSet.getTimestamp(index).toInstant();
node.put(columnName, DataTypeUtils.toISO8601StringWithMicroseconds(instant));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.UUID;
import java.util.stream.Collectors;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringSubstitutor;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
Expand Down Expand Up @@ -355,4 +356,30 @@ private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
};
}

protected String getDeleteStatesSql(Map<StreamId, ? extends SnowflakeState> destinationStates) {
// only doing the DELETE where there's rows to delete allows us to avoid taking a lock on the table
// when there's nothing to delete
// This is particularly relevant in the context of tests, where many instance of the snowflake
// destination could be run in parallel
String deleteStatesSql = super.getDeleteStatesSql(destinationStates);
StringBuilder sql = new StringBuilder();
// sql.append("BEGIN\n");
sql.append(" IF (EXISTS (").append(deleteStatesSql.replace("delete from", "SELECT 1 FROM ")).append(")) THEN\n");
sql.append(" ").append(deleteStatesSql).append(";\n");
sql.append(" END IF\n");
// sql.append("END;\n");
return sql.toString();
}

protected void executeWithinTransaction(List<String> statements) throws SQLException {
StringBuilder sb = new StringBuilder();
sb.append("BEGIN\n");
sb.append(" BEGIN TRANSACTION;\n ");
sb.append(StringUtils.join(statements, ";\n "));
sb.append(";\n COMMIT;\n");
sb.append("END;");
LOGGER.info("executing SQL:" + sb);
getJdbcDatabase().execute(sb.toString());
}

}

0 comments on commit 652c43b

Please sign in to comment.