Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
fsaintjacques committed May 1, 2020
1 parent d2263be commit 80df00e
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 24 deletions.
11 changes: 5 additions & 6 deletions cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ FileSystemDatasetFactory::FileSystemDatasetFactory(
format_(std::move(format)),
options_(std::move(options)) {}

util::optional<util::string_view> FileSystemDatasetFactory::BaselessPath(
util::optional<util::string_view> FileSystemDatasetFactory::RemovePartitionBaseDir(
util::string_view path) {
const util::string_view partition_base_dir{options_.partition_base_dir};
return fs::internal::RemoveAncestor(partition_base_dir, path);
Expand Down Expand Up @@ -193,7 +193,7 @@ Result<std::shared_ptr<Schema>> FileSystemDatasetFactory::PartitionSchema() {

std::vector<util::string_view> relative_paths;
for (const auto& path : paths_) {
if (auto relative = BaselessPath(path)) {
if (auto relative = RemovePartitionBaseDir(path)) {
relative_paths.push_back(*relative);
}
}
Expand Down Expand Up @@ -241,12 +241,11 @@ Result<std::shared_ptr<Dataset>> FileSystemDatasetFactory::Finish(FinishOptions
ARROW_ASSIGN_OR_RAISE(partitioning, factory->Finish(schema));
}

FragmentVector fragments;
std::vector<std::shared_ptr<FileFragment>> fragments;
for (const auto& path : paths_) {
std::shared_ptr<Expression> partition = scalar(true);
if (auto relative = BaselessPath(path)) {
std::string path_string{*relative};
partition = partitioning->Parse(path_string).ValueOr(scalar(true));
if (auto relative = RemovePartitionBaseDir(path)) {
partition = partitioning->Parse(relative->to_string()).ValueOr(scalar(true));
}

ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({path, fs_}, partition));
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory {
FileSystemFactoryOptions options_;

private:
util::optional<util::string_view> BaselessPath(util::string_view path);
util::optional<util::string_view> RemovePartitionBaseDir(util::string_view path);
};

} // namespace dataset
Expand Down
13 changes: 4 additions & 9 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,11 @@ FileSystemDataset::FileSystemDataset(std::shared_ptr<Schema> schema,

Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make(
std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition,
std::shared_ptr<FileFormat> format, FragmentVector fragments) {
std::vector<std::shared_ptr<FileFragment>> file_fragments;
for (const auto& fragment : fragments) {
auto file_fragment = internal::checked_pointer_cast<FileFragment>(fragment);
file_fragments.push_back(std::move(file_fragment));
}

std::shared_ptr<FileFormat> format,
std::vector<std::shared_ptr<FileFragment>> fragments) {
return std::shared_ptr<FileSystemDataset>(
new FileSystemDataset(std::move(schema), std::move(root_partition),
std::move(format), std::move(file_fragments)));
std::move(format), std::move(fragments)));
}

Result<std::shared_ptr<Dataset>> FileSystemDataset::ReplaceSchema(
Expand Down Expand Up @@ -164,7 +159,7 @@ Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Write(
auto partition_base_dir = fs::internal::EnsureTrailingSlash(plan.partition_base_dir);
auto extension = "." + plan.format->type_name();

FragmentVector fragments;
std::vector<std::shared_ptr<FileFragment>> fragments;
for (size_t i = 0; i < plan.paths.size(); ++i) {
const auto& op = plan.fragment_or_partition_expressions[i];
if (util::holds_alternative<std::shared_ptr<Fragment>>(op)) {
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ class ARROW_DS_EXPORT FileSystemDataset : public Dataset {
///
/// \param[in] schema the schema of the dataset
/// \param[in] root_partition the partition expression of the dataset
/// \param[in] format the format of each FileFragment.
/// \param[in] fragments list of fragments to create the dataset from
///
/// Note that all fragment must be of `FileFragment` type. The type are
Expand All @@ -202,7 +203,8 @@ class ARROW_DS_EXPORT FileSystemDataset : public Dataset {
/// \return A constructed dataset.
static Result<std::shared_ptr<FileSystemDataset>> Make(
std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition,
std::shared_ptr<FileFormat> format, FragmentVector fragments);
std::shared_ptr<FileFormat> format,
std::vector<std::shared_ptr<FileFragment>> fragments);

/// \brief Write to a new format and filesystem location, preserving partitioning.
///
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/arrow/dataset/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ class ARROW_DS_EXPORT Expression {
/// This is a shortcut to check if the expression is neither null nor false.
bool IsSatisfiable() const { return !IsNull() && !Equals(false); }

bool IsSatisfiableWith(const std::shared_ptr<Expression> other) const {
/// Indicates if the expression is satisfiable given an other expression.
///
/// This behaves like IsSatisfiable, but it simplifies the current expression
/// with the given `other` information.
bool IsSatisfiableWith(const std::shared_ptr<Expression>& other) const {
return Assume(other)->IsSatisfiable();
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ struct MakeFileSystemDatasetMixin {
MakeFileSystem(infos);
auto format = std::make_shared<DummyFileFormat>();

FragmentVector fragments;
std::vector<std::shared_ptr<FileFragment>> fragments;
for (size_t i = 0; i < n_fragments; i++) {
const auto& info = infos[i];
if (!info.IsFile()) {
Expand Down
11 changes: 7 additions & 4 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,11 @@ cdef class FileSystemDataset(Dataset):
cdef:
FileInfo info
Expression expr
Fragment fragment
FileFragment fragment
vector[CFileInfo] c_file_infos
vector[shared_ptr[CExpression]] c_partitions
vector[shared_ptr[CFragment]] c_fragments
shared_ptr[CFileFragment] c_fragment
vector[shared_ptr[CFileFragment]] c_fragments
CResult[shared_ptr[CDataset]] result

# validate required arguments
Expand All @@ -296,7 +297,7 @@ cdef class FileSystemDataset(Dataset):
infos = filesystem.get_file_info(paths_or_selector)

if partitions is None:
partitions = [ScalarExpression(True) for _ in range(len(infos))]
partitions = [ScalarExpression(True)] * len(infos)

if len(infos) != len(partitions):
raise ValueError(
Expand All @@ -308,7 +309,9 @@ cdef class FileSystemDataset(Dataset):
if info.is_file:
fragment = format.make_fragment(info.path, filesystem,
partitions[i])
c_fragments.push_back(fragment.unwrap())
c_fragments.push_back(
static_pointer_cast[CFileFragment, CFragment](
fragment.unwrap()))

if root_partition is None:
root_partition = ScalarExpression(True)
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
shared_ptr[CSchema] schema,
shared_ptr[CExpression] source_partition,
shared_ptr[CFileFormat] format,
CFragmentVector fragments)
vector[shared_ptr[CFileFragment]] fragments)
c_string type()
vector[c_string] files()
const shared_ptr[CFileFormat]& format() const
Expand Down

0 comments on commit 80df00e

Please sign in to comment.