Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ namespace arrow {

namespace dataset {

/// \defgroup dataset-file-formats File formats for reading and writing datasets
/// \defgroup dataset-filesystem File system datasets
///
/// @{
Expand Down
5 changes: 5 additions & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import datetime
import os
import sys
import warnings
from unittest import mock

import pyarrow
Expand All @@ -45,6 +46,10 @@

])

# Suppresses all warnings printed when sphinx is traversing the code (e.g.
# deprecation warnings)
warnings.filterwarnings("ignore", category=FutureWarning, message=".*pyarrow.*")

# -- General configuration ------------------------------------------------

# If your documentation needs a minimal Sphinx version, state it here.
Expand Down
18 changes: 7 additions & 11 deletions docs/source/developers/crossbow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,18 @@ configuration file to run the requested build (like ``.travis.yml``,
Scheduler
~~~~~~~~~

`Crossbow.py`_ handles version generation, task rendering and
Crossbow handles version generation, task rendering and
submission. The tasks are defined in ``tasks.yml``.

Install
-------

The following guide depends on GitHub, but theoretically any git
server can be used.
The following guide depends on GitHub, but theoretically any git
server can be used.

1. `Create the queue repository`_

2. Enable `TravisCI`_, `Appveyor`_, `Azure Pipelines_` and `CircleCI`_
2. Enable `TravisCI`_, `Appveyor`_, `Azure Pipelines`_ and `CircleCI`_
integrations on for the newly created queue repository.

- turn off Travis’ `auto cancellation`_ feature on branches
Expand All @@ -100,9 +100,7 @@ Install

export CROSSBOW_GITHUB_TOKEN=<token>

..

or pass as an argument to the CLI script ``--github-token``
or pass as an argument to the CLI script ``--github-token``

6. Export the previously created GitHub token on both CI services:

Expand Down Expand Up @@ -164,10 +162,8 @@ The script does the following:
git checkout ARROW-<ticket number>
archery crossbow submit --dry-run conda-linux conda-osx

..

Note that the arrow branch must be pushed beforehand, because the
script will clone the selected branch.
Note that the arrow branch must be pushed beforehand, because the
script will clone the selected branch.

3. Reads and renders the required build configurations with the
parameters substituted.
Expand Down
12 changes: 6 additions & 6 deletions docs/source/developers/experimental_repos.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ new repositories, as they offer many important tools to manage it (e.g. github
issues, “watch”, “github stars” to measure overall interest).

Process
-------
+++++++

* A committer *may* initiate experimental work by creating a separate git
repository within the Apache Arrow (e.g. via `selfserve <https://selfserve.apache.org/>`_)
Expand All @@ -44,21 +44,21 @@ Process
* The committer decides when the repository is archived.

Repository management
---------------------
+++++++++++++++++++++

* The repository *must* be under `apache/`
* The repository’s name *must* be prefixed by `arrow-experimental-`
* The repository *must* be under ``apache/``
* The repository’s name *must* be prefixed by ``arrow-experimental-``
* The committer has full permissions over the repository (within possible in ASF)
* Push / merge permissions *must only* be granted to Apache Arrow committers

Development process
-------------------
+++++++++++++++++++

* The repository must follow the ASF requirements about 3rd party code.
* The committer decides how to manage issues, PRs, etc.

Divergences
-----------
+++++++++++

* If any of the “must” above fails to materialize and no correction measure
is taken by the committer upon request, the PMC *should* take ownership
Expand Down
11 changes: 5 additions & 6 deletions docs/source/java/algorithm.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
.. specific language governing permissions and limitations
.. under the License.

===========
Java Algorithms
===========
===============

Arrow's Java library provides algorithms for some commonly-used
functionalities. The algorithms are provided in the ``org.apache.arrow.algorithm``
package of the ``algorithm`` module.

Comparing Vector Elements
=================
-------------------------

Comparing vector elements is the basic for many algorithms. Vector
elements can be compared in one of the two ways:
Expand All @@ -40,7 +39,7 @@ We provide default implementations to compare vector elements. However, users ca
for customized comparisons.

Vector Element Search
====================
---------------------

A search algorithm tries to find a particular value in a vector. When successful, a vector index is
returned; otherwise, a ``-1`` is returned. The following search algorithms are provided:
Expand All @@ -64,7 +63,7 @@ range search algorithm tries to find the upper/lower bound of the region in ``O(
An implementation is provided in ``org.apache.arrow.algorithm.search.VectorRangeSearcher``.

Vector Sorting
===================
--------------

Given a vector, a sorting algorithm turns it into a sorted one. The sorting criteria must
be specified by some ordering comparison operation. The sorting algorithms can be
Expand All @@ -88,6 +87,6 @@ smallest value in the vector. Index sorting is supported by ``org.apache.arrow.a
which runs in ``O(nlog(n))`` time. It is applicable to vectors of any type.

Other Algorithms
===================
----------------

Other algorithms include vector deduplication, dictionary encoding, etc., in the ``algorithm`` module.
1 change: 1 addition & 0 deletions docs/source/python/api/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Factory functions
partitioning
field
scalar
write_dataset

Classes
-------
Expand Down
1 change: 1 addition & 0 deletions docs/source/python/api/ipc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Inter-Process Communication
ipc.read_tensor
ipc.write_tensor
ipc.get_tensor_size
ipc.IpcWriteOptions
ipc.Message
ipc.MessageReader
ipc.RecordBatchFileReader
Expand Down
178 changes: 162 additions & 16 deletions docs/source/python/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ this can require a lot of memory, see below on filtering / iterative loading):
Reading different file formats
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The above examples use Parquet files as dataset source but the Dataset API
The above examples use Parquet files as dataset sources but the Dataset API
provides a consistent interface across multiple file formats and filesystems.
Currently, Parquet, Feather / Arrow IPC, and CSV file formats are supported;
more formats are planned in the future.
Expand Down Expand Up @@ -386,11 +386,11 @@ some specific methods exist for Parquet Datasets.

Some processing frameworks such as Dask (optionally) use a ``_metadata`` file
with partitioned datasets which includes information about the schema and the
row group metadata of the full dataset. Using such file can give a more
row group metadata of the full dataset. Using such a file can give a more
efficient creation of a parquet Dataset, since it does not need to infer the
schema and crawl the directories for all Parquet files (this is especially the
case for filesystems where accessing files is expensive). The
:func:`parquet_dataset` function allows to create a Dataset from a partitioned
:func:`parquet_dataset` function allows us to create a Dataset from a partitioned
dataset with a ``_metadata`` file:

.. code-block:: python
Expand Down Expand Up @@ -456,20 +456,166 @@ is materialized as columns when reading the data and can be used for filtering:
dataset.to_table().to_pandas()
dataset.to_table(filter=ds.field('year') == 2019).to_pandas()

Another benefit of manually listing the files is that the order of the files
controls the order of the data. When performing an ordered read (or a read to
a table) then the rows returned will match the order of the files given. This
only applies when the dataset is constructed with a list of files. There
are no order guarantees given when the files are instead discovered by scanning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do sort by path:

// Sorting by path guarantees a stability sometimes needed by unit tests.
std::sort(files.begin(), files.end(), fs::FileInfo::ByPath());

Though it states that's only for unit tests. However I would personally be really surprised if data were truly returned in random order and as we've had this behavior in a few releases now I think users would also be surprised. Maybe we should instead guarantee lexicographic sort on paths?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I just dropped the sentence about scanning. If someone wants clarification we can discuss it at that time. @bkietz any opinion? I recall we spoke about this before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also suppose we don't have to solve this here; we can get the docs in and improve this part later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't guarantee order for selectors because ARROW-8163 (asynchronous fragment discovery) might not guarantee order. Lexicographic sorting could be maintained for synchronous discovery from a selector, but in general we'd want to push a fragment into scan as soon as it's yielded by FileSystem::GetFileInfoGenerator

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks. Sorry for the churn here @westonpace - we should keep stating that there's no guarantee then.

a directory.

Manual scheduling
-----------------
Iterative (out of core or streaming) reads
------------------------------------------

..
Possible content:
- fragments (get_fragments)
- scan / scan tasks / iterators of record batches
The previous examples have demonstrated how to read the data into a table using :func:`~Dataset.to_table`. This is
useful if the dataset is small or there is only a small amount of data that needs to
be read. The dataset API contains additional methods to read and process large amounts
of data in a streaming fashion.

The :func:`~Dataset.to_table` method loads all selected data into memory
at once resulting in a pyarrow Table. Alternatively, a dataset can also be
scanned one RecordBatch at a time in an iterative manner using the
:func:`~Dataset.scan` method::
The easiest way to do this is to use the method :meth:`Dataset.to_batches`. This
method returns an iterator of record batches. For example, we can use this method to
calculate the average of a column without loading the entire column into memory:

for scan_task in dataset.scan(columns=[...], filter=...):
for record_batch in scan_task.execute():
# process the record batch
.. ipython:: python

import pyarrow.compute as pc

col2_sum = 0
count = 0
for batch in dataset.to_batches(columns=["col2"], filter=~ds.field("col2").is_null()):
col2_sum += pc.sum(batch.column("col2")).as_py()
count += batch.num_rows
mean_a = col2_sum/count

Customizing the batch size
~~~~~~~~~~~~~~~~~~~~~~~~~~

An iterative read of a dataset is often called a "scan" of the dataset and pyarrow
uses an object called a :class:`Scanner` to do this. A Scanner is created for you
automatically by the to_table and to_batches method of the dataset. Any arguments
you pass to these methods will be passed on to the Scanner constructor.

One of those parameters is the ``batch_size``. This controls the maximum size of the
batches returned by the scanner. Batches can still be smaller than the `batch_size`
if the dataset consists of small files or those files themselves consist of small
row groups. For example, a parquet file with 10,000 rows per row group will yield
batches with, at most, 10,000 rows unless the batch_size is set to a smaller value.

The default batch size is one million rows and this is typically a good default but
you may want to customize it if you are reading a large number of columns.

Writing Datasets
----------------

The dataset API also simplifies writing data to a dataset using :func:`write_dataset` . This can be useful when
you want to partition your data or you need to write a large amount of data. A
basic dataset write is similar to writing a table except that you specify a directory
instead of a filename.

.. ipython:: python

base = pathlib.Path(tempfile.gettempdir())
dataset_root = base / "sample_dataset"
dataset_root.mkdir(exist_ok=True)

table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] * 5})
ds.write_dataset(table, dataset_root, format="parquet")

