Skip to content

Commit

Permalink
feat: enable concurrent write
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jan 22, 2024
1 parent 2bf4b08 commit f97c0eb
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 26 deletions.
6 changes: 3 additions & 3 deletions src/common/datasource/src/buffered_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub trait ArrowWriterCloser {
impl<
T: AsyncWrite + Send + Unpin,
U: DfRecordBatchEncoder + ArrowWriterCloser,
F: FnMut(String) -> Fut,
F: Fn(String) -> Fut,
Fut: Future<Output = Result<T>>,
> LazyBufferedWriter<T, U, F>
{
Expand Down Expand Up @@ -75,7 +75,7 @@ impl<
impl<
T: AsyncWrite + Send + Unpin,
U: DfRecordBatchEncoder,
F: FnMut(String) -> Fut,
F: Fn(String) -> Fut,
Fut: Future<Output = Result<T>>,
> LazyBufferedWriter<T, U, F>
{
Expand Down Expand Up @@ -149,7 +149,7 @@ impl<
if let Some(ref mut writer) = self.writer {
Ok(writer)
} else {
let writer = (self.writer_factory)(self.path.clone()).await?;
let writer = (self.writer_factory)(self.path.to_string()).await?;
Ok(self.writer.insert(writer))
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/common/datasource/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,15 @@ pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
store: ObjectStore,
path: &str,
threshold: usize,
concurrent: usize,
encoder_factory: U,
) -> Result<usize> {
let buffer = SharedBuffer::with_capacity(threshold);
let encoder = encoder_factory(buffer.clone());
let mut writer = LazyBufferedWriter::new(threshold, buffer, encoder, path, |path| async {
store
.writer(&path)
.writer_with(&path)
.concurrent(concurrent)
.await
.context(error::WriteObjectSnafu { path })
});
Expand Down
3 changes: 2 additions & 1 deletion src/common/datasource/src/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,9 @@ pub async fn stream_to_csv(
store: ObjectStore,
path: &str,
threshold: usize,
concurrent: usize,
) -> Result<usize> {
stream_to_file(stream, store, path, threshold, |buffer| {
stream_to_file(stream, store, path, threshold, concurrent, |buffer| {
csv::Writer::new(buffer)
})
.await
Expand Down
3 changes: 2 additions & 1 deletion src/common/datasource/src/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ pub async fn stream_to_json(
store: ObjectStore,
path: &str,
threshold: usize,
concurrent: usize,
) -> Result<usize> {
stream_to_file(stream, store, path, threshold, |buffer| {
stream_to_file(stream, store, path, threshold, concurrent, |buffer| {
json::LineDelimitedWriter::new(buffer)
})
.await
Expand Down
41 changes: 22 additions & 19 deletions src/common/datasource/src/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::pin::Pin;
use std::result;
use std::sync::Arc;

Expand All @@ -31,7 +29,7 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::future::BoxFuture;
use futures::StreamExt;
use object_store::{ObjectStore, Reader};
use object_store::{ObjectStore, Reader, Writer};
use parquet::basic::{Compression, ZstdLevel};
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;
Expand Down Expand Up @@ -171,22 +169,33 @@ pub struct BufferedWriter {
type InnerBufferedWriter = LazyBufferedWriter<
object_store::Writer,
ArrowWriter<SharedBuffer>,
Box<
dyn FnMut(
String,
)
-> Pin<Box<dyn Future<Output = error::Result<object_store::Writer>> + Send>>
+ Send,
>,
impl Fn(String) -> BoxFuture<'static, Result<Writer>>,
>;

impl BufferedWriter {
fn make_write_factory(
store: ObjectStore,
concurrent: usize,
) -> impl Fn(String) -> BoxFuture<'static, Result<Writer>> {
move |path| {
let store = store.clone();
Box::pin(async move {
store
.writer_with(&path)
.concurrent(concurrent)
.await
.context(error::WriteObjectSnafu { path })
})
}
}

pub async fn try_new(
path: String,
store: ObjectStore,
arrow_schema: SchemaRef,
props: Option<WriterProperties>,
buffer_threshold: usize,
concurrent: usize,
) -> error::Result<Self> {
let buffer = SharedBuffer::with_capacity(buffer_threshold);

Expand All @@ -199,15 +208,7 @@ impl BufferedWriter {
buffer,
arrow_writer,
&path,
Box::new(move |path| {
let store = store.clone();
Box::pin(async move {
store
.writer(&path)
.await
.context(error::WriteObjectSnafu { path })
})
}),
Self::make_write_factory(store, concurrent),
),
})
}
Expand Down Expand Up @@ -236,6 +237,7 @@ pub async fn stream_to_parquet(
store: ObjectStore,
path: &str,
threshold: usize,
concurrent: usize,
) -> Result<usize> {
let write_props = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::default()))
Expand All @@ -247,6 +249,7 @@ pub async fn stream_to_parquet(
schema,
Some(write_props),
threshold,
concurrent,
)
.await?;
let mut rows_written = 0;
Expand Down
1 change: 1 addition & 0 deletions src/common/datasource/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#![feature(assert_matches)]
#![feature(type_alias_impl_trait)]

pub mod buffered_writer;
pub mod compression;
Expand Down
2 changes: 2 additions & 0 deletions src/common/datasource/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
tmp_store.clone(),
&output_path,
threshold(size),
8
)
.await
.is_ok());
Expand Down Expand Up @@ -150,6 +151,7 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz
tmp_store.clone(),
&output_path,
threshold(size),
8
)
.await
.is_ok());
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::IndexerBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENT};

/// A cache for uploading files to remote object stores.
///
Expand Down Expand Up @@ -180,6 +180,7 @@ impl WriteCache {
let mut writer = remote_store
.writer_with(upload_path)
.buffer(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.concurrent(DEFAULT_WRITE_CONCURRENT)
.await
.context(error::OpenDalSnafu)?;

Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ pub(crate) mod version;

/// Default write buffer size, it should be greater than the default minimum upload part of S3 (5mb).
pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);

/// Default number of concurrent write, it only works on object store backend(e.g., S3).
pub const DEFAULT_WRITE_CONCURRENT: usize = 8;
2 changes: 2 additions & 0 deletions src/mito2/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::sst::index::Indexer;
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
use crate::sst::DEFAULT_WRITE_CONCURRENT;

/// Parquet SST writer.
pub struct ParquetWriter {
Expand Down Expand Up @@ -90,6 +91,7 @@ impl ParquetWriter {
write_format.arrow_schema(),
Some(writer_props),
opts.write_buffer_size.as_bytes() as usize,
DEFAULT_WRITE_CONCURRENT,
)
.await
.context(WriteBufferSnafu)?;
Expand Down
6 changes: 6 additions & 0 deletions src/operator/src/statement/copy_table_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ use crate::statement::StatementExecutor;
/// Buffer size to flush data to object stores.
const WRITE_BUFFER_THRESHOLD: ReadableSize = ReadableSize::mb(8);

/// Default number of concurrent write, it only works on object store backend(e.g., S3).
const WRITE_CONCURRENT: usize = 8;

impl StatementExecutor {
async fn stream_to_file(
&self,
Expand All @@ -59,6 +62,7 @@ impl StatementExecutor {
object_store,
path,
threshold,
WRITE_CONCURRENT,
)
.await
.context(error::WriteStreamToFileSnafu { path }),
Expand All @@ -67,6 +71,7 @@ impl StatementExecutor {
object_store,
path,
threshold,
WRITE_CONCURRENT,
)
.await
.context(error::WriteStreamToFileSnafu { path }),
Expand All @@ -75,6 +80,7 @@ impl StatementExecutor {
object_store,
path,
threshold,
WRITE_CONCURRENT,
)
.await
.context(error::WriteStreamToFileSnafu { path }),
Expand Down

0 comments on commit f97c0eb

Please sign in to comment.