Skip to content

Commit

Permalink
ARROW-7608: [C++][Dataset] Add the ability to list files in FileSyste…
Browse files Browse the repository at this point in the history
…mSource

It also enables instrospection in R of dataset classes.

Closes #6374 from fsaintjacques/ARROW-7608-dataset-properties and squashes the following commits:

36a17cd <Benjamin Kietzman> normalize tempdir paths on Windows
124cd1b <Neal Richardson> Refactor new R tests so that they might pass on windows
2106188 <Neal Richardson> Move ..dispatch to instance methods
fefe216 <François Saint-Jacques> ARROW-7608:  Add the ability to list files in FileSystemSource

Lead-authored-by: François Saint-Jacques <fsaintjacques@gmail.com>
Co-authored-by: Neal Richardson <neal.p.richardson@gmail.com>
Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Signed-off-by: Neal Richardson <neal.p.richardson@gmail.com>
  • Loading branch information
3 people committed Feb 22, 2020
1 parent c83e82d commit f03c844
Show file tree
Hide file tree
Showing 12 changed files with 267 additions and 42 deletions.
13 changes: 13 additions & 0 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ Result<std::shared_ptr<Source>> FileSystemSource::Make(
std::move(filesystem), std::move(forest), std::move(partitions)));
}

std::vector<std::string> FileSystemSource::files() const {
std::vector<std::string> files;

DCHECK_OK(forest_.Visit([&](fs::PathForest::Ref ref) {
if (ref.stats().IsFile()) {
files.push_back(ref.stats().path());
}
return Status::OK();
}));

return files;
}

