Skip to content

Commit

Permalink
ARROW-7428: [Format][C++] Add serialization for CSF sparse tensors
Browse files Browse the repository at this point in the history
This is to solve [ARROW-7428](https://issues.apache.org/jira/browse/ARROW-7428).

Closes #6340 from rok/ARROW-7428

Authored-by: Rok <rok@mihevc.org>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
rok authored and wesm committed Apr 1, 2020
1 parent 58c0941 commit 57c525f
Show file tree
Hide file tree
Showing 7 changed files with 541 additions and 7 deletions.
88 changes: 88 additions & 0 deletions cpp/src/arrow/ipc/metadata_internal.cc
Expand Up @@ -29,12 +29,14 @@
#include "arrow/io/interfaces.h"
#include "arrow/ipc/dictionary.h"
#include "arrow/ipc/message.h"
#include "arrow/ipc/util.h"
#include "arrow/sparse_tensor.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/ubsan.h"
#include "arrow/visitor_inline.h"

#include "generated/File_generated.h"
Expand Down Expand Up @@ -932,6 +934,64 @@ Status MakeSparseMatrixIndexCSX(FBB& fbb, const SparseIndexType& sparse_index,
return Status::OK();
}

Status MakeSparseTensorIndexCSF(FBB& fbb, const SparseCSFIndex& sparse_index,
const std::vector<BufferMetadata>& buffers,
flatbuf::SparseTensorIndex* fb_sparse_index_type,
Offset* fb_sparse_index, size_t* num_buffers) {
*fb_sparse_index_type = flatbuf::SparseTensorIndex::SparseTensorIndexCSF;
const int ndim = static_cast<int>(sparse_index.axis_order().size());

// We assume that the value type of indptr tensor is an integer.
const auto& indptr_value_type =
checked_cast<const IntegerType&>(*sparse_index.indptr()[0]->type());
auto indptr_type_offset = flatbuf::CreateInt(fbb, indptr_value_type.bit_width(),
indptr_value_type.is_signed());

// We assume that the value type of indices tensor is an integer.
const auto& indices_value_type =
checked_cast<const IntegerType&>(*sparse_index.indices()[0]->type());
auto indices_type_offset = flatbuf::CreateInt(fbb, indices_value_type.bit_width(),
indices_value_type.is_signed());

const int64_t indptr_elem_size = indptr_value_type.bit_width() / 8;
const int64_t indices_elem_size = indices_value_type.bit_width() / 8;

int64_t offset = 0;
std::vector<flatbuf::Buffer> indptr, indices;

for (const std::shared_ptr<arrow::Tensor> tensor : sparse_index.indptr()) {
const int64_t size = tensor->data()->size() / indptr_elem_size;
const int64_t padded_size = PaddedLength(tensor->data()->size(), kArrowIpcAlignment);

indptr.push_back({offset, size});
offset += padded_size;
}
for (const std::shared_ptr<arrow::Tensor> tensor : sparse_index.indices()) {
const int64_t size = tensor->data()->size() / indices_elem_size;
const int64_t padded_size = PaddedLength(tensor->data()->size(), kArrowIpcAlignment);

indices.push_back({offset, size});
offset += padded_size;
}

auto fb_indices = fbb.CreateVectorOfStructs(indices);
auto fb_indptr = fbb.CreateVectorOfStructs(indptr);

std::vector<int> axis_order;
for (int i = 0; i < ndim; ++i) {
axis_order.emplace_back(static_cast<int>(sparse_index.axis_order()[i]));
}
auto fb_axis_order =
fbb.CreateVector(arrow::util::MakeNonNull(axis_order.data()), axis_order.size());

*fb_sparse_index =
flatbuf::CreateSparseTensorIndexCSF(fbb, indptr_type_offset, fb_indptr,
indices_type_offset, fb_indices, fb_axis_order)
.Union();
*num_buffers = 2 * ndim - 1;
return Status::OK();
}

Status MakeSparseTensorIndex(FBB& fbb, const SparseIndex& sparse_index,
const std::vector<BufferMetadata>& buffers,
flatbuf::SparseTensorIndex* fb_sparse_index_type,
Expand All @@ -955,7 +1015,14 @@ Status MakeSparseTensorIndex(FBB& fbb, const SparseIndex& sparse_index,
fb_sparse_index_type, fb_sparse_index, num_buffers));
break;

case SparseTensorFormat::CSF:
RETURN_NOT_OK(MakeSparseTensorIndexCSF(
fbb, checked_cast<const SparseCSFIndex&>(sparse_index), buffers,
fb_sparse_index_type, fb_sparse_index, num_buffers));
break;

default:
*fb_sparse_index_type = flatbuf::SparseTensorIndex::NONE; // Silence warnings
std::stringstream ss;
ss << "Unsupported sparse tensor format:: " << sparse_index.ToString() << std::endl;
return Status::NotImplemented(ss.str());
Expand Down Expand Up @@ -1215,6 +1282,23 @@ Status GetSparseCSXIndexMetadata(const flatbuf::SparseMatrixIndexCSX* sparse_ind
return Status::OK();
}

Status GetSparseCSFIndexMetadata(const flatbuf::SparseTensorIndexCSF* sparse_index,
std::vector<int64_t>* axis_order,
std::vector<int64_t>* indices_size,
std::shared_ptr<DataType>* indptr_type,
std::shared_ptr<DataType>* indices_type) {
RETURN_NOT_OK(IntFromFlatbuffer(sparse_index->indptrType(), indptr_type));
RETURN_NOT_OK(IntFromFlatbuffer(sparse_index->indicesType(), indices_type));

const int ndim = static_cast<int>(sparse_index->axisOrder()->size());
for (int i = 0; i < ndim; ++i) {
axis_order->push_back(sparse_index->axisOrder()->Get(i));
indices_size->push_back(sparse_index->indicesBuffers()->Get(i)->length());
}

return Status::OK();
}

Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type,
std::vector<int64_t>* shape,
std::vector<std::string>* dim_names,
Expand Down Expand Up @@ -1269,6 +1353,10 @@ Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>
}
} break;

