Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 73 additions & 68 deletions dataframely/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class Collection(BaseCollection, ABC):

A collection is comprised of a set of *members* which are collectively "consistent",
meaning they the collection ensures that invariants are held up *across* members.
This is different to :mod:`dataframely` schemas which only ensure invariants
This is different to :class:`~dataframely.Schema` which only ensure invariants
*within* individual members.

In order to properly ensure that invariants hold up across members, members must
Expand All @@ -75,10 +75,6 @@ class MyCollection(dy.Collection):
Besides, it may define *filters* (c.f. :meth:`~dataframely.filter`) and arbitrary
methods.

Note:
The :mod:`dataframely` mypy plugin ensures that the dictionaries passed to class
methods contain exactly the required keys.

Attention:
Do NOT use this class in combination with ``from __future__ import annotations``
as it requires the proper schema definitions to ensure that the collection is
Expand All @@ -91,8 +87,8 @@ class MyCollection(dy.Collection):
def create_empty(cls) -> Self:
"""Create an empty collection without any data.

This method simply calls ``create_empty`` on all member schemas, including
non-optional ones.
This method simply calls :meth:``~dataframely.Schema.create_empty`` on all member schemas,
including non-optional ones.

Returns:
An instance of this collection.
Expand Down Expand Up @@ -121,12 +117,12 @@ def sample(
function must be "row-oriented" (or "sample-oriented").

Args:
num_rows: The number of rows to sample for each member. If this is set to
``None``, the number of rows is inferred from the length of the
num_rows: The number of rows to sample for each member.
If this is set to ``None``, the number of rows is inferred from the length of the
overrides.
overrides: The overrides to set values in member schemas. The overrides must
be provided as a list of samples. The structure of the samples must be
as follows:
overrides: The overrides to set values in member schemas.
The overrides must be provided as a list of samples.
The structure of the samples must be as follows:

.. code::

Expand All @@ -150,25 +146,27 @@ def sample(
Note that overrides for columns of members that are annotated with
``inline_for_sampling=True`` can be supplied on the top-level instead
of in a nested dictionary.
generator: The (seeded) generator to use for sampling data. If ``None``, a
generator with random seed is automatically created.
generator: The (seeded) generator to use for sampling data.
If ``None``, a generator with random seed is automatically created.

Returns:
A collection where all members (including optional ones) have been sampled
according to the input parameters.

Attention:
In case the collection has members with a common primary key, the
`_preprocess_sample` method must return distinct primary key values for each
:meth:`_preprocess_sample` method must return distinct primary key values for each
sample. The default implementation does this on a best-effort basis but may
cause primary key violations. Hence, it is recommended to override this
method and ensure that all primary key columns are set.

Raises:
ValueError: If the :meth:`_preprocess_sample` method does not return all
ValueError:
If the :meth:`_preprocess_sample` method does not return all
common primary key columns for all samples.
ValidationError: If the sampled members violate any of the collection
filters. If the collection does not have filters, this error is never
ValidationError:
If the sampled members violate any of the collection filters.
If the collection does not have filters, this error is never
raised. To prevent validation errors, overwrite the
:meth:`_preprocess_sample` method appropriately.
"""
Expand Down Expand Up @@ -403,17 +401,16 @@ def is_valid(cls, data: Mapping[str, FrameType], /, *, cast: bool = False) -> bo
Args:
data: The members of the collection which ought to be validated. The
dictionary must contain exactly one entry per member with the name of
the member as key. The existence of all keys is checked via the
:mod:`dataframely` mypy plugin.
the member as key.
cast: Whether columns with a wrong data type in the member data frame are
cast to their schemas' defined data types if possible.

Returns:
Whether the provided members satisfy the invariants of the collection.

Raises:
ValueError: If an insufficient set of input data frames is provided, i.e. if
any required member of this collection is missing in the input.
ValueError: If an insufficient set of input data frames is provided,
i.e. if any required member of this collection is missing in the input.
"""
try:
cls.validate(data, cast=cast)
Expand All @@ -430,8 +427,8 @@ def filter(
"""Filter the members data frame by their schemas and the collection's filters.