The above example will create a single file named part-0.parquet in our sample_dataset
directory.

.. warning::

If you run the example again it will replace the existing part-0.parquet file.
Appending files to an existing dataset requires specifying a new
``basename_template`` for each call to ``ds.write_dataset``
to avoid overwrite.

Writing partitioned data
~~~~~~~~~~~~~~~~~~~~~~~~

A partitioning object can be used to specify how your output data should be partitioned.
This uses the same kind of partitioning objects we used for reading datasets. To write
our above data out to a partitioned directory we only need to specify how we want the
dataset to be partitioned. For example:

.. ipython:: python

part = ds.partitioning(
pa.schema([("c", pa.int16())]), flavor="hive"
)
ds.write_dataset(table, dataset_root, format="parquet", partitioning=part)

This will create two files. Half our data will be in the dataset_root/c=1 directory and
the other half will be in the dataset_root/c=2 directory.

Writing large amounts of data
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The above examples wrote data from a table. If you are writing a large amount of data
you may not be able to load everything into a single in-memory table. Fortunately, the
write_dataset method also accepts an iterable of record batches. This makes it really
simple, for example, to repartition a large dataset without loading the entire dataset
into memory:

.. ipython:: python

old_part = ds.partitioning(
pa.schema([("c", pa.int16())]), flavor="hive"
)
new_part = ds.partitioning(
pa.schema([("c", pa.int16())]), flavor=None
)
input_dataset = ds.dataset(dataset_root, partitioning=old_part)
new_root = base / "repartitioned_dataset"
# A scanner can act as an iterator of record batches but you could also receive
# data from the network (e.g. via flight), from your own scanning, or from any
# other method that yields record batches. In addition, you can pass a dataset
# into write_dataset directly but this method is useful if you want to customize
# the scanner (e.g. to filter the input dataset or set a maximum batch size)
scanner = input_dataset.scanner()

