Skip to content

Commit

Permalink
ARROW-8299: [C++] Reusable "optional ParallelFor" function for option…
Browse files Browse the repository at this point in the history
…al use of multithreading

We often see code like
```
    if (use_threads) {
      return ::arrow::internal::ParallelFor(n, Func);
    } else {
      for (size_t i = 0; i < n; ++i) {
        RETURN_NOT_OK(Func(i));
      }
      return Status::OK();
```

This patch adds a helper function to do this.

Closes #6870 from liyafan82/fly_0408_pf

Authored-by: liyafan82 <fan_li_ya@foxmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
liyafan82 authored and pitrou committed Apr 8, 2020
1 parent c49c47d commit e39326b
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 35 deletions.
11 changes: 2 additions & 9 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,15 +384,8 @@ Status DecompressBuffers(Compression::type compression, const IpcReadOptions& op
return Status::OK();
};

if (options.use_threads) {
return ::arrow::internal::ParallelFor(static_cast<int>(fields->size()),
DecompressOne);
} else {
for (int i = 0; i < static_cast<int>(fields->size()); ++i) {
RETURN_NOT_OK(DecompressOne(i));
}
return Status::OK();
}
return ::arrow::internal::OptionalParallelFor(
options.use_threads, static_cast<int>(fields->size()), DecompressOne);
}

Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
Expand Down
11 changes: 2 additions & 9 deletions cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,8 @@ class RecordBatchSerializer {
return Status::OK();
};

if (options_.use_threads) {
return ::arrow::internal::ParallelFor(static_cast<int>(out_->body_buffers.size()),
CompressOne);
} else {
for (size_t i = 0; i < out_->body_buffers.size(); ++i) {
RETURN_NOT_OK(CompressOne(i));
}
return Status::OK();
}
return ::arrow::internal::OptionalParallelFor(
options_.use_threads, static_cast<int>(out_->body_buffers.size()), CompressOne);
}

Status Assemble(const RecordBatch& batch) {
Expand Down
21 changes: 4 additions & 17 deletions cpp/src/arrow/python/arrow_to_pandas.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ namespace arrow {
class MemoryPool;

using internal::checked_cast;
using internal::ParallelFor;
using internal::OptionalParallelFor;

// ----------------------------------------------------------------------
// PyCapsule code for setting ndarray base to reference C++ object
Expand Down Expand Up @@ -1925,14 +1925,7 @@ class ConsolidatedBlockCreator : public PandasBlockCreator {
return block->Write(std::move(arrays_[i]), i, this->column_block_placement_[i]);
};

if (options_.use_threads) {
return ParallelFor(num_columns_, WriteColumn);
} else {
for (int i = 0; i < num_columns_; ++i) {
RETURN_NOT_OK(WriteColumn(i));
}
return Status::OK();
}
return OptionalParallelFor(options_.use_threads, num_columns_, WriteColumn);
}

private:
Expand Down Expand Up @@ -2032,14 +2025,8 @@ Status ConvertCategoricals(const PandasOptions& options,
}
}
}
if (options.use_threads) {
return ParallelFor(static_cast<int>(columns_to_encode.size()), EncodeColumn);
} else {
for (auto i : columns_to_encode) {
RETURN_NOT_OK(EncodeColumn(i));
}
return Status::OK();
}
return OptionalParallelFor(options.use_threads,
static_cast<int>(columns_to_encode.size()), EncodeColumn);
}

Status ConvertArrayToPandas(const PandasOptions& options, std::shared_ptr<Array> arr,
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/arrow/util/parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,21 @@ Status ParallelFor(int num_tasks, FUNCTION&& func) {
return st;
}

// A parallelizer that takes a `Status(int)` function and calls it with
// arguments between 0 and `num_tasks - 1`, in sequence or in parallel,
// depending on the input boolean.

template <class FUNCTION>
Status OptionalParallelFor(bool use_threads, int num_tasks, FUNCTION&& func) {
if (use_threads) {
return ParallelFor(num_tasks, std::forward<FUNCTION>(func));
} else {
for (int i = 0; i < num_tasks; ++i) {
RETURN_NOT_OK(func(i));
}
return Status::OK();
}
}

} // namespace internal
} // namespace arrow

0 comments on commit e39326b

Please sign in to comment.