Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 69 additions & 39 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,62 @@ impl GlueCatalog {
pub fn file_io(&self) -> FileIO {
self.file_io.clone()
}

/// Loads a table from the Glue Catalog along with its version_id for optimistic locking.
///
/// # Returns
/// A `Result` wrapping a tuple of (`Table`, `Option<String>`) where the String is the version_id
/// from Glue that should be used for optimistic concurrency control when updating the table.
///
/// # Errors
/// This function may return an error in several scenarios, including:
/// - Failure to validate the namespace.
/// - Failure to retrieve the table from the Glue Catalog.
/// - Absence of metadata location information in the table's properties.
/// - Issues reading or deserializing the table's metadata file.
async fn load_table_with_version_id(
&self,
table: &TableIdent,
) -> Result<(Table, Option<String>)> {
let db_name = validate_namespace(table.namespace())?;
let table_name = table.name();

let builder = self
.client
.0
.get_table()
.database_name(&db_name)
.name(table_name);
let builder = with_catalog_id!(builder, self.config);

let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;

let glue_table = glue_table_output.table().ok_or_else(|| {
Error::new(
ErrorKind::TableNotFound,
format!(
"Table object for database: {db_name} and table: {table_name} does not exist"
),
)
})?;

let version_id = glue_table.version_id.clone();
let metadata_location = get_metadata_location(&glue_table.parameters)?;

let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;

let table = Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(
NamespaceIdent::new(db_name),
table_name.to_owned(),
))
.build()?;

Ok((table, version_id))
}
}

#[async_trait]
Expand Down Expand Up @@ -514,42 +570,8 @@ impl Catalog for GlueCatalog {
/// - Absence of metadata location information in the table's properties.
/// - Issues reading or deserializing the table's metadata file.
async fn load_table(&self, table: &TableIdent) -> Result<Table> {
let db_name = validate_namespace(table.namespace())?;
let table_name = table.name();

let builder = self
.client
.0
.get_table()
.database_name(&db_name)
.name(table_name);
let builder = with_catalog_id!(builder, self.config);

let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;

match glue_table_output.table() {
None => Err(Error::new(
ErrorKind::TableNotFound,
format!(
"Table object for database: {db_name} and table: {table_name} does not exist"
),
)),
Some(table) => {
let metadata_location = get_metadata_location(&table.parameters)?;

let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;

Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(
NamespaceIdent::new(db_name),
table_name.to_owned(),
))
.build()
}
}
let (table, _) = self.load_table_with_version_id(table).await?;
Ok(table)
}

/// Asynchronously drops a table from the database.
Expand Down Expand Up @@ -761,7 +783,9 @@ impl Catalog for GlueCatalog {
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
let table_ident = commit.identifier().clone();
let table_namespace = validate_namespace(table_ident.namespace())?;
let current_table = self.load_table(&table_ident).await?;

let (current_table, current_version_id) =
self.load_table_with_version_id(&table_ident).await?;
let current_metadata_location = current_table.metadata_location_result()?.to_string();

let staged_table = commit.apply(current_table)?;
Expand All @@ -773,8 +797,8 @@ impl Catalog for GlueCatalog {
.write_to(staged_table.file_io(), staged_metadata_location)
.await?;

// Persist staged table to Glue
let builder = self
// Persist staged table to Glue with optimistic locking
let mut builder = self
.client
.0
.update_table()
Expand All @@ -787,6 +811,12 @@ impl Catalog for GlueCatalog {
staged_table.metadata().properties(),
Some(current_metadata_location),
)?);

// Add VersionId for optimistic locking
if let Some(version_id) = current_version_id {
builder = builder.version_id(version_id);
}

let builder = with_catalog_id!(builder, self.config);
let _ = builder.send().await.map_err(|e| {
let error = e.into_service_error();
Expand Down
Loading