ds.write_dataset(scanner, new_root, format="parquet", partitioning=new_part)

After the above example runs our data will be in dataset_root/1 and dataset_root/2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this example is not changing the partitioning structure (except for the flavor), it's not directly what this "repartitioning" will do apart from changing /c=1/ into /1/. Will it also repartition individual files? (eg write more smaller files in case your input dataset has large files)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would only change the directory structure. I didn't want to make the example data more complicated and a and b would be bad choices for a partition column. I modified the following paragraph to hopefully make it a bit clearer...

In this simple example we are not changing the structure of the data
(only the directory naming schema) but you could also use this mechnaism to change
which columns are used to partition the dataset. This is useful when you expect to
query your data in specific ways and you can utilize partitioning to reduce the
amount of data you need to read.

directories. In this simple example we are not changing the structure of the data
(only the directory naming schema) but you could also use this mechnaism to change
which columns are used to partition the dataset. This is useful when you expect to
query your data in specific ways and you can utilize partitioning to reduce the
amount of data you need to read.

.. To add when ARROW-12364 is merged
Customizing & inspecting written files
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

By default the dataset API will create files named "part-i.format" where "i" is a integer
generated during the write and "format" is the file format specified in the write_dataset
call. For simple datasets it may be possible to know which files will be created but for
larger or partitioned datasets it is not so easy. The ``file_visitor`` keyword can be used
to supply a visitor that will be called as each file is created:

