Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ testng = "7.11.0"
log4j = "2.25.2"
wiremock = "3.13.1"
jnanoid = "2.0.0"
awssdk = "2.36.1"
awssdk = "2.36.2"
gcs = "26.70.0"
system-stubs = "2.1.8"
fastcsv = "4.1.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,10 @@ class CSVReader
logger: KLogger,
totalFailures: AtomicLong,
) {
if (b.failures?.isNotEmpty() == true) {
if (b.failures?.isNotEmpty == true) {
for (f in b.failures.entrySet()) {
logger.warn { "Failed batch reason: ${f.value.failureReason}" }
logger.debug(f.value.failureReason) { "Full stack trace:" }
totalFailures.getAndAdd(
f.value.failedAssets.size
.toLong(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ class AssetTransformer(

/** {@inheritDoc} */
override fun includeRow(inputRow: Map<String, String>): Boolean {
// Rows will be limited by the extract, so everything extracted should be imported
return true
// Rows will be limited by the extract, so everything (non-empty) extracted should be imported
return inputRow.values.any { it.trim().isNotBlank() }
}

private fun mapAsset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,17 @@ class LineageTransformer(
/** {@inheritDoc} */
override fun mapRow(inputRow: Map<String, String>): List<List<String>> {
val name = inputRow[XFORM_NAME] ?: ""
val sourceQN = AssetTransformer.getAssetQN(ctx, inputRow, AssetTransformer.SOURCE_PREFIX, logger, qnMap)
val sourceType = inputRow[AssetTransformer.SOURCE_TYPE] ?: ""
val targetType = inputRow[AssetTransformer.TARGET_TYPE] ?: ""
val sourceQN = AssetTransformer.getAssetQN(ctx, inputRow, AssetTransformer.SOURCE_PREFIX, logger, qnMap)
val targetQN = AssetTransformer.getAssetQN(ctx, inputRow, AssetTransformer.TARGET_PREFIX, logger, qnMap)
val source =
if (sourceQN.isNotBlank() && sourceType.isNotBlank()) {
FieldSerde.getRefByQualifiedName(sourceType, sourceQN)
} else {
logger.warn { "Unable to translate source into a valid asset reference: $sourceType::$name" }
null
}
val targetQN = AssetTransformer.getAssetQN(ctx, inputRow, AssetTransformer.TARGET_PREFIX, logger, qnMap)
val targetType = inputRow[AssetTransformer.TARGET_TYPE] ?: ""
val target =
if (targetQN.isNotBlank() && targetType.isNotBlank()) {
FieldSerde.getRefByQualifiedName(targetType, targetQN)
Expand Down Expand Up @@ -108,7 +108,7 @@ class LineageTransformer(

/** {@inheritDoc} */
override fun includeRow(inputRow: Map<String, String>): Boolean {
// Rows will be limited by the extract, so everything extracted should be imported
return true
// Rows will be limited by the extract, so everything (non-empty) extracted should be imported
return inputRow.values.any { it.trim().isNotBlank() }
}
}
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
Source Type,Source Connector,Source Connection,Source Identity,Source Name,Target Type,Target Connector,Target Connection,Target Identity,Target Name,Transformation Connector,Transformation Connection,Transformation Identity,Transformation Name,sql,certificateStatus,announcementType,announcementTitle,announcementMessage
,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,
Table,matillion,{CONNECTION},db1/schema1/source_table,source_table,View,matillion,{CONNECTION},db2/schema2/target_view,target_view,matillion,{CONNECTION},xform_123,source_table > target_view,select * from db1.schema1.source_table,DRAFT,information,Testing lineage builder,Only a test...
Loading