Skip to content

Commit

Permalink
ARROW-13572: [C++][Datasets] Add ORC support to Datasets API
Browse files Browse the repository at this point in the history
Closes #10991 from jorisvandenbossche/ARROW-13572-dataset-orc

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
  • Loading branch information
jorisvandenbossche committed Sep 28, 2021
1 parent 2baf1a0 commit 1d7bc3e
Show file tree
Hide file tree
Showing 16 changed files with 591 additions and 18 deletions.
8 changes: 8 additions & 0 deletions cpp/src/arrow/dataset/CMakeLists.txt
Expand Up @@ -35,6 +35,10 @@ if(ARROW_CSV)
set(ARROW_DATASET_SRCS ${ARROW_DATASET_SRCS} file_csv.cc)
endif()

if(ARROW_ORC)
set(ARROW_DATASET_SRCS ${ARROW_DATASET_SRCS} file_orc.cc)
endif()

if(ARROW_PARQUET)
set(ARROW_DATASET_LINK_STATIC ${ARROW_DATASET_LINK_STATIC} parquet_static)
set(ARROW_DATASET_LINK_SHARED ${ARROW_DATASET_LINK_SHARED} parquet_shared)
Expand Down Expand Up @@ -116,6 +120,10 @@ if(ARROW_CSV)
add_arrow_dataset_test(file_csv_test)
endif()

if(ARROW_ORC)
add_arrow_dataset_test(file_orc_test)
endif()

if(ARROW_PARQUET)
add_arrow_dataset_test(file_parquet_test)
endif()
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/api.h
Expand Up @@ -25,5 +25,6 @@
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/file_csv.h"
#include "arrow/dataset/file_ipc.h"
#include "arrow/dataset/file_orc.h"
#include "arrow/dataset/file_parquet.h"
#include "arrow/dataset/scanner.h"
182 changes: 182 additions & 0 deletions cpp/src/arrow/dataset/file_orc.cc
@@ -0,0 +1,182 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/dataset/file_orc.h"

#include <memory>

#include "arrow/adapters/orc/adapter.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/scanner.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"

namespace arrow {

using internal::checked_pointer_cast;

namespace dataset {

namespace {

inline Result<std::unique_ptr<arrow::adapters::orc::ORCFileReader>> OpenReader(
const FileSource& source,
const std::shared_ptr<ScanOptions>& scan_options = nullptr) {
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());

arrow::MemoryPool* pool;
if (scan_options) {
pool = scan_options->pool;
} else {
pool = default_memory_pool();
}

auto reader = arrow::adapters::orc::ORCFileReader::Open(std::move(input), pool);
auto status = reader.status();
if (!status.ok()) {
return status.WithMessage("Could not open ORC input source '", source.path(),
"': ", status.message());
}
return reader;
}

/// \brief A ScanTask backed by an ORC file.
class OrcScanTask : public ScanTask {
public:
OrcScanTask(std::shared_ptr<FileFragment> fragment,
std::shared_ptr<ScanOptions> options)
: ScanTask(std::move(options), fragment), source_(fragment->source()) {}

Result<RecordBatchIterator> Execute() override {
struct Impl {
static Result<RecordBatchIterator> Make(const FileSource& source,
const FileFormat& format,
const ScanOptions& scan_options) {
ARROW_ASSIGN_OR_RAISE(
auto reader, OpenReader(source, std::make_shared<ScanOptions>(scan_options)));
int num_stripes = reader->NumberOfStripes();
return RecordBatchIterator(Impl{std::move(reader), 0, num_stripes});
}

Result<std::shared_ptr<RecordBatch>> Next() {
if (i_ == num_stripes_) {
return nullptr;
}
std::shared_ptr<RecordBatch> batch;
// TODO (https://issues.apache.org/jira/browse/ARROW-13797)
// determine included fields from options_->MaterializedFields() to
// optimize the column selection (see _column_index_lookup in python
// orc.py for custom logic)
// std::vector<int> included_fields;
// TODO (https://issues.apache.org/jira/browse/ARROW-14153)
// pass scan_options_->batch_size
return reader_->ReadStripe(i_++);
}

std::unique_ptr<arrow::adapters::orc::ORCFileReader> reader_;
int i_;
int num_stripes_;
};

return Impl::Make(source_, *checked_pointer_cast<FileFragment>(fragment_)->format(),
*options_);
}

private:
FileSource source_;
};

class OrcScanTaskIterator {
public:
static Result<ScanTaskIterator> Make(std::shared_ptr<ScanOptions> options,
std::shared_ptr<FileFragment> fragment) {
return ScanTaskIterator(OrcScanTaskIterator(std::move(options), std::move(fragment)));
}

Result<std::shared_ptr<ScanTask>> Next() {
if (once_) {
// Iteration is done.
return nullptr;
}

once_ = true;
return std::shared_ptr<ScanTask>(new OrcScanTask(fragment_, options_));
}

private:
OrcScanTaskIterator(std::shared_ptr<ScanOptions> options,
std::shared_ptr<FileFragment> fragment)
: options_(std::move(options)), fragment_(std::move(fragment)) {}

bool once_ = false;
std::shared_ptr<ScanOptions> options_;
std::shared_ptr<FileFragment> fragment_;
};

} // namespace

Result<bool> OrcFileFormat::IsSupported(const FileSource& source) const {
RETURN_NOT_OK(source.Open().status());
return OpenReader(source).ok();
}

Result<std::shared_ptr<Schema>> OrcFileFormat::Inspect(const FileSource& source) const {
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source));
return reader->ReadSchema();
}

