Skip to content

Commit

Permalink
Fix up glib bindings, undeprecate some APIs
Browse files Browse the repository at this point in the history
Change-Id: Icccce78b5fc73c66088dbafcdf8a098a009371aa
  • Loading branch information
wesm committed Aug 24, 2017
1 parent 4bdebfa commit b156767
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 46 deletions.
12 changes: 8 additions & 4 deletions c_glib/arrow-glib/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,15 @@ GArrowRecordBatchStreamReader *
garrow_record_batch_stream_reader_new(GArrowInputStream *stream,
GError **error)
{
using BaseType = arrow::ipc::RecordBatchReader;
using ReaderType = arrow::ipc::RecordBatchStreamReader;

auto arrow_input_stream = garrow_input_stream_get_raw(stream);
std::shared_ptr<arrow::ipc::RecordBatchStreamReader> arrow_reader;
auto status =
arrow::ipc::RecordBatchStreamReader::Open(arrow_input_stream, &arrow_reader);
std::shared_ptr<BaseType> arrow_reader;
auto status = ReaderType::Open(arrow_input_stream, &arrow_reader);
auto subtype = std::dynamic_pointer_cast<ReaderType>(arrow_reader);
if (garrow_error_check(error, status, "[record-batch-stream-reader][open]")) {
return garrow_record_batch_stream_reader_new_raw(&arrow_reader);
return garrow_record_batch_stream_reader_new_raw(&subtype);
} else {
return NULL;
}
Expand Down Expand Up @@ -354,6 +357,7 @@ garrow_record_batch_file_reader_new(GArrowSeekableInputStream *file,
GError **error)
{
auto arrow_random_access_file = garrow_seekable_input_stream_get_raw(file);

std::shared_ptr<arrow::ipc::RecordBatchFileReader> arrow_reader;
auto status =
arrow::ipc::RecordBatchFileReader::Open(arrow_random_access_file,
Expand Down
30 changes: 18 additions & 12 deletions c_glib/arrow-glib/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,17 @@ garrow_record_batch_stream_writer_new(GArrowOutputStream *sink,
GArrowSchema *schema,
GError **error)
{
using BaseType = arrow::ipc::RecordBatchWriter;
using WriterType = arrow::ipc::RecordBatchStreamWriter;

auto arrow_sink = garrow_output_stream_get_raw(sink).get();
std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> arrow_writer;
auto status =
arrow::ipc::RecordBatchStreamWriter::Open(arrow_sink,
garrow_schema_get_raw(schema),
&arrow_writer);
std::shared_ptr<BaseType> arrow_writer;
auto status = WriterType::Open(arrow_sink,
garrow_schema_get_raw(schema),
&arrow_writer);
auto subtype = std::dynamic_pointer_cast<WriterType>(arrow_writer);
if (garrow_error_check(error, status, "[record-batch-stream-writer][open]")) {
return garrow_record_batch_stream_writer_new_raw(&arrow_writer);
return garrow_record_batch_stream_writer_new_raw(&subtype);
} else {
return NULL;
}
Expand Down Expand Up @@ -259,14 +262,17 @@ garrow_record_batch_file_writer_new(GArrowOutputStream *sink,
GArrowSchema *schema,
GError **error)
{
using BaseType = arrow::ipc::RecordBatchWriter;
using WriterType = arrow::ipc::RecordBatchFileWriter;

auto arrow_sink = garrow_output_stream_get_raw(sink);
std::shared_ptr<arrow::ipc::RecordBatchFileWriter> arrow_writer;
auto status =
arrow::ipc::RecordBatchFileWriter::Open(arrow_sink.get(),
garrow_schema_get_raw(schema),
&arrow_writer);
std::shared_ptr<BaseType> arrow_writer;
auto status = WriterType::Open(arrow_sink.get(),
garrow_schema_get_raw(schema),
&arrow_writer);
auto subtype = std::dynamic_pointer_cast<WriterType>(arrow_writer);
if (garrow_error_check(error, status, "[record-batch-file-writer][open]")) {
return garrow_record_batch_file_writer_new_raw(&arrow_writer);
return garrow_record_batch_file_writer_new_raw(&subtype);
} else {
return NULL;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/file-to-stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Status ConvertToStream(const char* path) {
std::shared_ptr<RecordBatchFileReader> reader;

RETURN_NOT_OK(io::ReadableFile::Open(path, &in_file));
RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(in_file, &reader));
RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(in_file.get(), &reader));

io::StdoutStream sink;
std::shared_ptr<RecordBatchWriter> writer;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/ipc/ipc-read-write-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class IpcTestFixture : public io::MemoryMapFixture {
RETURN_NOT_OK(mmap_->Tell(&offset));

std::shared_ptr<RecordBatchFileReader> file_reader;
RETURN_NOT_OK(RecordBatchFileReader::Open(mmap_, offset, &file_reader));
RETURN_NOT_OK(RecordBatchFileReader::Open(mmap_.get(), offset, &file_reader));

return file_reader->ReadRecordBatch(0, result);
}
Expand Down Expand Up @@ -519,7 +519,7 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
// Open the file
auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
std::shared_ptr<RecordBatchFileReader> reader;
RETURN_NOT_OK(RecordBatchFileReader::Open(buf_reader, footer_offset, &reader));
RETURN_NOT_OK(RecordBatchFileReader::Open(buf_reader.get(), footer_offset, &reader));

EXPECT_EQ(num_batches, reader->num_record_batches());
for (int i = 0; i < num_batches; ++i) {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/ipc/json-integration-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ static Status ConvertArrowToJson(const std::string& arrow_path,
RETURN_NOT_OK(io::FileOutputStream::Open(json_path, &out_file));

std::shared_ptr<ipc::RecordBatchFileReader> reader;
RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(in_file, &reader));
RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(in_file.get(), &reader));

if (FLAGS_verbose) {
std::cout << "Found schema: " << reader->schema()->ToString() << std::endl;
Expand Down Expand Up @@ -140,7 +140,7 @@ static Status ValidateArrowVsJson(const std::string& arrow_path,
RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &arrow_file));

std::shared_ptr<ipc::RecordBatchFileReader> arrow_reader;
RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(arrow_file, &arrow_reader));
RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(arrow_file.get(), &arrow_reader));

auto json_schema = json_reader->schema();
auto arrow_schema = arrow_reader->schema();
Expand Down
6 changes: 0 additions & 6 deletions cpp/src/arrow/ipc/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,24 +138,18 @@ class ARROW_EXPORT InputStreamMessageReader : public MessageReader {
public:
explicit InputStreamMessageReader(io::InputStream* stream) : stream_(stream) {}

#ifndef ARROW_NO_DEPRECATED_API
/// \deprecated Since 0.7.0
explicit InputStreamMessageReader(const std::shared_ptr<io::InputStream>& owned_stream)
: InputStreamMessageReader(owned_stream.get()) {
owned_stream_ = owned_stream;
}
#endif

~InputStreamMessageReader();

Status ReadNextMessage(std::unique_ptr<Message>* message) override;

private:
io::InputStream* stream_;

#ifndef ARROW_NO_DEPRECATED_API
std::shared_ptr<io::InputStream> owned_stream_;
#endif
};

/// \brief Read encapulated RPC message from position in file
Expand Down
35 changes: 30 additions & 5 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,12 @@ Status RecordBatchStreamReader::Open(io::InputStream* stream,
return Open(std::move(message_reader), out);
}

Status RecordBatchStreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
std::shared_ptr<RecordBatchReader>* out) {
std::unique_ptr<MessageReader> message_reader(new InputStreamMessageReader(stream));
return Open(std::move(message_reader), out);
}

#ifndef ARROW_NO_DEPRECATED_API
Status RecordBatchStreamReader::Open(std::unique_ptr<MessageReader> message_reader,
std::shared_ptr<RecordBatchStreamReader>* reader) {
Expand Down Expand Up @@ -577,8 +583,7 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
DCHECK(BitUtil::IsMultipleOf8(block.body_length));

std::unique_ptr<Message> message;
RETURN_NOT_OK(
ReadMessage(block.offset, block.metadata_length, file_.get(), &message));
RETURN_NOT_OK(ReadMessage(block.offset, block.metadata_length, file_, &message));

io::BufferReader reader(message->body());
return ::arrow::ipc::ReadRecordBatch(*message->metadata(), schema_, &reader, batch);
Expand All @@ -596,8 +601,7 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
DCHECK(BitUtil::IsMultipleOf8(block.body_length));

std::unique_ptr<Message> message;
RETURN_NOT_OK(
ReadMessage(block.offset, block.metadata_length, file_.get(), &message));
RETURN_NOT_OK(ReadMessage(block.offset, block.metadata_length, file_, &message));

io::BufferReader reader(message->body());

Expand All @@ -613,6 +617,11 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
}

Status Open(const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset) {
owned_file_ = file;
return Open(file.get(), footer_offset);
}

Status Open(io::RandomAccessFile* file, int64_t footer_offset) {
file_ = file;
footer_offset_ = footer_offset;
RETURN_NOT_OK(ReadFooter());
Expand All @@ -622,7 +631,10 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
std::shared_ptr<Schema> schema() const { return schema_; }

private:
std::shared_ptr<io::RandomAccessFile> file_;
io::RandomAccessFile* file_;

// Deprecated as of 0.7.0
std::shared_ptr<io::RandomAccessFile> owned_file_;

// The location where the Arrow file layout ends. May be the end of the file
// or some other location if embedded in a larger file.
Expand All @@ -645,6 +657,19 @@ RecordBatchFileReader::RecordBatchFileReader() {

RecordBatchFileReader::~RecordBatchFileReader() {}

Status RecordBatchFileReader::Open(io::RandomAccessFile* file,
std::shared_ptr<RecordBatchFileReader>* reader) {
int64_t footer_offset;
RETURN_NOT_OK(file->GetSize(&footer_offset));
return Open(file, footer_offset, reader);
}

Status RecordBatchFileReader::Open(io::RandomAccessFile* file, int64_t footer_offset,
std::shared_ptr<RecordBatchFileReader>* reader) {
*reader = std::shared_ptr<RecordBatchFileReader>(new RecordBatchFileReader());
return (*reader)->impl_->Open(file, footer_offset);
}

Status RecordBatchFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
std::shared_ptr<RecordBatchFileReader>* reader) {
int64_t footer_offset;
Expand Down
28 changes: 21 additions & 7 deletions cpp/src/arrow/ipc/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader {
/// \return Status
static Status Open(io::InputStream* stream, std::shared_ptr<RecordBatchReader>* out);

/// \brief Version of Open that retains ownership of stream
static Status Open(const std::shared_ptr<io::InputStream>& stream,
std::shared_ptr<RecordBatchReader>* out);

std::shared_ptr<Schema> schema() const override;
Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) override;

Expand All @@ -107,21 +111,31 @@ class ARROW_EXPORT RecordBatchFileReader {
public:
~RecordBatchFileReader();

/// \brief Open a RecordBatchFileReader
// Open a file-like object that is assumed to be self-contained; i.e., the
// end of the file interface is the end of the Arrow file. Note that there
// can be any amount of data preceding the Arrow-formatted data, because we
// need only locate the end of the Arrow file stream to discover the metadata
// and then proceed to read the data into memory.
static Status Open(io::RandomAccessFile* file,
std::shared_ptr<RecordBatchFileReader>* reader);

/// \brief Open a RecordBatchFileReader
/// If the file is embedded within some larger file or memory region, you can
/// pass the absolute memory offset to the end of the file (which contains the
/// metadata footer). The metadata must have been written with memory offsets
/// relative to the start of the containing file
///
/// @param file: the data source
/// @param footer_offset: the position of the end of the Arrow "file"
static Status Open(io::RandomAccessFile* file, int64_t footer_offset,
std::shared_ptr<RecordBatchFileReader>* reader);

/// \brief Version of Open that retains ownership of file
static Status Open(const std::shared_ptr<io::RandomAccessFile>& file,
std::shared_ptr<RecordBatchFileReader>* reader);

// If the file is embedded within some larger file or memory region, you can
// pass the absolute memory offset to the end of the file (which contains the
// metadata footer). The metadata must have been written with memory offsets
// relative to the start of the containing file
//
// @param file: the data source
// @param footer_offset: the position of the end of the Arrow "file"
/// \brief Version of Open that retains ownership of file
static Status Open(const std::shared_ptr<io::RandomAccessFile>& file,
int64_t footer_offset,
std::shared_ptr<RecordBatchFileReader>* reader);
Expand Down
4 changes: 2 additions & 2 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -654,11 +654,11 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
cdef cppclass CRecordBatchFileReader \
" arrow::ipc::RecordBatchFileReader":
@staticmethod
CStatus Open(const shared_ptr[RandomAccessFile]& file,
CStatus Open(RandomAccessFile* file,
shared_ptr[CRecordBatchFileReader]* out)

@staticmethod
CStatus Open2" Open"(const shared_ptr[RandomAccessFile]& file,
CStatus Open2" Open"(RandomAccessFile* file,
int64_t footer_offset,
shared_ptr[CRecordBatchFileReader]* out)

Expand Down
12 changes: 7 additions & 5 deletions python/pyarrow/ipc.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ cdef class _RecordBatchFileWriter(_RecordBatchWriter):
cdef class _RecordBatchFileReader:
cdef:
shared_ptr[CRecordBatchFileReader] reader
shared_ptr[RandomAccessFile] file

cdef readonly:
Schema schema
Expand All @@ -297,19 +298,20 @@ cdef class _RecordBatchFileReader:
pass

def _open(self, source, footer_offset=None):
cdef shared_ptr[RandomAccessFile] reader
get_reader(source, &reader)
get_reader(source, &self.file)

cdef int64_t offset = 0
if footer_offset is not None:
offset = footer_offset

with nogil:
if offset != 0:
check_status(CRecordBatchFileReader.Open2(
reader, offset, &self.reader))
check_status(
CRecordBatchFileReader.Open2(self.file.get(), offset,
&self.reader))
else:
check_status(CRecordBatchFileReader.Open(reader, &self.reader))
check_status(
CRecordBatchFileReader.Open(self.file.get(), &self.reader))

self.schema = pyarrow_wrap_schema(self.reader.get().schema())

Expand Down

0 comments on commit b156767

Please sign in to comment.