Skip to content

Commit

Permalink
ARROW-1408: [C++] IPC public API cleanup, refactoring. Add SerializeS…
Browse files Browse the repository at this point in the history
…chema, ReadSchema public APIs

This is mostly moving code around. In reviewing I recommend focusing on the public headers. There were a number of places where it is more consistent to use naked pointers versus shared_ptr. Also some constructors were returning shared_ptr to subclass, where it would be simpler for clients to return a pointer to base.

This includes ARROW-1376 and ARROW-1406

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #988 from wesm/ARROW-1408 and squashes the following commits:

b156767 [Wes McKinney] Fix up glib bindings, undeprecate some APIs
4bdebfa [Wes McKinney] Add serialize methods to RecordBatch, Schema. Test round trip
ef12e0f [Wes McKinney] Fix a valgrind warning
73d30c9 [Wes McKinney] Better comments
8597b96 [Wes McKinney] Remove API that was never intended to be public, unlikely to be used anywhere
122a759 [Wes McKinney] Refactoring sweep and cleanup of public IPC API. Move non-public APIs from metadata.h to metadata-internal.h and create message.h, dictionary.h
b646f96 [Wes McKinney] Set device in more places
  • Loading branch information
wesm committed Aug 25, 2017
1 parent 750b77d commit f50f2ea
Show file tree
Hide file tree
Showing 39 changed files with 1,191 additions and 678 deletions.
12 changes: 8 additions & 4 deletions c_glib/arrow-glib/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,15 @@ GArrowRecordBatchStreamReader *
garrow_record_batch_stream_reader_new(GArrowInputStream *stream,
GError **error)
{
using BaseType = arrow::ipc::RecordBatchReader;
using ReaderType = arrow::ipc::RecordBatchStreamReader;

auto arrow_input_stream = garrow_input_stream_get_raw(stream);
std::shared_ptr<arrow::ipc::RecordBatchStreamReader> arrow_reader;
auto status =
arrow::ipc::RecordBatchStreamReader::Open(arrow_input_stream, &arrow_reader);
std::shared_ptr<BaseType> arrow_reader;
auto status = ReaderType::Open(arrow_input_stream, &arrow_reader);
auto subtype = std::dynamic_pointer_cast<ReaderType>(arrow_reader);
if (garrow_error_check(error, status, "[record-batch-stream-reader][open]")) {
return garrow_record_batch_stream_reader_new_raw(&arrow_reader);
return garrow_record_batch_stream_reader_new_raw(&subtype);
} else {
return NULL;
}
Expand Down Expand Up @@ -354,6 +357,7 @@ garrow_record_batch_file_reader_new(GArrowSeekableInputStream *file,
GError **error)
{
auto arrow_random_access_file = garrow_seekable_input_stream_get_raw(file);

std::shared_ptr<arrow::ipc::RecordBatchFileReader> arrow_reader;
auto status =
arrow::ipc::RecordBatchFileReader::Open(arrow_random_access_file,
Expand Down
30 changes: 18 additions & 12 deletions c_glib/arrow-glib/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,17 @@ garrow_record_batch_stream_writer_new(GArrowOutputStream *sink,
GArrowSchema *schema,
GError **error)
{
using BaseType = arrow::ipc::RecordBatchWriter;
using WriterType = arrow::ipc::RecordBatchStreamWriter;

auto arrow_sink = garrow_output_stream_get_raw(sink).get();
std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> arrow_writer;
auto status =
arrow::ipc::RecordBatchStreamWriter::Open(arrow_sink,
garrow_schema_get_raw(schema),
&arrow_writer);
std::shared_ptr<BaseType> arrow_writer;
auto status = WriterType::Open(arrow_sink,
garrow_schema_get_raw(schema),
&arrow_writer);
auto subtype = std::dynamic_pointer_cast<WriterType>(arrow_writer);
if (garrow_error_check(error, status, "[record-batch-stream-writer][open]")) {
return garrow_record_batch_stream_writer_new_raw(&arrow_writer);
return garrow_record_batch_stream_writer_new_raw(&subtype);
} else {
return NULL;
}
Expand Down Expand Up @@ -259,14 +262,17 @@ garrow_record_batch_file_writer_new(GArrowOutputStream *sink,
GArrowSchema *schema,
GError **error)
{
using BaseType = arrow::ipc::RecordBatchWriter;
using WriterType = arrow::ipc::RecordBatchFileWriter;

auto arrow_sink = garrow_output_stream_get_raw(sink);
std::shared_ptr<arrow::ipc::RecordBatchFileWriter> arrow_writer;
auto status =
arrow::ipc::RecordBatchFileWriter::Open(arrow_sink.get(),
garrow_schema_get_raw(schema),
&arrow_writer);
std::shared_ptr<BaseType> arrow_writer;
auto status = WriterType::Open(arrow_sink.get(),
garrow_schema_get_raw(schema),
&arrow_writer);
auto subtype = std::dynamic_pointer_cast<WriterType>(arrow_writer);
if (garrow_error_check(error, status, "[record-batch-file-writer][open]")) {
return garrow_record_batch_file_writer_new_raw(&arrow_writer);
return garrow_record_batch_file_writer_new_raw(&subtype);
} else {
return NULL;
}
Expand Down
8 changes: 5 additions & 3 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,12 @@ include(BuildUtils)
# Compiler flags
############################################################

include(SetupCxxFlags)

if (ARROW_NO_DEPRECATED_API)
add_definitions(-DARROW_NO_DEPRECATED_API)
endif()

include(SetupCxxFlags)

############################################################
# Dependencies
############################################################
Expand Down Expand Up @@ -780,10 +780,12 @@ endif()

if (ARROW_IPC)
set(ARROW_SRCS ${ARROW_SRCS}
src/arrow/ipc/dictionary.cc
src/arrow/ipc/feather.cc
src/arrow/ipc/json.cc
src/arrow/ipc/json-internal.cc
src/arrow/ipc/metadata.cc
src/arrow/ipc/message.cc
src/arrow/ipc/metadata-internal.cc
src/arrow/ipc/reader.cc
src/arrow/ipc/writer.cc
)
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/gpu/cuda_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ namespace gpu {
} \
} while (0)

#define CUDADRV_RETURN_NOT_OK(STMT) \
do { \
CUresult ret = (STMT); \
if (ret != CUDA_SUCCESS) { \
std::stringstream ss; \
ss << "Cuda Driver API call in " << __FILE__ << " at line " << __LINE__ \
<< " failed: " << #STMT; \
return Status::IOError(ss.str()); \
} \
} while (0)

} // namespace gpu
} // namespace arrow

Expand Down
11 changes: 9 additions & 2 deletions cpp/src/arrow/gpu/cuda_memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ CudaHostBuffer::~CudaHostBuffer() { CUDA_DCHECK(cudaFreeHost(mutable_data_)); }
// CudaBufferReader

CudaBufferReader::CudaBufferReader(const std::shared_ptr<CudaBuffer>& buffer)
: io::BufferReader(buffer), cuda_buffer_(buffer) {}
: io::BufferReader(buffer), cuda_buffer_(buffer), gpu_number_(buffer->gpu_number()) {}

CudaBufferReader::~CudaBufferReader() {}

Status CudaBufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
nbytes = std::min(nbytes, size_ - position_);
CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number_));
CUDA_RETURN_NOT_OK(
cudaMemcpy(buffer, data_ + position_, nbytes, cudaMemcpyDeviceToHost));
*bytes_read = nbytes;
Expand All @@ -95,7 +96,10 @@ Status CudaBufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
// CudaBufferWriter

