Skip to content

Commit

Permalink
GH-40866: [C++][Python] Basic conversion of RecordBatch to Arrow Tens…
Browse files Browse the repository at this point in the history
…or - add support for row-major (#40867)

### Rationale for this change

The conversion from `RecordBatch` to `Tensor` class now exists but it doesn't support row-major `Tensor` as an output. This PR adds support for an option to construct row-major `Tensor`.

### What changes are included in this PR?

This PR adds a `row_major` option in `RecordBatch::ToTensor` so that row-major `Tensor` can be constructed. The default conversion will be row-major. This for example works:

```python
>>> import pyarrow as pa
>>> import numpy as np

>>> arr1 = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> arr2 = [10, 20, 30, 40, 50, 60, 70, 80, 90]
>>> batch = pa.RecordBatch.from_arrays(
...     [
...         pa.array(arr1, type=pa.uint16()),
...         pa.array(arr2, type=pa.int16()),
... 
...     ], ["a", "b"]
... )

# Row-major

>>> batch.to_tensor()
<pyarrow.Tensor>
type: int32
shape: (9, 2)
strides: (8, 4)

>>> batch.to_tensor().to_numpy().flags
  C_CONTIGUOUS : True
  F_CONTIGUOUS : False
  OWNDATA : False
  WRITEABLE : True
  ALIGNED : True
  WRITEBACKIFCOPY : False

# Column-major

>>> batch.to_tensor(row_major=False)
<pyarrow.Tensor>
type: int32
shape: (9, 2)
strides: (4, 36)

>>> batch.to_tensor(row_major=False).to_numpy().flags
  C_CONTIGUOUS : False
  F_CONTIGUOUS : True
  OWNDATA : False
  WRITEABLE : True
  ALIGNED : True
  WRITEBACKIFCOPY : False
```

### Are these changes tested?

Yes, in C++ and Python.

### Are there any user-facing changes?

No.
* GitHub Issue: #40866

Lead-authored-by: AlenkaF <frim.alenka@gmail.com>
Co-authored-by: Alenka Frim <AlenkaF@users.noreply.github.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
  • Loading branch information
AlenkaF and jorisvandenbossche committed Apr 10, 2024
1 parent 6e1b625 commit 831b94a
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 82 deletions.
79 changes: 61 additions & 18 deletions cpp/src/arrow/record_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,18 +283,55 @@ struct ConvertColumnsToTensorVisitor {
}
};

template <typename Out>
struct ConvertColumnsToTensorRowMajorVisitor {
Out*& out_values;
const ArrayData& in_data;
int num_cols;
int col_idx;

template <typename T>
Status Visit(const T&) {
if constexpr (is_numeric(T::type_id)) {
using In = typename T::c_type;
auto in_values = ArraySpan(in_data).GetSpan<In>(1, in_data.length);

if (in_data.null_count == 0) {
for (int64_t i = 0; i < in_data.length; ++i) {
out_values[i * num_cols + col_idx] = static_cast<Out>(in_values[i]);
}
} else {
for (int64_t i = 0; i < in_data.length; ++i) {
out_values[i * num_cols + col_idx] =
in_data.IsNull(i) ? static_cast<Out>(NAN) : static_cast<Out>(in_values[i]);
}
}
return Status::OK();
}
Unreachable();
}
};