.. ipython:: python

def file_visitor(written_file):
print(f"path={written_file.path}")
print(f"metadata={written_file.metadata}")
ds.write_dataset(table, dataset_root, format="parquet", partitioning=part,
file_visitor=file_visitor)

This will allow you to collect the filenames that belong to the dataset and store them elsewhere
which can be useful when you want to avoid scanning directories the next time you need to read
the data. It can also be used to generate the _metadata index file used by other tools such as
dask or spark to create an index of the dataset.

Configuring format-specific parameters during a write
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

In addition to the common options shared by all formats there are also format specific options
that are unique to a particular format. For example, to allow truncated timestamps while writing
Parquet files:

.. ipython:: python

parquet_format = ds.ParquetFileFormat()
write_options = parquet_format.make_write_options(allow_truncated_timestamps=True)
ds.write_dataset(table, dataset_root, format="parquet", partitioning=part,
file_options=write_options)
8 changes: 5 additions & 3 deletions python/pyarrow/_hdfs.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,11 @@ cdef class HadoopFileSystem(FileSystem):
Instantiate HadoopFileSystem object from an URI string.

The following two calls are equivalent
* HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test'
'&replication=1')
* HadoopFileSystem('localhost', port=8020, user='test', replication=1)

* ``HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test\
&replication=1')``
* ``HadoopFileSystem('localhost', port=8020, user='test', \
replication=1)``

Parameters
----------
Expand Down
1 change: 0 additions & 1 deletion python/pyarrow/array.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1622,7 +1622,6 @@ cdef class ListArray(BaseListArray):
3
]
]

# nulls in the offsets array become null lists
>>> offsets = pa.array([0, None, 2, 4])
>>> pa.ListArray.from_arrays(offsets, values)
Expand Down