Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 119 additions & 7 deletions crates/iceberg/src/transaction/sort_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
#[derive(Debug, PartialEq, Eq, Clone)]
struct PendingSortField {
name: String,
transform: Transform,
direction: SortDirection,
null_order: NullOrder,
}
Expand All @@ -46,7 +47,7 @@ impl PendingSortField {

Ok(SortField::builder()
.source_id(field_id)
.transform(Transform::Identity)
.transform(self.transform)
.direction(self.direction)
.null_order(self.null_order)
.build())
Expand All @@ -65,24 +66,58 @@ impl ReplaceSortOrderAction {
}
}

/// Adds a field for sorting in ascending order.
/// Adds a field for sorting in ascending order, sorting by the column's raw value
/// (an identity transform). To sort by a transform of the column instead (e.g.
/// `bucket[N]`, `year`, `truncate[W]`), use [`Self::asc_with_transform`].
pub fn asc(self, name: &str, null_order: NullOrder) -> Self {
self.add_sort_field(name, SortDirection::Ascending, null_order)
self.asc_with_transform(name, Transform::Identity, null_order)
}

/// Adds a field for sorting in descending order.
/// Adds a field for sorting in descending order, sorting by the column's raw value
/// (an identity transform). To sort by a transform of the column instead (e.g.
/// `bucket[N]`, `year`, `truncate[W]`), use [`Self::desc_with_transform`].
pub fn desc(self, name: &str, null_order: NullOrder) -> Self {
self.add_sort_field(name, SortDirection::Descending, null_order)
self.desc_with_transform(name, Transform::Identity, null_order)
}

/// Adds a field for sorting in ascending order by a transform of the column's value
/// (e.g. `Transform::Bucket(16)`, `Transform::Year`, `Transform::Truncate(4)`).
///
/// Whether the transform is valid for the column's type is checked at commit time,
/// once the table schema is available (mirroring Java's `SortOrder.Builder.build()`).
pub fn asc_with_transform(
self,
name: &str,
transform: Transform,
null_order: NullOrder,
) -> Self {
self.add_sort_field(name, transform, SortDirection::Ascending, null_order)
}

/// Adds a field for sorting in descending order by a transform of the column's value
/// (e.g. `Transform::Bucket(16)`, `Transform::Year`, `Transform::Truncate(4)`).
///
/// Whether the transform is valid for the column's type is checked at commit time,
/// once the table schema is available (mirroring Java's `SortOrder.Builder.build()`).
pub fn desc_with_transform(
self,
name: &str,
transform: Transform,
null_order: NullOrder,
) -> Self {
self.add_sort_field(name, transform, SortDirection::Descending, null_order)
}

fn add_sort_field(
mut self,
name: &str,
transform: Transform,
sort_direction: SortDirection,
null_order: NullOrder,
) -> Self {
self.pending_sort_fields.push(PendingSortField {
name: name.to_string(),
transform,
direction: sort_direction,
null_order,
});
Expand Down Expand Up @@ -135,10 +170,11 @@ impl TransactionAction for ReplaceSortOrderAction {
mod tests {
use as_any::Downcast;

use crate::spec::{NullOrder, SortDirection};
use crate::ErrorKind;
use crate::spec::{NullOrder, SortDirection, Transform};
use crate::transaction::sort_order::{PendingSortField, ReplaceSortOrderAction};
use crate::transaction::tests::make_v2_table;
use crate::transaction::{ApplyTransactionAction, Transaction};
use crate::transaction::{ApplyTransactionAction, Transaction, TransactionAction};

#[test]
fn test_replace_sort_order() {
Expand All @@ -159,14 +195,90 @@ mod tests {
assert_eq!(replace_sort_order.pending_sort_fields, vec![
PendingSortField {
name: String::from("x"),
transform: Transform::Identity,
direction: SortDirection::Ascending,
null_order: NullOrder::First,
},
PendingSortField {
name: String::from("y"),
transform: Transform::Identity,
direction: SortDirection::Descending,
null_order: NullOrder::Last,
}
]);
}

#[test]
fn test_replace_sort_order_with_transform() {
let table = make_v2_table();
let tx = Transaction::new(&table);
let replace_sort_order = tx.replace_sort_order();

let tx = replace_sort_order
.asc_with_transform("x", Transform::Bucket(16), NullOrder::First)
.desc_with_transform("y", Transform::Truncate(4), NullOrder::Last)
.apply(tx)
.unwrap();

let replace_sort_order = (*tx.actions[0])
.downcast_ref::<ReplaceSortOrderAction>()
.unwrap();

assert_eq!(replace_sort_order.pending_sort_fields, vec![
PendingSortField {
name: String::from("x"),
transform: Transform::Bucket(16),
direction: SortDirection::Ascending,
null_order: NullOrder::First,
},
PendingSortField {
name: String::from("y"),
transform: Transform::Truncate(4),
direction: SortDirection::Descending,
null_order: NullOrder::Last,
}
]);
}

#[tokio::test]
async fn test_replace_sort_order_with_transform_commits() {
use std::sync::Arc;

use crate::TableUpdate;

let table = make_v2_table();
let action = Arc::new(ReplaceSortOrderAction::new().asc_with_transform(
"x",
Transform::Bucket(16),
NullOrder::First,
));

let mut action_commit = TransactionAction::commit(action, &table).await.unwrap();
let updates = action_commit.take_updates();

let sort_order = match &updates[0] {
TableUpdate::AddSortOrder { sort_order } => sort_order,
other => panic!("expected AddSortOrder, got {other:?}"),
};
assert_eq!(sort_order.fields[0].transform, Transform::Bucket(16));
}

#[tokio::test]
async fn test_replace_sort_order_rejects_incompatible_transform() {
use std::sync::Arc;

let table = make_v2_table();
// `x` is a `long` column; `year` only accepts date/timestamp types.
let action = Arc::new(ReplaceSortOrderAction::new().asc_with_transform(
"x",
Transform::Year,
NullOrder::First,
));

let err = match TransactionAction::commit(action, &table).await {
Err(e) => e,
Ok(_) => panic!("year transform on a long column should be rejected"),
};
assert_eq!(err.kind(), ErrorKind::Unexpected);
}
}
Loading