Skip to content

Commit

Permalink
fix: Metakeys are not propagated with the blocking operators (#3116)
Browse files Browse the repository at this point in the history
* Add test

Signed-off-by: Xuanwo <github@xuanwo.io>

* Implement metakey for blocking lister

Signed-off-by: Xuanwo <github@xuanwo.io>

---------

Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Sep 18, 2023
1 parent 3fdfa56 commit fbe8543
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 14 deletions.
46 changes: 37 additions & 9 deletions core/src/types/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ impl Stream for Lister {
///
/// Users can construct Lister by `blocking_lister`.
pub struct BlockingLister {
pager: oio::BlockingPager,
acc: FusedAccessor,
/// required_metakey is the metakey required by users.
required_metakey: FlagSet<Metakey>,

pager: Option<oio::BlockingPager>,
buf: VecDeque<oio::Entry>,
}

Expand All @@ -153,11 +157,17 @@ unsafe impl Sync for BlockingLister {}

impl BlockingLister {
/// Create a new lister.
pub(crate) fn new(pager: oio::BlockingPager) -> Self {
Self {
pager,
buf: VecDeque::default(),
}
pub(crate) fn create(acc: FusedAccessor, path: &str, args: OpList) -> Result<Self> {
let required_metakey = args.metakey();
let (_, pager) = acc.blocking_list(path, args)?;

Ok(Self {
acc,
required_metakey,

buf: VecDeque::new(),
pager: Some(pager),
})
}
}

Expand All @@ -167,15 +177,33 @@ impl Iterator for BlockingLister {

fn next(&mut self) -> Option<Self::Item> {
if let Some(oe) = self.buf.pop_front() {
return Some(Ok(oe.into_entry()));
let (path, metadata) = oe.into_entry().into_parts();
// TODO: we can optimize this by checking the provided metakey provided by services.
if metadata.contains_bit(self.required_metakey) {
return Some(Ok(Entry::new(path, metadata)));
}

let metadata = match self.acc.blocking_stat(&path, OpStat::default()) {
Ok(rp) => rp.into_metadata(),
Err(err) => return Some(Err(err)),
};
return Some(Ok(Entry::new(path, metadata)));
}

self.buf = match self.pager.next() {
let pager = match self.pager.as_mut() {
Some(pager) => pager,
None => return None,
};

self.buf = match pager.next() {
// Ideally, the convert from `Vec` to `VecDeque` will not do reallocation.
//
// However, this could be changed as described in [impl<T, A> From<Vec<T, A>> for VecDeque<T, A>](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E)
Ok(Some(entries)) => entries.into(),
Ok(None) => return None,
Ok(None) => {
self.pager = None;
return None;
}
Err(err) => return Some(Err(err)),
};

Expand Down
7 changes: 2 additions & 5 deletions core/src/types/operator/blocking_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -941,8 +941,7 @@ impl BlockingOperator {
.with_context("path", &path));
}

let (_, pager) = inner.blocking_list(&path, args)?;
let lister = BlockingLister::new(pager);
let lister = BlockingLister::create(inner, &path, args)?;

lister.collect()
},
Expand Down Expand Up @@ -1116,9 +1115,7 @@ impl BlockingOperator {
.with_context("path", &path));
}

let (_, pager) = inner.blocking_list(&path, args)?;

Ok(BlockingLister::new(pager))
BlockingLister::create(inner, &path, args)
},
))
}
Expand Down
91 changes: 91 additions & 0 deletions core/tests/behavior/blocking_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub fn behavior_blocking_list_tests(op: &Operator) -> Vec<Trial> {
blocking_trials!(
op,
test_blocking_list_dir,
test_blocking_list_dir_with_metakey,
test_blocking_list_dir_with_metakey_complete,
test_blocking_list_non_exist_dir,
test_blocking_scan,
test_blocking_remove_all
Expand Down Expand Up @@ -67,6 +69,95 @@ pub fn test_blocking_list_dir(op: BlockingOperator) -> Result<()> {
Ok(())
}

/// List dir with metakey
pub fn test_blocking_list_dir_with_metakey(op: BlockingOperator) -> Result<()> {
let parent = uuid::Uuid::new_v4().to_string();
let path = format!("{parent}/{}", uuid::Uuid::new_v4());
debug!("Generate a random file: {}", &path);
let (content, size) = gen_bytes();

op.write(&path, content).expect("write must succeed");

let mut obs = op
.lister_with(&format!("{parent}/"))
.metakey(
Metakey::Mode
| Metakey::CacheControl
| Metakey::ContentDisposition
| Metakey::ContentLength
| Metakey::ContentMd5
| Metakey::ContentRange
| Metakey::ContentType
| Metakey::Etag
| Metakey::LastModified
| Metakey::Version,
)
.call()?;
let mut found = false;
while let Some(de) = obs.next().transpose()? {
let meta = de.metadata();
if de.path() == path {
assert_eq!(meta.mode(), EntryMode::FILE);
assert_eq!(meta.content_length(), size as u64);

// We don't care about the value, we just to check there is no panic.
let _ = meta.cache_control();
let _ = meta.content_disposition();
let _ = meta.content_md5();
let _ = meta.content_range();
let _ = meta.content_type();
let _ = meta.etag();
let _ = meta.last_modified();
let _ = meta.version();

found = true
}
}
assert!(found, "file should be found in list");

op.delete(&path).expect("delete must succeed");
Ok(())
}

/// List dir with metakey complete
pub fn test_blocking_list_dir_with_metakey_complete(op: BlockingOperator) -> Result<()> {
let parent = uuid::Uuid::new_v4().to_string();
let path = format!("{parent}/{}", uuid::Uuid::new_v4());
debug!("Generate a random file: {}", &path);
let (content, size) = gen_bytes();

op.write(&path, content).expect("write must succeed");

let mut obs = op
.lister_with(&format!("{parent}/"))
.metakey(Metakey::Complete)
.call()?;
let mut found = false;
while let Some(de) = obs.next().transpose()? {
let meta = de.metadata();
if de.path() == path {
assert_eq!(meta.mode(), EntryMode::FILE);
assert_eq!(meta.content_length(), size as u64);

// We don't care about the value, we just to check there is no panic.
let _ = meta.cache_control();
let _ = meta.content_disposition();
let _ = meta.content_md5();
let _ = meta.content_range();
let _ = meta.content_type();
let _ = meta.etag();
let _ = meta.last_modified();
let _ = meta.version();

found = true
}
}
assert!(found, "file should be found in list");

op.delete(&path).expect("delete must succeed");
Ok(())
}

/// List non exist dir should return nothing.
pub fn test_blocking_list_non_exist_dir(op: BlockingOperator) -> Result<()> {
let dir = format!("{}/", uuid::Uuid::new_v4());
Expand Down

0 comments on commit fbe8543

Please sign in to comment.