Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [
"pydantic~=2.11.4",
"pyarrow~=19.0.1",
"platformdirs~=4.3.8",
"duckdb~=1.3.2",
]

[project.urls]
Expand Down
127 changes: 127 additions & 0 deletions src/getml_io/metadata/dataframe_information.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,138 @@
from __future__ import annotations

from collections.abc import Mapping
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Annotated, Literal, get_args

import getml.data.roles.types as roles
from pydantic import Field, TypeAdapter
from pydantic.dataclasses import dataclass

from getml_io.utils.convert import assume_is_str


class Role(str, Enum):
CATEGORICAL = assume_is_str(get_args(roles.Categorical)[0])
JOIN_KEY = assume_is_str(get_args(roles.JoinKey)[0])
NUMERICAL = assume_is_str(get_args(roles.Numerical)[0])
TARGET = assume_is_str(get_args(roles.Target)[0])
TEXT = assume_is_str(get_args(roles.Text)[0])
TIME_STAMP = assume_is_str(get_args(roles.TimeStamp)[0])
UNUSED_FLOAT = assume_is_str(get_args(roles.UnusedFloat)[0])
UNUSED_STRING = assume_is_str(get_args(roles.UnusedString)[0])


@dataclass(frozen=True)
class ColumnStatisticsDouble:
count: int
approx_unique: int
avg: float
min: float
max: float
q25: float
q50: float
q75: float
std: float
null_percentage: float
column_type: Literal["DOUBLE"]


@dataclass(frozen=True)
class ColumnStatisticsVarchar:
count: int
approx_unique: int
min: str
max: str
null_percentage: float
column_type: Literal["VARCHAR"]


@dataclass(frozen=True)
class ColumnStatisticsNumerical(ColumnStatisticsDouble):
type: Literal["numerical"] = "numerical"


@dataclass(frozen=True)
class ColumnStatisticsTarget(ColumnStatisticsDouble):
type: Literal["target"] = "target"


@dataclass(frozen=True)
class ColumnStatisticsTimeStamp:
count: int
approx_unique: int
avg: datetime
min: datetime
max: datetime
q25: datetime
q50: datetime
q75: datetime
null_percentage: float
column_type: Literal["TIMESTAMP_NS"]
type: Literal["time_stamp"] = "time_stamp"


@dataclass(frozen=True)
class ColumnStatisticsTimeStampAsFloat(ColumnStatisticsDouble):
type: Literal["time_stamp_float"] = "time_stamp_float"


@dataclass(frozen=True)
class ColumnStatisticsCategorical(ColumnStatisticsVarchar):
type: Literal["categorical"] = "categorical"


@dataclass(frozen=True)
class ColumnStatisticsJoinKey(ColumnStatisticsVarchar):
type: Literal["join_key"] = "join_key"


@dataclass(frozen=True)
class ColumnStatisticsUnusedFloat(ColumnStatisticsDouble):
type: Literal["unused_float"] = "unused_float"


@dataclass(frozen=True)
class ColumnStatisticsUnusedString(ColumnStatisticsVarchar):
type: Literal["unused_string"] = "unused_string"


ColumnStatistics = Annotated[
ColumnStatisticsNumerical
| ColumnStatisticsTarget
| ColumnStatisticsCategorical
| ColumnStatisticsJoinKey
| ColumnStatisticsTimeStamp
| ColumnStatisticsTimeStampAsFloat
| ColumnStatisticsUnusedFloat
| ColumnStatisticsUnusedString,
Field(discriminator="type"),
]


@dataclass
class ColumnProfile:
Comment thread
Urfoex marked this conversation as resolved.
name: str
role: Role
statistics: ColumnStatistics


@dataclass
class DataFrameInformation:
name: str
path: Path
profile: Mapping[str, ColumnProfile]


