Skip to content

Commit

Permalink
GH-36674: [C++] Use anonymous namespace in arrow/ipc/reader.cc (#36937)
Browse files Browse the repository at this point in the history
### Rationale for this change

The following types and functions should be in file scope:

IpcReadConext, BatchDataReadRequest, ArrayLoader,
DecompressBuffer(s), LoadRecordBatch*, GetCompression*,
ReadRecordBatchInternal,
GetInclusionMaskAndOutSchmea, UnpackSchemaMessage,
ReadDictionary, AsyncRecordBatchStreamReaderImpl.

### What changes are included in this PR?

Use anonymous namespace around the aforementioned definitions.

### Are these changes tested?

No

### Are there any user-facing changes?

No

Lead-authored-by: pegasas <616672335@qq.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
pegasas and pitrou committed Aug 9, 2023
1 parent d3ccc83 commit 1a00fec
Showing 1 changed file with 40 additions and 41 deletions.
81 changes: 40 additions & 41 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ Status InvalidMessageType(MessageType expected, MessageType actual) {
} \
} while (0)

} // namespace

// ----------------------------------------------------------------------
// Record batch read path

Expand Down Expand Up @@ -634,36 +632,14 @@ Status GetCompressionExperimental(const flatbuf::Message* message,
return Status::OK();
}

static Status ReadContiguousPayload(io::InputStream* file,
std::unique_ptr<Message>* message) {
Status ReadContiguousPayload(io::InputStream* file, std::unique_ptr<Message>* message) {
ARROW_ASSIGN_OR_RAISE(*message, ReadMessage(file));
if (*message == nullptr) {
return Status::Invalid("Unable to read metadata at offset");
}
return Status::OK();
}

Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
const std::shared_ptr<Schema>& schema, const DictionaryMemo* dictionary_memo,
const IpcReadOptions& options, io::InputStream* file) {
std::unique_ptr<Message> message;
RETURN_NOT_OK(ReadContiguousPayload(file, &message));
CHECK_HAS_BODY(*message);
ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
return ReadRecordBatch(*message->metadata(), schema, dictionary_memo, options,
reader.get());
}

Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
const Message& message, const std::shared_ptr<Schema>& schema,
const DictionaryMemo* dictionary_memo, const IpcReadOptions& options) {
CHECK_MESSAGE_TYPE(MessageType::RECORD_BATCH, message.type());
CHECK_HAS_BODY(message);
ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message.body()));
return ReadRecordBatch(*message.metadata(), schema, dictionary_memo, options,
reader.get());
}

Result<RecordBatchWithMetadata> ReadRecordBatchInternal(
const Buffer& metadata, const std::shared_ptr<Schema>& schema,
const std::vector<bool>& inclusion_mask, IpcReadContext& context,
Expand Down Expand Up @@ -764,22 +740,6 @@ Status UnpackSchemaMessage(const Message& message, const IpcReadOptions& options
out_schema, field_inclusion_mask, swap_endian);
}

Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
const Buffer& metadata, const std::shared_ptr<Schema>& schema,
const DictionaryMemo* dictionary_memo, const IpcReadOptions& options,
io::RandomAccessFile* file) {
std::shared_ptr<Schema> out_schema;
// Empty means do not use
std::vector<bool> inclusion_mask;
IpcReadContext context(const_cast<DictionaryMemo*>(dictionary_memo), options, false);
RETURN_NOT_OK(GetInclusionMaskAndOutSchema(schema, context.options.included_fields,
&inclusion_mask, &out_schema));
ARROW_ASSIGN_OR_RAISE(
auto batch_and_custom_metadata,
ReadRecordBatchInternal(metadata, schema, inclusion_mask, context, file));
return batch_and_custom_metadata.batch;
}

Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context,
DictionaryKind* kind, io::RandomAccessFile* file) {
const flatbuf::Message* message = nullptr;
Expand Down Expand Up @@ -851,6 +811,45 @@ Status ReadDictionary(const Message& message, const IpcReadContext& context,
return ReadDictionary(*message.metadata(), context, kind, reader.get());
}

} // namespace

Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
const Buffer& metadata, const std::shared_ptr<Schema>& schema,
const DictionaryMemo* dictionary_memo, const IpcReadOptions& options,
io::RandomAccessFile* file) {
std::shared_ptr<Schema> out_schema;
// Empty means do not use
std::vector<bool> inclusion_mask;
IpcReadContext context(const_cast<DictionaryMemo*>(dictionary_memo), options, false);
RETURN_NOT_OK(GetInclusionMaskAndOutSchema(schema, context.options.included_fields,
&inclusion_mask, &out_schema));
ARROW_ASSIGN_OR_RAISE(
auto batch_and_custom_metadata,
ReadRecordBatchInternal(metadata, schema, inclusion_mask, context, file));
return batch_and_custom_metadata.batch;
}

Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
const std::shared_ptr<Schema>& schema, const DictionaryMemo* dictionary_memo,
const IpcReadOptions& options, io::InputStream* file) {
std::unique_ptr<Message> message;
RETURN_NOT_OK(ReadContiguousPayload(file, &message));
CHECK_HAS_BODY(*message);
ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
return ReadRecordBatch(*message->metadata(), schema, dictionary_memo, options,
reader.get());
}

Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
const Message& message, const std::shared_ptr<Schema>& schema,
const DictionaryMemo* dictionary_memo, const IpcReadOptions& options) {
CHECK_MESSAGE_TYPE(MessageType::RECORD_BATCH, message.type());
CHECK_HAS_BODY(message);
ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message.body()));
return ReadRecordBatch(*message.metadata(), schema, dictionary_memo, options,
reader.get());
}

// Streaming format decoder
class StreamDecoderInternal : public MessageDecoderListener {
public:
Expand Down

0 comments on commit 1a00fec

Please sign in to comment.