Skip to content

Commit

Permalink
Revert "ARROW-11802: [Rust][DataFusion] Remove use of crossbeam chann…
Browse files Browse the repository at this point in the history
…els to avoid potential deadlocks"

This reverts commit 90f735c.
  • Loading branch information
nevi-me committed Mar 6, 2021
1 parent 209622a commit bd5f465
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 91 deletions.
2 changes: 1 addition & 1 deletion rust/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ parquet = { path = "../parquet", version = "4.0.0-SNAPSHOT", features = ["arrow"
sqlparser = "0.8.0"
clap = "2.33"
rustyline = {version = "7.0", optional = true}
crossbeam = "0.8"
paste = "^1.0"
num_cpus = "1.13.0"
chrono = "0.4"
async-trait = "0.1.41"
futures = "0.3"
pin-project-lite= "^0.2.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio-stream = "0.1"
log = "^0.4"
md-5 = { version = "^0.9.1", optional = true }
sha2 = { version = "^0.9.1", optional = true }
Expand Down
50 changes: 26 additions & 24 deletions rust/datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use super::{
planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream,
};
use crate::physical_plan::{common, ExecutionPlan, Partitioning};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{common, Partitioning};
use crate::{
error::{DataFusionError, Result},
execution::context::ExecutionContextState,
Expand All @@ -54,17 +55,14 @@ use parquet::file::{
statistics::Statistics as ParquetStatistics,
};

use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
use fmt::Debug;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
use tokio::{
sync::mpsc::{channel, Receiver, Sender},
task,
};
use tokio_stream::wrappers::ReceiverStream;
use tokio::task;

use crate::datasource::datasource::Statistics;
use async_trait::async_trait;
use futures::stream::{Stream, StreamExt};
use futures::stream::Stream;

/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -775,9 +773,9 @@ impl ExecutionPlan for ParquetExec {
// because the parquet implementation is not thread-safe, it is necessary to execute
// on a thread and communicate with channels
let (response_tx, response_rx): (
Sender<ArrowResult<RecordBatch>>,
Receiver<ArrowResult<RecordBatch>>,
) = channel(2);
Sender<Option<ArrowResult<RecordBatch>>>,
Receiver<Option<ArrowResult<RecordBatch>>>,
) = bounded(2);

let filenames = self.partitions[partition].filenames.clone();
let projection = self.projection.clone();
Expand All @@ -798,18 +796,17 @@ impl ExecutionPlan for ParquetExec {

Ok(Box::pin(ParquetStream {
schema: self.schema.clone(),
inner: ReceiverStream::new(response_rx),
response_rx,
}))
}
}

fn send_result(
response_tx: &Sender<ArrowResult<RecordBatch>>,
result: ArrowResult<RecordBatch>,
response_tx: &Sender<Option<ArrowResult<RecordBatch>>>,
result: Option<ArrowResult<RecordBatch>>,
) -> Result<()> {
// Note this function is running on its own blockng tokio thread so blocking here is ok.
response_tx
.blocking_send(result)
.send(result)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
Ok(())
}
Expand All @@ -819,7 +816,7 @@ fn read_files(
projection: &[usize],
predicate_builder: &Option<RowGroupPredicateBuilder>,
batch_size: usize,
response_tx: Sender<ArrowResult<RecordBatch>>,
response_tx: Sender<Option<ArrowResult<RecordBatch>>>,
) -> Result<()> {
for filename in filenames {
let file = File::open(&filename)?;
Expand All @@ -836,7 +833,7 @@ fn read_files(
match batch_reader.next() {
Some(Ok(batch)) => {
//println!("ParquetExec got new batch from {}", filename);
send_result(&response_tx, Ok(batch))?
send_result(&response_tx, Some(Ok(batch)))?
}
None => {
break;
Expand All @@ -850,7 +847,7 @@ fn read_files(
// send error to operator
send_result(
&response_tx,
Err(ArrowError::ParquetError(err_msg.clone())),
Some(Err(ArrowError::ParquetError(err_msg.clone()))),
)?;
// terminate thread with error
return Err(DataFusionError::Execution(err_msg));
Expand All @@ -859,8 +856,9 @@ fn read_files(
}
}

// finished reading files (dropping response_tx will close
// channel)
// finished reading files
send_result(&response_tx, None)?;

Ok(())
}

Expand All @@ -874,17 +872,21 @@ fn split_files(filenames: &[String], n: usize) -> Vec<&[String]> {

struct ParquetStream {
schema: SchemaRef,
inner: ReceiverStream<ArrowResult<RecordBatch>>,
response_rx: Receiver<Option<ArrowResult<RecordBatch>>>,
}

impl Stream for ParquetStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
match self.response_rx.recv() {
Ok(batch) => Poll::Ready(batch),
// RecvError means receiver has exited and closed the channel
Err(RecvError) => Poll::Ready(None),
}
}
}

Expand Down
95 changes: 29 additions & 66 deletions rust/datafusion/src/physical_plan/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,21 @@
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{any::Any, collections::HashMap, vec};
use std::{any::Any, vec};

use crate::error::{DataFusionError, Result};
use crate::physical_plan::{ExecutionPlan, Partitioning};
use arrow::record_batch::RecordBatch;
use arrow::{array::Array, error::Result as ArrowResult};
use arrow::{compute::take, datatypes::SchemaRef};
use tokio_stream::wrappers::UnboundedReceiverStream;

use super::{hash_join::create_hashes, RecordBatchStream, SendableRecordBatchStream};
use async_trait::async_trait;

use crossbeam::channel::{unbounded, Receiver, Sender};
use futures::stream::Stream;
use futures::StreamExt;
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
Mutex,
};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

type MaybeBatch = Option<ArrowResult<RecordBatch>>;
Expand All @@ -51,13 +48,9 @@ pub struct RepartitionExec {
input: Arc<dyn ExecutionPlan>,
/// Partitioning scheme to use
partitioning: Partitioning,
/// Channels for sending batches from input partitions to output partitions.
/// Key is the partition number
channels: Arc<
Mutex<
HashMap<usize, (UnboundedSender<MaybeBatch>, UnboundedReceiver<MaybeBatch>)>,
>,
>,
/// Channels for sending batches from input partitions to output partitions
/// there is one entry in this Vec for each output partition
channels: Arc<Mutex<Vec<(Sender<MaybeBatch>, Receiver<MaybeBatch>)>>>,
}

impl RepartitionExec {
Expand Down Expand Up @@ -117,28 +110,23 @@ impl ExecutionPlan for RepartitionExec {
// if this is the first partition to be invoked then we need to set up initial state
if channels.is_empty() {
// create one channel per *output* partition
for partition in 0..num_output_partitions {
for _ in 0..num_output_partitions {
// Note that this operator uses unbounded channels to avoid deadlocks because
// the output partitions can be read in any order and this could cause input
// partitions to be blocked when sending data to output UnboundedReceivers that are not
// partitions to be blocked when sending data to output receivers that are not
// being read yet. This may cause high memory usage if the next operator is
// reading output partitions in order rather than concurrently. One workaround
// for this would be to add spill-to-disk capabilities.
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<
Option<ArrowResult<RecordBatch>>,
>();
channels.insert(partition, (sender, receiver));
let (sender, receiver) = unbounded::<Option<ArrowResult<RecordBatch>>>();
channels.push((sender, receiver));
}
let random = ahash::RandomState::new();

// launch one async task per *input* partition
for i in 0..num_input_partitions {
let random_state = random.clone();
let input = self.input.clone();
let mut txs: HashMap<_, _> = channels
.iter()
.map(|(partition, (tx, _rx))| (*partition, tx.clone()))
.collect();
let mut channels = channels.clone();
let partitioning = self.partitioning.clone();
let _: JoinHandle<Result<()>> = tokio::spawn(async move {
let mut stream = input.execute(i).await?;
Expand All @@ -147,7 +135,7 @@ impl ExecutionPlan for RepartitionExec {
match &partitioning {
Partitioning::RoundRobinBatch(_) => {
let output_partition = counter % num_output_partitions;
let tx = txs.get_mut(&output_partition).unwrap();
let tx = &mut channels[output_partition].0;
tx.send(Some(result)).map_err(|e| {
DataFusionError::Execution(e.to_string())
})?;
Expand Down Expand Up @@ -194,7 +182,7 @@ impl ExecutionPlan for RepartitionExec {
input_batch.schema(),
columns,
);
let tx = txs.get_mut(&num_output_partition).unwrap();
let tx = &mut channels[num_output_partition].0;
tx.send(Some(output_batch)).map_err(|e| {
DataFusionError::Execution(e.to_string())
})?;
Expand All @@ -213,12 +201,14 @@ impl ExecutionPlan for RepartitionExec {
}

// notify each output partition that this input partition has no more data
for (_, tx) in txs {
for channel in channels.iter_mut().take(num_output_partitions) {
let tx = &mut channel.0;
tx.send(None)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
}
Ok(())
});
tokio::task::yield_now().await;
}
}

Expand All @@ -228,7 +218,7 @@ impl ExecutionPlan for RepartitionExec {
num_input_partitions,
num_input_partitions_processed: 0,
schema: self.input.schema(),
input: UnboundedReceiverStream::new(channels.remove(&partition).unwrap().1),
input: channels[partition].1.clone(),
}))
}
}
Expand All @@ -242,7 +232,7 @@ impl RepartitionExec {
Ok(RepartitionExec {
input,
partitioning,
channels: Arc::new(Mutex::new(HashMap::new())),
channels: Arc::new(Mutex::new(vec![])),
})
}
}
Expand All @@ -255,7 +245,7 @@ struct RepartitionStream {
/// Schema
schema: SchemaRef,
/// channel containing the repartitioned batches
input: UnboundedReceiverStream<Option<ArrowResult<RecordBatch>>>,
input: Receiver<Option<ArrowResult<RecordBatch>>>,
}

impl Stream for RepartitionStream {
Expand All @@ -265,9 +255,10 @@ impl Stream for RepartitionStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.input.poll_next_unpin(cx) {
Poll::Ready(Some(Some(v))) => Poll::Ready(Some(v)),
Poll::Ready(Some(None)) => {
match self.input.recv() {
Ok(Some(batch)) => Poll::Ready(Some(batch)),
// End of results from one input partition
Ok(None) => {
self.num_input_partitions_processed += 1;
if self.num_input_partitions == self.num_input_partitions_processed {
// all input partitions have finished sending batches
Expand All @@ -277,8 +268,8 @@ impl Stream for RepartitionStream {
self.poll_next(cx)
}
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
// RecvError means receiver has exited and closed the channel
Err(_) => Poll::Ready(None),
}
}
}
Expand All @@ -298,7 +289,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn one_to_many_round_robin() -> Result<()> {
// define input partitions
let schema = test_schema();
Expand All @@ -318,7 +309,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn many_to_one_round_robin() -> Result<()> {
// define input partitions
let schema = test_schema();
Expand All @@ -335,7 +326,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn many_to_many_round_robin() -> Result<()> {
// define input partitions
let schema = test_schema();
Expand All @@ -356,7 +347,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn many_to_many_hash_partition() -> Result<()> {
// define input partitions
let schema = test_schema();
Expand Down Expand Up @@ -426,32 +417,4 @@ mod tests {
}
Ok(output_partitions)
}

#[tokio::test]
async fn many_to_many_round_robin_within_tokio_task() -> Result<()> {
let join_handle: JoinHandle<Result<Vec<Vec<RecordBatch>>>> =
tokio::spawn(async move {
// define input partitions
let schema = test_schema();
let partition = create_vec_batches(&schema, 50);
let partitions =
vec![partition.clone(), partition.clone(), partition.clone()];

// repartition from 3 input to 5 output
repartition(&schema, partitions, Partitioning::RoundRobinBatch(5)).await
});

let output_partitions = join_handle
.await
.map_err(|e| DataFusionError::Internal(e.to_string()))??;

assert_eq!(5, output_partitions.len());
assert_eq!(30, output_partitions[0].len());
assert_eq!(30, output_partitions[1].len());
assert_eq!(30, output_partitions[2].len());
assert_eq!(30, output_partitions[3].len());
assert_eq!(30, output_partitions[4].len());

Ok(())
}
}

0 comments on commit bd5f465

Please sign in to comment.