Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOCS] Improve docs for external types #2274

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 33 additions & 32 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,11 @@

if TYPE_CHECKING:
import dask
import pandas as pd
import pyarrow as pa
import torch.utils.data.Dataset as TorchDataset
import torch.utils.data.IterableDataset as TorchIterableDataset
from pyiceberg.table import Table as IcebergTable
from ray import ObjectRef as RayObjectRef
from ray.data.dataset import Dataset as RayDataset
import pandas
import pyarrow
import pyiceberg
import ray
import torch

from daft.logical.schema import Schema

Expand Down Expand Up @@ -214,11 +212,14 @@ def __iter__(self) -> Iterator[Dict[str, Any]]:
yield row

@DataframePublicAPI
def iter_partitions(self) -> Iterator[Union[MicroPartition, "RayObjectRef"]]:
def iter_partitions(self) -> Iterator[Union[MicroPartition, "ray.ObjectRef[MicroPartition]"]]:
"""Begin executing this dataframe and return an iterator over the partitions.

Each partition will be returned as a daft.Table object (if using Python runner backend)
or a ray ObjectRef (if using Ray runner backend).

.. WARNING::
This method is experimental and may change in future versions.
"""
if self._result is not None:
# If the dataframe has already finished executing,
Expand Down Expand Up @@ -292,16 +293,16 @@ def _from_pydict(cls, data: Dict[str, InputListType]) -> "DataFrame":
return cls._from_tables(data_vpartition)

@classmethod
def _from_arrow(cls, data: Union["pa.Table", List["pa.Table"]]) -> "DataFrame":
"""Creates a DataFrame from a pyarrow Table."""
def _from_arrow(cls, data: Union["pyarrow.Table", List["pyarrow.Table"]]) -> "DataFrame":
"""Creates a DataFrame from a `pyarrow Table <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html>`__."""
if not isinstance(data, list):
data = [data]
data_vpartitions = [MicroPartition.from_arrow(table) for table in data]
return cls._from_tables(*data_vpartitions)

