Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,11 @@ pub struct IPCWriter {
/// inner writer
pub writer: FileWriter<File>,
/// batches written
pub num_batches: u64,
pub num_batches: usize,
/// rows written
pub num_rows: u64,
pub num_rows: usize,
/// bytes written
pub num_bytes: u64,
pub num_bytes: usize,
}

impl IPCWriter {
Expand Down Expand Up @@ -306,9 +306,9 @@ impl IPCWriter {
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.writer.write(batch)?;
self.num_batches += 1;
self.num_rows += batch.num_rows() as u64;
self.num_rows += batch.num_rows();
let num_bytes: usize = batch.get_array_memory_size();
self.num_bytes += num_bytes as u64;
self.num_bytes += num_bytes;
Ok(())
}

Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ impl ExternalSorter {
let used = self.reservation.free();
self.metrics.spill_count.add(1);
self.metrics.spilled_bytes.add(used);
self.metrics.spilled_rows.add(spilled_rows as usize);
self.metrics.spilled_rows.add(spilled_rows);
self.spills.push(spill_file);
Ok(used)
}
Expand Down Expand Up @@ -674,7 +674,7 @@ async fn spill_sorted_batches(
batches: Vec<RecordBatch>,
path: &Path,
schema: SchemaRef,
) -> Result<u64> {
) -> Result<usize> {
let path: PathBuf = path.into();
let task = SpawnedTask::spawn_blocking(move || write_sorted(batches, path, schema));
match task.join().await {
Expand Down Expand Up @@ -705,7 +705,7 @@ fn write_sorted(
batches: Vec<RecordBatch>,
path: PathBuf,
schema: SchemaRef,
) -> Result<u64> {
) -> Result<usize> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
for batch in batches {
writer.write(&batch)?;
Expand All @@ -715,7 +715,7 @@ fn write_sorted(
"Spilled {} batches of total {} rows to disk, memory released {}",
writer.num_batches,
writer.num_rows,
human_readable_size(writer.num_bytes as usize),
human_readable_size(writer.num_bytes),
);
Ok(writer.num_rows)
}
Expand Down