CudaBufferWriter::CudaBufferWriter(const std::shared_ptr<CudaBuffer>& buffer)
: io::FixedSizeBufferWriter(buffer), buffer_size_(0), buffer_position_(0) {}
: io::FixedSizeBufferWriter(buffer),
gpu_number_(buffer->gpu_number()),
buffer_size_(0),
buffer_position_(0) {}

CudaBufferWriter::~CudaBufferWriter() {}

Expand All @@ -104,6 +108,7 @@ Status CudaBufferWriter::Close() { return Flush(); }
Status CudaBufferWriter::Flush() {
if (buffer_size_ > 0 && buffer_position_ > 0) {
// Only need to flush when the write has been buffered
CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number_));
CUDA_RETURN_NOT_OK(cudaMemcpy(mutable_data_ + position_ - buffer_position_,
host_buffer_data_, buffer_position_,
cudaMemcpyHostToDevice));
Expand Down Expand Up @@ -132,6 +137,7 @@ Status CudaBufferWriter::Write(const uint8_t* data, int64_t nbytes) {
if (nbytes + buffer_position_ >= buffer_size_) {
// Reach end of buffer, write everything
RETURN_NOT_OK(Flush());
CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number_));
CUDA_RETURN_NOT_OK(
cudaMemcpy(mutable_data_ + position_, data, nbytes, cudaMemcpyHostToDevice));
} else {
Expand All @@ -141,6 +147,7 @@ Status CudaBufferWriter::Write(const uint8_t* data, int64_t nbytes) {
}
} else {
// Unbuffered write
CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number_));
CUDA_RETURN_NOT_OK(
cudaMemcpy(mutable_data_ + position_, data, nbytes, cudaMemcpyHostToDevice));
}
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/gpu/cuda_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ class ARROW_EXPORT CudaBufferReader : public io::BufferReader {
Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;

private:
// In case we need to access anything GPU-specific, like device number
std::shared_ptr<CudaBuffer> cuda_buffer_;
int gpu_number_;
};

