Skip to content

Commit

Permalink
implement redshift refreshes
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jun 27, 2024
1 parent af94f4d commit 9429b1b
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.38.3'
cdkVersionRequired = '0.40.3'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ class RedshiftDestination : BaseConnector(), Destination {
hasUnprocessedRecords = true,
maxProcessedTimestamp = Optional.empty(),
),
initialTempRawTableStatus =
InitialRawTableStatus(
rawTableExists = false,
hasUnprocessedRecords = true,
maxProcessedTimestamp = Optional.empty(),
),
isSchemaMismatch = true,
isFinalTableEmpty = true,
destinationState =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import io.airbyte.integrations.destination.redshift.manifest.Entry
import io.airbyte.integrations.destination.redshift.manifest.Manifest
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Instant
import java.time.ZoneOffset
Expand All @@ -41,17 +40,59 @@ class RedshiftStagingStorageOperation(
private val writeDatetime: ZonedDateTime = Instant.now().atZone(ZoneOffset.UTC)
private val objectMapper = ObjectMapper()

override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) {
override fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean) {
// create raw table
destinationHandler.execute(Sql.of(createRawTableQuery(streamId)))
if (destinationSyncMode == DestinationSyncMode.OVERWRITE) {
destinationHandler.execute(Sql.of(truncateRawTableQuery(streamId)))
destinationHandler.execute(Sql.of(createRawTableQuery(streamId, suffix)))
if (replace) {
destinationHandler.execute(Sql.of(truncateRawTableQuery(streamId, suffix)))
}
// create bucket for staging files
s3StorageOperations.createBucketIfNotExists()
}

override fun writeToStage(streamConfig: StreamConfig, data: SerializableBuffer) {
override fun overwriteStage(streamId: StreamId, suffix: String) {
destinationHandler.execute(
Sql.transactionally(
"""DROP TABLE IF EXISTS "${streamId.rawNamespace}"."${streamId.rawName}" """,
"""ALTER TABLE "${streamId.rawNamespace}"."${streamId.rawName}$suffix" RENAME TO "${streamId.rawNamespace}"."${streamId.rawName}" """
)
)
}

override fun transferFromTempStage(streamId: StreamId, suffix: String) {
// TODO are there edge cases where this doesn't work? (migrations, etc.)
// ALTER TABLE ... APPEND can't run inside transactions, so run these statements separately
destinationHandler.execute(
Sql.separately(
// ALTER TABLE ... APPEND is an efficient way to move records from one table to
// another.
// https://docs.aws.amazon.com/redshift/latest/dg/r_ALTER_TABLE_APPEND.html
"""
ALTER TABLE "${streamId.rawNamespace}"."${streamId.rawName}"
APPEND FROM "${streamId.rawNamespace}"."${streamId.rawName}$suffix"
""".trimIndent(),
"""DROP TABLE IF EXISTS "${streamId.rawNamespace}"."${streamId.rawName}" """,
)
)
}

override fun getStageGeneration(streamId: StreamId, suffix: String): Long? {
val generation =
destinationHandler.query(
"""SELECT ${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID} FROM "${streamId.rawNamespace}"."${streamId.rawName}$suffix" LIMIT 1"""
)
if (generation.isEmpty()) {
return null
}

return generation.first()[JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID].asLong()
}

override fun writeToStage(
streamConfig: StreamConfig,
suffix: String,
data: SerializableBuffer
) {
val streamId = streamConfig.id
val objectPath: String = getStagingPath(streamId)
log.info {
Expand All @@ -61,13 +102,19 @@ class RedshiftStagingStorageOperation(
s3StorageOperations.uploadRecordsToBucket(data, streamId.rawNamespace, objectPath)

log.info {
"Starting copy to target table from stage: ${streamId.rawName} in destination from stage: $objectPath/$filename."
"Starting copy to target table from stage: ${streamId.rawName}$suffix in destination from stage: $objectPath/$filename."
}
val manifestContents = createManifest(listOf(filename), objectPath)
val manifestPath = putManifest(manifestContents, objectPath)
executeCopy(manifestPath, destinationHandler, streamId.rawNamespace, streamId.rawName)
executeCopy(
manifestPath,
destinationHandler,
streamId.rawNamespace,
streamId.rawName,
suffix
)
log.info {
"Copy to target table ${streamId.rawNamespace}.${streamId.rawName} in destination complete."
"Copy to target table ${streamId.rawNamespace}.${streamId.rawName}$suffix in destination complete."
}
}

Expand Down Expand Up @@ -172,6 +219,7 @@ class RedshiftStagingStorageOperation(
destinationHandler: RedshiftDestinationHandler,
schemaName: String,
tableName: String,
suffix: String,
) {
val accessKeyId =
s3Config.s3CredentialConfig!!.s3CredentialsProvider.credentials.awsAccessKeyId
Expand All @@ -180,7 +228,7 @@ class RedshiftStagingStorageOperation(

val copyQuery =
"""
COPY $schemaName.$tableName FROM '${getFullS3Path(s3Config.bucketName!!, manifestPath)}'
COPY $schemaName.$tableName$suffix FROM '${getFullS3Path(s3Config.bucketName!!, manifestPath)}'
CREDENTIALS 'aws_access_key_id=$accessKeyId;aws_secret_access_key=$secretAccessKey'
CSV GZIP
REGION '${s3Config.bucketRegion}' TIMEFORMAT 'auto'
Expand All @@ -195,9 +243,9 @@ class RedshiftStagingStorageOperation(
companion object {
private val nameTransformer = RedshiftSQLNameTransformer()

private fun createRawTableQuery(streamId: StreamId): String {
private fun createRawTableQuery(streamId: StreamId, suffix: String): String {
return """
CREATE TABLE IF NOT EXISTS "${streamId.rawNamespace}"."${streamId.rawName}" (
CREATE TABLE IF NOT EXISTS "${streamId.rawNamespace}"."${streamId.rawName}$suffix" (
${JavaBaseConstants.COLUMN_NAME_AB_RAW_ID} VARCHAR(36),
${JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT} TIMESTAMPTZ DEFAULT GETDATE(),
${JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT} TIMESTAMPTZ,
Expand All @@ -208,12 +256,8 @@ class RedshiftStagingStorageOperation(
""".trimIndent()
}

private fun truncateRawTableQuery(streamId: StreamId): String {
return String.format(
"""TRUNCATE TABLE "%s"."%s";""",
streamId.rawNamespace,
streamId.rawName
)
private fun truncateRawTableQuery(streamId: StreamId, suffix: String): String {
return """TRUNCATE TABLE "${streamId.rawNamespace}"."${streamId.rawName}$suffix" """
}

private fun getFullS3Path(s3BucketName: String, s3StagingFile: String): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ class RedshiftDestinationHandler(
)
}

fun query(sql: String): List<JsonNode> = jdbcDatabase.queryJsons(sql)

private fun toJdbcTypeName(airbyteProtocolType: AirbyteProtocolType): String {
return when (airbyteProtocolType) {
AirbyteProtocolType.STRING -> "varchar"
Expand Down

0 comments on commit 9429b1b

Please sign in to comment.