Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ impl SQLMetric {
self.value.fetch_add(n, Ordering::Relaxed);
}

/// Add elapsed nanoseconds since `start`to self
pub fn add_elapsed(&self, start: std::time::Instant) {
self.add(start.elapsed().as_nanos() as usize)
}

/// Get the current value
pub fn value(&self) -> usize {
self.value.load(Ordering::Relaxed)
Expand Down
279 changes: 152 additions & 127 deletions datafusion/src/physical_plan/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use futures::stream::Stream;
use futures::StreamExt;
use hashbrown::HashMap;
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
mpsc::{self, UnboundedReceiver, UnboundedSender},
Mutex,
};
use tokio::task::JoinHandle;
Expand All @@ -60,12 +60,40 @@ pub struct RepartitionExec {
HashMap<usize, (UnboundedSender<MaybeBatch>, UnboundedReceiver<MaybeBatch>)>,
>,
>,

/// Execution metrics
metrics: RepartitionMetrics,
}

#[derive(Debug, Clone)]
struct RepartitionMetrics {
/// Time in nanos to execute child operator and fetch batches
fetch_time_nanos: Arc<SQLMetric>,
fetch_nanos: Arc<SQLMetric>,
/// Time in nanos to perform repartitioning
repart_time_nanos: Arc<SQLMetric>,
repart_nanos: Arc<SQLMetric>,
/// Time in nanos for sending resulting batches to channels
send_time_nanos: Arc<SQLMetric>,
send_nanos: Arc<SQLMetric>,
}

impl RepartitionMetrics {
fn new() -> Self {
Self {
fetch_nanos: SQLMetric::time_nanos(),
repart_nanos: SQLMetric::time_nanos(),
send_nanos: SQLMetric::time_nanos(),
}
}
/// Convert into the external metrics form
fn to_hashmap(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("fetchTime".to_owned(), self.fetch_nanos.as_ref().clone());
metrics.insert(
"repartitionTime".to_owned(),
self.repart_nanos.as_ref().clone(),
);
metrics.insert("sendTime".to_owned(), self.send_nanos.as_ref().clone());
metrics
}
}