case flatbuf::SparseTensorIndex::SparseTensorIndexCSF:
*sparse_tensor_format_id = SparseTensorFormat::CSF;
break;

default:
return Status::Invalid("Unrecognized sparse index type");
}
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/ipc/metadata_internal.h
Expand Up @@ -137,6 +137,13 @@ Status GetSparseCSXIndexMetadata(const flatbuf::SparseMatrixIndexCSX* sparse_ind
std::shared_ptr<DataType>* indptr_type,
std::shared_ptr<DataType>* indices_type);

// EXPERIMENTAL: Extracting metadata of a SparseCSFIndex from the message
Status GetSparseCSFIndexMetadata(const flatbuf::SparseTensorIndexCSF* sparse_index,
std::vector<int64_t>* axis_order,
std::vector<int64_t>* indices_size,
std::shared_ptr<DataType>* indptr_type,
std::shared_ptr<DataType>* indices_type);

// EXPERIMENTAL: Extracting metadata of a sparse tensor from the message
Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type,
std::vector<int64_t>* shape,
Expand Down
83 changes: 82 additions & 1 deletion cpp/src/arrow/ipc/read_write_test.cc
Expand Up @@ -1402,6 +1402,66 @@ class TestSparseTensorRoundTrip : public ::testing::Test, public IpcTestFixture
ASSERT_TRUE(result->Equals(sparse_tensor));
}

void CheckSparseCSFTensorRoundTrip(const SparseCSFTensor& sparse_tensor) {
const auto& type = checked_cast<const FixedWidthType&>(*sparse_tensor.type());
const int elem_size = type.bit_width() / 8;
const int index_elem_size = sizeof(typename IndexValueType::c_type);

int32_t metadata_length;
int64_t body_length;

ASSERT_OK(mmap_->Seek(0));

ASSERT_OK(
WriteSparseTensor(sparse_tensor, mmap_.get(), &metadata_length, &body_length));

const auto& sparse_index =
checked_cast<const SparseCSFIndex&>(*sparse_tensor.sparse_index());

const int64_t ndim = sparse_index.axis_order().size();
int64_t indptr_length = 0;
int64_t indices_length = 0;

for (int64_t i = 0; i < ndim - 1; ++i) {
indptr_length += BitUtil::RoundUpToMultipleOf8(index_elem_size *
sparse_index.indptr()[i]->size());
}
for (int64_t i = 0; i < ndim; ++i) {
indices_length += BitUtil::RoundUpToMultipleOf8(index_elem_size *
sparse_index.indices()[i]->size());
}
const int64_t data_length =
BitUtil::RoundUpToMultipleOf8(elem_size * sparse_tensor.non_zero_length());
const int64_t expected_body_length = indptr_length + indices_length + data_length;
ASSERT_EQ(expected_body_length, body_length);

ASSERT_OK(mmap_->Seek(0));

std::shared_ptr<SparseTensor> result;
ASSERT_OK_AND_ASSIGN(result, ReadSparseTensor(mmap_.get()));
ASSERT_EQ(SparseTensorFormat::CSF, result->format_id());

const auto& resulted_sparse_index =
checked_cast<const SparseCSFIndex&>(*result->sparse_index());

int64_t out_indptr_length = 0;
int64_t out_indices_length = 0;
for (int i = 0; i < ndim - 1; ++i) {
out_indptr_length += BitUtil::RoundUpToMultipleOf8(
index_elem_size * resulted_sparse_index.indptr()[i]->size());
}
for (int i = 0; i < ndim; ++i) {
out_indices_length += BitUtil::RoundUpToMultipleOf8(
index_elem_size * resulted_sparse_index.indices()[i]->size());
}

ASSERT_EQ(out_indptr_length, indptr_length);
ASSERT_EQ(out_indices_length, indices_length);
ASSERT_EQ(result->data()->size(), data_length);
ASSERT_TRUE(resulted_sparse_index.Equals(sparse_index));
ASSERT_TRUE(result->Equals(sparse_tensor));
}

