Skip to content

Commit

Permalink
Support Limit / Offset pushdown (lancedb#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
eddyxu committed Jul 24, 2022
1 parent 0530a37 commit 7303b0d
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 15 deletions.
3 changes: 3 additions & 0 deletions cpp/src/lance/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ add_library(
endian.h
filter.cc
filter.h
limit.cc
limit.h
pb.cc
pb.h
project.cc
Expand All @@ -37,3 +39,4 @@ target_include_directories(io SYSTEM PRIVATE ${Protobuf_INCLUDE_DIR})
add_dependencies(io format)

add_lance_test(filter_test)
add_lance_test(limit_test)
49 changes: 49 additions & 0 deletions cpp/src/lance/io/limit.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2022 Lance Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <fmt/format.h>

#include <algorithm>

#include "lance/io/limit.h"

namespace lance::io {

Limit::Limit(int64_t limit, int64_t offset) noexcept
: limit_(limit), offset_(offset) {
assert(offset >= 0);
assert(limit >= 0);
}

std::optional<std::tuple<int64_t, int64_t>> Limit::Apply(int64_t length) {
if (seen_ >= limit_ + offset_) {
/// Already read all the data.
return std::nullopt;
}
auto read_to = std::min(length, offset_ + limit_ - seen_);
auto offset = std::max(static_cast<int64_t>(0), offset_ - seen_);
seen_ += length;
if (seen_ < offset_) {
/// No data to read.
return std::make_tuple(0, 0);
}
assert(read_to >= offset);
return std::make_tuple(offset, read_to - offset);
}

std::string Limit::ToString() const {
return fmt::format("Limit(n={}, offset={})", limit_, offset_);
}

} // namespace lance::io
68 changes: 68 additions & 0 deletions cpp/src/lance/io/limit.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2022 Lance Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <arrow/array.h>
#include <arrow/result.h>

#include <memory>
#include <optional>
#include <string>
#include <tuple>

namespace lance::io {

/// Plan for Limit clause:
///
/// LIMIT value:int64 [OFFSET value:int64]
///
class Limit {
public:
Limit() = delete;

/// Construct a Limit Clause with limit, and optionally, offset.
explicit Limit(int64_t limit, int64_t offset = 0) noexcept;

/// Apply limit when reading a chunk.
///
/// \param chunk_length the length of a chunk to be read.
///
/// \return a tuple of `[offset, length]` that should be physically loaded
/// into memory. Return `[0, 0]` to skip this chunk.
/// Return `std::nullopt` to indicate the end of the iteration.
///
/// \code{.cpp}
/// auto chunk_length = GetChunkLength(chunk_id);
/// auto limit = Limit(20, 30);
/// auto offset_and_length = limit.Apply(chunk_length);
/// if (!offset_and_length) {
/// // stop
/// return;
/// }
/// auto [offset, length] = offset_and_length;
/// return infile->ReadAt(offset, length);
/// \endcode
std::optional<std::tuple<int64_t, int64_t>> Apply(int64_t chunk_length);

/// Debug String
std::string ToString() const;

private:
int64_t limit_ = 0;
int64_t offset_ = 0;
int64_t seen_ = 0;
};

} // namespace lance::io
39 changes: 39 additions & 0 deletions cpp/src/lance/io/limit_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2022 Lance Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "lance/io/limit.h"

#include <fmt/format.h>

#include <catch2/catch_test_macros.hpp>
#include <numeric>

TEST_CASE("LIMIT 100") {
auto limit = lance::io::Limit(100);
CHECK(limit.Apply(10).value() == std::make_tuple(0, 10));
CHECK(limit.Apply(80).value() == std::make_tuple(0, 80));
CHECK(limit.Apply(20).value() == std::make_tuple(0, 10));
// Limit already reached.
CHECK(limit.Apply(30) == std::nullopt);
}

TEST_CASE("LIMIT 10 OFFSET 20") {
auto limit = lance::io::Limit(10, 20);
CHECK(limit.Apply(10).value() == std::make_tuple(0, 0));
CHECK(limit.Apply(5).value() == std::make_tuple(0, 0));
auto val = limit.Apply(20).value();
INFO("Limit::Apply(20): offset=" << std::get<0>(val) << " len=" << std::get<1>(val));
CHECK(val == std::make_tuple(5, 10));
CHECK(limit.Apply(5) == std::nullopt);
}
28 changes: 24 additions & 4 deletions cpp/src/lance/io/project.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,28 @@

#include "lance/arrow/utils.h"
#include "lance/io/filter.h"
#include "lance/io/limit.h"
#include "lance/io/reader.h"

namespace lance::io {

Project::Project(std::shared_ptr<format::Schema> dataset_schema,
std::shared_ptr<format::Schema> projected_schema,
std::shared_ptr<format::Schema> scan_schema,
std::unique_ptr<Filter> filter)
std::unique_ptr<Filter> filter,
std::optional<int32_t> limit,
int32_t offset)
: dataset_schema_(dataset_schema),
projected_schema_(projected_schema),
scan_schema_(scan_schema),
filter_(std::move(filter)) {}
filter_(std::move(filter)),
limit_(limit.has_value() ? new Limit(limit.value(), offset) : nullptr) {}

::arrow::Result<std::unique_ptr<Project>> Project::Make(
std::shared_ptr<format::Schema> schema,
std::shared_ptr<::arrow::dataset::ScanOptions> scan_options) {
std::shared_ptr<::arrow::dataset::ScanOptions> scan_options,
std::optional<int32_t> limit,
int32_t offset) {
ARROW_ASSIGN_OR_RAISE(auto filter, Filter::Make(*schema, scan_options->filter));
auto projected_arrow_schema = scan_options->projected_schema;
if (projected_arrow_schema->num_fields() == 0) {
Expand All @@ -49,9 +55,11 @@ ::arrow::Result<std::unique_ptr<Project>> Project::Make(
ARROW_ASSIGN_OR_RAISE(scan_schema, projected_schema->Exclude(filter->schema()));
}
return std::unique_ptr<Project>(
new Project(schema, projected_schema, scan_schema, std::move(filter)));
new Project(schema, projected_schema, scan_schema, std::move(filter), limit, offset));
}

bool Project::CanParallelScan() const { return limit_.operator bool(); }

::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> Project::Execute(
std::shared_ptr<FileReader> reader, int32_t chunk_idx) {
if (filter_) {
Expand All @@ -60,6 +68,18 @@ ::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> Project::Execute(
return result.status();
}
auto [indices, values] = result.ValueUnsafe();
assert(indices->length() == values->num_rows());
if (limit_) {
auto offset_and_len = limit_->Apply(indices->length());
if (!offset_and_len.has_value()) {
/// Indicate the end of iteration.
return nullptr;
}
auto [offset, len] = offset_and_len.value();
indices =
std::static_pointer_cast<decltype(indices)::element_type>(indices->Slice(offset, len));
values = values->Slice(offset, len);
}
ARROW_ASSIGN_OR_RAISE(auto batch, reader->ReadChunk(*scan_schema_, chunk_idx, indices));
assert(values->num_rows() == batch->num_rows());
ARROW_ASSIGN_OR_RAISE(auto merged, lance::arrow::MergeRecordBatches(values, batch));
Expand Down
29 changes: 26 additions & 3 deletions cpp/src/lance/io/project.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <arrow/result.h>

#include <memory>
#include <optional>

namespace lance::format {
class Schema;
Expand All @@ -28,33 +29,55 @@ namespace lance::io {

class FileReader;
class Filter;
class Limit;

/// Projection over dataset.
/// \brief Projection over dataset.
///
class Project {
public:
Project() = delete;

/// Make a Project from the full dataset schema and scan options.
///
/// \param schema dataset schema.
/// \param scan_options Arrow scan options.
/// \param limit limit number of records to return. Optional.
/// \param offset offset to fetch the record. Optional.
/// \return Project if success. Returns the error status otherwise.
///
static ::arrow::Result<std::unique_ptr<Project>> Make(
std::shared_ptr<format::Schema> schema,
std::shared_ptr<::arrow::dataset::ScanOptions> scan_options);
std::shared_ptr<::arrow::dataset::ScanOptions> scan_options,
std::optional<int32_t> limit = std::nullopt,
int32_t offset = 0);

::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> Execute(std::shared_ptr<FileReader> reader,
int32_t chunk_idx);

/// \brief Can the plan support parallel scan.
///
/// \note Once Projection has limit / offset clause, parallel reads are limited.
///
/// \todo GH-43. should we remove this LIMIT / OFFSET logic, and the decision about parallel scan
/// out of the format spec?
bool CanParallelScan() const;

private:
Project(std::shared_ptr<format::Schema> dataset_schema,
std::shared_ptr<format::Schema> projected_schema,
std::shared_ptr<format::Schema> scan_schema,
std::unique_ptr<Filter> filter);
std::unique_ptr<Filter> filter,
std::optional<int32_t> limit = std::nullopt,
int32_t offset = 0);

std::shared_ptr<format::Schema> dataset_schema_;
std::shared_ptr<format::Schema> projected_schema_;
/// scan_schema_ equals to projected_schema_ - filters_.schema()
/// It includes the columns that are not read from the filters yet.
std::shared_ptr<format::Schema> scan_schema_;
std::unique_ptr<Filter> filter_;

std::unique_ptr<Limit> limit_;
};

} // namespace lance::io
28 changes: 21 additions & 7 deletions cpp/src/lance/io/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,45 +22,59 @@

#include <algorithm>
#include <future>
#include <set>
#include <tuple>

#include "lance/format/metadata.h"
#include "lance/format/schema.h"
#include "lance/io/filter.h"
#include "lance/io/limit.h"
#include "lance/io/project.h"
#include "lance/io/reader.h"

namespace lance::io {

Scanner::Scanner(std::shared_ptr<FileReader> reader,
std::shared_ptr<arrow::dataset::ScanOptions> options) noexcept
: reader_(reader), options_(options) {}
std::shared_ptr<arrow::dataset::ScanOptions> options,
std::optional<int64_t> limit,
int64_t offset) noexcept
: reader_(reader),
options_(options),
limit_(limit),
offset_(offset),
max_queue_size_(static_cast<std::size_t>(options->batch_readahead)) {}

Scanner::Scanner(const Scanner& other) noexcept
: reader_(other.reader_),
options_(other.options_),
limit_(other.limit_),
offset_(other.offset_),
schema_(other.schema_),
project_(other.project_),
current_chunk_(other.current_chunk_) {}
current_chunk_(other.current_chunk_),
max_queue_size_(other.max_queue_size_) {}

Scanner::Scanner(Scanner&& other) noexcept
: reader_(std::move(other.reader_)),
options_(std::move(other.options_)),
limit_(other.limit_),
offset_(other.offset_),
schema_(std::move(other.schema_)),
project_(std::move(other.project_)),
current_chunk_(other.current_chunk_),
max_queue_size_(other.max_queue_size_),
q_(std::move(other.q_)) {}

::arrow::Status Scanner::Open() {
schema_ = std::make_shared<lance::format::Schema>(reader_->schema());
ARROW_ASSIGN_OR_RAISE(project_, Project::Make(schema_, options_));
ARROW_ASSIGN_OR_RAISE(project_, Project::Make(schema_, options_, limit_, offset_));
if (!project_->CanParallelScan()) {
max_queue_size_ = 1;
}
return ::arrow::Status::OK();
}

void Scanner::AddPrefetchTask() {
while (q_.size() < static_cast<std::size_t>(options_->batch_readahead) &&
current_chunk_ < reader_->metadata().num_chunks()) {
while (q_.size() < max_queue_size_ && current_chunk_ < reader_->metadata().num_chunks()) {
auto chunk_id = current_chunk_++;
auto f = std::async(
[&](int32_t chunk_id) {
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/lance/io/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ class Scanner {
public:
/// Constructor.
Scanner(std::shared_ptr<FileReader> reader,
std::shared_ptr<::arrow::dataset::ScanOptions> options) noexcept;
std::shared_ptr<::arrow::dataset::ScanOptions> options,
std::optional<int64_t> limit = std::nullopt,
int64_t offset = 0) noexcept;

/// Copy constructor.
Scanner(const Scanner& other) noexcept;
Expand All @@ -65,11 +67,14 @@ class Scanner {

std::shared_ptr<FileReader> reader_;
std::shared_ptr<::arrow::dataset::ScanOptions> options_;
std::optional<int64_t> limit_ = std::nullopt;
int64_t offset_ = 0;
std::shared_ptr<lance::format::Schema> schema_;
/// Projection over the dataset.
std::shared_ptr<Project> project_;

int current_chunk_ = 0;
std::size_t max_queue_size_ = 1;
std::queue<std::future<::arrow::Result<std::shared_ptr<::arrow::RecordBatch>>>> q_;

void AddPrefetchTask();
Expand Down

0 comments on commit 7303b0d

Please sign in to comment.