Result<ScanTaskIterator> OrcFileFormat::ScanFile(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& fragment) const {
return OrcScanTaskIterator::Make(options, fragment);
}

Future<util::optional<int64_t>> OrcFileFormat::CountRows(
const std::shared_ptr<FileFragment>& file, compute::Expression predicate,
const std::shared_ptr<ScanOptions>& options) {
if (ExpressionHasFieldRefs(predicate)) {
return Future<util::optional<int64_t>>::MakeFinished(util::nullopt);
}
auto self = checked_pointer_cast<OrcFileFormat>(shared_from_this());
return DeferNotOk(options->io_context.executor()->Submit(
[self, file]() -> Result<util::optional<int64_t>> {
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(file->source()));
return reader->NumberOfRows();
}));
}

// //
// // OrcFileWriter, OrcFileWriteOptions
// //

std::shared_ptr<FileWriteOptions> OrcFileFormat::DefaultWriteOptions() {
// TODO (https://issues.apache.org/jira/browse/ARROW-13796)
return nullptr;
}

Result<std::shared_ptr<FileWriter>> OrcFileFormat::MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options,
fs::FileLocator destination_locator) const {
// TODO (https://issues.apache.org/jira/browse/ARROW-13796)
return Status::NotImplemented("ORC writer not yet implemented.");
}

} // namespace dataset
} // namespace arrow
79 changes: 79 additions & 0 deletions cpp/src/arrow/dataset/file_orc.h
@@ -0,0 +1,79 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// This API is EXPERIMENTAL.

#pragma once

#include <memory>
#include <string>

#include "arrow/dataset/file_base.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/io/type_fwd.h"
#include "arrow/result.h"

namespace arrow {
namespace dataset {

/// \addtogroup dataset-file-formats
///
/// @{

constexpr char kOrcTypeName[] = "orc";

/// \brief A FileFormat implementation that reads from and writes to ORC files
class ARROW_DS_EXPORT OrcFileFormat : public FileFormat {
public:
std::string type_name() const override { return kOrcTypeName; }

bool Equals(const FileFormat& other) const override {
return type_name() == other.type_name();
}

Result<bool> IsSupported(const FileSource& source) const override;

/// \brief Return the schema of the file if possible.
Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override;

/// \brief Open a file for scanning
Result<ScanTaskIterator> ScanFile(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& fragment) const override;

// TODO add async version (https://issues.apache.org/jira/browse/ARROW-13795)
// Result<RecordBatchGenerator> ScanBatchesAsync(
// const std::shared_ptr<ScanOptions>& options,
// const std::shared_ptr<FileFragment>& file) const override;

Future<util::optional<int64_t>> CountRows(
const std::shared_ptr<FileFragment>& file, compute::Expression predicate,
const std::shared_ptr<ScanOptions>& options) override;

Result<std::shared_ptr<FileWriter>> MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options,
fs::FileLocator destination_locator) const override;

std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override;
};

/// @}

} // namespace dataset
} // namespace arrow
85 changes: 85 additions & 0 deletions cpp/src/arrow/dataset/file_orc_test.cc
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/dataset/file_orc.h"

#include <memory>
#include <utility>

#include "arrow/adapters/orc/adapter.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/discovery.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/partition.h"
#include "arrow/dataset/scanner_internal.h"
#include "arrow/dataset/test_util.h"
#include "arrow/io/memory.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/util.h"

namespace arrow {
namespace dataset {

class OrcFormatHelper {
public:
using FormatType = OrcFileFormat;
static Result<std::shared_ptr<Buffer>> Write(RecordBatchReader* reader) {
ARROW_ASSIGN_OR_RAISE(auto sink, io::BufferOutputStream::Create());
ARROW_ASSIGN_OR_RAISE(auto writer, adapters::orc::ORCFileWriter::Open(sink.get()));
std::shared_ptr<Table> table;
RETURN_NOT_OK(reader->ReadAll(&table));
writer->Write(*table);
RETURN_NOT_OK(writer->Close());
return sink->Finish();
}

static std::shared_ptr<OrcFileFormat> MakeFormat() {
return std::make_shared<OrcFileFormat>();
}
};

class TestOrcFileFormat : public FileFormatFixtureMixin<OrcFormatHelper> {};

// TEST_F(TestOrcFileFormat, WriteRecordBatchReader) { TestWrite(); }

TEST_F(TestOrcFileFormat, InspectFailureWithRelevantError) {
TestInspectFailureWithRelevantError(StatusCode::IOError, "ORC");
}
TEST_F(TestOrcFileFormat, Inspect) { TestInspect(); }
TEST_F(TestOrcFileFormat, IsSupported) { TestIsSupported(); }
TEST_F(TestOrcFileFormat, CountRows) { TestCountRows(); }

// TODO add TestOrcFileSystemDataset if write support is added

class TestOrcFileFormatScan : public FileFormatScanMixin<OrcFormatHelper> {};

TEST_P(TestOrcFileFormatScan, ScanRecordBatchReader) { TestScan(); }
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
TestScanWithVirtualColumn();
}
// TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderProjected) { TestScanProjected(); }
// TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderProjectedMissingCols) {
// TestScanProjectedMissingCols();
// }
INSTANTIATE_TEST_SUITE_P(TestScan, TestOrcFileFormatScan,
::testing::ValuesIn(TestFormatParams::Values()),
TestFormatParams::ToTestNameString);

} // namespace dataset
} // namespace arrow

0 comments on commit 1d7bc3e

Please sign in to comment.