Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/iceberg/src/arrow/record_batch_transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,8 @@ mod test {
let projected_iceberg_field_ids = [1, 2, 3];

let mut transformer =
RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids);
RecordBatchTransformerBuilder::new(snapshot_schema, &projected_iceberg_field_ids)
.build();

let file_schema = Arc::new(ArrowSchema::new(vec![
simple_field("id", DataType::Int32, false, "1"),
Expand Down
38 changes: 35 additions & 3 deletions crates/iceberg/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ use url::Url;
use super::storage::Storage;
use crate::{Error, ErrorKind, Result};

/// Write mode for output files.
///
/// This controls whether to use append mode when writing to storage.
/// Note: This is different from OpenDAL's internal write strategy
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum WriteMode {
/// Standard write mode (default)..
#[default]
Standard,
/// Append mode (required for some storage backends like AZDLS).
Append,
}

/// FileIO implementation, used to manipulate files in underlying storage.
///
/// # Note
Expand Down Expand Up @@ -144,6 +157,7 @@ impl FileIO {
let (op, relative_path) = self.inner.create_operator(&path)?;
let path = path.as_ref().to_string();
let relative_path_pos = path.len() - relative_path.len();

Ok(InputFile {
op,
path,
Expand All @@ -160,10 +174,22 @@ impl FileIO {
let (op, relative_path) = self.inner.create_operator(&path)?;
let path = path.as_ref().to_string();
let relative_path_pos = path.len() - relative_path.len();

// ADLS requires append mode for writes
#[cfg(feature = "storage-azdls")]
let write_mode = if matches!(self.inner.as_ref(), Storage::Azdls { .. }) {
WriteMode::Append
} else {
WriteMode::Standard
};
#[cfg(not(feature = "storage-azdls"))]
let write_mode = WriteMode::Standard;

Ok(OutputFile {
op,
path,
relative_path_pos,
write_mode,
})
}
}
Expand Down Expand Up @@ -409,6 +435,8 @@ pub struct OutputFile {
path: String,
// Relative path of file to uri, starts at [`relative_path_pos`]
relative_path_pos: usize,
// Write mode for the file (required for some storage backends like AZDLS)
write_mode: WriteMode,
}

impl OutputFile {
Expand Down Expand Up @@ -456,9 +484,13 @@ impl OutputFile {
///
/// For one-time writing, use [`Self::write`] instead.
pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
Ok(Box::new(
self.op.writer(&self.path[self.relative_path_pos..]).await?,
))
let writer = self
.op
.writer_with(&self.path[self.relative_path_pos..])
.append(self.write_mode == WriteMode::Append)
.await?;

Ok(Box::new(writer))
}
}

Expand Down
Loading