From 9813bb6c7382ffd5fe85183f6b1e6a847f84d5e6 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 25 Jul 2023 13:10:54 -0500 Subject: [PATCH] [Data][Docs] Standardize "Consuming Data" references (#37007) This PR standardizes API references listed under "Consuming Data". --------- Signed-off-by: Balaji Veeramani Signed-off-by: Balaji Veeramani Co-authored-by: angelinalg <122562471+angelinalg@users.noreply.github.com> --- python/ray/data/dataset.py | 339 +++++++++++++++++++++++-------------- 1 file changed, 210 insertions(+), 129 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 665e9380877bc5..3d0257ae1c555b 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1908,7 +1908,7 @@ def aggregate(self, *aggs: AggregateFn) -> Union[Any, Dict[str, Any]]: .. testoutput:: {'prod': 45} - + Time complexity: O(dataset size / parallelism) Args: @@ -1956,7 +1956,7 @@ def sum( - ``on=["col_1", ..., "col_n"]``: an n-column ``dict`` containing the column-wise sum of the provided columns. - If the dataset is empty, all values are null. If ``ignore_nulls`` is + If the dataset is empty, all values are null. If ``ignore_nulls`` is ``False`` and any value is null, then the output is ``None``. """ ret = self._aggregate_on(Sum, on, ignore_nulls) @@ -1997,8 +1997,8 @@ def min( column ``"col"``, - ``on=["col_1", ..., "col_n"]``: an n-column dict containing the column-wise min of the provided columns. - - If the dataset is empty, all values are null. If ``ignore_nulls`` is + + If the dataset is empty, all values are null. If ``ignore_nulls`` is ``False`` and any value is null, then the output is ``None``. """ ret = self._aggregate_on(Min, on, ignore_nulls) @@ -2040,7 +2040,7 @@ def max( - ``on=["col_1", ..., "col_n"]``: an n-column dict containing the column-wise max of the provided columns. - If the dataset is empty, all values are null. If ``ignore_nulls`` is + If the dataset is empty, all values are null. If ``ignore_nulls`` is ``False`` and any value is null, then the output is ``None``. """ ret = self._aggregate_on(Max, on, ignore_nulls) @@ -2082,7 +2082,7 @@ def mean( - ``on=["col_1", ..., "col_n"]``: an n-column dict containing the column-wise mean of the provided columns. - If the dataset is empty, all values are null. If ``ignore_nulls`` is + If the dataset is empty, all values are null. If ``ignore_nulls`` is ``False`` and any value is null, then the output is ``None``. """ ret = self._aggregate_on(Mean, on, ignore_nulls) @@ -2137,7 +2137,7 @@ def std( - ``on=["col_1", ..., "col_n"]``: an n-column dict containing the column-wise std of the provided columns. - If the dataset is empty, all values are null. If ``ignore_nulls`` is + If the dataset is empty, all values are null. If ``ignore_nulls`` is ``False`` and any value is null, then the output is ``None``. """ ret = self._aggregate_on(Std, on, ignore_nulls, ddof=ddof) @@ -2246,34 +2246,43 @@ def limit(self, limit: int) -> "Dataset": logical_plan = LogicalPlan(op) return Dataset(plan, self._epoch, self._lazy, logical_plan) - @ConsumptionAPI(pattern="Time complexity:") + @ConsumptionAPI def take_batch( self, batch_size: int = 20, *, batch_format: Optional[str] = "default" ) -> DataBatch: - """Return up to ``batch_size`` records from the dataset in a batch. + """Return up to ``batch_size`` rows from the :class:`Dataset` in a batch. + + Ray Data represents batches as NumPy arrays or pandas DataFrames. You can + configure the batch type by specifying ``batch_format``. + + This method is useful for inspecting inputs to :meth:`~Dataset.map_batches`. + + .. warning:: - Unlike take(), the records are returned in the same format as used for - `iter_batches` and `map_batches`. + :meth:`~Dataset.take_batch` moves up to ``batch_size`` rows to the caller's + machine. If ``batch_size`` is large, this method can cause an ` + ``OutOfMemory`` error on the caller. - This will move up to ``batch_size`` records to the caller's machine; if - ``batch_size`` is very large, this can result in an OutOfMemory crash on - the caller. + Examples: + + >>> import ray + >>> ds = ray.data.range(100) + >>> ds.take_batch(5) + {'id': array([0, 1, 2, 3, 4])} Time complexity: O(batch_size specified) Args: - batch_size: The max number of records to return. - batch_format: Specify ``"default"`` to use the default block format - (NumPy), ``"pandas"`` to select ``pandas.DataFrame``, "pyarrow" to - select ``pyarrow.Table``, or ``"numpy"`` to select - ``Dict[str, numpy.ndarray]``, or None to return the underlying block - exactly as is with no additional formatting. + batch_size: The maximum number of rows to return. + batch_format: If ``"default"`` or ``"numpy"``, batches are + ``Dict[str, numpy.ndarray]``. If ``"pandas"``, batches are + ``pandas.DataFrame``. Returns: - A batch of up to ``batch_size`` records from the dataset. + A batch of up to ``batch_size`` rows from the dataset. Raises: - ValueError if the dataset is empty. + ``ValueError``: if the dataset is empty. """ batch_format = _apply_strict_mode_batch_format(batch_format) try: @@ -2287,21 +2296,37 @@ def take_batch( self._synchronize_progress_bar() return res - @ConsumptionAPI(pattern="Time complexity:") + @ConsumptionAPI def take(self, limit: int = 20) -> List[Dict[str, Any]]: - """Return up to ``limit`` records from the dataset. + """Return up to ``limit`` rows from the :class:`Dataset`. + + This method is useful for inspecting data. + + .. warning:: - This will move up to ``limit`` records to the caller's machine; if - ``limit`` is very large, this can result in an OutOfMemory crash on - the caller. + :meth:`~Dataset.take` moves up to ``limit`` rows to the caller's machine. If + ``limit`` is large, this method can cause an ``OutOfMemory`` error on the + caller. + + Examples: + + >>> import ray + >>> ds = ray.data.range(100) + >>> ds.take(3) + [{'id': 0}, {'id': 1}, {'id': 2}] Time complexity: O(limit specified) Args: - limit: The max number of records to return. + limit: The maximum number of rows to return. Returns: - A list of up to ``limit`` records from the dataset. + A list of up to ``limit`` rows from the dataset. + + .. seealso:: + + :meth:`~Dataset.take_all` + Call this method to return all rows. """ if ray.util.log_once("dataset_take"): logger.info( @@ -2316,13 +2341,23 @@ def take(self, limit: int = 20) -> List[Dict[str, Any]]: self._synchronize_progress_bar() return output - @ConsumptionAPI(pattern="Time complexity:") + @ConsumptionAPI def take_all(self, limit: Optional[int] = None) -> List[Dict[str, Any]]: - """Return all of the records in the dataset. + """Return all of the rows in this :class:`Dataset`. - This will move the entire dataset to the caller's machine; if the - dataset is very large, this can result in an OutOfMemory crash on - the caller. + This method is useful for inspecting small datasets. + + .. warning:: + + :meth:`~Dataset.take_all` moves the entire dataset to the caller's + machine. If the dataset is large, this method can cause an + ``OutOfMemory`` error on the caller. + + Examples: + >>> import ray + >>> ds = ray.data.range(5) + >>> ds.take_all() + [{'id': 0}, {'id': 1}, {'id': 2}, {'id': 3}, {'id': 4}] Time complexity: O(dataset size) @@ -2330,7 +2365,12 @@ def take_all(self, limit: Optional[int] = None) -> List[Dict[str, Any]]: limit: Raise an error if the size exceeds the specified limit. Returns: - A list of all the records in the dataset. + A list of all the rows in the dataset. + + .. seealso:: + + :meth:`~Dataset.take` + Call this method to return a specific number of rows. """ output = [] for row in self.iter_rows(): @@ -2342,14 +2382,30 @@ def take_all(self, limit: Optional[int] = None) -> List[Dict[str, Any]]: self._synchronize_progress_bar() return output - @ConsumptionAPI(pattern="Time complexity:") + @ConsumptionAPI def show(self, limit: int = 20) -> None: - """Print up to the given number of records from the dataset. + """Print up to the given number of rows from the :class:`Dataset`. + + This method is useful for inspecting data. + + Examples: + + >>> import ray + >>> ds = ray.data.range(100) + >>> ds.show(3) + {'id': 0} + {'id': 1} + {'id': 2} Time complexity: O(limit specified) Args: - limit: The max number of records to print. + limit: The maximum number of row to print. + + .. seealso:: + + :meth:`~Dataset.take` + Call this method to get (not print) a given number of rows. """ for row in self.take(limit): print(row) @@ -3228,33 +3284,30 @@ def write_fn_wrapper(blocks: Iterator[Block], ctx, fn) -> Iterator[Block]: @ConsumptionAPI( delegate=( "Calling any of the consumption methods on the returned ``DataIterator``" - ) + ), + pattern="Returns:", ) def iterator(self) -> DataIterator: - """Return a :class:`~ray.data.DataIterator` that - can be used to repeatedly iterate over the dataset. + """Return a :class:`~ray.data.DataIterator` over this dataset. - Examples: - >>> import ray - >>> for batch in ray.data.range( - ... 1000000 - ... ).iterator().iter_batches(): # doctest: +SKIP - ... print(batch) # doctest: +SKIP + Don't call this method directly. Use it internally. - .. note:: - It is recommended to use ``DataIterator`` methods over directly - calling methods such as ``iter_batches()``. + Returns: + A :class:`~ray.data.DataIterator` over this dataset. """ return DataIteratorImpl(self) @ConsumptionAPI def iter_rows(self, *, prefetch_blocks: int = 0) -> Iterator[Dict[str, Any]]: - """Return a local row iterator over the dataset. + """Return an iterator over the rows in this dataset. Examples: >>> import ray - >>> for i in ray.data.range(1000000).iter_rows(): # doctest: +SKIP - ... print(i) # doctest: +SKIP + >>> for row in ray.data.range(3).iter_rows(): + ... print(row) + {'id': 0} + {'id': 1} + {'id': 2} Time complexity: O(1) @@ -3263,9 +3316,8 @@ def iter_rows(self, *, prefetch_blocks: int = 0) -> Iterator[Dict[str, Any]]: current block during the scan. Returns: - A local iterator over the entire dataset. + An iterator over the rows in this dataset. """ - return self.iterator().iter_rows(prefetch_blocks=prefetch_blocks) @ConsumptionAPI @@ -3282,41 +3334,52 @@ def iter_batches( # Deprecated. prefetch_blocks: int = 0, ) -> Iterator[DataBatch]: - """Return a batched iterator over the dataset. + """Return an iterator over batches of data. + + This method is useful for model training. Examples: - >>> import ray - >>> for batch in ray.data.range(1000000).iter_batches(): # doctest: +SKIP - ... print(batch) # doctest: +SKIP + + .. testcode:: + + import ray + + ds = ray.data.read_images("example://image-datasets/simple") + + for batch in ds.iter_batches(batch_size=2, batch_format="numpy"): + print(batch) + + .. testoutput:: + :options: +MOCK + + {'image': array([[[[...]]]], dtype=uint8)} + ... + {'image': array([[[[...]]]], dtype=uint8)} Time complexity: O(1) Args: prefetch_batches: The number of batches to fetch ahead of the current batch to fetch. If set to greater than 0, a separate threadpool is used - to fetch the objects to the local node, format the batches, and apply - the collate_fn. Defaults to 1. You can revert back to the old - prefetching behavior that uses `prefetch_blocks` by setting - `use_legacy_iter_batches` to True in the datasetContext. - batch_size: The number of rows in each batch, or None to use entire blocks - as batches (blocks may contain different number of rows). + to fetch the objects to the local node and format the batches. Defaults + to 1. + batch_size: The number of rows in each batch, or ``None`` to use entire + blocks as batches (blocks may contain different numbers of rows). The final batch may include fewer than ``batch_size`` rows if ``drop_last`` is ``False``. Defaults to 256. - batch_format: Specify ``"default"`` to use the default block format - (NumPy), ``"pandas"`` to select ``pandas.DataFrame``, "pyarrow" to - select ``pyarrow.Table``, or ``"numpy"`` to select - ``Dict[str, numpy.ndarray]``, or None to return the underlying block - exactly as is with no additional formatting. + batch_format: If ``"default"`` or ``"numpy"``, batches are + ``Dict[str, numpy.ndarray]``. If ``"pandas"``, batches are + ``pandas.DataFrame``. drop_last: Whether to drop the last batch if it's incomplete. - local_shuffle_buffer_size: If non-None, the data is randomly shuffled - using a local in-memory shuffle buffer, and this value will serve as the + local_shuffle_buffer_size: If not ``None``, the data is randomly shuffled + using a local in-memory shuffle buffer, and this value serves as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to - the buffer, the remaining rows in the buffer is drained. + the buffer, the remaining rows in the buffer are drained. local_shuffle_seed: The seed to use for the local random shuffle. Returns: - An iterator over record batches. + An iterator over batches of data. """ batch_format = _apply_strict_mode_batch_format(batch_format) if batch_format == "native": @@ -3347,23 +3410,29 @@ def iter_torch_batches( # Deprecated prefetch_blocks: int = 0, ) -> Iterator["TorchTensorBatchType"]: - """Return a batched iterator of Torch Tensors over the dataset. + """Return an iterator over batches of data represented as Torch tensors. - This iterator will yield single-tensor batches if the underlying dataset - consists of a single column; otherwise, it will yield a dictionary of - column-tensors. If looking for more flexibility in the tensor conversion (e.g. - casting dtypes) or the batch format, try use `.iter_batches` directly, which is - a lower-level API. + This iterator yields batches of type ``Dict[str, torch.Tensor]``. + For more flexibility, call :meth:`~Dataset.iter_batches` and manually convert + your data to Torch tensors. Examples: - >>> import ray - >>> for batch in ray.data.range( # doctest: +SKIP - ... 12, - ... ).iter_torch_batches(batch_size=4): - ... print(batch.shape) # doctest: +SKIP - torch.Size([4, 1]) - torch.Size([4, 1]) - torch.Size([4, 1]) + + .. testcode:: + + import ray + + # This dataset contains three images. + ds = ray.data.read_images("example://image-datasets/simple") + + for batch in ds.iter_torch_batches(batch_size=2): + print(batch) + + .. testoutput:: + :options: +MOCK + + {'image': } + {'image': } Time complexity: O(1) @@ -3371,17 +3440,15 @@ def iter_torch_batches( prefetch_batches: The number of batches to fetch ahead of the current batch to fetch. If set to greater than 0, a separate threadpool is used to fetch the objects to the local node, format the batches, and apply - the collate_fn. Defaults to 1. You can revert back to the old - prefetching behavior that uses `prefetch_blocks` by setting - `use_legacy_iter_batches` to True in the datasetContext. - batch_size: The number of rows in each batch, or None to use entire blocks - as batches (blocks may contain different number of rows). + the ``collate_fn``. Defaults to 1. + batch_size: The number of rows in each batch, or ``None`` to use entire + blocks as batches (blocks may contain different number of rows). The final batch may include fewer than ``batch_size`` rows if ``drop_last`` is ``False``. Defaults to 256. - dtypes: The Torch dtype(s) for the created tensor(s); if ``None``, the dtype - is inferred from the tensor data. + dtypes: The Torch dtype(s) for the created tensor(s); if ``None``, the + dtype is inferred from the tensor data. device: The device on which the tensor should be placed; if ``None``, the - torch tensor is constructed on the CPU. + Torch tensor is constructed on CPU. collate_fn: A function to convert a Numpy batch to a PyTorch tensor batch. Potential use cases include collating along a dimension other than the first, padding sequences of various lengths, or generally handling @@ -3390,19 +3457,21 @@ def iter_torch_batches( arrays to a batch of PyTorch tensors. This API is still experimental and is subject to change. drop_last: Whether to drop the last batch if it's incomplete. - local_shuffle_buffer_size: If non-None, the data is randomly shuffled - using a local in-memory shuffle buffer, and this value will serve as the + local_shuffle_buffer_size: If not ``None``, the data is randomly shuffled + using a local in-memory shuffle buffer, and this value serves as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to - the buffer, the remaining rows in the buffer is drained. This - buffer size must be greater than or equal to ``batch_size``, and - therefore ``batch_size`` must also be specified when using local - shuffling. + the buffer, the remaining rows in the buffer are drained. + ``batch_size`` must also be specified when using local shuffling. local_shuffle_seed: The seed to use for the local random shuffle. Returns: An iterator over Torch Tensor batches. - """ + + .. seealso:: + :meth:`Dataset.iter_batches` + Call this method to manually convert your data to Torch tensors. + """ # noqa: E501 return self.iterator().iter_torch_batches( prefetch_batches=prefetch_batches, prefetch_blocks=prefetch_blocks, @@ -3428,11 +3497,11 @@ def iter_tf_batches( # Deprecated prefetch_blocks: int = 0, ) -> Iterator[TensorFlowTensorBatchType]: - """Return a batched iterator of TensorFlow Tensors over the dataset. + """Return an iterator over batches of data represented as TensorFlow tensors. - This iterator will yield single-tensor batches of the underlying dataset - consists of a single column; otherwise, it will yield a dictionary of - column-tensors. + This iterator yields batches of type ``Dict[str, tf.Tensor]``. + For more flexibility, call :meth:`~Dataset.iter_batches` and manually convert + your data to TensorFlow tensors. .. tip:: If you don't need the additional flexibility provided by this method, @@ -3440,14 +3509,26 @@ def iter_tf_batches( to use. Examples: - >>> import ray - >>> for batch in ray.data.range( # doctest: +SKIP - ... 12, - ... ).iter_tf_batches(batch_size=4): - ... print(batch.shape) # doctest: +SKIP - (4, 1) - (4, 1) - (4, 1) + + .. testcode:: + + import ray + + ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv") + + tf_dataset = ds.to_tf( + feature_columns="sepal length (cm)", + label_columns="target", + batch_size=2 + ) + for features, labels in tf_dataset: + print(features, labels) + + .. testoutput:: + + tf.Tensor([5.1 4.9], shape=(2,), dtype=float64) tf.Tensor([0 0], shape=(2,), dtype=int64) + ... + tf.Tensor([6.2 5.9], shape=(2,), dtype=float64) tf.Tensor([2 2], shape=(2,), dtype=int64) Time complexity: O(1) @@ -3455,29 +3536,29 @@ def iter_tf_batches( prefetch_batches: The number of batches to fetch ahead of the current batch to fetch. If set to greater than 0, a separate threadpool is used to fetch the objects to the local node, format the batches, and apply - the collate_fn. Defaults to 1. You can revert back to the old - prefetching behavior that uses `prefetch_blocks` by setting - `use_legacy_iter_batches` to True in the datasetContext. - batch_size: The number of rows in each batch, or None to use entire blocks - as batches (blocks may contain different number of rows). + the ``collate_fn``. Defaults to 1. + batch_size: The number of rows in each batch, or ``None`` to use entire + blocks as batches (blocks may contain different numbers of rows). The final batch may include fewer than ``batch_size`` rows if ``drop_last`` is ``False``. Defaults to 256. dtypes: The TensorFlow dtype(s) for the created tensor(s); if ``None``, the dtype is inferred from the tensor data. drop_last: Whether to drop the last batch if it's incomplete. - local_shuffle_buffer_size: If non-None, the data is randomly shuffled - using a local in-memory shuffle buffer, and this value will serve as the + local_shuffle_buffer_size: If not ``None``, the data is randomly shuffled + using a local in-memory shuffle buffer, and this value serves as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to - the buffer, the remaining rows in the buffer is drained. This - buffer size must be greater than or equal to ``batch_size``, and - therefore ``batch_size`` must also be specified when using local - shuffling. + the buffer, the remaining rows in the buffer are drained. + ``batch_size`` must also be specified when using local shuffling. local_shuffle_seed: The seed to use for the local random shuffle. Returns: An iterator over TensorFlow Tensor batches. - """ + + .. seealso:: + :meth:`Dataset.iter_batches` + Call this method to manually convert your data to TensorFlow tensors. + """ # noqa: E501 return self.iterator().iter_tf_batches( prefetch_batches=prefetch_batches, prefetch_blocks=prefetch_blocks, @@ -4486,8 +4567,8 @@ def stats(self) -> str: .. testoutput:: :options: +MOCK - - Stage 0 Read: 20/20 blocks executed in 0.3s + + Stage 0 Read: 20/20 blocks executed in 0.3s * Remote wall time: 16.29us min, 7.29ms max, 1.21ms mean, 24.17ms total * Remote cpu time: 16.0us min, 2.54ms max, 810.45us mean, 16.21ms total * Peak heap memory usage (MiB): 137968.75 min, 142734.38 max, 139846 mean