ROLE_TYPE_ADAPTER_MAPPING = {
(Role.CATEGORICAL, "VARCHAR"): TypeAdapter(ColumnStatisticsCategorical),
(Role.JOIN_KEY, "VARCHAR"): TypeAdapter(ColumnStatisticsJoinKey),
(Role.NUMERICAL, "DOUBLE"): TypeAdapter(ColumnStatisticsNumerical),
(Role.TARGET, "DOUBLE"): TypeAdapter(ColumnStatisticsTarget),
(Role.TIME_STAMP, "TIMESTAMP_NS"): TypeAdapter(ColumnStatisticsTimeStamp),
(Role.TIME_STAMP, "DOUBLE"): TypeAdapter(ColumnStatisticsTimeStampAsFloat),
(Role.UNUSED_FLOAT, "DOUBLE"): TypeAdapter(ColumnStatisticsUnusedFloat),
(Role.UNUSED_STRING, "VARCHAR"): TypeAdapter(ColumnStatisticsUnusedString),
}
3 changes: 0 additions & 3 deletions src/getml_io/metadata/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
from pathlib import Path
from typing import Protocol, TypeVar

from pydantic.dataclasses import dataclass

from getml_io.metadata.exception import PathNotRelativeError


@dataclass
Comment thread
awaismirza92 marked this conversation as resolved.
class InstanceProtocol(Protocol):
name: str
path: Path
Expand Down
18 changes: 10 additions & 8 deletions src/getml_io/serialize/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,19 @@ def serialize_container(
container,
container_storage_directory,
)
peripheral_information = serialize_peripheral(
container,
container_storage_directory,
)
subsets_information = serialize_subsets(
container,
container_storage_directory,
)
container_information = ContainerInformation(
id=assume_is_str(container.id),
population=population_information,
peripheral=serialize_peripheral(
container,
container_storage_directory,
),
subsets=serialize_subsets(
container,
container_storage_directory,
),
peripheral=peripheral_information,
subsets=subsets_information,
deep_copy=assume_is_bool(container.deep_copy),
path=target_storage_directory,
)
Expand Down
126 changes: 121 additions & 5 deletions src/getml_io/serialize/dataframe_or_view.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,40 @@
from __future__ import annotations

import logging
from logging import Logger
from pathlib import Path

import duckdb
from getml.data import (
DataFrame,
View,
)
from pydantic import TypeAdapter

from getml_io.metadata.dataframe_information import DataFrameInformation
from getml_io.metadata.dataframe_information import (
ROLE_TYPE_ADAPTER_MAPPING,
ColumnProfile,
ColumnStatistics,
ColumnStatisticsCategorical,
ColumnStatisticsJoinKey,
ColumnStatisticsNumerical,
ColumnStatisticsTarget,
ColumnStatisticsTimeStamp,
ColumnStatisticsTimeStampAsFloat,
ColumnStatisticsUnusedFloat,
ColumnStatisticsUnusedString,
DataFrameInformation,
Role,
)
from getml_io.serialize.exception import (
DataFrameParquetStorageError,
UnsupportedColumnStatisticsError,
)
from getml_io.utils.convert import assume_is_str
from getml_io.utils.exception import StorageDirectoryCreationError

logger: Logger = logging.getLogger(__name__)


