Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion crates/paimon/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ impl FileIO {
/// FIXME: how to handle large dir? Better to return a stream instead?
pub async fn list_status(&self, path: &str) -> Result<Vec<FileStatus>> {
let (op, relative_path) = self.storage.create(path)?;
let base_path = &path[..path.len() - relative_path.len()];
// Opendal list() expects directory path to end with `/`.
// use normalize_root to make sure it end with `/`.
let list_path = normalize_root(relative_path);
Expand All @@ -137,7 +138,7 @@ impl FileIO {
statuses.push(FileStatus {
size: meta.content_length(),
is_dir: meta.is_dir(),
path: entry.path().to_string(),
path: format!("{base_path}{}", entry.path()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, format!("{base_path}{}", entry.path()) just add leading / to entry.path()?
Considering openal will always call normalize_path to remove leading /, do we real need to add eading / to entry.path()? Could you just keep entry.path()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@luoyuxia Thanks for the comment. entry.path() in OpenDAL is relative to operator root, while FileStatus.path here is expected to preserve the original FileIO path prefix (e.g., memory:/ or
file:/) for consistency with get_status and follow-up FileIO operations. normalize_path applies to Operator input paths, not to Entry::path() output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. It sounds reasonable to me to return the full path with scheme prefix.

But it doesn't seem to be necessary now for reusing the storage operator. Maybe you could create a separate PR for reusing the storage operator part in which we can revist it further it.

last_modified: meta.last_modified(),
});
}
Expand Down Expand Up @@ -381,6 +382,7 @@ impl OutputFile {

#[cfg(test)]
mod file_action_test {
use std::collections::BTreeSet;
use std::fs;

use super::*;
Expand Down Expand Up @@ -461,6 +463,39 @@ mod file_action_test {
file_io.delete_file(dst).await.unwrap();
}

async fn common_test_list_status_paths(file_io: &FileIO, dir_path: &str) {
if let Some(local_dir) = dir_path.strip_prefix("file:/") {
let _ = fs::remove_dir_all(local_dir);
}

file_io.mkdirs(dir_path).await.unwrap();

let file_a = format!("{dir_path}a.txt");
let file_b = format!("{dir_path}b.txt");
for file in [&file_a, &file_b] {
file_io
.new_output(file)
.unwrap()
.write(Bytes::from("test data"))
.await
.unwrap();
}

let statuses = file_io.list_status(dir_path).await.unwrap();
assert_eq!(statuses.len(), 2);

let expected_paths: BTreeSet<String> =
[file_a.clone(), file_b.clone()].into_iter().collect();
let actual_paths: BTreeSet<String> =
statuses.iter().map(|status| status.path.clone()).collect();
assert_eq!(
actual_paths, expected_paths,
"list_status should return exact entry paths"
);

file_io.delete_dir(dir_path).await.unwrap();
}

#[tokio::test]
async fn test_delete_file_memory() {
let file_io = setup_memory_file_io();
Expand Down Expand Up @@ -501,6 +536,12 @@ mod file_action_test {
)
.await;
}

#[tokio::test]
async fn test_list_status_fs_should_return_entry_paths() {
let file_io = setup_fs_file_io();
common_test_list_status_paths(&file_io, "file:/tmp/test_list_status_paths_fs/").await;
}
}

#[cfg(test)]
Expand Down
Loading