impl RepartitionExec {
Expand Down Expand Up @@ -132,132 +160,33 @@ impl ExecutionPlan for RepartitionExec {
// being read yet. This may cause high memory usage if the next operator is
// reading output partitions in order rather than concurrently. One workaround
// for this would be to add spill-to-disk capabilities.
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<
Option<ArrowResult<RecordBatch>>,
>();
let (sender, receiver) =
mpsc::unbounded_channel::<Option<ArrowResult<RecordBatch>>>();
channels.insert(partition, (sender, receiver));
}
// Use fixed random state
let random = ahash::RandomState::with_seeds(0, 0, 0, 0);

// launch one async task per *input* partition
for i in 0..num_input_partitions {
let random_state = random.clone();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this code was moved to pull_from_input

let input = self.input.clone();
let fetch_time = self.fetch_time_nanos.clone();
let repart_time = self.repart_time_nanos.clone();
let send_time = self.send_time_nanos.clone();
let txs: HashMap<_, _> = channels
.iter()
.map(|(partition, (tx, _rx))| (*partition, tx.clone()))
.collect();
let partitioning = self.partitioning.clone();
let mut txs_captured = txs.clone();
let input_task: JoinHandle<Result<()>> = tokio::spawn(async move {
// execute the child operator
let now = Instant::now();
let mut stream = input.execute(i).await?;
fetch_time.add(now.elapsed().as_nanos() as usize);

let mut counter = 0;
let hashes_buf = &mut vec![];

loop {
// fetch the next batch
let now = Instant::now();
let result = stream.next().await;
fetch_time.add(now.elapsed().as_nanos() as usize);

if result.is_none() {
break;
}
let result: ArrowResult<RecordBatch> = result.unwrap();

match &partitioning {
Partitioning::RoundRobinBatch(_) => {
let now = Instant::now();
let output_partition = counter % num_output_partitions;
let tx = txs_captured.get_mut(&output_partition).unwrap();
tx.send(Some(result)).map_err(|e| {
DataFusionError::Execution(e.to_string())
})?;
send_time.add(now.elapsed().as_nanos() as usize);
}
Partitioning::Hash(exprs, _) => {
let now = Instant::now();
let input_batch = result?;
let arrays = exprs
.iter()
.map(|expr| {
Ok(expr
.evaluate(&input_batch)?
.into_array(input_batch.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
hashes_buf.clear();
hashes_buf.resize(arrays[0].len(), 0);
// Hash arrays and compute buckets based on number of partitions
let hashes =
create_hashes(&arrays, &random_state, hashes_buf)?;
let mut indices = vec![vec![]; num_output_partitions];
for (index, hash) in hashes.iter().enumerate() {
indices
[(*hash % num_output_partitions as u64) as usize]
.push(index as u64)
}
repart_time.add(now.elapsed().as_nanos() as usize);
for (num_output_partition, partition_indices) in
indices.into_iter().enumerate()
{
let now = Instant::now();
let indices = partition_indices.into();
// Produce batches based on indices
let columns = input_batch
.columns()
.iter()
.map(|c| {
take(c.as_ref(), &indices, None).map_err(
|e| {
DataFusionError::Execution(
e.to_string(),
)
},
)
})
.collect::<Result<Vec<Arc<dyn Array>>>>()?;
let output_batch = RecordBatch::try_new(
input_batch.schema(),
columns,
);
repart_time.add(now.elapsed().as_nanos() as usize);
let now = Instant::now();
let tx = txs_captured
.get_mut(&num_output_partition)
.unwrap();
tx.send(Some(output_batch)).map_err(|e| {
DataFusionError::Execution(e.to_string())
})?;
send_time.add(now.elapsed().as_nanos() as usize);
}
}
other => {
// this should be unreachable as long as the validation logic
// in the constructor is kept up-to-date
return Err(DataFusionError::NotImplemented(format!(
"Unsupported repartitioning scheme {:?}",
other
)));
}
}
counter += 1;
}

Ok(())
});
let input_task: JoinHandle<Result<()>> =
tokio::spawn(Self::pull_from_input(
random.clone(),
self.input.clone(),
i,
txs.clone(),
self.partitioning.clone(),
self.metrics.clone(),
));

// In a separate task, wait for each input to be done
// (and pass along any errors)
tokio::spawn(async move { Self::wait_for_task(input_task, txs).await });
// (and pass along any errors, including panic!s)
tokio::spawn(Self::wait_for_task(input_task, txs));
}
}

Expand All @@ -272,14 +201,7 @@ impl ExecutionPlan for RepartitionExec {
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("fetchTime".to_owned(), (*self.fetch_time_nanos).clone());
metrics.insert(
"repartitionTime".to_owned(),
(*self.repart_time_nanos).clone(),
);
metrics.insert("sendTime".to_owned(), (*self.send_time_nanos).clone());
metrics
self.metrics.to_hashmap()
}

fn fmt_as(
Expand All @@ -305,12 +227,115 @@ impl RepartitionExec {
input,
partitioning,
channels: Arc::new(Mutex::new(HashMap::new())),
fetch_time_nanos: SQLMetric::time_nanos(),
repart_time_nanos: SQLMetric::time_nanos(),
send_time_nanos: SQLMetric::time_nanos(),
metrics: RepartitionMetrics::new(),
})
}

/// Pulls data from the specified input plan, feeding it to the
/// output partitions based on the desired partitioning
///
/// i is the input partition index
///
/// txs hold the output sending channels for each output partition
async fn pull_from_input(
random_state: ahash::RandomState,
input: Arc<dyn ExecutionPlan>,
i: usize,
mut txs: HashMap<usize, UnboundedSender<Option<ArrowResult<RecordBatch>>>>,
partitioning: Partitioning,
metrics: RepartitionMetrics,
) -> Result<()> {
let num_output_partitions = txs.len();

// execute the child operator
let now = Instant::now();
let mut stream = input.execute(i).await?;
metrics.fetch_nanos.add_elapsed(now);

let mut counter = 0;
let hashes_buf = &mut vec![];

loop {
// fetch the next batch
let now = Instant::now();
let result = stream.next().await;
metrics.fetch_nanos.add_elapsed(now);

if result.is_none() {
break;
}
let result: ArrowResult<RecordBatch> = result.unwrap();

match &partitioning {
Partitioning::RoundRobinBatch(_) => {
let now = Instant::now();
let output_partition = counter % num_output_partitions;
let tx = txs.get_mut(&output_partition).unwrap();
tx.send(Some(result))
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
metrics.send_nanos.add_elapsed(now);
}
Partitioning::Hash(exprs, _) => {
let now = Instant::now();
let input_batch = result?;
let arrays = exprs
.iter()
.map(|expr| {
Ok(expr
.evaluate(&input_batch)?
.into_array(input_batch.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
hashes_buf.clear();
hashes_buf.resize(arrays[0].len(), 0);
// Hash arrays and compute buckets based on number of partitions
let hashes = create_hashes(&arrays, &random_state, hashes_buf)?;
let mut indices = vec![vec![]; num_output_partitions];
for (index, hash) in hashes.iter().enumerate() {
indices[(*hash % num_output_partitions as u64) as usize]
.push(index as u64)
}
metrics.repart_nanos.add_elapsed(now);
for (num_output_partition, partition_indices) in
indices.into_iter().enumerate()
{
let now = Instant::now();
let indices = partition_indices.into();
// Produce batches based on indices
let columns = input_batch
.columns()
.iter()
.map(|c| {
take(c.as_ref(), &indices, None).map_err(|e| {
DataFusionError::Execution(e.to_string())
})
})
.collect::<Result<Vec<Arc<dyn Array>>>>()?;
let output_batch =
RecordBatch::try_new(input_batch.schema(), columns);
metrics.repart_nanos.add_elapsed(now);
let now = Instant::now();
let tx = txs.get_mut(&num_output_partition).unwrap();
tx.send(Some(output_batch))
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
metrics.send_nanos.add_elapsed(now);
}
}
other => {
// this should be unreachable as long as the validation logic
// in the constructor is kept up-to-date
return Err(DataFusionError::NotImplemented(format!(
"Unsupported repartitioning scheme {:?}",
other
)));
}
}
counter += 1;
}

Ok(())
}

/// Waits for `input_task` which is consuming one of the inputs to
/// complete. Upon each successful completion, sends a `None` to
/// each of the output tx channels to signal one of the inputs is
Expand Down