@classmethod
def _from_pandas(cls, data: Union["pd.DataFrame", List["pd.DataFrame"]]) -> "DataFrame":
"""Creates a Daft DataFrame from a pandas DataFrame."""
def _from_pandas(cls, data: Union["pandas.DataFrame", List["pandas.DataFrame"]]) -> "DataFrame":
"""Creates a Daft DataFrame from a `pandas DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__."""
if not isinstance(data, list):
data = [data]
data_vpartitions = [MicroPartition.from_pandas(df) for df in data]
Expand Down Expand Up @@ -443,15 +444,15 @@ def write_csv(
return result_df

@DataframePublicAPI
def write_iceberg(self, table: "IcebergTable", mode: str = "append") -> "DataFrame":
"""Writes the DataFrame to an Iceberg Table, returning a new DataFrame with the operations that occurred.
def write_iceberg(self, table: "pyiceberg.table.Table", mode: str = "append") -> "DataFrame":
"""Writes the DataFrame to an `Iceberg <https://iceberg.apache.org/docs/nightly/>`__ table, returning a new DataFrame with the operations that occurred.
Can be run in either `append` or `overwrite` mode which will either appends the rows in the DataFrame or will delete the existing rows and then append the DataFrame rows respectively.

.. NOTE::
This call is **blocking** and will execute the DataFrame when called

Args:
table (IcebergTable): Destination Iceberg Table to write dataframe to.
table (pyiceberg.table.Table): Destination `PyIceberg Table <https://py.iceberg.apache.org/reference/pyiceberg/table/#pyiceberg.table.Table>`__ to write dataframe to.
mode (str, optional): Operation mode of the write. `append` or `overwrite` Iceberg Table. Defaults to "append".

Returns:
Expand Down Expand Up @@ -486,7 +487,7 @@ def write_iceberg(self, table: "IcebergTable", mode: str = "append") -> "DataFra
else:
raise ValueError(f"Only support `append` or `overwrite` mode. {mode} is unsupported")

# We perform the merge here since IcebergTable is not pickle-able
# We perform the merge here since table is not pickle-able
# We should be able to move to a transaction API for iceberg 0.7.0
merge = _MergingSnapshotProducer(operation=operation, table=table)

Expand Down Expand Up @@ -1577,12 +1578,12 @@ def __contains__(self, col_name: str) -> bool:
return col_name in self.column_names

@DataframePublicAPI
def to_pandas(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pd.DataFrame":
"""Converts the current DataFrame to a pandas DataFrame.
def to_pandas(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pandas.DataFrame":
"""Converts the current DataFrame to a `pandas DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think links are single underscore?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If results have not computed yet, collect will be called.

Returns:
pd.DataFrame: pandas DataFrame converted from a Daft DataFrame
pandas.DataFrame: `pandas DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__ converted from a Daft DataFrame

.. NOTE::
This call is **blocking** and will execute the DataFrame when called
Expand All @@ -1598,12 +1599,12 @@ def to_pandas(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pd.DataF
return pd_df

@DataframePublicAPI
def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pa.Table":
"""Converts the current DataFrame to a pyarrow Table.
def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pyarrow.Table":
"""Converts the current DataFrame to a `pyarrow Table <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html>`__.
If results have not computed yet, collect will be called.

Returns:
pyarrow.Table: pyarrow Table converted from a Daft DataFrame
pyarrow.Table: `pyarrow Table <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html>`__ converted from a Daft DataFrame

.. NOTE::
This call is **blocking** and will execute the DataFrame when called
Expand Down Expand Up @@ -1632,7 +1633,7 @@ def to_pydict(self) -> Dict[str, List[Any]]:
return result.to_pydict()

@DataframePublicAPI
def to_torch_map_dataset(self) -> "TorchDataset":
def to_torch_map_dataset(self) -> "torch.utils.data.Dataset":
"""Convert the current DataFrame into a map-style
`Torch Dataset <https://pytorch.org/docs/stable/data.html#map-style-datasets>`__
for use with PyTorch.
Expand All @@ -1654,7 +1655,7 @@ def to_torch_map_dataset(self) -> "TorchDataset":
return DaftTorchDataset(self.to_pydict(), len(self))

@DataframePublicAPI
def to_torch_iter_dataset(self) -> "TorchIterableDataset":
def to_torch_iter_dataset(self) -> "torch.utils.data.IterableDataset":
"""Convert the current DataFrame into a
`Torch IterableDataset <https://pytorch.org/docs/stable/data.html#torch.utils.data.IterableDataset>`__
for use with PyTorch.
Expand All @@ -1679,14 +1680,14 @@ def to_torch_iter_dataset(self) -> "TorchIterableDataset":
return DaftTorchIterableDataset(self)

@DataframePublicAPI
def to_ray_dataset(self) -> "RayDataset":
"""Converts the current DataFrame to a Ray Dataset which is useful for running distributed ML model training in Ray
def to_ray_dataset(self) -> "ray.data.dataset.DataSet":
"""Converts the current DataFrame to a `Ray Dataset <https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.html#ray.data.Dataset>`__ which is useful for running distributed ML model training in Ray

.. NOTE::
This function can only work if Daft is running using the RayRunner

Returns:
RayDataset: Ray dataset
ray.data.dataset.DataSet: `Ray dataset <https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.html#ray.data.Dataset>`__
"""
from daft.runners.ray_runner import RayPartitionSet

Expand All @@ -1698,8 +1699,8 @@ def to_ray_dataset(self) -> "RayDataset":
return partition_set.to_ray_dataset()

@classmethod
def _from_ray_dataset(cls, ds: "RayDataset") -> "DataFrame":
"""Creates a DataFrame from a Ray Dataset."""
def _from_ray_dataset(cls, ds: "ray.data.dataset.DataSet") -> "DataFrame":
"""Creates a DataFrame from a `Ray Dataset <https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.html#ray.data.Dataset>`__."""
context = get_context()
if context.runner_config.name != "ray":
raise ValueError("Daft needs to be running on the Ray Runner for this operation")
Expand Down Expand Up @@ -1744,8 +1745,8 @@ def _from_ray_dataset(cls, ds: "RayDataset") -> "DataFrame":
def to_dask_dataframe(
self,
meta: Union[
"pd.DataFrame",
"pd.Series",
"pandas.DataFrame",
"pandas.Series",
Dict[str, Any],
Iterable[Any],
Tuple[Any],
Expand All @@ -1761,7 +1762,7 @@ def to_dask_dataframe(
This function can only work if Daft is running using the RayRunner.

Args:
meta: An empty pandas DataFrame or Series that matches the dtypes and column
meta: An empty pandas `DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__ or `Series <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html>`__ that matches the dtypes and column
names of the stream. This metadata is necessary for many algorithms in
dask dataframe to work. For ease of use, some alternative inputs are
also available. Instead of a DataFrame, a dict of ``{name: dtype}`` or
Expand Down
12 changes: 12 additions & 0 deletions docs/source/api_docs/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ Execution
.. NOTE::
These methods will execute the operations in your DataFrame and **are blocking**.

Data Retrieval
**************

These methods will run the dataframe and retrieve them to where the code is being run.

.. autosummary::
:nosignatures:
:toctree: doc_gen/dataframe_methods

DataFrame.to_pydict
DataFrame.iter_partitions

Materialization
***************

Expand Down
Loading