Skip to content

Commit

Permalink
Feedback on ipc reader/writer names. Add open_stream/open_file Python…
Browse files Browse the repository at this point in the history
… APIs

Change-Id: I92cc2d5de55f625dee543bd7dc223225fd8f7977
  • Loading branch information
wesm committed May 13, 2017
1 parent 22346d4 commit 04fa285
Show file tree
Hide file tree
Showing 14 changed files with 232 additions and 183 deletions.
12 changes: 7 additions & 5 deletions cpp/src/arrow/ipc/file-to-stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@
#include "arrow/util/io-util.h"

namespace arrow {
namespace ipc {

// Reads a file on the file system and prints to stdout the stream version of it.
Status ConvertToStream(const char* path) {
std::shared_ptr<io::ReadableFile> in_file;
std::shared_ptr<ipc::BatchFileReader> reader;
std::shared_ptr<RecordBatchFileReader> reader;

RETURN_NOT_OK(io::ReadableFile::Open(path, &in_file));
RETURN_NOT_OK(ipc::BatchFileReader::Open(in_file, &reader));
RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(in_file, &reader));

io::StdoutStream sink;
std::shared_ptr<ipc::OutputStreamWriter> writer;
RETURN_NOT_OK(ipc::OutputStreamWriter::Open(&sink, reader->schema(), &writer));
std::shared_ptr<RecordBatchStreamWriter> writer;
RETURN_NOT_OK(RecordBatchStreamWriter::Open(&sink, reader->schema(), &writer));
for (int i = 0; i < reader->num_record_batches(); ++i) {
std::shared_ptr<RecordBatch> chunk;
RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
Expand All @@ -44,14 +45,15 @@ Status ConvertToStream(const char* path) {
return writer->Close();
}

} // namespace ipc
} // namespace arrow

int main(int argc, char** argv) {
if (argc != 2) {
std::cerr << "Usage: file-to-stream <input arrow file>" << std::endl;
return 1;
}
arrow::Status status = arrow::ConvertToStream(argv[1]);
arrow::Status status = arrow::ipc::ConvertToStream(argv[1]);
if (!status.ok()) {
std::cerr << "Could not convert to stream: " << status.ToString() << std::endl;
return 1;
Expand Down
25 changes: 13 additions & 12 deletions cpp/src/arrow/ipc/ipc-read-write-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,16 @@ class IpcTestFixture : public io::MemoryMapFixture {
if (zero_data) { RETURN_NOT_OK(ZeroMemoryMap(mmap_.get())); }
RETURN_NOT_OK(mmap_->Seek(0));

std::shared_ptr<BatchFileWriter> file_writer;
RETURN_NOT_OK(BatchFileWriter::Open(mmap_.get(), batch.schema(), &file_writer));
std::shared_ptr<RecordBatchFileWriter> file_writer;
RETURN_NOT_OK(RecordBatchFileWriter::Open(mmap_.get(), batch.schema(), &file_writer));
RETURN_NOT_OK(file_writer->WriteRecordBatch(batch, true));
RETURN_NOT_OK(file_writer->Close());

int64_t offset;
RETURN_NOT_OK(mmap_->Tell(&offset));

std::shared_ptr<BatchFileReader> file_reader;
RETURN_NOT_OK(BatchFileReader::Open(mmap_, offset, &file_reader));
std::shared_ptr<RecordBatchFileReader> file_reader;
RETURN_NOT_OK(RecordBatchFileReader::Open(mmap_, offset, &file_reader));

return file_reader->GetRecordBatch(0, result);
}
Expand Down Expand Up @@ -487,8 +487,9 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {

Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) {
// Write the file
std::shared_ptr<BatchFileWriter> writer;
RETURN_NOT_OK(BatchFileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer));
std::shared_ptr<RecordBatchFileWriter> writer;
RETURN_NOT_OK(
RecordBatchFileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer));

const int num_batches = static_cast<int>(in_batches.size());

Expand All @@ -504,8 +505,8 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {

// Open the file
auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
std::shared_ptr<BatchFileReader> reader;
RETURN_NOT_OK(BatchFileReader::Open(buf_reader, footer_offset, &reader));
std::shared_ptr<RecordBatchFileReader> reader;
RETURN_NOT_OK(RecordBatchFileReader::Open(buf_reader, footer_offset, &reader));

EXPECT_EQ(num_batches, reader->num_record_batches());
for (int i = 0; i < num_batches; ++i) {
Expand Down Expand Up @@ -553,8 +554,8 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
Status RoundTripHelper(
const RecordBatch& batch, std::vector<std::shared_ptr<RecordBatch>>* out_batches) {
// Write the file
std::shared_ptr<OutputStreamWriter> writer;
RETURN_NOT_OK(OutputStreamWriter::Open(sink_.get(), batch.schema(), &writer));
std::shared_ptr<RecordBatchStreamWriter> writer;
RETURN_NOT_OK(RecordBatchStreamWriter::Open(sink_.get(), batch.schema(), &writer));
int num_batches = 5;
for (int i = 0; i < num_batches; ++i) {
RETURN_NOT_OK(writer->WriteRecordBatch(batch));
Expand All @@ -565,8 +566,8 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
// Open the file
auto buf_reader = std::make_shared<io::BufferReader>(buffer_);

std::shared_ptr<InputStreamReader> reader;
RETURN_NOT_OK(InputStreamReader::Open(buf_reader, &reader));
std::shared_ptr<RecordBatchStreamReader> reader;
RETURN_NOT_OK(RecordBatchStreamReader::Open(buf_reader, &reader));

std::shared_ptr<RecordBatch> chunk;
while (true) {
Expand Down
13 changes: 7 additions & 6 deletions cpp/src/arrow/ipc/json-integration-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ static Status ConvertJsonToArrow(
std::cout << "Found schema: " << reader->schema()->ToString() << std::endl;
}

std::shared_ptr<ipc::BatchFileWriter> writer;
RETURN_NOT_OK(ipc::BatchFileWriter::Open(out_file.get(), reader->schema(), &writer));
std::shared_ptr<ipc::RecordBatchFileWriter> writer;
RETURN_NOT_OK(
ipc::RecordBatchFileWriter::Open(out_file.get(), reader->schema(), &writer));

for (int i = 0; i < reader->num_record_batches(); ++i) {
std::shared_ptr<RecordBatch> batch;
Expand All @@ -96,8 +97,8 @@ static Status ConvertArrowToJson(
RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &in_file));
RETURN_NOT_OK(io::FileOutputStream::Open(json_path, &out_file));

std::shared_ptr<ipc::BatchFileReader> reader;
RETURN_NOT_OK(ipc::BatchFileReader::Open(in_file, &reader));
std::shared_ptr<ipc::RecordBatchFileReader> reader;
RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(in_file, &reader));

if (FLAGS_verbose) {
std::cout << "Found schema: " << reader->schema()->ToString() << std::endl;
Expand Down Expand Up @@ -137,8 +138,8 @@ static Status ValidateArrowVsJson(
std::shared_ptr<io::ReadableFile> arrow_file;
RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &arrow_file));

std::shared_ptr<ipc::BatchFileReader> arrow_reader;
RETURN_NOT_OK(ipc::BatchFileReader::Open(arrow_file, &arrow_reader));
std::shared_ptr<ipc::RecordBatchFileReader> arrow_reader;
RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(arrow_file, &arrow_reader));

auto json_schema = json_reader->schema();
auto arrow_schema = arrow_reader->schema();
Expand Down
52 changes: 26 additions & 26 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ Status ReadDictionary(const Message& metadata, const DictionaryTypeMap& dictiona
}

// ----------------------------------------------------------------------
// InputStreamReader implementation
// RecordBatchStreamReader implementation

static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength());
Expand All @@ -176,12 +176,12 @@ static inline std::string message_type_name(Message::Type type) {
return "unknown";
}

BatchStreamReader::~BatchStreamReader() {}
RecordBatchReader::~RecordBatchReader() {}

class InputStreamReader::InputStreamReaderImpl {
class RecordBatchStreamReader::RecordBatchStreamReaderImpl {
public:
InputStreamReaderImpl() {}
~InputStreamReaderImpl() {}
RecordBatchStreamReaderImpl() {}
~RecordBatchStreamReaderImpl() {}

Status Open(const std::shared_ptr<io::InputStream>& stream) {
stream_ = stream;
Expand Down Expand Up @@ -269,31 +269,31 @@ class InputStreamReader::InputStreamReaderImpl {
std::shared_ptr<Schema> schema_;
};

InputStreamReader::InputStreamReader() {
impl_.reset(new InputStreamReaderImpl());
RecordBatchStreamReader::RecordBatchStreamReader() {
impl_.reset(new RecordBatchStreamReaderImpl());
}

Status InputStreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
std::shared_ptr<InputStreamReader>* reader) {
Status RecordBatchStreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
std::shared_ptr<RecordBatchStreamReader>* reader) {
// Private ctor
*reader = std::shared_ptr<InputStreamReader>(new InputStreamReader());
*reader = std::shared_ptr<RecordBatchStreamReader>(new RecordBatchStreamReader());
return (*reader)->impl_->Open(stream);
}

std::shared_ptr<Schema> InputStreamReader::schema() const {
std::shared_ptr<Schema> RecordBatchStreamReader::schema() const {
return impl_->schema();
}

Status InputStreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
Status RecordBatchStreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
return impl_->GetNextRecordBatch(batch);
}

// ----------------------------------------------------------------------
// Reader implementation

class BatchFileReader::BatchFileReaderImpl {
class RecordBatchFileReader::RecordBatchFileReaderImpl {
public:
BatchFileReaderImpl() { dictionary_memo_ = std::make_shared<DictionaryMemo>(); }
RecordBatchFileReaderImpl() { dictionary_memo_ = std::make_shared<DictionaryMemo>(); }

Status ReadFooter() {
int magic_size = static_cast<int>(strlen(kArrowMagicBytes));
Expand Down Expand Up @@ -432,38 +432,38 @@ class BatchFileReader::BatchFileReaderImpl {
std::shared_ptr<Schema> schema_;
};

BatchFileReader::BatchFileReader() {
impl_.reset(new BatchFileReaderImpl());
RecordBatchFileReader::RecordBatchFileReader() {
impl_.reset(new RecordBatchFileReaderImpl());
}

BatchFileReader::~BatchFileReader() {}
RecordBatchFileReader::~RecordBatchFileReader() {}

Status BatchFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
std::shared_ptr<BatchFileReader>* reader) {
Status RecordBatchFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
std::shared_ptr<RecordBatchFileReader>* reader) {
int64_t footer_offset;
RETURN_NOT_OK(file->GetSize(&footer_offset));
return Open(file, footer_offset, reader);
}

Status BatchFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
int64_t footer_offset, std::shared_ptr<BatchFileReader>* reader) {
*reader = std::shared_ptr<BatchFileReader>(new BatchFileReader());
Status RecordBatchFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
int64_t footer_offset, std::shared_ptr<RecordBatchFileReader>* reader) {
*reader = std::shared_ptr<RecordBatchFileReader>(new RecordBatchFileReader());
return (*reader)->impl_->Open(file, footer_offset);
}

std::shared_ptr<Schema> BatchFileReader::schema() const {
std::shared_ptr<Schema> RecordBatchFileReader::schema() const {
return impl_->schema();
}

int BatchFileReader::num_record_batches() const {
int RecordBatchFileReader::num_record_batches() const {
return impl_->num_record_batches();
}

MetadataVersion BatchFileReader::version() const {
MetadataVersion RecordBatchFileReader::version() const {
return impl_->version();
}

Status BatchFileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
Status RecordBatchFileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
return impl_->GetRecordBatch(i, batch);
}

Expand Down
36 changes: 18 additions & 18 deletions cpp/src/arrow/ipc/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ class RandomAccessFile;
namespace ipc {

/// \brief Abstract interface for reading stream of record batches
class ARROW_EXPORT BatchStreamReader {
class ARROW_EXPORT RecordBatchReader {
public:
virtual ~BatchStreamReader();
virtual ~RecordBatchReader();

/// \return the shared schema of the record batches in the stream
virtual std::shared_ptr<Schema> schema() const = 0;
Expand All @@ -60,40 +60,40 @@ class ARROW_EXPORT BatchStreamReader {
virtual Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) = 0;
};

/// \class InputStreamReader
/// \class RecordBatchStreamReader
/// \brief Synchronous batch stream reader that reads from io::InputStream
class ARROW_EXPORT InputStreamReader : public BatchStreamReader {
class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader {
public:
/// Create batch reader from InputStream
///
/// \param(in) stream an input stream instance
/// \param(out) reader the created reader object
/// \return Status
static Status Open(const std::shared_ptr<io::InputStream>& stream,
std::shared_ptr<InputStreamReader>* reader);
std::shared_ptr<RecordBatchStreamReader>* reader);

std::shared_ptr<Schema> schema() const override;
Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) override;

private:
InputStreamReader();
RecordBatchStreamReader();

class ARROW_NO_EXPORT InputStreamReaderImpl;
std::unique_ptr<InputStreamReaderImpl> impl_;
class ARROW_NO_EXPORT RecordBatchStreamReaderImpl;
std::unique_ptr<RecordBatchStreamReaderImpl> impl_;
};

/// \brief Reads the random access record batch file format
class ARROW_EXPORT BatchFileReader {
/// \brief Reads the record batch file format
class ARROW_EXPORT RecordBatchFileReader {
public:
~BatchFileReader();
~RecordBatchFileReader();

// Open a file-like object that is assumed to be self-contained; i.e., the
// end of the file interface is the end of the Arrow file. Note that there
// can be any amount of data preceding the Arrow-formatted data, because we
// need only locate the end of the Arrow file stream to discover the metadata
// and then proceed to read the data into memory.
static Status Open(const std::shared_ptr<io::RandomAccessFile>& file,
std::shared_ptr<BatchFileReader>* reader);
std::shared_ptr<RecordBatchFileReader>* reader);

// If the file is embedded within some larger file or memory region, you can
// pass the absolute memory offset to the end of the file (which contains the
Expand All @@ -103,7 +103,7 @@ class ARROW_EXPORT BatchFileReader {
// @param file: the data source
// @param footer_offset: the position of the end of the Arrow "file"
static Status Open(const std::shared_ptr<io::RandomAccessFile>& file,
int64_t footer_offset, std::shared_ptr<BatchFileReader>* reader);
int64_t footer_offset, std::shared_ptr<RecordBatchFileReader>* reader);

/// The schema includes any dictionaries
std::shared_ptr<Schema> schema() const;
Expand All @@ -123,10 +123,10 @@ class ARROW_EXPORT BatchFileReader {
Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch);

private:
BatchFileReader();
RecordBatchFileReader();

class ARROW_NO_EXPORT BatchFileReaderImpl;
std::unique_ptr<BatchFileReaderImpl> impl_;
class ARROW_NO_EXPORT RecordBatchFileReaderImpl;
std::unique_ptr<RecordBatchFileReaderImpl> impl_;
};

// Generic read functions; does not copy data if the input supports zero copy reads
Expand Down Expand Up @@ -173,8 +173,8 @@ Status ARROW_EXPORT ReadTensor(
/// Backwards-compatibility for Arrow < 0.4.0
///
#ifndef ARROW_NO_DEPRECATED_API
using StreamReader = BatchStreamReader;
using FileReader = BatchFileReader;
using StreamReader = RecordBatchReader;
using FileReader = RecordBatchFileReader;
#endif

} // namespace ipc
Expand Down
12 changes: 7 additions & 5 deletions cpp/src/arrow/ipc/stream-to-file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@
#include "arrow/util/io-util.h"

namespace arrow {
namespace ipc {

// Converts a stream from stdin to a file written to standard out.
// A typical usage would be:
// $ <program that produces streaming output> | stream-to-file > file.arrow
Status ConvertToFile() {
std::shared_ptr<io::InputStream> input(new io::StdinStream);
std::shared_ptr<ipc::InputStreamReader> reader;
RETURN_NOT_OK(ipc::InputStreamReader::Open(input, &reader));
std::shared_ptr<RecordBatchStreamReader> reader;
RETURN_NOT_OK(RecordBatchStreamReader::Open(input, &reader));

io::StdoutStream sink;
std::shared_ptr<ipc::BatchFileWriter> writer;
RETURN_NOT_OK(ipc::BatchFileWriter::Open(&sink, reader->schema(), &writer));
std::shared_ptr<RecordBatchFileWriter> writer;
RETURN_NOT_OK(RecordBatchFileWriter::Open(&sink, reader->schema(), &writer));

std::shared_ptr<RecordBatch> batch;
while (true) {
Expand All @@ -46,10 +47,11 @@ Status ConvertToFile() {
return writer->Close();
}

} // namespace ipc
} // namespace arrow

int main(int argc, char** argv) {
arrow::Status status = arrow::ConvertToFile();
arrow::Status status = arrow::ipc::ConvertToFile();
if (!status.ok()) {
std::cerr << "Could not convert to file: " << status.ToString() << std::endl;
return 1;
Expand Down
Loading

0 comments on commit 04fa285

Please sign in to comment.