Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 79 additions & 5 deletions core/src/services/http/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ impl Accessor for HttpBackend {
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let resp = self.http_get(path, args.range()).await?;
let resp = self
.http_get(path, args.range(), args.if_none_match())
.await?;

let status = resp.status();

Expand All @@ -276,13 +278,13 @@ impl Accessor for HttpBackend {
}
}

async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
// Stat root always returns a DIR.
if path == "/" {
return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
}

let resp = self.http_head(path).await?;
let resp = self.http_head(path, args.if_none_match()).await?;

let status = resp.status();

Expand All @@ -299,13 +301,22 @@ impl Accessor for HttpBackend {
}

impl HttpBackend {
async fn http_get(&self, path: &str, range: BytesRange) -> Result<Response<IncomingAsyncBody>> {
async fn http_get(
&self,
path: &str,
range: BytesRange,
if_none_match: Option<&str>,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_rooted_abs_path(&self.root, path);

let url = format!("{}{}", self.endpoint, percent_encode_path(&p));

let mut req = Request::get(&url);

if let Some(if_none_match) = if_none_match {
req = req.header(http::header::IF_NONE_MATCH, if_none_match);
}

if let Some(auth) = &self.authorization {
req = req.header(header::AUTHORIZATION, auth.clone())
}
Expand All @@ -321,13 +332,21 @@ impl HttpBackend {
self.client.send(req).await
}

async fn http_head(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
async fn http_head(
&self,
path: &str,
if_none_match: Option<&str>,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_rooted_abs_path(&self.root, path);

let url = format!("{}{}", self.endpoint, percent_encode_path(&p));

let mut req = Request::head(&url);

if let Some(if_none_match) = if_none_match {
req = req.header(http::header::IF_NONE_MATCH, if_none_match);
}

if let Some(auth) = &self.authorization {
req = req.header(header::AUTHORIZATION, auth.clone())
}
Expand All @@ -345,6 +364,7 @@ mod tests {
use anyhow::Result;
use wiremock::matchers::basic_auth;
use wiremock::matchers::bearer_token;
use wiremock::matchers::headers;
use wiremock::matchers::method;
use wiremock::matchers::path;
use wiremock::Mock;
Expand Down Expand Up @@ -461,4 +481,58 @@ mod tests {
assert_eq!(bs.content_length(), 128);
Ok(())
}

#[tokio::test]
async fn test_read_with() -> Result<()> {
let _ = env_logger::builder().is_test(true).try_init();

let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/hello"))
.and(headers("if-none-match", vec!["*"]))
.respond_with(
ResponseTemplate::new(200)
.insert_header("content-length", "13")
.set_body_string("Hello, World!"),
)
.mount(&mock_server)
.await;

let mut builder = HttpBuilder::default();
builder.endpoint(&mock_server.uri());
builder.root("/");
let op = Operator::new(builder)?.finish();

let match_bs = op
.read_with("hello", OpRead::new().with_if_none_match("*"))
.await?;
assert_eq!(match_bs, b"Hello, World!");

Ok(())
}

#[tokio::test]
async fn test_stat_with() -> Result<()> {
let _ = env_logger::builder().is_test(true).try_init();

let mock_server = MockServer::start().await;
Mock::given(method("HEAD"))
.and(path("/hello"))
.and(headers("if-none-match", vec!["*"]))
.respond_with(ResponseTemplate::new(200).insert_header("content-length", "128"))
.mount(&mock_server)
.await;

let mut builder = HttpBuilder::default();
builder.endpoint(&mock_server.uri());
builder.root("/");
let op = Operator::new(builder)?.finish();
let bs = op
.stat_with("hello", OpStat::new().with_if_none_match("*"))
.await?;

assert_eq!(bs.mode(), EntryMode::FILE);
assert_eq!(bs.content_length(), 128);
Ok(())
}
}
96 changes: 92 additions & 4 deletions core/src/types/operator/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,45 @@ impl Operator {
/// # }
/// ```
pub async fn stat(&self, path: &str) -> Result<Metadata> {
self.stat_with(path, OpStat::new()).await
}

/// Get current path's metadata **without cache** directly with extra options.
///
/// # Notes
///
/// Use `stat` if you:
///
/// - Want detect the outside changes of path.
/// - Don't want to read from cached metadata.
///
/// You may want to use `metadata` if you are working with entries
/// returned by [`Lister`]. It's highly possible that metadata
/// you want has already been cached.
///
/// # Examples
///
/// ```
/// # use anyhow::Result;
/// # use futures::io;
/// # use opendal::Operator;
/// # use opendal::ops::OpStat;
/// use opendal::ErrorKind;
/// #
/// # #[tokio::main]
/// # async fn test(op: Operator) -> Result<()> {
/// if let Err(e) = op.stat_with("test", OpStat::new()).await {
/// if e.kind() == ErrorKind::NotFound {
/// println!("file not exist")
/// }
/// }
/// # Ok(())
/// # }
/// ```
pub async fn stat_with(&self, path: &str, args: OpStat) -> Result<Metadata> {
let path = normalize_path(path);

let rp = self.inner().stat(&path, OpStat::new()).await?;
let rp = self.inner().stat(&path, args).await?;
let meta = rp.into_metadata();

Ok(meta)
Expand Down Expand Up @@ -382,6 +418,28 @@ impl Operator {
self.range_read(path, ..).await
}

/// Read the whole path into a bytes with extra options.
///
/// This function will allocate a new bytes internally. For more precise memory control or
/// reading data lazily, please use [`Operator::reader`]
///
/// # Examples
///
/// ```
/// # use std::io::Result;
/// # use opendal::Operator;
/// # use opendal::ops::OpRead;
/// # use futures::TryStreamExt;
/// # #[tokio::main]
/// # async fn test(op: Operator) -> Result<()> {
/// let bs = op.read_with("path/to/file", OpRead::new()).await?;
/// # Ok(())
/// # }
/// ```
pub async fn read_with(&self, path: &str, args: OpRead) -> Result<Vec<u8>> {
self.range_read_with(path, .., args).await
}

/// Read the specified range of path into a bytes.
///
/// This function will allocate a new bytes internally. For more precise memory control or
Expand All @@ -396,6 +454,7 @@ impl Operator {
/// ```
/// # use std::io::Result;
/// # use opendal::Operator;
/// # use opendal::ops::OpRead;
/// # use futures::TryStreamExt;
/// # #[tokio::main]
/// # async fn test(op: Operator) -> Result<()> {
Expand All @@ -404,6 +463,37 @@ impl Operator {
/// # }
/// ```
pub async fn range_read(&self, path: &str, range: impl RangeBounds<u64>) -> Result<Vec<u8>> {
self.range_read_with(path, range, OpRead::new()).await
}

/// Read the specified range of path into a bytes with extra options..
///
/// This function will allocate a new bytes internally. For more precise memory control or
/// reading data lazily, please use [`Operator::range_reader`]
///
/// # Notes
///
/// - The returning content's length may be smaller than the range specified.
///
/// # Examples
///
/// ```
/// # use std::io::Result;
/// # use opendal::Operator;
/// # use opendal::ops::OpRead;
/// # use futures::TryStreamExt;
/// # #[tokio::main]
/// # async fn test(op: Operator) -> Result<()> {
/// let bs = op.range_read_with("path/to/file", 1024..2048, OpRead::new()).await?;
/// # Ok(())
/// # }
/// ```
pub async fn range_read_with(
&self,
path: &str,
range: impl RangeBounds<u64>,
args: OpRead,
) -> Result<Vec<u8>> {
let path = normalize_path(path);

if !validate_path(&path, EntryMode::FILE) {
Expand All @@ -417,9 +507,7 @@ impl Operator {

let br = BytesRange::from(range);

let op = OpRead::new().with_range(br);

let (rp, mut s) = self.inner().read(&path, op).await?;
let (rp, mut s) = self.inner().read(&path, args.with_range(br)).await?;

let length = rp.into_metadata().content_length() as usize;
let mut buffer = Vec::with_capacity(length);
Expand Down