template <typename DataType>
inline void ConvertColumnsToTensor(const RecordBatch& batch, uint8_t* out) {
inline void ConvertColumnsToTensor(const RecordBatch& batch, uint8_t* out,
bool row_major) {
using CType = typename arrow::TypeTraits<DataType>::CType;
auto* out_values = reinterpret_cast<CType*>(out);

int i = 0;
for (const auto& column : batch.columns()) {
ConvertColumnsToTensorVisitor<CType> visitor{out_values, *column->data()};
DCHECK_OK(VisitTypeInline(*column->type(), &visitor));
if (row_major) {
ConvertColumnsToTensorRowMajorVisitor<CType> visitor{out_values, *column->data(),
batch.num_columns(), i++};
DCHECK_OK(VisitTypeInline(*column->type(), &visitor));
} else {
ConvertColumnsToTensorVisitor<CType> visitor{out_values, *column->data()};
DCHECK_OK(VisitTypeInline(*column->type(), &visitor));
}
}
}

Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(bool null_to_nan,
Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(bool null_to_nan, bool row_major,
MemoryPool* pool) const {
if (num_columns() == 0) {
return Status::TypeError(
Expand Down Expand Up @@ -362,35 +399,35 @@ Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(bool null_to_nan,
// Copy data
switch (result_type->id()) {
case Type::UINT8:
ConvertColumnsToTensor<UInt8Type>(*this, result->mutable_data());
ConvertColumnsToTensor<UInt8Type>(*this, result->mutable_data(), row_major);
break;
case Type::UINT16:
case Type::HALF_FLOAT:
ConvertColumnsToTensor<UInt16Type>(*this, result->mutable_data());
ConvertColumnsToTensor<UInt16Type>(*this, result->mutable_data(), row_major);
break;
case Type::UINT32:
ConvertColumnsToTensor<UInt32Type>(*this, result->mutable_data());
ConvertColumnsToTensor<UInt32Type>(*this, result->mutable_data(), row_major);
break;
case Type::UINT64:
ConvertColumnsToTensor<UInt64Type>(*this, result->mutable_data());
ConvertColumnsToTensor<UInt64Type>(*this, result->mutable_data(), row_major);
break;
case Type::INT8:
ConvertColumnsToTensor<Int8Type>(*this, result->mutable_data());
ConvertColumnsToTensor<Int8Type>(*this, result->mutable_data(), row_major);
break;
case Type::INT16:
ConvertColumnsToTensor<Int16Type>(*this, result->mutable_data());
ConvertColumnsToTensor<Int16Type>(*this, result->mutable_data(), row_major);
break;
case Type::INT32:
ConvertColumnsToTensor<Int32Type>(*this, result->mutable_data());
ConvertColumnsToTensor<Int32Type>(*this, result->mutable_data(), row_major);
break;
case Type::INT64:
ConvertColumnsToTensor<Int64Type>(*this, result->mutable_data());
ConvertColumnsToTensor<Int64Type>(*this, result->mutable_data(), row_major);
break;
case Type::FLOAT:
ConvertColumnsToTensor<FloatType>(*this, result->mutable_data());
ConvertColumnsToTensor<FloatType>(*this, result->mutable_data(), row_major);
break;
case Type::DOUBLE:
ConvertColumnsToTensor<DoubleType>(*this, result->mutable_data());
ConvertColumnsToTensor<DoubleType>(*this, result->mutable_data(), row_major);
break;
default:
return Status::TypeError("DataType is not supported: ", result_type->ToString());
Expand All @@ -401,11 +438,17 @@ Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(bool null_to_nan,
internal::checked_cast<const FixedWidthType&>(*result_type);
std::vector<int64_t> shape = {num_rows(), num_columns()};
std::vector<int64_t> strides;
ARROW_RETURN_NOT_OK(
internal::ComputeColumnMajorStrides(fixed_width_type, shape, &strides));
ARROW_ASSIGN_OR_RAISE(auto tensor,
Tensor::Make(result_type, std::move(result), shape, strides));
std::shared_ptr<Tensor> tensor;

if (row_major) {
ARROW_RETURN_NOT_OK(
internal::ComputeRowMajorStrides(fixed_width_type, shape, &strides));
} else {
ARROW_RETURN_NOT_OK(
internal::ComputeColumnMajorStrides(fixed_width_type, shape, &strides));
}
ARROW_ASSIGN_OR_RAISE(tensor,
Tensor::Make(result_type, std::move(result), shape, strides));
return tensor;
}

Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/record_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ class ARROW_EXPORT RecordBatch {
/// Generated Tensor will have column-major layout.
///
/// \param[in] null_to_nan if true, convert nulls to NaN
/// \param[in] row_major if true, create row-major Tensor else column-major Tensor
/// \param[in] pool the memory pool to allocate the tensor buffer
/// \return the resulting Tensor
Result<std::shared_ptr<Tensor>> ToTensor(
bool null_to_nan = false, MemoryPool* pool = default_memory_pool()) const;
bool null_to_nan = false, bool row_major = true,
MemoryPool* pool = default_memory_pool()) const;

/// \brief Construct record batch from struct array
///
Expand Down
Loading

0 comments on commit 831b94a

Please sign in to comment.