Skip to content

Commit

Permalink
ARROW-7430: [Python] Add more docstrings to dataset bindings
Browse files Browse the repository at this point in the history
  • Loading branch information
jorisvandenbossche authored and pitrou committed Jan 6, 2020
1 parent e4bf429 commit 4e8fa9f
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 18 deletions.
179 changes: 168 additions & 11 deletions python/pyarrow/_dataset.pyx
Expand Up @@ -115,6 +115,7 @@ cdef class PartitionScheme:

@property
def schema(self):
"""The arrow Schema describing the partition scheme."""
return pyarrow_wrap_schema(self.scheme.schema())


Expand Down Expand Up @@ -155,6 +156,31 @@ cdef class DefaultPartitionScheme(PartitionScheme):


cdef class SchemaPartitionScheme(PartitionScheme):
"""
A PartitionScheme based on a specified Schema.
The SchemaPartitionScheme expects one segment in the file path for each
field in the schema (all fields are required to be present).
For example given schema<year:int16, month:int8> the path "/2009/11" would
be parsed to ("year"_ == 2009 and "month"_ == 11).
Parameters
----------
schema : Schema
The schema that describes the partitions present in the file path.
Returns
-------
SchemaPartitionScheme
Examples
--------
>>> from pyarrow.dataset import SchemaPartitionScheme
>>> scheme = SchemaPartitionScheme(
... pa.schema([("year", pa.int16()), ("month", pa.int8())]))
>>> print(scheme.parse("/2009/11"))
((year == 2009:int16) and (month == 11:int8))
"""

cdef:
CSchemaPartitionScheme* schema_scheme
Expand All @@ -172,6 +198,37 @@ cdef class SchemaPartitionScheme(PartitionScheme):


cdef class HivePartitionScheme(PartitionScheme):
"""
A PartitionScheme for "/$key=$value/" nested directories as found in
Apache Hive.
Multi-level, directory based partitioning scheme originating from
Apache Hive with all data files stored in the leaf directories. Data is
partitioned by static values of a particular column in the schema.
Partition keys are represented in the form $key=$value in directory names.
Field order is ignored, as are missing or unrecognized field names.
For example, given schema<year:int16, month:int8, day:int8>, a possible
path would be "/year=2009/month=11/day=15".
Parameters
----------
schema : Schema
The schema that describes the partitions present in the file path.
Returns
-------
SchemaPartitionScheme
Examples
--------
>>> from pyarrow.dataset import HivePartitionScheme
>>> scheme = HivePartitionScheme(
... pa.schema([("year", pa.int16()), ("month", pa.int8())]))
>>> print(scheme.parse("/year=2009/month=11"))
((year == 2009:int16) and (month == 11:int8))
"""

cdef:
CHivePartitionScheme* hive_scheme
Expand All @@ -189,6 +246,28 @@ cdef class HivePartitionScheme(PartitionScheme):


cdef class FileSystemDiscoveryOptions:
"""
Options for FileSystemDataSourceDiscovery.
Parameters
----------
partition_base_dir : str, optional
For the purposes of applying the partition scheme, paths will be
stripped of the partition_base_dir. Files not matching the
partition_base_dir prefix will be skipped for partition discovery.
The ignored files will still be part of the DataSource, but will not
have partition information.
exclude_invalid_files : bool, optional (default True)
If True, invalid files will be excluded (file format specific check).
This will incur IO for each files in a serial and single threaded
fashion. Disabling this feature will skip the IO, but unsupported
files may be present in the DataSource (resulting in an error at scan
time).
ignore_prefixes : list, optional
Files matching one of those prefixes will be ignored by the
discovery process. This is matched to the basename of a path.
By default this is ['.', '_'].
"""