std::string FileSystemSource::ToString() const {
std::string repr = "FileSystemSource:";

Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,12 @@ class ARROW_DS_EXPORT FileSystemSource : public Source {

std::string type_name() const override { return "filesystem"; }

std::string ToString() const;

const std::shared_ptr<FileFormat>& format() const { return format_; }

std::vector<std::string> files() const;

std::string ToString() const;

protected:
FragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,12 @@ TEST_F(TestFileSystemSource, Basic) {

MakeSource({fs::File("a"), fs::File("b"), fs::File("c")});
AssertFragmentsAreFromPath(source_->GetFragments(options_), {"a", "b", "c"});
AssertFilesAre(source_, {"a", "b", "c"});

// Should not create fragment from directories.
MakeSource({fs::Dir("A"), fs::Dir("A/B"), fs::File("A/a"), fs::File("A/B/b")});
AssertFragmentsAreFromPath(source_->GetFragments(options_), {"A/a", "A/B/b"});
AssertFilesAre(source_, {"A/a", "A/B/b"});
}

TEST_F(TestFileSystemSource, RootPartitionPruning) {
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/dataset/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,12 @@ class TestFileSystemSource : public ::testing::Test {
std::shared_ptr<ScanOptions> options_ = ScanOptions::Make(schema({}));
};

void AssertFilesAre(const std::shared_ptr<Source>& source,
std::vector<std::string> expected) {
auto fs_source = internal::checked_cast<FileSystemSource*>(source.get());
EXPECT_THAT(fs_source->files(), testing::UnorderedElementsAreArray(expected));
}

void AssertFragmentsAreFromPath(FragmentIterator it, std::vector<std::string> expected) {
std::vector<std::string> actual;

Expand Down
6 changes: 6 additions & 0 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,12 @@ cdef class FileSystemSource(Source):
Source.init(self, sp)
self.filesystem_source = <CFileSystemSource*> sp.get()

@property
def files(self):
"""List of the files"""
cdef vector[c_string] files = self.filesystem_source.files()
return [frombytes(f) for f in files]


cdef class DatasetFactory:
"""
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
CFileStatsVector stats,
CExpressionVector partitions)
c_string type()
vector[c_string] files()
shared_ptr[CFragmentIterator] GetFragments(
shared_ptr[CScanOptions] options)

Expand Down
3 changes: 2 additions & 1 deletion python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def test_filesystem_source(mockfs):
filesystem=mockfs, partitions=partitions,
file_format=file_format)
assert source.partition_expression.equals(root_partition)
assert set(source.files) == set(paths)


def test_dataset(dataset):
Expand Down Expand Up @@ -455,7 +456,7 @@ def test_file_system_factory(mockfs, paths_or_selector):
assert factory.root_partition.equals(ds.ScalarExpression(True))

source = factory.finish()
assert isinstance(source, ds.Source)
assert isinstance(source, ds.FileSystemSource)

dataset = ds.Dataset([source], inspected_schema)
assert len(list(dataset.scan())) == 2
Expand Down
24 changes: 22 additions & 2 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 74 additions & 19 deletions r/R/dataset.R
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ Dataset <- R6Class("Dataset", inherit = Object,
#' @description
#' Return the Dataset's `Schema`
schema = function() shared_ptr(Schema, dataset___Dataset__schema(self)),
metadata = function() self$schema$metadata
metadata = function() self$schema$metadata,
#' @description
#' Return the Dataset's `Source`s
sources = function() {
map(dataset___Dataset__sources(self), ~shared_ptr(Source, .)$..dispatch())
}
)
)
Dataset$create <- function(sources, schema) {
Expand Down Expand Up @@ -138,7 +143,7 @@ DatasetFactory$create <- function(sources) {
#' fragments contained in it, and declare a partitioning.
#' `FileSystemSourceFactory` is a subclass of `SourceFactory` for
#' discovering files in the local file system, the only currently supported
#' file system.
#' file system, it constructs an instance of `FileSystemSource`.
#'
#' In general, you'll deal with `SourceFactory` rather than `Source` itself.
#' @section Factory:
Expand All @@ -153,24 +158,57 @@ DatasetFactory$create <- function(sources) {
#' Currently supported options are "parquet", "arrow", and "ipc" (an alias for
#' the Arrow file format)
#' @section Methods:
#' `Source` has one defined method:
#' `Source` and its subclasses have the following method:
#'
#' - `$schema`: Active binding, returns the [Schema] of the `Source`
#'
#' `FileSystemSource` has the following methods:
#'
#' - `$files`: Active binding, returns the files of the `FileSystemSource`
#' - `$format`: Active binding, returns the [FileFormat] of the `FileSystemSource`
#'
#' `SourceFactory` and its subclasses have the following methods:
#'
#' - `$Inspect()`: Walks the files in the directory and returns a common [Schema]
#' - `$Finish(schema)`: Returns a `Source`
#' @rdname Source
#' @name Source
#' @seealso [Dataset] for what do do with a `Source`
#' @seealso [Dataset] for what to do with a `Source`
#' @export
Source <- R6Class("Source", inherit = Object,
public = list(
..dispatch = function() {
if (self$type == "filesystem") {
shared_ptr(FileSystemSource, self$pointer())
} else {
self
}
}
),
active = list(
#' @description
#' Return the Source's `Schema`
schema = function() {
shared_ptr(Schema, dataset___Source__schema(self))
},
#' @description
#' Return the Source's type.
type = function() dataset___Source__type_name(self)
)
)

#' @name FileSystemSource
#' @rdname Source
#' @export
FileSystemSource <- R6Class("FileSystemSource", inherit = Source,
active = list(
#' @description
#' Return the files contained in this `Source`
files = function() dataset___FSSource__files(self),
#' @description
#' Return the format of files in this `Source`
format = function() {
shared_ptr(FileFormat, dataset___FSSource__format(self))$..dispatch()
}
)
)
Expand All @@ -183,10 +221,11 @@ SourceFactory <- R6Class("SourceFactory", inherit = Object,
public = list(
Finish = function(schema = NULL) {
if (is.null(schema)) {
shared_ptr(Source, dataset___SFactory__Finish1(self))
ptr <- dataset___SFactory__Finish1(self)
} else {
shared_ptr(Source, dataset___SFactory__Finish2(self, schema))
ptr <- dataset___SFactory__Finish2(self, schema)
}
shared_ptr(Source, ptr)$..dispatch()
},
Inspect = function() shared_ptr(Schema, dataset___SFactory__Inspect(self))
)
Expand Down Expand Up @@ -282,23 +321,21 @@ FileSystemSourceFactory$create <- function(filesystem,
assert_is(filesystem, "FileSystem")
assert_is(selector, "FileSelector")
assert_is(format, "FileFormat")

if (is.null(partitioning)) {
shared_ptr(
FileSystemSourceFactory,
dataset___FSSFactory__Make1(filesystem, selector, format)
)
ptr <- dataset___FSSFactory__Make1(filesystem, selector, format)
} else if (inherits(partitioning, "PartitioningFactory")) {
shared_ptr(
FileSystemSourceFactory,
dataset___FSSFactory__Make3(filesystem, selector, format, partitioning)
)
ptr <- dataset___FSSFactory__Make3(filesystem, selector, format, partitioning)
} else if (inherits(partitioning, "Partitioning")) {
ptr <- dataset___FSSFactory__Make2(filesystem, selector, format, partitioning)
} else {
assert_is(partitioning, "Partitioning")
shared_ptr(
FileSystemSourceFactory,
dataset___FSSFactory__Make2(filesystem, selector, format, partitioning)
stop(
"Expected 'partitioning' to be NULL, PartitioningFactory or Partitioning",
call. = FALSE
)
}

shared_ptr(FileSystemSourceFactory, ptr)
}

#' Dataset file formats
Expand All @@ -319,7 +356,25 @@ FileSystemSourceFactory$create <- function(filesystem,
#' @rdname FileFormat
#' @name FileFormat
#' @export
FileFormat <- R6Class("FileFormat", inherit = Object)
FileFormat <- R6Class("FileFormat", inherit = Object,
public = list(
..dispatch = function() {
type <- self$type
if (type == "parquet") {
shared_ptr(ParquetFileFormat, self$pointer())
} else if (type == "ipc") {
shared_ptr(IpcFileFormat, self$pointer())
} else {
self
}
}
),
active = list(
#' @description
#' Return the `FileFormat`'s type
type = function() dataset___FileFormat__type_name(self)
)
)
FileFormat$create <- function(format, ...) {
# TODO: pass list(...) options to the initializers
# https://issues.apache.org/jira/browse/ARROW-7547
Expand Down
Loading

0 comments on commit f03c844

Please sign in to comment.