Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manually retrying commits can result in partially applied changes #9227

Closed
jasonf20 opened this issue Dec 5, 2023 · 0 comments 路 Fixed by #9230
Closed

Manually retrying commits can result in partially applied changes #9227

jasonf20 opened this issue Dec 5, 2023 · 0 comments 路 Fixed by #9230
Milestone

Comments

@jasonf20
Copy link
Contributor

jasonf20 commented Dec 5, 2023

Apache Iceberg version

1.4.2 (latest release)

Query engine

Athena

Please describe the bug 馃悶

Since the following PR: #6335 FastAppend and subclasses of MergingSnapshotProducer will skip newly added data files during manual commit retries (calling commit again inside a try-catch in user code).

This happens because the cached value is set to an empty list instead of null during cleanUncommittedAppends and then during retry when newDataFilesAsManifests is called the logic is skipped and no data files are returned.

The result can be partially applied changes if the user does manual retries of commits. For example, the following code will produce a rewrite that applies the deletes but does not add the new file:

import java.util
import java.util.UUID

import scala.collection.JavaConverters._

import org.apache.iceberg.aws.glue.GlueCatalog
import org.apache.iceberg.catalog._
import org.apache.iceberg.data.GenericRecord
import org.apache.iceberg.data.parquet.GenericParquetWriter
import org.apache.iceberg.parquet.Parquet
import org.apache.iceberg.types.Types
import org.apache.iceberg.{DataFile, PartitionSpec, Schema, Table, data}

object TestRewriteCommits {

  def main(args: Array[String]): Unit = {
    val catalog = new GlueCatalog()
    catalog.initialize("iceberg", Map.empty[String, String].asJava)

    val schema = new Schema(
      Types.NestedField.required(1, "id", Types.StringType.get()),
    );
    val tableName = "temp4"
    val tableId = TableIdentifier.of("prod_iceberg", tableName)
    val basePath = s"s3://s3-bucket-path/ice/tables/${tableName}/"

     val tableProperties: util.Map[String, String] = Map(
      "format-version" -> "2",
      "commit.retry.num-retries" -> "0" //turn off retries for more control during testing process
    ).asJava
    if (!catalog.tableExists(tableId)) {
      catalog.createTable(tableId, schema, PartitionSpec.unpartitioned(), basePath, tableProperties)
    }

    val table = catalog.loadTable(tableId)

    val addedFiles = (1 to 2).map(i => {
      val file: DataFile = writeFile(basePath, table)
      val append = table.newAppend()
      append.appendFile(file)
      append.commit()
      file
    })

    val transaction = table.newTransaction()
    val rewrite = transaction.newRewrite()
    addedFiles.foreach(rewrite.deleteFile)
    rewrite.addFile(writeFile(basePath, table))
    rewrite.commit()
    try {
      // Make sure this commit fails (I failed it by breaking at glue.updateTable(updateTableRequest.build()); and changing the table from athena.
      transaction.commitTransaction()
    } catch {
      case e: Throwable =>
        // This retry will run successfully but the result will not contain the data file added during the rewrite. 
        transaction.commitTransaction()
    }
  }

  private def writeFile(basePath: String, table: Table) = {
    val writer = Parquet.writeData(
        table.io().newOutputFile(basePath + UUID.randomUUID().toString + ".parquet"))
      .forTable(table)
      .overwrite(true)
      .createWriterFunc(GenericParquetWriter.buildWriter)
      .build[data.Record]()
    writer.write(Iterable(GenericRecord.create(table.schema()).copy("id", "1")).asJava)
    writer.close()
    val file = writer.toDataFile
    file
  }
}

I think fixing this can be done by either setting the cached value to null like it was before or by forbidding calling commit more than once.

alexjo2144 added a commit to alexjo2144/trino that referenced this issue Dec 18, 2023
This reverts commit bb66918.

Iceberg 1.4.x contains a silent correctness issue when concurrently
committing writes to a table.

See: apache/iceberg#9227
alexjo2144 added a commit to alexjo2144/trino that referenced this issue Dec 18, 2023
This reverts commit bb66918.

Iceberg 1.4.x contains a silent correctness issue when concurrently
committing writes to a table.

See: apache/iceberg#9227
alexjo2144 added a commit to alexjo2144/trino that referenced this issue Dec 18, 2023
This reverts commit bb66918.

Iceberg 1.4.x contains a silent correctness issue when concurrently
committing writes to a table.

See: apache/iceberg#9227
alexjo2144 added a commit to alexjo2144/trino that referenced this issue Dec 18, 2023
This reverts commit bb66918.

Iceberg 1.4.x contains a silent correctness issue when concurrently
committing writes to a table.

See: apache/iceberg#9227
@nastra nastra added this to the Iceberg 1.4.3 milestone Dec 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants