Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 55 additions & 11 deletions core/core/src/types/read/buffer_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,20 @@ impl oio::Read for StreamingReader {
}
}

/// Input for a chunked read task.
struct ChunkedReadInput {
ctx: Arc<ReadContext>,
range: BytesRange,
}

/// ChunkedReader will read the file in chunks.
///
/// ChunkedReader is good for concurrent read and optimized for throughput.
pub struct ChunkedReader {
generator: ReadGenerator,
tasks: ConcurrentTasks<oio::Reader, Buffer>,
ctx: Arc<ReadContext>,
offset: u64,
remaining: Option<u64>,
tasks: ConcurrentTasks<ChunkedReadInput, Buffer>,
done: bool,
}

Expand All @@ -91,29 +99,65 @@ impl ChunkedReader {
ctx.accessor().info().executor(),
ctx.options().concurrent(),
ctx.options().prefetch(),
|mut r: oio::Reader| {
Box::pin(async {
match r.read_all().await {
Ok(buf) => (r, Ok(buf)),
Err(err) => (r, Err(err)),
|input: ChunkedReadInput| {
Box::pin(async move {
let args = input.ctx.args().clone().with_range(input.range);
let result = async {
let (_, mut r) = input.ctx.accessor().read(input.ctx.path(), args).await?;
r.read_all().await
}
.await;
(input, result)
})
},
);
let generator = ReadGenerator::new(ctx, range.offset(), range.size());
Self {
generator,
ctx,
offset: range.offset(),
remaining: range.size(),
tasks,
done: false,
}
}

/// Generate the next range to read, advancing internal state.
fn next_range(&mut self) -> Option<BytesRange> {
if self.remaining == Some(0) {
return None;
}

let next_offset = self.offset;
let next_size = match self.remaining {
None => {
self.remaining = Some(0);
None
}
Some(remaining) => {
let read_size = self
.ctx
.options()
.chunk()
.map_or(remaining, |chunk| remaining.min(chunk as u64));
self.offset += read_size;
self.remaining = Some(remaining - read_size);
Some(read_size)
}
};

Some(BytesRange::new(next_offset, next_size))
}
}

impl oio::Read for ChunkedReader {
async fn read(&mut self) -> Result<Buffer> {
while self.tasks.has_remaining() && !self.done {
if let Some(r) = self.generator.next_reader().await? {
self.tasks.execute(r).await?;
if let Some(range) = self.next_range() {
self.tasks
.execute(ChunkedReadInput {
ctx: self.ctx.clone(),
range,
})
.await?;
} else {
self.done = true;
break;
Expand Down
94 changes: 94 additions & 0 deletions core/layers/concurrent-limit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ impl<R: oio::Delete, P: Send + Sync + 'static + Unpin> oio::Delete
mod tests {
use super::*;
use opendal_core::Operator;
use opendal_core::OperatorBuilder;
use opendal_core::services;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -397,6 +398,99 @@ mod tests {
);
}

#[tokio::test]
async fn concurrent_chunked_read_with_http_limit() {
use opendal_core::raw::*;

struct EchoFetcher;

impl HttpFetch for EchoFetcher {
async fn fetch(&self, req: http::Request<Buffer>) -> Result<http::Response<HttpBody>> {
let data = req.into_body();
let len = data.len() as u64;
let body =
HttpBody::new(Box::pin(stream::once(async move { Ok(data) })), Some(len));
Ok(http::Response::builder()
.status(http::StatusCode::OK)
.body(body)
.unwrap())
}
}

#[derive(Clone, Debug)]
struct HttpBackend {
info: Arc<AccessorInfo>,
content: Buffer,
}

impl Access for HttpBackend {
type Reader = HttpBody;
type Writer = ();
type Lister = ();
type Deleter = ();

fn info(&self) -> Arc<AccessorInfo> {
self.info.clone()
}

async fn read(&self, _: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let range = args.range();
let start = range.offset() as usize;
let data = match range.size() {
Some(sz) => self.content.slice(start..start + sz as usize),
None => self.content.slice(start..),
};
let req = http::Request::get("http://fake").body(data).unwrap();
let resp = self.info.http_client().fetch(req).await?;
Ok((RpRead::default(), resp.into_body()))
}

async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
Ok(RpStat::new(
Metadata::new(EntryMode::FILE).with_content_length(self.content.len() as u64),
))
}

async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
Err(Error::new(ErrorKind::Unsupported, "not needed"))
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
Err(Error::new(ErrorKind::Unsupported, "not needed"))
}
async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
Err(Error::new(ErrorKind::Unsupported, "not needed"))
}
}

let content = Buffer::from(vec![0u8; 4096]);
let info = Arc::new(AccessorInfo::default());
info.update_http_client(|_| HttpClient::with(EchoFetcher));

let op = OperatorBuilder::new(HttpBackend {
info,
content: content.clone(),
})
.layer(ConcurrentLimitLayer::new(1024).with_http_concurrent_limit(2))
.finish();

// chunk=256 ⇒ 16 HTTP requests, concurrent=4, but only 2 HTTP permits.
let result = timeout(Duration::from_secs(5), async {
op.reader_with("test")
.chunk(256)
.concurrent(4)
.await
.expect("reader must build")
.read(..)
.await
})
.await;

let buf = result
.expect("read must not deadlock (timeout)")
.expect("read must succeed");
assert_eq!(buf.to_bytes(), content.to_bytes());
}

#[tokio::test]
async fn http_semaphore_holds_until_body_dropped() {
struct DummyFetcher;
Expand Down
Loading