/// \class CudaBufferWriter
Expand Down Expand Up @@ -132,6 +132,8 @@ class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter {
int64_t num_bytes_buffered() const { return buffer_position_; }

private:
int gpu_number_;

// Pinned host buffer for buffering writes on CPU before calling cudaMalloc
int64_t buffer_size_;
int64_t buffer_position_;
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/ipc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ add_custom_target(metadata_fbs DEPENDS ${FBS_OUTPUT_FILES})
# Headers: top level
install(FILES
api.h
dictionary.h
feather.h
json.h
metadata.h
message.h
reader.h
writer.h
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/ipc")
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/ipc/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
#ifndef ARROW_IPC_API_H
#define ARROW_IPC_API_H

#include "arrow/ipc/dictionary.h"
#include "arrow/ipc/feather.h"
#include "arrow/ipc/json.h"
#include "arrow/ipc/metadata.h"
#include "arrow/ipc/message.h"
#include "arrow/ipc/reader.h"
#include "arrow/ipc/writer.h"

Expand Down
85 changes: 85 additions & 0 deletions cpp/src/arrow/ipc/dictionary.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/ipc/dictionary.h"

#include <cstdint>
#include <memory>
#include <sstream>

#include "arrow/array.h"
#include "arrow/status.h"
#include "arrow/type.h"

namespace arrow {
namespace ipc {

DictionaryMemo::DictionaryMemo() {}

// Returns KeyError if dictionary not found
Status DictionaryMemo::GetDictionary(int64_t id,
std::shared_ptr<Array>* dictionary) const {
auto it = id_to_dictionary_.find(id);
if (it == id_to_dictionary_.end()) {
std::stringstream ss;
ss << "Dictionary with id " << id << " not found";
return Status::KeyError(ss.str());
}
*dictionary = it->second;
return Status::OK();
}

int64_t DictionaryMemo::GetId(const std::shared_ptr<Array>& dictionary) {
intptr_t address = reinterpret_cast<intptr_t>(dictionary.get());
auto it = dictionary_to_id_.find(address);
if (it != dictionary_to_id_.end()) {
// Dictionary already observed, return the id
return it->second;
} else {
int64_t new_id = static_cast<int64_t>(dictionary_to_id_.size());
dictionary_to_id_[address] = new_id;
id_to_dictionary_[new_id] = dictionary;
return new_id;
}
}

bool DictionaryMemo::HasDictionary(const std::shared_ptr<Array>& dictionary) const {
intptr_t address = reinterpret_cast<intptr_t>(dictionary.get());
auto it = dictionary_to_id_.find(address);
return it != dictionary_to_id_.end();
}

bool DictionaryMemo::HasDictionaryId(int64_t id) const {
auto it = id_to_dictionary_.find(id);
return it != id_to_dictionary_.end();
}

Status DictionaryMemo::AddDictionary(int64_t id,
const std::shared_ptr<Array>& dictionary) {
if (HasDictionaryId(id)) {
std::stringstream ss;
ss << "Dictionary with id " << id << " already exists";
return Status::KeyError(ss.str());
}
intptr_t address = reinterpret_cast<intptr_t>(dictionary.get());
id_to_dictionary_[id] = dictionary;
dictionary_to_id_[address] = id;
return Status::OK();
}

} // namespace ipc
} // namespace arrow
88 changes: 88 additions & 0 deletions cpp/src/arrow/ipc/dictionary.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Tools for dictionaries in IPC context

#ifndef ARROW_IPC_DICTIONARY_H
#define ARROW_IPC_DICTIONARY_H

#include <cstdint>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include "arrow/status.h"
#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"

namespace arrow {

class Array;
class Buffer;
class Field;

namespace io {

class InputStream;
class OutputStream;
class RandomAccessFile;

} // namespace io

namespace ipc {

using DictionaryMap = std::unordered_map<int64_t, std::shared_ptr<Array>>;
using DictionaryTypeMap = std::unordered_map<int64_t, std::shared_ptr<Field>>;

// Memoization data structure for handling shared dictionaries
class ARROW_EXPORT DictionaryMemo {
public:
DictionaryMemo();

// Returns KeyError if dictionary not found
Status GetDictionary(int64_t id, std::shared_ptr<Array>* dictionary) const;

/// Return id for dictionary, computing new id if necessary
int64_t GetId(const std::shared_ptr<Array>& dictionary);

bool HasDictionary(const std::shared_ptr<Array>& dictionary) const;
bool HasDictionaryId(int64_t id) const;

// Add a dictionary to the memo with a particular id. Returns KeyError if
// that dictionary already exists
Status AddDictionary(int64_t id, const std::shared_ptr<Array>& dictionary);

const DictionaryMap& id_to_dictionary() const { return id_to_dictionary_; }

int size() const { return static_cast<int>(id_to_dictionary_.size()); }

private:
// Dictionary memory addresses, to track whether a dictionary has been seen
// before
std::unordered_map<intptr_t, int64_t> dictionary_to_id_;

// Map of dictionary id to dictionary array
DictionaryMap id_to_dictionary_;

DISALLOW_COPY_AND_ASSIGN(DictionaryMemo);
};

} // namespace ipc
} // namespace arrow

#endif // ARROW_IPC_DICTIONARY_H
Loading

0 comments on commit f50f2ea

Please sign in to comment.