protected:
std::shared_ptr<SparseCOOIndex> MakeSparseCOOIndex(
const std::vector<int64_t>& coords_shape,
Expand Down Expand Up @@ -1564,9 +1624,30 @@ TYPED_TEST_P(TestSparseTensorRoundTrip, WithSparseCSCIndex) {
this->CheckSparseCSXMatrixRoundTrip(*st);
}

TYPED_TEST_P(TestSparseTensorRoundTrip, WithSparseCSFIndex) {
using IndexValueType = TypeParam;

std::string path = "test-write-sparse-csf-tensor";
constexpr int64_t kBufferSize = 1 << 20;
ASSERT_OK_AND_ASSIGN(this->mmap_,
io::MemoryMapFixture::InitMemoryMap(kBufferSize, path));

std::vector<int64_t> shape = {4, 6};
std::vector<std::string> dim_names = {"foo", "bar", "baz"};
std::vector<int64_t> values = {1, 0, 2, 0, 0, 3, 0, 4, 5, 0, 6, 0,
0, 11, 0, 12, 13, 0, 14, 0, 0, 15, 0, 16};

auto data = Buffer::Wrap(values);
NumericTensor<Int64Type> t(data, shape, {}, dim_names);
std::shared_ptr<SparseCSFTensor> st;
ASSERT_OK_AND_ASSIGN(
st, SparseCSFTensor::Make(t, TypeTraits<IndexValueType>::type_singleton()));

this->CheckSparseCSFTensorRoundTrip(*st);
}
REGISTER_TYPED_TEST_SUITE_P(TestSparseTensorRoundTrip, WithSparseCOOIndexRowMajor,
WithSparseCOOIndexColumnMajor, WithSparseCSRIndex,
WithSparseCSCIndex);
WithSparseCSCIndex, WithSparseCSFIndex);

INSTANTIATE_TYPED_TEST_SUITE_P(TestInt8, TestSparseTensorRoundTrip, Int8Type);
INSTANTIATE_TYPED_TEST_SUITE_P(TestUInt8, TestSparseTensorRoundTrip, UInt8Type);
Expand Down
73 changes: 73 additions & 0 deletions cpp/src/arrow/ipc/reader.cc
Expand Up @@ -1058,6 +1058,35 @@ Result<std::shared_ptr<SparseIndex>> ReadSparseCSXIndex(
}
}

Result<std::shared_ptr<SparseIndex>> ReadSparseCSFIndex(
const flatbuf::SparseTensor* sparse_tensor, const std::vector<int64_t>& shape,
io::RandomAccessFile* file) {
auto* sparse_index = sparse_tensor->sparseIndex_as_SparseTensorIndexCSF();
const auto ndim = static_cast<int64_t>(shape.size());
auto* indptr_buffers = sparse_index->indptrBuffers();
auto* indices_buffers = sparse_index->indicesBuffers();
std::vector<std::shared_ptr<Buffer>> indptr_data(ndim - 1);
std::vector<std::shared_ptr<Buffer>> indices_data(ndim);

std::shared_ptr<DataType> indptr_type, indices_type;
std::vector<int64_t> axis_order, indices_size;

RETURN_NOT_OK(internal::GetSparseCSFIndexMetadata(
sparse_index, &axis_order, &indices_size, &indptr_type, &indices_type));
for (int i = 0; i < static_cast<int>(indptr_buffers->Length()); ++i) {
ARROW_ASSIGN_OR_RAISE(indptr_data[i], file->ReadAt(indptr_buffers->Get(i)->offset(),
indptr_buffers->Get(i)->length()));
}
for (int i = 0; i < static_cast<int>(indices_buffers->Length()); ++i) {
ARROW_ASSIGN_OR_RAISE(indices_data[i],
file->ReadAt(indices_buffers->Get(i)->offset(),
indices_buffers->Get(i)->length()));
}

return SparseCSFIndex::Make(indptr_type, indices_type, indices_size, axis_order,
indptr_data, indices_data);
}

