Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions cpp/src/arrow/io/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,23 @@ Status BufferOutputStream::Reserve(int64_t nbytes) {
return Status::OK();
}

// ----------------------------------------------------------------------
// OutputStream that doesn't write anything

Status MockOutputStream::Close() {
return Status::OK();
}

Status MockOutputStream::Tell(int64_t* position) {
*position = extent_bytes_written_;
return Status::OK();
}

Status MockOutputStream::Write(const uint8_t* data, int64_t nbytes) {
extent_bytes_written_ += nbytes;
return Status::OK();
}

// ----------------------------------------------------------------------
// In-memory buffer writer

Expand Down
16 changes: 16 additions & 0 deletions cpp/src/arrow/io/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream {
uint8_t* mutable_data_;
};

// A helper class to tracks the size of allocations
class ARROW_EXPORT MockOutputStream : public OutputStream {
public:
MockOutputStream() : extent_bytes_written_(0) {}

// Implement the OutputStream interface
Status Close() override;
Status Tell(int64_t* position) override;
Status Write(const uint8_t* data, int64_t nbytes) override;

int64_t GetExtentBytesWritten() const { return extent_bytes_written_; }

private:
int64_t extent_bytes_written_;
};

/// \brief Enables random writes into a fixed-size mutable buffer
///
class ARROW_EXPORT FixedSizeBufferWriter : public WriteableFile {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/ipc-read-write-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ TEST_F(TestWriteRecordBatch, SliceTruncatesBuffers) {
}

void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
ipc::MockOutputStream mock;
io::MockOutputStream mock;
int32_t mock_metadata_length = -1;
int64_t mock_body_length = -1;
int64_t size = -1;
Expand Down
23 changes: 0 additions & 23 deletions cpp/src/arrow/ipc/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,6 @@ static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = kArrowAli
return ((nbytes + alignment - 1) / alignment) * alignment;
}

// A helper class to tracks the size of allocations
class MockOutputStream : public io::OutputStream {
public:
MockOutputStream() : extent_bytes_written_(0) {}

Status Close() override { return Status::OK(); }

Status Write(const uint8_t* data, int64_t nbytes) override {
extent_bytes_written_ += nbytes;
return Status::OK();
}

Status Tell(int64_t* position) override {
*position = extent_bytes_written_;
return Status::OK();
}

int64_t GetExtentBytesWritten() const { return extent_bytes_written_; }

private:
int64_t extent_bytes_written_;
};

} // namespace ipc
} // namespace arrow

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
// emulates the behavior of Write without actually writing
int32_t metadata_length = 0;
int64_t body_length = 0;
MockOutputStream dst;
io::MockOutputStream dst;
RETURN_NOT_OK(WriteRecordBatch(batch, 0, &dst, &metadata_length, &body_length,
default_memory_pool(), kMaxNestingDepth, true));
*size = dst.GetExtentBytesWritten();
Expand All @@ -577,7 +577,7 @@ Status GetTensorSize(const Tensor& tensor, int64_t* size) {
// emulates the behavior of Write without actually writing
int32_t metadata_length = 0;
int64_t body_length = 0;
MockOutputStream dst;
io::MockOutputStream dst;
RETURN_NOT_OK(WriteTensor(tensor, &dst, &metadata_length, &body_length));
*size = dst.GetExtentBytesWritten();
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
frombuffer, read_tensor, write_tensor,
memory_map, create_memory_map,
get_record_batch_size, get_tensor_size,
have_libhdfs, have_libhdfs3)
have_libhdfs, have_libhdfs3, MockOutputStream)

from pyarrow.lib import (MemoryPool, total_allocated_bytes,
set_memory_pool, default_memory_pool)
Expand Down
5 changes: 5 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,11 @@ cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil:
(OutputStream):
CBufferOutputStream(const shared_ptr[ResizableBuffer]& buffer)

cdef cppclass CMockOutputStream" arrow::io::MockOutputStream"\
(OutputStream):
CMockOutputStream()
int64_t GetExtentBytesWritten()


cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil:
cdef cppclass SchemaMessage:
Expand Down
12 changes: 12 additions & 0 deletions python/pyarrow/io.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,18 @@ cdef class BufferOutputStream(NativeFile):
return pyarrow_wrap_buffer(<shared_ptr[CBuffer]> self.buffer)


cdef class MockOutputStream(NativeFile):

def __cinit__(self):
self.wr_file.reset(new CMockOutputStream())
self.is_readable = 0
self.is_writeable = 1
self.is_open = True

def size(self):
return (<CMockOutputStream*>self.wr_file.get()).GetExtentBytesWritten()


cdef class BufferReader(NativeFile):
"""
Zero-copy reader from objects convertible to Arrow buffer
Expand Down
39 changes: 39 additions & 0 deletions python/pyarrow/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import numpy as np

import pandas as pd

from pyarrow.compat import u, guid
import pyarrow as pa

Expand Down Expand Up @@ -232,6 +234,43 @@ def test_nativefile_write_memoryview():
assert buf.to_pybytes() == data * 2


# ----------------------------------------------------------------------
# Mock output stream


def test_mock_output_stream():
# Make sure that the MockOutputStream and the BufferOutputStream record the
# same size

# 10 bytes
val = b'dataabcdef'

f1 = pa.MockOutputStream()
f2 = pa.BufferOutputStream()

K = 1000
for i in range(K):
f1.write(val)
f2.write(val)

assert f1.size() == len(f2.get_result())

# Do the same test with a pandas DataFrame
val = pd.DataFrame({'a': [1, 2, 3]})
record_batch = pa.RecordBatch.from_pandas(val)

f1 = pa.MockOutputStream()
f2 = pa.BufferOutputStream()

stream_writer1 = pa.RecordBatchStreamWriter(f1, record_batch.schema)
stream_writer2 = pa.RecordBatchStreamWriter(f2, record_batch.schema)

stream_writer1.write_batch(record_batch)
stream_writer2.write_batch(record_batch)

assert f1.size() == len(f2.get_result())


# ----------------------------------------------------------------------
# OS files and memory maps

Expand Down