Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

annotations experiment #942

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dlt.common.configuration.specs import CredentialsConfiguration as _CredentialsConfiguration
from dlt.common.pipeline import source_state as state
from dlt.common.schema import Schema
from dlt.common.schema import annotations

from dlt import sources
from dlt.extract.decorators import source, resource, transformer, defer
Expand Down Expand Up @@ -54,6 +55,7 @@

__all__ = [
"__version__",
"annotations",
"config",
"secrets",
"state",
Expand Down
153 changes: 153 additions & 0 deletions dlt/common/schema/annotations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import inspect

from typing import Type, List, Any, Tuple, Union, Dict
from typing_extensions import Annotated, get_origin, get_args
from decimal import Decimal
from dlt.common.data_types import py_type_to_sc_type

from dataclasses import dataclass
from dlt.common.schema.typing import TTableSchema, TColumnSchema, TWriteDisposition, TDataType
from datetime import datetime, date


#
# base dataclasses used for hints
#
@dataclass
class BoolValue:
value: bool


@dataclass
class StringValue:
value: str


@dataclass
class IntValue:
value: int


@dataclass
class StrListValue:
value: List[str]


#
# Column Hints
#
class PrimaryKey(BoolValue): ...


class Unique(BoolValue): ...


#
# Table Hints
#
class TableName(StringValue): ...


class Description(StringValue): ...


class Classifiers(StringValue): ...


@dataclass
class WriteDisposition:
value: TWriteDisposition


#
# Converters
#


def unwrap(t: Type[Any]) -> Tuple[Any, List[Any]]:
"""Returns python type info and wrapped types if this was annotated type"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

look at typing.py in common, I think I have similar function

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i extracted the essential part.

if get_origin(t) is Annotated:
args = get_args(t)
return args[0], list(args[1:])
return t, []


def to_full_type(t: Type[Any]) -> TColumnSchema:
result: TColumnSchema = {}
if get_origin(t) is Union:
for arg in get_args(t):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case the last type in Union will override all previous values should aggregate things somehow?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can only have one type in the schema, so either we have a default way of resolving if there are multiple types or we throw an error.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or types of int and string will produce a string, but I think that is taking it to far for now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood

if arg is type(None):
result["nullable"] = True
else:
result["data_type"] = py_type_to_sc_type(arg)
else:
result["data_type"] = py_type_to_sc_type(t)
return result


def to_table_hints(h: List[Type[Any]]) -> TTableSchema:
result: TTableSchema = {}
for hint in h:
if isinstance(hint, TableName):
result["name"] = hint.value
elif isinstance(hint, WriteDisposition):
result["write_disposition"] = hint.value
return result


def resolve_boolean_hint(column: TColumnSchema, hint: Type[Any], type: Type[Any], key: str) -> None:
"""boolean hints may be instantiated with a value or just be present as a class, in that case they are assumed to be true"""
if isinstance(hint, type):
column[key] = hint.value # type: ignore
if hint is type:
column[key] = True # type: ignore


def to_column_hints(h: List[Type[Any]]) -> TColumnSchema:
result: TColumnSchema = {}
for hint in h:
resolve_boolean_hint(result, hint, Unique, "unique")
resolve_boolean_hint(result, hint, PrimaryKey, "primary_key")
#
if isinstance(hint, Classifiers):
result["x-classifiers"] = hint.value # NOTE: classifiers not supported properly yet
return result


def class_to_table(ctt: Type[Any]) -> TTableSchema:

cls, hints = unwrap(ctt)

# initial checks
if not inspect.isclass(cls):
return {}

table: TTableSchema = {"columns": {}}
if hints:
table = {**table, **to_table_hints(hints)}

# return if there are no type annotations
if not (annotations := getattr(cls, "__annotations__", None)):
return table

# convert annotations to table schema
for name, value in annotations.items():
t, hints = unwrap(value)

# skip private attributes for now
if name.startswith("_"):
continue

# extract column hints
table["columns"][name] = {
"name": name,
**to_full_type(t),
**to_column_hints(hints),
}

return table


def table_to_class(table: TTableSchema) -> str:
"""TODO: do something with ast unparse"""
return ""
9 changes: 8 additions & 1 deletion dlt/extract/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
Generator,
)
from collections.abc import Mapping as C_Mapping
from typing_extensions import Annotated, get_origin

from dlt.common.exceptions import MissingDependencyException
from dlt.common.pipeline import reset_resource_state
from dlt.common.schema.typing import TColumnNames, TAnySchemaColumns, TTableSchemaColumns
from dlt.common.typing import AnyFun, DictStrAny, TDataItem, TDataItems
from dlt.common.utils import get_callable_name
from dlt.common.schema.annotations import class_to_table

from dlt.extract.exceptions import (
InvalidResourceDataTypeFunctionNotAGenerator,
Expand Down Expand Up @@ -61,10 +63,15 @@ def ensure_table_schema_columns(columns: TAnySchemaColumns) -> TTableSchemaColum
elif isinstance(columns, Sequence):
# Assume list of columns
return {col["name"]: col for col in columns}
elif get_origin(columns) is Annotated:
return class_to_table(columns)["columns"]
elif pydantic is not None and (
isinstance(columns, pydantic.BaseModel) or issubclass(columns, pydantic.BaseModel)
):
return pydantic.pydantic_to_table_schema_columns(columns)
elif isinstance(columns, type):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no support for table level hints here yet

# try to build a table from class annotations
return class_to_table(columns)["columns"]

raise ValueError(f"Unsupported columns type: {type(columns)}")

Expand All @@ -75,7 +82,7 @@ def ensure_table_schema_columns_hint(
"""Convert column schema hint to a hint returning `TTableSchemaColumns`.
A callable hint is wrapped in another function which converts the original result.
"""
if callable(columns) and not isinstance(columns, type):
if callable(columns) and not isinstance(columns, type) and not get_origin(columns) is Annotated:

def wrapper(item: TDataItem) -> TTableSchemaColumns:
return ensure_table_schema_columns(
Expand Down
36 changes: 36 additions & 0 deletions test_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import dlt

from dlt import annotations as a
from dlt.common import json
from typing_extensions import Annotated, Never, Optional


class Items:

# primary keys
id: Annotated[str, a.PrimaryKey, a.Unique]

# additional columns
name: Annotated[Optional[str], a.Classifiers(["pii.name"])]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably generate literals for those

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean for the classifiers?

email: Annotated[Optional[str], a.Unique, a.Classifiers(["pii.email"])]
likes_herring: Annotated[bool, a.Classifiers(["pii.food_preference"])]


AnnotatedItems = Annotated[Items, a.TableName("my_items"), a.WriteDisposition("merge")]

if __name__ == "__main__":

# print result of class_to_table
print(json.dumps(a.class_to_table(AnnotatedItems), pretty=True))

p = dlt.pipeline("my_pipe", destination="duckdb", full_refresh=True)

data = [{
"id": "my_id"
}]

# run simple pipeline and see wether schema was used
load_info = p.run(data, columns=AnnotatedItems, table_name="blah")
print(load_info)
print(p.default_schema.to_pretty_yaml())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The items example will produce this segment in the final schema:

  blah:
    columns:
      id:
        data_type: text
        primary_key: true
        unique: true
      name:
        data_type: text
        nullable: true
        x-classifiers:
        - pii.name
      email:
        data_type: text
        nullable: true
        unique: true
        x-classifiers:
        - pii.email
      likes_herring:
        data_type: bool
        x-classifiers:
        - pii.food_preference
      _dlt_load_id:
        data_type: text
        nullable: false
      _dlt_id:
        data_type: text
        nullable: false
        unique: true
    write_disposition: append