Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.apache.iceberg.spark.source

import scala.collection.JavaConverters._

import org.apache.commons.lang3.reflect.FieldUtils
import org.apache.iceberg.Table
import org.apache.iceberg.types.TypeUtil

object AuronIcebergSourceUtil {
Expand All @@ -37,8 +39,23 @@ object AuronIcebergSourceUtil {
expectedSchema.columns().asScala.map(field => field.name() -> field.fieldId()).toMap
}

def expectedFieldIdsForChangelogScan(scan: AnyRef): Map[String, Int] = {
val expectedSchema =
FieldUtils.readField(scan, "expectedSchema", true).asInstanceOf[org.apache.iceberg.Schema]
expectedSchema.columns().asScala.map(field => field.name() -> field.fieldId()).toMap
}

def detectRenameOrDrop(scan: AnyRef): RenameOrDrop = {
val table = asBatchQueryScan(scan).table()
detectRenameOrDrop(table)
}

def detectRenameOrDropForChangelogScan(scan: AnyRef): RenameOrDrop = {
val table = FieldUtils.readField(scan, "table", true).asInstanceOf[Table]
detectRenameOrDrop(table)
}

private def detectRenameOrDrop(table: Table): RenameOrDrop = {
val currentFields = collectFieldIdToName(table.schema())

table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,31 @@ object IcebergScanSupport extends Logging {
}
val (fileSchema, partitionSchema) = schemas.get

val fieldIdsByName =
try {
AuronIcebergSourceUtil.expectedFieldIdsForChangelogScan(scan.asInstanceOf[AnyRef])
} catch {
case NonFatal(t) =>
logWarning(s"Failed to inspect Iceberg changelog field ids for $scan.", t)
return None
}

val renameOrDrop =
try {
AuronIcebergSourceUtil.detectRenameOrDropForChangelogScan(scan.asInstanceOf[AnyRef])
} catch {
case NonFatal(t) =>
logWarning(s"Failed to inspect Iceberg changelog schema history for $scan.", t)
return None
}
assert(!renameOrDrop.nested, "Nested Iceberg rename or drop is not supported.")

val missingFieldIds =
fileSchema.fields.filterNot(field => fieldIdsByName.contains(field.name)).map(_.name)
assert(
missingFieldIds.isEmpty,
s"Missing Iceberg field ids for columns: ${missingFieldIds.mkString(", ")}")

val partitions = inputPartitions(exec)
if (partitions.isEmpty) {
return Some(
Expand All @@ -221,7 +246,7 @@ object IcebergScanSupport extends Logging {
fileSchema,
partitionSchema,
Seq.empty,
Map.empty))
fieldIdsByName))
}

val icebergPartitions = partitions.flatMap(icebergPartition)
Expand Down Expand Up @@ -256,7 +281,9 @@ object IcebergScanSupport extends Logging {
}

val format = formats.headOption.getOrElse(FileFormat.PARQUET)
if (format != FileFormat.PARQUET && format != FileFormat.ORC) {
val supportedFormat =
format == FileFormat.PARQUET || (format == FileFormat.ORC && !renameOrDrop.topLevel)
if (!supportedFormat) {
return None
}

Expand All @@ -270,7 +297,7 @@ object IcebergScanSupport extends Logging {
fileSchema,
partitionSchema,
pruningPredicates,
Map.empty))
fieldIdsByName))
}

private def supportedSchemas(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,65 @@ class AuronIcebergIntegrationSuite
}
}

test("iceberg changelog scan reads renamed columns by field id") {
withTable("local.db.t_changelog_rename") {
withTempView("t_changelog_rename_changes") {
sql("""
|create table local.db.t_changelog_rename (id int, old_name string)
|using iceberg
|tblproperties ('format-version' = '2')
|""".stripMargin)
sql("insert into local.db.t_changelog_rename values (0, 'initial')")
val startSnapshotId = currentSnapshotId("local.db.t_changelog_rename")
sql("insert into local.db.t_changelog_rename values (1, 'before')")
sql("alter table local.db.t_changelog_rename rename column old_name to new_name")
sql("insert into local.db.t_changelog_rename values (2, 'after')")
val endSnapshotId = currentSnapshotId("local.db.t_changelog_rename")
createChangelogView(
"local.db.t_changelog_rename",
"t_changelog_rename_changes",
startSnapshotId,
endSnapshotId)

checkSparkAnswerAndOperator("""
|select id, new_name, _change_type, _change_ordinal, _commit_snapshot_id
|from t_changelog_rename_changes
|order by id
|""".stripMargin)
}
}
}

test("iceberg changelog scan does not reuse dropped field id for an added column") {
withTable("local.db.t_changelog_drop_add") {
withTempView("t_changelog_drop_add_changes") {
sql("""
|create table local.db.t_changelog_drop_add (id int, value string)
|using iceberg
|tblproperties ('format-version' = '2')
|""".stripMargin)
sql("insert into local.db.t_changelog_drop_add values (0, 'initial')")
val startSnapshotId = currentSnapshotId("local.db.t_changelog_drop_add")
sql("insert into local.db.t_changelog_drop_add values (1, 'old')")
sql("alter table local.db.t_changelog_drop_add drop column value")
sql("alter table local.db.t_changelog_drop_add add column value string")
sql("insert into local.db.t_changelog_drop_add values (2, 'new')")
val endSnapshotId = currentSnapshotId("local.db.t_changelog_drop_add")
createChangelogView(
"local.db.t_changelog_drop_add",
"t_changelog_drop_add_changes",
startSnapshotId,
endSnapshotId)

checkSparkAnswerAndOperator("""
|select id, value, _change_type, _change_ordinal, _commit_snapshot_id
|from t_changelog_drop_add_changes
|order by id
|""".stripMargin)
}
}
}

test("iceberg changelog scan falls back when delete changes exist") {
withTable("local.db.t_changelog_delete") {
withTempView("t_changelog_delete_changes") {
Expand Down
Loading