diff --git a/Cargo.toml b/Cargo.toml index df8c29594..fdda392ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,9 @@ members = [ lto = true codegen-units = 1 +[patch.crates-io] +arrow2 = { git = "https://github.com/blaze-init/arrow2.git", branch = "shuffle_ipc" } + # If you want to try your local work of arrow-df instead of directly from github, # uncomment the patch below and adjust the paths accordingly. # [patch.'https://github.com/blaze-init/arrow-datafusion.git'] diff --git a/datafusion-ext/src/lib.rs b/datafusion-ext/src/lib.rs index f7eec6b36..8082293f5 100644 --- a/datafusion-ext/src/lib.rs +++ b/datafusion-ext/src/lib.rs @@ -17,5 +17,6 @@ #![allow(unused_imports)] +pub mod shuffle_reader; pub mod shuffle_writer; pub mod task_runner; diff --git a/datafusion-ext/src/shuffle_reader.rs b/datafusion-ext/src/shuffle_reader.rs new file mode 100644 index 000000000..6cbb91424 --- /dev/null +++ b/datafusion-ext/src/shuffle_reader.rs @@ -0,0 +1,246 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::error::{ArrowError, Result as ArrowResult}; +use datafusion::error::{DataFusionError, Result}; +use std::any::Any; +use std::cell::RefCell; +use std::fmt::{Debug, Formatter}; +use std::fs::{self, File, Metadata}; +use std::io::{BufReader, Cursor, Read, Seek, SeekFrom}; + +use arrow::datatypes::SchemaRef; +use arrow::io::ipc::read::{ + read_file_segment_metadata, FileMetadata as IPCMeta, FileReader as IPCReader, + FileReader, +}; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use datafusion::physical_plan::common::AbortOnDropSingle; +use datafusion::physical_plan::{ + ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, +}; +use futures::{Stream, StreamExt}; +use log::info; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::task::JoinHandle; +use tokio_stream::wrappers::ReceiverStream; + +pub trait ReadSeek: Read + Seek {} + +impl ReadSeek for std::fs::File {} + +impl ReadSeek for std::io::BufReader {} + +impl> ReadSeek for std::io::Cursor {} + +pub type BatchIter = Box> + Send + Sync>; + +#[derive(Debug)] +pub enum Content { + File(String), + // FIXME: raw pointer from JNI to avoid copy + Mem(Vec), +} + +#[derive(Debug)] +pub struct Block { + pub content: Content, + pub offset: usize, + pub length: usize, +} + +#[derive(Debug, Clone)] +pub struct ShuffleReaderExec { + pub tx: Arc>>>>, + pub schema: SchemaRef, +} + +impl ShuffleReaderExec { + pub fn new(schema: SchemaRef) -> Self { + Self { + tx: Arc::new(Mutex::new(None)), + schema, + } + } + + // execute(i) first to set up block sending channel, get sender here to insert shuffle blocks + pub fn tx(&self) -> Sender> { + let tx = self.tx.lock().unwrap(); + tx.clone().unwrap() + } +} + +#[async_trait] +impl ExecutionPlan for ShuffleReaderExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + &self, + _children: Vec>, + ) -> Result> { + Err(DataFusionError::Plan( + "Blaze ShuffleReaderExec does not support with_new_children()".to_owned(), + )) + } + + async fn execute(&self, partition: usize) -> Result { + info!("ShuffleReader for partition {}", partition); + let (tx, rx) = tokio::sync::mpsc::channel(2); + let mut stx = self.tx.lock().unwrap(); + *stx = Some(tx); + Ok(Box::pin(ShuffleReaderStream::new(rx, self.schema.clone()))) + } + + fn statistics(&self) -> Statistics { + Statistics::default() + } +} + +struct ShuffleReaderStream { + inner: ReceiverStream>, + schema: SchemaRef, + batch_iter: BatchIter, +} + +impl ShuffleReaderStream { + fn new(rx: Receiver>, schema: SchemaRef) -> Self { + Self { + inner: ReceiverStream::new(rx), + schema, + batch_iter: Box::new(std::iter::empty()), + } + } +} + +impl Stream for ShuffleReaderStream { + type Item = ArrowResult; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let item = self.batch_iter.next(); + match item { + None => { + let poll = self.inner.poll_next_unpin(cx); + match poll { + Poll::Ready(Some(Ok(block))) => { + let start = block.offset; + let end = block.offset + block.length; + match block.content { + Content::File(f) => { + let f = BufReader::new(File::open(f).unwrap()); + let r = MultiIPCReader::new(f, start, end).unwrap(); + self.as_mut().batch_iter = Box::new(r); + } + Content::Mem(m) => { + let c = Cursor::new(m); + let r = MultiIPCReader::new(c, start, end).unwrap(); + self.as_mut().batch_iter = Box::new(r); + } + } + self.poll_next(cx) + } + + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(Err(e))) => { + Poll::Ready(Some(Err(e.into_arrow_external_error()))) + } + } + } + Some(b) => Poll::Ready(Some(b)), + } + } +} + +impl RecordBatchStream for ShuffleReaderStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +fn read_meta(reader: &mut R, end: usize) -> Result<(usize, IPCMeta)> { + let ipc_end = end - 8; + reader.seek(SeekFrom::Start(ipc_end as u64))?; + let mut meta_buf = [0; 8]; + reader.read_exact(&mut meta_buf)?; + let ipc_length = i64::from_le_bytes(meta_buf) as usize; + let ipc_start = ipc_end - ipc_length; + + let ipc_meta = read_file_segment_metadata(reader, ipc_start as u64, ipc_end as u64) + .map_err(DataFusionError::ArrowError)?; + Ok((ipc_start, ipc_meta)) +} + +pub struct MultiIPCReader { + start: usize, + current_start: usize, + reader: Option>, +} + +impl MultiIPCReader { + fn new(mut reader: R, start: usize, end: usize) -> Result { + let (current_start, meta) = read_meta(&mut reader, end)?; + Ok(Self { + start, + current_start, + reader: Some(IPCReader::new(reader, meta, None)), + }) + } +} + +impl Iterator for MultiIPCReader { + type Item = ArrowResult; + + fn next(&mut self) -> Option { + let next = self.reader.as_mut().unwrap().next(); + match next { + None => { + if self.current_start == self.start { + None + } else { + let mut r = self.reader.take().unwrap().into_inner(); + let (new_start, meta) = + read_meta(&mut r, self.current_start).unwrap(); + self.reader = Some(IPCReader::new(r, meta, None)); + self.current_start = new_start; + self.next() + } + } + item => item, + } + } +}