cdef:
CFileSystemDiscoveryOptions options
Expand All @@ -213,10 +292,10 @@ cdef class FileSystemDiscoveryOptions:
NOTE: setting this property will overwrite partition_scheme_discovery.
"""
cdef shared_ptr[CPartitionScheme] s = self.options.partition_scheme.scheme()
if s.get() == nullptr:
c_scheme = self.options.partition_scheme.scheme()
if c_scheme.get() == nullptr:
return None
return PartitionScheme.wrap(s)
return PartitionScheme.wrap(c_scheme)

@partition_scheme.setter
def partition_scheme(self, PartitionScheme value):
Expand All @@ -229,17 +308,20 @@ cdef class FileSystemDiscoveryOptions:
NOTE: setting this property will overwrite partition_scheme.
"""
cdef shared_ptr[CPartitionSchemeDiscovery] d = self.options.partition_scheme.discovery()
if d.get() == nullptr:
c_discovery = self.options.partition_scheme.discovery()
if c_discovery.get() == nullptr:
return None
return PartitionSchemeDiscovery.wrap(d)
return PartitionSchemeDiscovery.wrap(c_discovery)

@partition_scheme_discovery.setter
def partition_scheme_discovery(self, PartitionSchemeDiscovery value):
self.options.partition_scheme = (<PartitionSchemeDiscovery> value).unwrap()
self.options.partition_scheme = value.unwrap()

@property
def partition_base_dir(self):
"""
Base directory to strip paths before applying the partition scheme.
"""
return frombytes(self.options.partition_base_dir)

@partition_base_dir.setter
Expand All @@ -248,6 +330,7 @@ cdef class FileSystemDiscoveryOptions:

@property
def exclude_invalid_files(self):
"""Whether to exclude invalid files."""
return self.options.exclude_invalid_files

@exclude_invalid_files.setter
Expand All @@ -256,6 +339,10 @@ cdef class FileSystemDiscoveryOptions:

@property
def ignore_prefixes(self):
"""
List of prefixes. Files matching one of those prefixes will be
ignored by the discovery process.
"""
return [frombytes(p) for p in self.options.ignore_prefixes]

@ignore_prefixes.setter
Expand Down Expand Up @@ -309,12 +396,33 @@ cdef class DataSourceDiscovery:
return schemas

def inspect(self):
"""
Inspect all data fragments and return a common Schema.
Returns
-------
Schema
"""
cdef CResult[shared_ptr[CSchema]] result
with nogil:
result = self.discovery.Inspect()
return pyarrow_wrap_schema(GetResultValue(result))

def finish(self, Schema schema = None):
def finish(self, Schema schema=None):
"""
Create a DataSource using the inspected schema or an explicit schema
(if given).
Parameters
----------
schema: Schema, default None
The schema to conform the datasource to. If None, the inspected
schema is used.
Returns
-------
DataSource
"""
cdef:
shared_ptr[CSchema] sp_schema
CResult[shared_ptr[CDataSource]] result
Expand All @@ -329,6 +437,21 @@ cdef class DataSourceDiscovery:


cdef class FileSystemDataSourceDiscovery(DataSourceDiscovery):
"""
Create a DataSource from a list of paths with schema inspection.
DataSourceDiscovery is used to create a DataSource, inspect the Schema
of the fragments contained in it, and declare a partition scheme.
Parameters
----------
filesystem : pyarrow.fs.FileSystem
paths_or_selector: pyarrow.fs.Selector or list of path-likes
Either a Selector object or a list of path-like objects.
format : FileFormat
options : FileSystemDiscoveryOptions, optional
"""

cdef:
CFileSystemDataSourceDiscovery* filesystem_discovery
Expand Down Expand Up @@ -417,6 +540,10 @@ cdef class DataSource:

@property
def partition_expression(self):
"""
An expression which evaluates to true for all data viewed by this
DataSource.
"""
cdef shared_ptr[CExpression] expression
expression = self.source.partition_expression()
if expression.get() == nullptr:
Expand Down Expand Up @@ -508,7 +635,13 @@ cdef class FileSystemDataSource(DataSource):


cdef class Dataset:
"""Collection of data fragments coming from possibly multiple sources."""
"""
Collection of data fragments coming from possibly multiple sources.
Arrow Datasets allow you to query against data that has been split across
multiple files. This sharding of data may indicate partitioning, which
can accelerate queries that only touch some partitions (files).
"""

