From 479ea790b7c1a4bdda620af06a488f88ae1c7929 Mon Sep 17 00:00:00 2001 From: Jem Bishop Date: Mon, 17 Nov 2025 11:08:36 +0000 Subject: [PATCH 1/3] fix: add version_id to Glue update_table for optimistic locking - Fetch current VersionId from Glue table before commit - Pass version_id to update_table() to enable proper optimistic locking - Prevents concurrent commits from silently overwriting each other - Fixes orphaned files issue when multiple writers commit simultaneously --- crates/catalog/glue/src/catalog.rs | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 4514f2d7ab..613b76dfea 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -764,6 +764,18 @@ impl Catalog for GlueCatalog { let current_table = self.load_table(&table_ident).await?; let current_metadata_location = current_table.metadata_location_result()?.to_string(); + // Get current VersionId for optimistic locking + let get_table_builder = self + .client + .0 + .get_table() + .database_name(&table_namespace) + .name(table_ident.name()); + let get_table_builder = with_catalog_id!(get_table_builder, self.config); + let glue_table = get_table_builder.send().await.map_err(from_aws_sdk_error)?; + let current_version_id = glue_table.table() + .and_then(|t| t.version_id.clone()); + let staged_table = commit.apply(current_table)?; let staged_metadata_location = staged_table.metadata_location_result()?; @@ -773,8 +785,8 @@ impl Catalog for GlueCatalog { .write_to(staged_table.file_io(), staged_metadata_location) .await?; - // Persist staged table to Glue - let builder = self + // Persist staged table to Glue with optimistic locking + let mut builder = self .client .0 .update_table() @@ -787,6 +799,12 @@ impl Catalog for GlueCatalog { staged_table.metadata().properties(), Some(current_metadata_location), )?); + + // Add VersionId for optimistic locking + if let Some(version_id) = current_version_id { + builder = builder.version_id(version_id); + } + let builder = with_catalog_id!(builder, self.config); let _ = builder.send().await.map_err(|e| { let error = e.into_service_error(); From e2693d63954bfbffc299e4df522444e2d44ee435 Mon Sep 17 00:00:00 2001 From: Jem Bishop Date: Mon, 17 Nov 2025 11:16:12 +0000 Subject: [PATCH 2/3] fix: fetch version_id atomically with metadata for proper optimistic locking --- crates/catalog/glue/src/catalog.rs | 31 ++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 613b76dfea..d021704ca5 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -761,10 +761,10 @@ impl Catalog for GlueCatalog { async fn update_table(&self, commit: TableCommit) -> Result { let table_ident = commit.identifier().clone(); let table_namespace = validate_namespace(table_ident.namespace())?; - let current_table = self.load_table(&table_ident).await?; - let current_metadata_location = current_table.metadata_location_result()?.to_string(); - // Get current VersionId for optimistic locking + // Make a single get_table call to fetch both metadata location and version_id atomically + // This ensures optimistic locking works correctly: we use the version_id that was current + // when we read the metadata, not the version_id at commit time let get_table_builder = self .client .0 @@ -772,9 +772,28 @@ impl Catalog for GlueCatalog { .database_name(&table_namespace) .name(table_ident.name()); let get_table_builder = with_catalog_id!(get_table_builder, self.config); - let glue_table = get_table_builder.send().await.map_err(from_aws_sdk_error)?; - let current_version_id = glue_table.table() - .and_then(|t| t.version_id.clone()); + let glue_table_output = get_table_builder.send().await.map_err(from_aws_sdk_error)?; + + let glue_table = glue_table_output.table().ok_or_else(|| { + Error::new( + ErrorKind::TableNotFound, + format!("Table {table_ident} not found"), + ) + })?; + + // Extract version_id for optimistic locking + let current_version_id = glue_table.version_id.clone(); + + // Extract metadata location and load the table + let current_metadata_location = get_metadata_location(&glue_table.parameters)?; + let metadata = TableMetadata::read_from(&self.file_io, ¤t_metadata_location).await?; + + let current_table = Table::builder() + .file_io(self.file_io()) + .metadata_location(current_metadata_location.clone()) + .metadata(metadata) + .identifier(table_ident.clone()) + .build()?; let staged_table = commit.apply(current_table)?; let staged_metadata_location = staged_table.metadata_location_result()?; From 50c9710a75aa61e49d545423349ffef912b0383b Mon Sep 17 00:00:00 2001 From: jem Date: Wed, 19 Nov 2025 10:43:15 +0000 Subject: [PATCH 3/3] fix: refactor load_table to use load_table_with_version_id for proper OCC --- crates/catalog/glue/src/catalog.rs | 129 ++++++++++++++--------------- 1 file changed, 61 insertions(+), 68 deletions(-) diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index d021704ca5..dce287ed6e 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -196,6 +196,62 @@ impl GlueCatalog { pub fn file_io(&self) -> FileIO { self.file_io.clone() } + + /// Loads a table from the Glue Catalog along with its version_id for optimistic locking. + /// + /// # Returns + /// A `Result` wrapping a tuple of (`Table`, `Option`) where the String is the version_id + /// from Glue that should be used for optimistic concurrency control when updating the table. + /// + /// # Errors + /// This function may return an error in several scenarios, including: + /// - Failure to validate the namespace. + /// - Failure to retrieve the table from the Glue Catalog. + /// - Absence of metadata location information in the table's properties. + /// - Issues reading or deserializing the table's metadata file. + async fn load_table_with_version_id( + &self, + table: &TableIdent, + ) -> Result<(Table, Option)> { + let db_name = validate_namespace(table.namespace())?; + let table_name = table.name(); + + let builder = self + .client + .0 + .get_table() + .database_name(&db_name) + .name(table_name); + let builder = with_catalog_id!(builder, self.config); + + let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?; + + let glue_table = glue_table_output.table().ok_or_else(|| { + Error::new( + ErrorKind::TableNotFound, + format!( + "Table object for database: {db_name} and table: {table_name} does not exist" + ), + ) + })?; + + let version_id = glue_table.version_id.clone(); + let metadata_location = get_metadata_location(&glue_table.parameters)?; + + let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?; + + let table = Table::builder() + .file_io(self.file_io()) + .metadata_location(metadata_location) + .metadata(metadata) + .identifier(TableIdent::new( + NamespaceIdent::new(db_name), + table_name.to_owned(), + )) + .build()?; + + Ok((table, version_id)) + } } #[async_trait] @@ -514,42 +570,8 @@ impl Catalog for GlueCatalog { /// - Absence of metadata location information in the table's properties. /// - Issues reading or deserializing the table's metadata file. async fn load_table(&self, table: &TableIdent) -> Result
{ - let db_name = validate_namespace(table.namespace())?; - let table_name = table.name(); - - let builder = self - .client - .0 - .get_table() - .database_name(&db_name) - .name(table_name); - let builder = with_catalog_id!(builder, self.config); - - let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?; - - match glue_table_output.table() { - None => Err(Error::new( - ErrorKind::TableNotFound, - format!( - "Table object for database: {db_name} and table: {table_name} does not exist" - ), - )), - Some(table) => { - let metadata_location = get_metadata_location(&table.parameters)?; - - let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?; - - Table::builder() - .file_io(self.file_io()) - .metadata_location(metadata_location) - .metadata(metadata) - .identifier(TableIdent::new( - NamespaceIdent::new(db_name), - table_name.to_owned(), - )) - .build() - } - } + let (table, _) = self.load_table_with_version_id(table).await?; + Ok(table) } /// Asynchronously drops a table from the database. @@ -762,38 +784,9 @@ impl Catalog for GlueCatalog { let table_ident = commit.identifier().clone(); let table_namespace = validate_namespace(table_ident.namespace())?; - // Make a single get_table call to fetch both metadata location and version_id atomically - // This ensures optimistic locking works correctly: we use the version_id that was current - // when we read the metadata, not the version_id at commit time - let get_table_builder = self - .client - .0 - .get_table() - .database_name(&table_namespace) - .name(table_ident.name()); - let get_table_builder = with_catalog_id!(get_table_builder, self.config); - let glue_table_output = get_table_builder.send().await.map_err(from_aws_sdk_error)?; - - let glue_table = glue_table_output.table().ok_or_else(|| { - Error::new( - ErrorKind::TableNotFound, - format!("Table {table_ident} not found"), - ) - })?; - - // Extract version_id for optimistic locking - let current_version_id = glue_table.version_id.clone(); - - // Extract metadata location and load the table - let current_metadata_location = get_metadata_location(&glue_table.parameters)?; - let metadata = TableMetadata::read_from(&self.file_io, ¤t_metadata_location).await?; - - let current_table = Table::builder() - .file_io(self.file_io()) - .metadata_location(current_metadata_location.clone()) - .metadata(metadata) - .identifier(table_ident.clone()) - .build()?; + let (current_table, current_version_id) = + self.load_table_with_version_id(&table_ident).await?; + let current_metadata_location = current_table.metadata_location_result()?.to_string(); let staged_table = commit.apply(current_table)?; let staged_metadata_location = staged_table.metadata_location_result()?;