diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 4514f2d7ab..dce287ed6e 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -196,6 +196,62 @@ impl GlueCatalog { pub fn file_io(&self) -> FileIO { self.file_io.clone() } + + /// Loads a table from the Glue Catalog along with its version_id for optimistic locking. + /// + /// # Returns + /// A `Result` wrapping a tuple of (`Table`, `Option`) where the String is the version_id + /// from Glue that should be used for optimistic concurrency control when updating the table. + /// + /// # Errors + /// This function may return an error in several scenarios, including: + /// - Failure to validate the namespace. + /// - Failure to retrieve the table from the Glue Catalog. + /// - Absence of metadata location information in the table's properties. + /// - Issues reading or deserializing the table's metadata file. + async fn load_table_with_version_id( + &self, + table: &TableIdent, + ) -> Result<(Table, Option)> { + let db_name = validate_namespace(table.namespace())?; + let table_name = table.name(); + + let builder = self + .client + .0 + .get_table() + .database_name(&db_name) + .name(table_name); + let builder = with_catalog_id!(builder, self.config); + + let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?; + + let glue_table = glue_table_output.table().ok_or_else(|| { + Error::new( + ErrorKind::TableNotFound, + format!( + "Table object for database: {db_name} and table: {table_name} does not exist" + ), + ) + })?; + + let version_id = glue_table.version_id.clone(); + let metadata_location = get_metadata_location(&glue_table.parameters)?; + + let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?; + + let table = Table::builder() + .file_io(self.file_io()) + .metadata_location(metadata_location) + .metadata(metadata) + .identifier(TableIdent::new( + NamespaceIdent::new(db_name), + table_name.to_owned(), + )) + .build()?; + + Ok((table, version_id)) + } } #[async_trait] @@ -514,42 +570,8 @@ impl Catalog for GlueCatalog { /// - Absence of metadata location information in the table's properties. /// - Issues reading or deserializing the table's metadata file. async fn load_table(&self, table: &TableIdent) -> Result { - let db_name = validate_namespace(table.namespace())?; - let table_name = table.name(); - - let builder = self - .client - .0 - .get_table() - .database_name(&db_name) - .name(table_name); - let builder = with_catalog_id!(builder, self.config); - - let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?; - - match glue_table_output.table() { - None => Err(Error::new( - ErrorKind::TableNotFound, - format!( - "Table object for database: {db_name} and table: {table_name} does not exist" - ), - )), - Some(table) => { - let metadata_location = get_metadata_location(&table.parameters)?; - - let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?; - - Table::builder() - .file_io(self.file_io()) - .metadata_location(metadata_location) - .metadata(metadata) - .identifier(TableIdent::new( - NamespaceIdent::new(db_name), - table_name.to_owned(), - )) - .build() - } - } + let (table, _) = self.load_table_with_version_id(table).await?; + Ok(table) } /// Asynchronously drops a table from the database. @@ -761,7 +783,9 @@ impl Catalog for GlueCatalog { async fn update_table(&self, commit: TableCommit) -> Result
{ let table_ident = commit.identifier().clone(); let table_namespace = validate_namespace(table_ident.namespace())?; - let current_table = self.load_table(&table_ident).await?; + + let (current_table, current_version_id) = + self.load_table_with_version_id(&table_ident).await?; let current_metadata_location = current_table.metadata_location_result()?.to_string(); let staged_table = commit.apply(current_table)?; @@ -773,8 +797,8 @@ impl Catalog for GlueCatalog { .write_to(staged_table.file_io(), staged_metadata_location) .await?; - // Persist staged table to Glue - let builder = self + // Persist staged table to Glue with optimistic locking + let mut builder = self .client .0 .update_table() @@ -787,6 +811,12 @@ impl Catalog for GlueCatalog { staged_table.metadata().properties(), Some(current_metadata_location), )?); + + // Add VersionId for optimistic locking + if let Some(version_id) = current_version_id { + builder = builder.version_id(version_id); + } + let builder = with_catalog_id!(builder, self.config); let _ = builder.send().await.map_err(|e| { let error = e.into_service_error();