Skip to content

Commit

Permalink
fix clippy warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
jaylmiller committed Feb 14, 2023
1 parent 7aaddb5 commit 96a2e15
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 74 deletions.
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/sorts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl RowBatch {
/// Iterate over rows in their selected order
pub fn iter(&self) -> RowBatchIter {
RowBatchIter {
row_selection: &self,
row_selection: self,
cur_idx: 0,
}
}
Expand Down Expand Up @@ -181,7 +181,7 @@ impl RowSelection {
/// Iterate over the rows in the selected order.
pub fn iter(&self) -> RowSelectionIter {
RowSelectionIter {
row_selection: &self,
row_selection: self,
cur_n: 0,
}
}
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ fn in_mem_partial_sort(
} else {
let (sort_data, batches): (Vec<SortData>, Vec<RecordBatch>) = buffered_batches
.drain(..)
.into_iter()
.map(|b| {
let BatchWithSortArray {
sort_data,
Expand Down
139 changes: 68 additions & 71 deletions datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ impl SortPreservingMergeStream {
self.cursors[idx] = Some(SortKeyCursor::new(
idx,
self.next_batch_id, // assign this batch an ID
rows.into(),
rows,
));
self.next_batch_id += 1;
self.batches[idx].push_back(batch)
Expand Down Expand Up @@ -758,14 +758,15 @@ impl SortPreservingMergeStream {
Poll::Ready(Ok(()))
}
}
impl Into<SendableRecordBatchStream> for SortPreservingMergeStream {
fn into(self) -> SendableRecordBatchStream {
impl From<SortPreservingMergeStream> for SendableRecordBatchStream {
fn from(value: SortPreservingMergeStream) -> Self {
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
self.map_ok(|(rb, _rows)| rb),
value.schema.clone(),
value.into_stream().map_ok(|(rb, _rows)| rb),
))
}
}

struct SortReceiverStream {
inner: ReceiverStream<SortStreamItem>,
#[allow(dead_code)]
Expand All @@ -790,7 +791,6 @@ impl Stream for SortReceiverStream {
self.inner.poll_next_unpin(cx)
}
}

#[cfg(test)]
mod tests {
use std::iter::FromIterator;
Expand Down Expand Up @@ -1344,71 +1344,68 @@ mod tests {
);
}

// #[tokio::test]
// async fn test_async() {
// let session_ctx = SessionContext::new();
// let task_ctx = session_ctx.task_ctx();
// let schema = test_util::aggr_test_schema();
// let sort = vec![PhysicalSortExpr {
// expr: col("c12", &schema).unwrap(),
// options: SortOptions::default(),
// }];

// let batches =
// sorted_partitioned_input(sort.clone(), &[5, 7, 3], task_ctx.clone()).await;

// let partition_count = batches.output_partitioning().partition_count();
// let mut streams = Vec::with_capacity(partition_count);

// for partition in 0..partition_count {
// let (sender, receiver) = mpsc::channel(1);
// let mut stream = batches.execute(partition, task_ctx.clone()).unwrap();
// let join_handle = tokio::spawn(async move {
// while let Some(batch) = stream.next().await {
// sender.send(batch).await.unwrap();
// // This causes the MergeStream to wait for more input
// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
// }
// });

// streams.push(SortedStream::new(
// RecordBatchReceiverStream::create(&schema, receiver, join_handle),
// 0,
// futures::stream::empty().boxed(),
// ));
// }

// let metrics = ExecutionPlanMetricsSet::new();
// let tracking_metrics =
// MemTrackingMetrics::new(&metrics, task_ctx.memory_pool(), 0);

// let merge_stream = SortPreservingMergeStream::new_from_streams(
// streams,
// batches.schema(),
// sort.as_slice(),
// tracking_metrics,
// task_ctx.session_config().batch_size(),
// )
// .unwrap();

// let mut merged = common::collect(Box::pin(merge_stream)).await.unwrap();

// assert_eq!(merged.len(), 1);
// let merged = merged.remove(0);
// let basic = basic_sort(batches, sort.clone(), task_ctx.clone()).await;

// let basic = arrow::util::pretty::pretty_format_batches(&[basic])
// .unwrap()
// .to_string();
// let partition = arrow::util::pretty::pretty_format_batches(&[merged])
// .unwrap()
// .to_string();

// assert_eq!(
// basic, partition,
// "basic:\n\n{basic}\n\npartition:\n\n{partition}\n\n"
// );
// }
#[tokio::test]
async fn test_async() {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let schema = test_util::aggr_test_schema();
let sort = vec![PhysicalSortExpr {
expr: col("c12", &schema).unwrap(),
options: SortOptions::default(),
}];

let batches =
sorted_partitioned_input(sort.clone(), &[5, 7, 3], task_ctx.clone()).await;

let partition_count = batches.output_partitioning().partition_count();
let mut streams = Vec::with_capacity(partition_count);

for partition in 0..partition_count {
let (sender, receiver) = mpsc::channel(1);
let mut stream = batches.execute(partition, task_ctx.clone()).unwrap();
let join_handle = tokio::spawn(async move {
while let Some(batch) = stream.next().await {
sender.send(batch).await.unwrap();
// This causes the MergeStream to wait for more input
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
});

streams.push(SortedStream::new_no_row_encoding(
RecordBatchReceiverStream::create(&schema, receiver, join_handle),
0,
));
}

let metrics = ExecutionPlanMetricsSet::new();
let tracking_metrics =
MemTrackingMetrics::new(&metrics, task_ctx.memory_pool(), 0);
let merge_stream = SortPreservingMergeStream::new_from_streams(
streams,
batches.schema(),
sort.as_slice(),
tracking_metrics,
task_ctx.session_config().batch_size(),
)
.unwrap();
let mut merged = common::collect(merge_stream.into()).await.unwrap();

assert_eq!(merged.len(), 1);
let merged = merged.remove(0);
let basic = basic_sort(batches, sort.clone(), task_ctx.clone()).await;

let basic = arrow::util::pretty::pretty_format_batches(&[basic])
.unwrap()
.to_string();
let partition = arrow::util::pretty::pretty_format_batches(&[merged])
.unwrap()
.to_string();

assert_eq!(
basic, partition,
"basic:\n\n{basic}\n\npartition:\n\n{partition}\n\n"
);
}

#[tokio::test]
async fn test_merge_metrics() {
Expand Down

0 comments on commit 96a2e15

Please sign in to comment.