def serialize_dataframe_or_view(
dataframe_or_view: DataFrame | View,
Expand Down Expand Up @@ -53,16 +74,111 @@ def serialize_dataframe_or_view(
if filename_prefix and filename_prefix != name
else name
)
filepath = target_storage_directory / f"{filename}.parquet"
parquet_filepath = target_storage_directory / f"{filename}.parquet"
try:
dataframe_or_view.to_parquet(str(filepath))
dataframe_or_view.to_parquet(str(parquet_filepath))
except Exception as exception:
raise DataFrameParquetStorageError(
name,
filepath,
parquet_filepath,
) from exception

profile = _calculate_profile(parquet_filepath, dataframe_or_view)

return DataFrameInformation(
name=name,
path=filepath,
path=parquet_filepath,
profile=profile,
)


def _calculate_profile(
Comment thread
Urfoex marked this conversation as resolved.
parquet_filepath: Path,
dataframe_or_view: DataFrame | View,
) -> dict[str, ColumnProfile]:
summary_statistics = _calculate_summary_statistics(
parquet_filepath,
dataframe_or_view,
)
return {
name: ColumnProfile(
name=name,
role=Role(dataframe_or_view.roles.column(name)),
statistics=summary_statistics[name],
)
for name in dataframe_or_view.columns
}


def _calculate_summary_statistics(
parquet_filepath: Path,
dataframe_or_view: DataFrame | View,
) -> dict[str, ColumnStatistics]:
raw_summary_statistics = _fetch_raw_summary_statistics(parquet_filepath)
return _build_column_statistics(
dataframe_or_view,
raw_summary_statistics,
)


SUMMARIZE_STATEMENT_TEMPLATE = "SUMMARIZE (SELECT * FROM read_parquet(?))"
Comment thread
Urfoex marked this conversation as resolved.


def _fetch_raw_summary_statistics(
parquet_filepath: Path,
) -> dict[str, dict[str, str | int | float]]:
with (
duckdb.connect() as connection,
):
logger.debug(
"Calculating summary statistics for Parquet '%s'",
parquet_filepath,
)
return (
connection.execute(SUMMARIZE_STATEMENT_TEMPLATE, [str(parquet_filepath)])
.df()
.set_index("column_name")
.to_dict(orient="index")
)


def _build_column_statistics(
dataframe_or_view: DataFrame | View,
raw_summary_statistics: dict[str, dict[str, str | int | float]],
) -> dict[str, ColumnStatistics]:
return {
name: _get_column_statistics_adapter(
dataframe_or_view,
name,
assume_is_str(raw_summary_statistics[name]["column_type"]),
).validate_python(
raw_summary_statistics[name],
)
for name in dataframe_or_view.columns
}


def _get_column_statistics_adapter(
dataframe_or_view: DataFrame | View,
name: str,
column_type: str,
) -> (
TypeAdapter[ColumnStatisticsNumerical]
| TypeAdapter[ColumnStatisticsTarget]
| TypeAdapter[ColumnStatisticsCategorical]
| TypeAdapter[ColumnStatisticsJoinKey]
| TypeAdapter[ColumnStatisticsTimeStamp]
| TypeAdapter[ColumnStatisticsTimeStampAsFloat]
| TypeAdapter[ColumnStatisticsUnusedFloat]
| TypeAdapter[ColumnStatisticsUnusedString]
):
Comment thread
Urfoex marked this conversation as resolved.
role = Role(assume_is_str(dataframe_or_view.roles.column(name)))
adapter = ROLE_TYPE_ADAPTER_MAPPING.get((role, column_type))
if adapter is None:
raise UnsupportedColumnStatisticsError(
assume_is_str(dataframe_or_view.name),
name,
role,
column_type,
)
Comment thread
awaismirza92 marked this conversation as resolved.
return adapter
20 changes: 20 additions & 0 deletions src/getml_io/serialize/exception.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pathlib import Path

from getml_io.metadata.dataframe_information import ROLE_TYPE_ADAPTER_MAPPING, Role
from getml_io.utils.exception import GetMLIOError


Expand Down Expand Up @@ -76,3 +77,22 @@ class TableParquetStorageError(GetMLIOStorageError):
def __init__(self, name: str, path: Path) -> None:
"""Initialize the exception with a custom message."""
super().__init__("Table as parquet", name, path)


class UnsupportedColumnStatisticsError(GetMLIOError):
"""Exception raised when an unsupported column statistics is encountered."""

def __init__(
self,
dataframe_name: str,
column_name: str,
role: Role,
column_type: str,
) -> None:
"""Initialize the exception with a custom message."""
message = (
f"Column {column_name!r} in dataframe {dataframe_name!r} has an "
f"unsupported role: {role!r} and type: {column_type}. "
f"Supported are: {list(ROLE_TYPE_ADAPTER_MAPPING.keys())}."
)
super().__init__(message)
Loading