Result<std::shared_ptr<SparseTensor>> MakeSparseTensorWithSparseCOOIndex(
const std::shared_ptr<DataType>& type, const std::vector<int64_t>& shape,
const std::vector<std::string>& dim_names,
Expand All @@ -1082,6 +1111,14 @@ Result<std::shared_ptr<SparseTensor>> MakeSparseTensorWithSparseCSCIndex(
return SparseCSCMatrix::Make(sparse_index, type, data, shape, dim_names);
}

Result<std::shared_ptr<SparseTensor>> MakeSparseTensorWithSparseCSFIndex(
const std::shared_ptr<DataType>& type, const std::vector<int64_t>& shape,
const std::vector<std::string>& dim_names,
const std::shared_ptr<SparseCSFIndex>& sparse_index,
const std::shared_ptr<Buffer>& data) {
return SparseCSFTensor::Make(sparse_index, type, data, shape, dim_names);
}

Status ReadSparseTensorMetadata(const Buffer& metadata,
std::shared_ptr<DataType>* out_type,
std::vector<int64_t>* out_shape,
Expand Down Expand Up @@ -1131,6 +1168,9 @@ Result<size_t> GetSparseTensorBodyBufferCount(SparseTensorFormat::type format_id
case SparseTensorFormat::CSC:
return 3;

case SparseTensorFormat::CSF:
return 3;

default:
return Status::Invalid("Unrecognized sparse tensor format");
}
Expand Down Expand Up @@ -1215,6 +1255,33 @@ Result<std::shared_ptr<SparseTensor>> ReadSparseTensorPayload(const IpcPayload&
return MakeSparseTensorWithSparseCSCIndex(type, shape, dim_names, sparse_index,
non_zero_length, payload.body_buffers[2]);
}
case SparseTensorFormat::CSF: {
std::shared_ptr<SparseCSFIndex> sparse_index;
std::shared_ptr<DataType> indptr_type, indices_type;
std::vector<int64_t> axis_order, indices_size;

RETURN_NOT_OK(internal::GetSparseCSFIndexMetadata(
sparse_tensor->sparseIndex_as_SparseTensorIndexCSF(), &axis_order,
&indices_size, &indptr_type, &indices_type));
ARROW_CHECK_EQ(indptr_type, indices_type);

const int64_t ndim = shape.size();
std::vector<std::shared_ptr<Buffer>> indptr_data(ndim - 1);
std::vector<std::shared_ptr<Buffer>> indices_data(ndim);

for (int64_t i = 0; i < ndim - 1; ++i) {
indptr_data[i] = payload.body_buffers[i];
}
for (int64_t i = 0; i < ndim; ++i) {
indices_data[i] = payload.body_buffers[i + ndim - 1];
}

ARROW_ASSIGN_OR_RAISE(sparse_index,
SparseCSFIndex::Make(indptr_type, indices_type, indices_size,
axis_order, indptr_data, indices_data));
return MakeSparseTensorWithSparseCSFIndex(type, shape, dim_names, sparse_index,
payload.body_buffers[2 * ndim - 1]);
}
default:
return Status::Invalid("Unsupported sparse index format");
}
Expand Down Expand Up @@ -1261,6 +1328,12 @@ Result<std::shared_ptr<SparseTensor>> ReadSparseTensor(const Buffer& metadata,
type, shape, dim_names, checked_pointer_cast<SparseCSCIndex>(sparse_index),
non_zero_length, data);
}
case SparseTensorFormat::CSF: {
ARROW_ASSIGN_OR_RAISE(sparse_index, ReadSparseCSFIndex(sparse_tensor, shape, file));
return MakeSparseTensorWithSparseCSFIndex(
type, shape, dim_names, checked_pointer_cast<SparseCSFIndex>(sparse_index),
data);
}
default:
return Status::Invalid("Unsupported sparse index format");
}
Expand Down
15 changes: 15 additions & 0 deletions cpp/src/arrow/ipc/writer.cc
Expand Up @@ -771,6 +771,11 @@ class SparseTensorSerializer {
VisitSparseCSCIndex(checked_cast<const SparseCSCIndex&>(sparse_index)));
break;

case SparseTensorFormat::CSF:
RETURN_NOT_OK(
VisitSparseCSFIndex(checked_cast<const SparseCSFIndex&>(sparse_index)));
break;

default:
std::stringstream ss;
ss << "Unable to convert type: " << sparse_index.ToString() << std::endl;
Expand Down Expand Up @@ -829,6 +834,16 @@ class SparseTensorSerializer {
return Status::OK();
}

Status VisitSparseCSFIndex(const SparseCSFIndex& sparse_index) {
for (const std::shared_ptr<arrow::Tensor> indptr : sparse_index.indptr()) {
out_->body_buffers.emplace_back(indptr->data());
}
for (const std::shared_ptr<arrow::Tensor> indices : sparse_index.indices()) {
out_->body_buffers.emplace_back(indices->data());
}
return Status::OK();
}

IpcPayload* out_;

std::vector<internal::BufferMetadata> buffer_meta_;
Expand Down

0 comments on commit 57c525f

Please sign in to comment.