cdef:
shared_ptr[CDataset] wrapped
Expand Down Expand Up @@ -548,7 +681,19 @@ cdef class Dataset:
return self.wrapped

def new_scan(self, MemoryPool memory_pool=None):
"""Begin to build a new Scan operation against this Dataset."""
"""
Begin to build a new Scan operation against this Dataset.
Parameters
----------
memory_pool : MemoryPool, default None
For memory allocations, if required. If not specified, uses the
default pool.
Returns
-------
ScannerBuilder
"""
cdef:
shared_ptr[CScanContext] context = make_shared[CScanContext]()
CResult[shared_ptr[CScannerBuilder]] result
Expand All @@ -558,11 +703,13 @@ cdef class Dataset:

@property
def sources(self):
"""List of the data sources"""
cdef vector[shared_ptr[CDataSource]] sources = self.dataset.sources()
return [DataSource.wrap(source) for source in sources]

@property
def schema(self):
"""The common schema of the full Dataset"""
return pyarrow_wrap_schema(self.dataset.schema())


Expand Down Expand Up @@ -632,6 +779,14 @@ cdef class ScannerBuilder:
It is used to pass information, notably a potential filter expression and a
subset of columns to materialize.
Parameters
----------
dataset : Dataset
The dataset to scan.
memory_pool : MemoryPool, default None
For memory allocations, if required. If not specified, uses the
default pool.
"""

cdef:
Expand Down Expand Up @@ -690,7 +845,7 @@ cdef class ScannerBuilder:
Returns
-------
self : ScannerBuilder
Scanner
"""
return Scanner.wrap(GetResultValue(self.builder.Finish()))

Expand Down Expand Up @@ -743,6 +898,8 @@ cdef class ScannerBuilder:
cdef class Scanner:
"""A materialized scan operation with context and options bound.
Create this using the ScannerBuilder factory class.
A scanner is the class that glues the scan tasks, data fragments and data
sources together.
"""
Expand Down
16 changes: 9 additions & 7 deletions python/pyarrow/includes/libarrow_dataset.pxd
Expand Up @@ -258,11 +258,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
"arrow::dataset::FileSystemDataSource"(CDataSource):
@staticmethod
CResult[shared_ptr[CDataSource]] Make(
shared_ptr[CFileSystem] filesystem,
CFileStatsVector stats,
CExpressionVector partitions,
shared_ptr[CExpression] source_partition,
shared_ptr[CFileFormat] format)
shared_ptr[CFileSystem] filesystem,
CFileStatsVector stats,
CExpressionVector partitions,
shared_ptr[CExpression] source_partition,
shared_ptr[CFileFormat] format)
c_string type()
shared_ptr[CDataFragmentIterator] GetFragments(
shared_ptr[CScanOptions] options)
Expand Down Expand Up @@ -290,7 +290,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
CResult[shared_ptr[CExpression]] Parse(const c_string& path) const
const shared_ptr[CSchema]& schema()

cdef cppclass CPartitionSchemeDiscovery "arrow::dataset::PartitionSchemeDiscovery":
cdef cppclass CPartitionSchemeDiscovery \
"arrow::dataset::PartitionSchemeDiscovery":
pass

cdef cppclass CDefaultPartitionScheme \
Expand All @@ -310,7 +311,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
CPartitionSchemeOrDiscovery(shared_ptr[CPartitionScheme])
CPartitionSchemeOrDiscovery(shared_ptr[CPartitionSchemeDiscovery])
CPartitionSchemeOrDiscovery& operator=(shared_ptr[CPartitionScheme])
CPartitionSchemeOrDiscovery& operator=(shared_ptr[CPartitionSchemeDiscovery])
CPartitionSchemeOrDiscovery& operator=(
shared_ptr[CPartitionSchemeDiscovery])
shared_ptr[CPartitionScheme] scheme() const
shared_ptr[CPartitionSchemeDiscovery] discovery() const

Expand Down

0 comments on commit 4e8fa9f

Please sign in to comment.