Args:
data: The members of the collection which ought to be filtered. The
dictionary must contain exactly one entry per member with the name of
data: The members of the collection which ought to be filtered.
The dictionary must contain exactly one entry per member with the name of
the member as key, except for optional members which may be missing.
All data frames passed here will be eagerly collected within the method,
regardless of whether they are a :class:`~polars.DataFrame` or
Expand All @@ -447,16 +444,17 @@ def filter(
filtered out by any of the collection's filters. While collection members
are always instances of :class:`~polars.LazyFrame`, the members of the
returned collection are essentially eager as they are constructed by
calling ``.lazy()`` on eager data frames. Just like in polars' native
:meth:`~polars.DataFrame.filter`, the order of rows is maintained in all
returned data frames.
- A mapping from member name to a :class:`FailureInfo` object which provides
details on why individual rows had been removed. Optional members are only
included in this dictionary if they had been provided in the input.
calling :meth:`polars.DataFrame.lazy()` on eager data frames.
Just like in polars' native :meth:`~polars.DataFrame.filter`,
the order of rows is maintained in all returned data frames.
- A mapping from member name to a :class:`~dataframely.FailureInfo` object
which provides details on why individual rows had been removed.
Optional members are only included in this dictionary if they were
provided in the input.

Raises:
ValueError: If an insufficient set of input data frames is provided, i.e. if
any required member of this collection is missing in the input.
ValueError: If an insufficient set of input data frames is provided,
i.e. if any required member of this collection is missing in the input.
ValidationError: If the columns of any of the input data frames are invalid.
This happens only if a data frame misses a column defined in its schema
or a column has an invalid dtype while ``cast`` is set to ``False``.
Expand Down Expand Up @@ -587,15 +585,17 @@ def join(
The collection, with members potentially reduced in length.

Raises:
ValueError: If the collection contains any member that is annotated with
ValueError:
If the collection contains any member that is annotated with
`ignored_in_filters=True`.

Attention:
This method does not validate the resulting collection. Ensure to only use
this if the resulting collection still satisfies the filters of the
collection. The joins are not evaluated eagerly. Therefore, a downstream
call to :meth:`collect` might fail, especially if `primary_keys` does not
contain all columns for all common primary keys.
call to :meth:`polars.LazyFrame.collect`
may fail, especially if `primary_keys` does not contain all columns
for all common primary keys.
"""
if any(member.ignored_in_filters for member in self.members().values()):
raise ValueError(
Expand All @@ -620,7 +620,7 @@ def join(
def cast(cls, data: Mapping[str, FrameType], /) -> Self:
"""Initialize a collection by casting all members into their correct schemas.

This method calls :meth:`~Schema.cast` on every member, thus, removing
This method calls :meth:`~dataframely.Schema.cast` on every member, thus, removing
superfluous columns and casting to the correct dtypes for all input data frames.

You should typically use :meth:`validate` or :meth:`filter` to obtain instances
Expand All @@ -629,19 +629,21 @@ def cast(cls, data: Mapping[str, FrameType], /) -> Self:
it is known that the provided data adheres to the collection's invariants.

Args:
data: The data for all members. The dictionary must contain exactly one
entry per member with the name of the member as key.
data: The data for all members.
The dictionary must contain exactly one entry per member
with the name of the member as key.

Returns:
The initialized collection.

Raises:
ValueError: If an insufficient set of input data frames is provided, i.e. if
any required member of this collection is missing in the input.
ValueError: If an insufficient set of input data frames is provided
i.e. if any required member of this collection is missing in the input.

Attention:
For lazy frames, casting is not performed eagerly. This prevents collecting
the lazy frames' schemas but also means that a call to :meth:`collect`
the lazy frames' schemas but also means that a call to
:meth:`polars.LazyFrame.collect`
further down the line might fail because of the cast and/or missing columns.
"""
cls._validate_input_keys(data)
Expand Down Expand Up @@ -677,10 +679,10 @@ def collect_all(self) -> Self:

@classmethod
def serialize(cls) -> str:
"""Serialize this collection to a JSON string.
"""Serialize the metadata for this collection to a JSON string.

This method does NOT serialize any data frames, but only the _structure_ of the
collection, similar to :meth:`Schema.serialize`.
collection, similar to :meth:`dataframely.Schema.serialize`.

Returns:
The serialized collection.
Expand All @@ -702,9 +704,10 @@ def serialize(cls) -> str:
without it being considered a breaking change.

Raises:
TypeError: If a column of any member contains metadata that is not
JSON-serializable.
ValueError: If a column of any member is not a "native" dataframely column
TypeError:
If a column of any member contains metadata that is not JSON-serializable.
ValueError:
If a column of any member is not a "native" dataframely column
type but a custom subclass.
"""
result = {
Expand Down Expand Up @@ -736,15 +739,14 @@ def write_parquet(self, directory: str | Path, **kwargs: Any) -> None:
members which are not provided in the current collection.

Args:
directory: The directory where the Parquet files should be written to. If
the directory does not exist, it is created automatically, including all
of its parents.
kwargs: Additional keyword arguments passed directly to
:meth:`polars.write_parquet` of all members. ``metadata`` may only be
provided if it is a dictionary.
directory: The directory the Parquet files should be written to.
If the directory does not exist, it is created automatically,
including all of its parents.
kwargs: Additional keyword arguments passed to :meth:`polars.DataFrame.write_parquet`.
``metadata`` may only be provided if it is a dictionary.

Attention:
This method suffers from the same limitations as :meth:`Schema.serialize`.
This method suffers from the same limitations as :meth:`~dataframely.Schema.serialize`.
"""
self._write(ParquetStorageBackend(), directory=directory, **kwargs)

Expand All @@ -759,12 +761,11 @@ def sink_parquet(self, directory: str | Path, **kwargs: Any) -> None:
directory: The directory where the Parquet files should be written to. If
the directory does not exist, it is created automatically, including all
of its parents.
kwargs: Additional keyword arguments passed directly to
:meth:`polars.sink_parquet` of all members. ``metadata`` may only be
provided if it is a dictionary.
kwargs: Additional keyword arguments passed to :meth:`polars.LazyFrame.sink_parquet`.
``metadata`` may only be provided if it is a dictionary.

Attention:
This method suffers from the same limitations as :meth:`Schema.serialize`.
This method suffers from the same limitations as :meth:`~dataframely.Schema.serialize`.
"""
self._sink(ParquetStorageBackend(), directory=directory, **kwargs)

Expand Down Expand Up @@ -802,17 +803,19 @@ def read_parquet(
carefully*.

kwargs: Additional keyword arguments passed directly to
:meth:`polars.read_parquet`.
:func:`polars.read_parquet`.

Returns:
The initialized collection.

Raises:
ValidationRequiredError: If no collection schema can be read from the
ValidationRequiredError:
If no collection schema can be read from the
directory and ``validation`` is set to ``"forbid"``.
ValueError: If the provided directory does not contain parquet files for
ValueError:
If the provided directory does not contain parquet files for
all required members.
ValidationError: If the collection cannot be validate.
ValidationError: If the collection cannot be validated.

Note:
This method is backward compatible with older versions of dataframely
Expand Down Expand Up @@ -865,7 +868,7 @@ def scan_parquet(
carefully*.

kwargs: Additional keyword arguments passed directly to
:meth:`polars.scan_parquet` for all members.
:func:`polars.scan_parquet` for all members.

Returns:
The initialized collection.
Expand Down Expand Up @@ -910,7 +913,7 @@ def write_delta(
target: The location or DeltaTable where the data should be written.
If the location does not exist, it is created automatically,
including all of its parents.
kwargs: Additional keyword arguments passed directly to :meth:`polars.write_delta`.
kwargs: Additional keyword arguments passed to :meth:`polars.DataFrame.write_delta`.

Attention:
Schema metadata is stored as custom commit metadata. Only the schema
Expand All @@ -922,7 +925,7 @@ def write_delta(
without re-validating. Only use appends if you are certain that they do not
break your schema.

This method suffers from the same limitations as :meth:`Schema.serialize`.
This method suffers from the same limitations as :meth:`~dataframely.Schema.serialize`.
"""
self._write(
backend=DeltaStorageBackend(),
Expand Down Expand Up @@ -962,14 +965,16 @@ def scan_delta(
data, entrusting the user that the schema is valid. *Use this option
carefully*.

kwargs: Additional keyword arguments passed directly to :meth:`polars.scan_delta`.
kwargs: Additional keyword arguments passed to :func:`polars.scan_delta`.

Returns:
The initialized collection.

Raises:
ValidationRequiredError: If no collection schema can be read from the source and ``validation`` is set to ``"forbid"``.
ValueError: If the provided source does not contain Delta tables for all required members.
ValidationRequiredError:
If no collection schema can be read from the source and ``validation`` is set to ``"forbid"``.
ValueError:
If the provided source does not contain Delta tables for all required members.

Note:
Due to current limitations in dataframely, this method may read the Delta table into memory if ``validation`` is ``"warn"`` or ``"allow"`` and validation is required.
Expand Down Expand Up @@ -1025,7 +1030,7 @@ def read_delta(
data, entrusting the user that the schema is valid. *Use this option
carefully*.

kwargs: Additional keyword arguments passed directly to :meth:`polars.read_delta`.
kwargs: Additional keyword arguments passed directly to :func:`polars.read_delta`.

Returns:
The initialized collection.
Expand Down
Loading
Loading