Skip to content

Commit

Permalink
Merge pull request #116 from dlt-hub/rfix/fixes-bug-a14
Browse files Browse the repository at this point in the history
0.1.0a14 bug fixes
  • Loading branch information
rudolfix committed Jan 8, 2023
2 parents 64591cf + 739df52 commit 8310c40
Show file tree
Hide file tree
Showing 28 changed files with 702 additions and 87 deletions.
5 changes: 0 additions & 5 deletions INSTALLATION.md

This file was deleted.

5 changes: 2 additions & 3 deletions dlt/cli/config_toml_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
import tomlkit
from tomlkit.items import Table as TOMLTable

from dlt.common.configuration.resolve import extract_inner_hint
from dlt.common.configuration.specs import BaseConfiguration, is_base_configuration_hint
from dlt.common.configuration.specs import BaseConfiguration, is_base_configuration_inner_hint, extract_inner_hint
from dlt.common.typing import AnyType, is_final_type, is_optional_type


Expand All @@ -21,7 +20,7 @@ def write_value(toml_table: TOMLTable, name: str, hint: AnyType, default_value:
# get the inner hint to generate cool examples
hint = extract_inner_hint(hint)

if is_base_configuration_hint(hint):
if is_base_configuration_inner_hint(hint):
inner_table = tomlkit.table(False)
toml_table[name] = inner_table
write_spec(inner_table, hint())
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/configuration/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dlt.common.configuration.exceptions import LookupTrace

from dlt.common.configuration.providers.provider import ConfigProvider
from dlt.common.configuration.specs import is_base_configuration_hint
from dlt.common.configuration.specs import is_base_configuration_inner_hint
from dlt.common.configuration.utils import deserialize_value, log_traces
from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext
from dlt.common.schema.utils import coerce_value
Expand Down
12 changes: 6 additions & 6 deletions dlt/common/configuration/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from dlt.common.configuration.providers.provider import ConfigProvider
from dlt.common.typing import AnyType, StrAny, TSecretValue, is_final_type, is_optional_type

from dlt.common.configuration.specs.base_configuration import BaseConfiguration, CredentialsConfiguration, is_secret_hint, is_context_hint, is_base_configuration_hint
from dlt.common.configuration.specs.base_configuration import BaseConfiguration, CredentialsConfiguration, is_secret_hint, extract_inner_hint, is_context_inner_hint, is_base_configuration_inner_hint
from dlt.common.configuration.specs.config_namespace_context import ConfigNamespacesContext
from dlt.common.configuration.container import Container
from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext
from dlt.common.configuration.utils import extract_inner_hint, log_traces, deserialize_value
from dlt.common.configuration.utils import log_traces, deserialize_value
from dlt.common.configuration.exceptions import (FinalConfigFieldException, LookupTrace, ConfigFieldMissingException, ConfigurationWrongTypeException, ValueNotSecretException, InvalidNativeValue)

TConfiguration = TypeVar("TConfiguration", bound=BaseConfiguration)
Expand Down Expand Up @@ -166,10 +166,10 @@ def _resolve_config_field(
value, traces = _resolve_single_value(key, hint, inner_hint, config_namespace, explicit_namespaces, embedded_namespaces)
log_traces(config, key, hint, value, default_value, traces)
# contexts must be resolved as a whole
if is_context_hint(inner_hint):
if is_context_inner_hint(inner_hint):
pass
# if inner_hint is BaseConfiguration then resolve it recursively
elif is_base_configuration_hint(inner_hint):
elif is_base_configuration_inner_hint(inner_hint):
if isinstance(value, BaseConfiguration):
# if resolved value is instance of configuration (typically returned by context provider)
embedded_config = value
Expand Down Expand Up @@ -245,11 +245,11 @@ def _resolve_single_value(
# get providers from container
providers_context = container[ConfigProvidersContext]
# we may be resolving context
if is_context_hint(inner_hint):
if is_context_inner_hint(inner_hint):
# resolve context with context provider and do not look further
value, _ = providers_context.context_provider.get_value(key, inner_hint)
return value, traces
if is_base_configuration_hint(inner_hint):
if is_base_configuration_inner_hint(inner_hint):
# cannot resolve configurations directly
return value, traces

Expand Down
2 changes: 1 addition & 1 deletion dlt/common/configuration/specs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .run_configuration import RunConfiguration # noqa: F401
from .base_configuration import BaseConfiguration, CredentialsConfiguration, ContainerInjectableContext, is_base_configuration_hint # noqa: F401
from .base_configuration import BaseConfiguration, CredentialsConfiguration, ContainerInjectableContext, extract_inner_hint, is_base_configuration_inner_hint # noqa: F401
from .normalize_volume_configuration import NormalizeVolumeConfiguration # noqa: F401
from .load_volume_configuration import LoadVolumeConfiguration # noqa: F401
from .schema_volume_configuration import SchemaVolumeConfiguration, TSchemaFileFormat # noqa: F401
Expand Down
41 changes: 29 additions & 12 deletions dlt/common/configuration/specs/base_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,57 @@
_F_ContainerInjectableContext: Any = type(object)


def is_secret_hint(hint: Type[Any]) -> bool:
return hint is TSecretValue or (inspect.isclass(hint) and issubclass(hint, CredentialsConfiguration))
def is_base_configuration_inner_hint(inner_hint: Type[Any]) -> bool:
return inspect.isclass(inner_hint) and issubclass(inner_hint, BaseConfiguration)


def is_context_inner_hint(inner_hint: Type[Any]) -> bool:
return inspect.isclass(inner_hint) and issubclass(inner_hint, ContainerInjectableContext)


def is_base_configuration_hint(hint: Type[Any]) -> bool:
return inspect.isclass(hint) and issubclass(hint, BaseConfiguration)
def is_credentials_inner_hint(inner_hint: Type[Any]) -> bool:
return inspect.isclass(inner_hint) and issubclass(inner_hint, CredentialsConfiguration)


def is_context_hint(hint: Type[Any]) -> bool:
return inspect.isclass(hint) and issubclass(hint, ContainerInjectableContext)
def get_config_if_union_hint(hint: Type[Any]) -> Type[Any]:
if get_origin(hint) is Union:
return next((t for t in get_args(hint) if is_base_configuration_inner_hint(t)), None)
return None


def is_valid_hint(hint: Type[Any]) -> bool:
hint = extract_inner_type(hint)
hint = get_config_if_union(hint) or hint
hint = get_config_if_union_hint(hint) or hint
hint = get_origin(hint) or hint

if hint is Any:
return True
if hint is ClassVar:
# class vars are skipped by dataclass
return True
if is_base_configuration_hint(hint):
if is_base_configuration_inner_hint(hint):
return True
with contextlib.suppress(TypeError):
py_type_to_sc_type(hint)
return True
return False


def get_config_if_union(hint: Type[Any]) -> Type[Any]:
if get_origin(hint) is Union:
return next((t for t in get_args(hint) if is_base_configuration_hint(t)), None)
return None
def extract_inner_hint(hint: Type[Any], preserve_new_types: bool = False) -> Type[Any]:
# extract hint from Optional / Literal / NewType hints
inner_hint = extract_inner_type(hint, preserve_new_types)
# get base configuration from union type
inner_hint = get_config_if_union_hint(inner_hint) or inner_hint
# extract origin from generic types (ie List[str] -> List)
return get_origin(inner_hint) or inner_hint


def is_secret_hint(hint: Type[Any]) -> bool:
is_secret = hint is TSecretValue
if not is_secret:
hint = extract_inner_hint(hint, preserve_new_types=True)
is_secret = hint is TSecretValue or is_credentials_inner_hint(hint)
return is_secret


@overload
Expand Down
13 changes: 2 additions & 11 deletions dlt/common/configuration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dlt.common.schema.utils import coerce_value, py_type_to_sc_type
from dlt.common.configuration import DOT_DLT
from dlt.common.configuration.exceptions import ConfigValueCannotBeCoercedException, LookupTrace
from dlt.common.configuration.specs.base_configuration import BaseConfiguration, get_config_if_union, is_base_configuration_hint
from dlt.common.configuration.specs.base_configuration import BaseConfiguration, get_config_if_union_hint, is_base_configuration_inner_hint


class ResolvedValueTrace(NamedTuple):
Expand All @@ -28,7 +28,7 @@ def deserialize_value(key: str, value: Any, hint: Type[TAny]) -> TAny:
try:
if hint != Any:
# if deserializing to base configuration, try parse the value
if is_base_configuration_hint(hint):
if is_base_configuration_inner_hint(hint):
c = hint()
if isinstance(value, dict):
c.update(value)
Expand Down Expand Up @@ -89,15 +89,6 @@ def serialize_value(value: Any) -> Any:
return coerce_value("text", value_dt, value)


def extract_inner_hint(hint: Type[Any]) -> Type[Any]:
# extract hint from Optional / Literal / NewType hints
inner_hint = extract_inner_type(hint)
# get base configuration from union type
inner_hint = get_config_if_union(inner_hint) or inner_hint
# extract origin from generic types (ie List[str] -> List)
return get_origin(inner_hint) or inner_hint


def log_traces(config: BaseConfiguration, key: str, hint: Type[Any], value: Any, default_value: Any, traces: Sequence[LookupTrace]) -> None:
if logger.is_logging() and logger.log_level() == "DEBUG":
logger.debug(f"Field {key} with type {hint} in {type(config).__name__} {'NOT RESOLVED' if value is None else 'RESOLVED'}")
Expand Down
3 changes: 3 additions & 0 deletions dlt/common/reflection/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import inspect
from typing import Dict, List, Type, Any, Optional
from inspect import Signature, Parameter
from dlt.common.configuration.specs.base_configuration import get_config_if_union_hint

from dlt.common.typing import AnyType, AnyFun, TSecretValue
from dlt.common.configuration import configspec, is_valid_hint, is_secret_hint
Expand Down Expand Up @@ -68,6 +69,8 @@ def dlt_config_literal_to_type(arg_name: str) -> AnyType:
# try to get type from default
if field_type is AnyType and p.default is not None:
field_type = type(p.default)
# extract base type from union to let it parse native values
field_type = get_config_if_union_hint(field_type) or field_type
# make type optional if explicit None is provided as default
if p.default is None:
# check if the defaults were attributes of the form .config.value or .secrets.value
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dlt.common.typing import DictStrAny, StrAny, REPattern, SupportsVariant, VARIANT_FIELD_FORMAT
from dlt.common.normalizers.names import TNormalizeBreakPath, TNormalizeMakePath, TNormalizeNameFunc
from dlt.common.normalizers.json import TNormalizeJSONFunc
from dlt.common.schema.typing import (LOADS_TABLE_NAME, VERSION_TABLE_NAME, TNormalizersConfig, TPartialTableSchema, TSchemaSettings, TSimpleRegex, TStoredSchema,
from dlt.common.schema.typing import (SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, VERSION_TABLE_NAME, TNormalizersConfig, TPartialTableSchema, TSchemaSettings, TSimpleRegex, TStoredSchema,
TSchemaTables, TTableSchema, TTableSchemaColumns, TColumnSchema, TColumnProp, TDataType,
TColumnHint, TWriteDisposition)
from dlt.common.schema import utils
Expand All @@ -16,7 +16,7 @@


class Schema:
ENGINE_VERSION: ClassVar[int] = 5
ENGINE_VERSION: ClassVar[int] = SCHEMA_ENGINE_VERSION

# name normalization functions
normalize_table_name: TNormalizeNameFunc
Expand Down
10 changes: 6 additions & 4 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

from dlt.common.typing import StrAny

# current version of schema engine
SCHEMA_ENGINE_VERSION = 5

# dlt tables
VERSION_TABLE_NAME = "_dlt_version"
LOADS_TABLE_NAME = "_dlt_loads"

TDataType = Literal["text", "double", "bool", "timestamp", "bigint", "binary", "complex", "decimal", "wei"]
TColumnHint = Literal["not_null", "partition", "cluster", "primary_key", "foreign_key", "sort", "unique"]
Expand All @@ -15,10 +21,6 @@
COLUMN_HINTS: Set[TColumnHint] = set(["partition", "cluster", "primary_key", "foreign_key", "sort", "unique"])
WRITE_DISPOSITIONS: Set[TWriteDisposition] = set(get_args(TWriteDisposition))

# dlt tables
VERSION_TABLE_NAME = "_dlt_version"
LOADS_TABLE_NAME = "_dlt_loads"


class TColumnSchemaBase(TypedDict, total=True):
name: Optional[str]
Expand Down
14 changes: 8 additions & 6 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from dlt.common.utils import map_nested_in_place, str2bool
from dlt.common.validation import TCustomValidator, validate_dict
from dlt.common.schema import detections
from dlt.common.schema.typing import LOADS_TABLE_NAME, SIMPLE_REGEX_PREFIX, VERSION_TABLE_NAME, TColumnName, TNormalizersConfig, TPartialTableSchema, TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp, TDataType, TColumnHint, TTypeDetectionFunc, TTypeDetections, TWriteDisposition
from dlt.common.schema.typing import SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, SIMPLE_REGEX_PREFIX, VERSION_TABLE_NAME, TColumnName, TNormalizersConfig, TPartialTableSchema, TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp, TDataType, TColumnHint, TTypeDetectionFunc, TTypeDetections, TWriteDisposition
from dlt.common.schema.exceptions import CannotCoerceColumnException, ParentTableNotFoundException, SchemaEngineNoUpgradePathException, SchemaException, TablePropertiesConflictException


Expand Down Expand Up @@ -96,14 +96,16 @@ def generate_version_hash(stored_schema: TStoredSchema) -> str:
return base64.b64encode(h.digest()).decode('ascii')


def verify_schema_hash(stored_schema: DictStrAny, empty_hash_verifies: bool = True) -> bool:
def verify_schema_hash(loaded_schema_dict: DictStrAny, verifies_if_not_migrated: bool = False) -> bool:
# generates content hash and compares with existing
current_hash: str = stored_schema.get("version_hash")
if not current_hash and empty_hash_verifies:
engine_version: str = loaded_schema_dict.get("engine_version")
# if upgrade is needed, the hash cannot be compared
if verifies_if_not_migrated and engine_version != SCHEMA_ENGINE_VERSION:
return True
# if hash is present we can assume at least v4 engine version so hash is computable
hash_ = generate_version_hash(cast(TStoredSchema, stored_schema))
return hash_ == current_hash
stored_schema = cast(TStoredSchema, loaded_schema_dict)
hash_ = generate_version_hash(stored_schema)
return hash_ == stored_schema["version_hash"]


def simple_regex_validator(path: str, pk: str, pv: Any, t: Any) -> bool:
Expand Down
14 changes: 9 additions & 5 deletions dlt/common/storages/live_schema_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __getitem__(self, name: str) -> Schema:
else:
# return new schema instance
schema = super().load_schema(name)
self._update_live_schema(schema, True)
self._update_live_schema(schema)

return schema

Expand All @@ -29,11 +29,15 @@ def load_schema(self, name: str) -> Schema:

def save_schema(self, schema: Schema) -> str:
rv = super().save_schema(schema)
# -- update the live schema with schema being saved but do not create live instance if not already present
# no, cre
self._update_live_schema(schema, True)
# update the live schema with schema being saved, if no live schema exist, create one to be available for a getter
self._update_live_schema(schema)
return rv

def remove_schema(self, name: str) -> None:
super().remove_schema(name)
# also remove the live schema
self.live_schemas.pop(name, None)

def initialize_import_schema(self, schema: Schema) -> None:
if self.config.import_schema_path:
try:
Expand All @@ -50,7 +54,7 @@ def commit_live_schema(self, name: str) -> Schema:
self._save_schema(live_schema)
return live_schema

def _update_live_schema(self, schema: Schema, can_create_new: bool) -> None:
def _update_live_schema(self, schema: Schema, can_create_new: bool = True) -> None:
live_schema = self.live_schemas.get(schema.name)
if live_schema:
# replace content without replacing instance
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/storages/schema_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def load_schema(self, name: str) -> Schema:
try:
storage_schema = json.loads(self.storage.load(schema_file))
# prevent external modifications of schemas kept in storage
if not verify_schema_hash(storage_schema, empty_hash_verifies=True):
if not verify_schema_hash(storage_schema, verifies_if_not_migrated=True):
raise InStorageSchemaModified(name, self.config.schema_volume_path)
except FileNotFoundError:
# maybe we can import from external storage
Expand Down
15 changes: 8 additions & 7 deletions dlt/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,23 @@ def is_dict_generic_type(t: Any) -> bool:
return False


def extract_inner_type(hint: Type[Any]) -> Type[Any]:
"""Gets the inner type from Literal, Optional and NewType
def extract_inner_type(hint: Type[Any], preserve_new_types: bool = False) -> Type[Any]:
"""Gets the inner type from Literal, Optional, Final and NewType
Args:
hint (Type[Any]): Any type
hint (Type[Any]): Type to extract
preserve_new_types (bool): Do not extract supertype of a NewType
Returns:
Type[Any]: Inner type if hint was Literal, Optional or NewType, otherwise hint
"""
if is_literal_type(hint):
# assume that all literals are of the same type
return extract_inner_type(type(get_args(hint)[0]))
return extract_inner_type(type(get_args(hint)[0]), preserve_new_types)
if is_optional_type(hint) or is_final_type(hint):
# extract specialization type and call recursively
return extract_inner_type(get_args(hint)[0])
if is_newtype_type(hint):
return extract_inner_type(get_args(hint)[0], preserve_new_types)
if is_newtype_type(hint) and not preserve_new_types:
# descend into supertypes of NewType
return extract_inner_type(hint.__supertype__)
return extract_inner_type(hint.__supertype__, preserve_new_types)
return hint
4 changes: 3 additions & 1 deletion dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def source(func: Optional[AnyFun] = None, /, name: str = None, max_table_nesting
### Passing credentials
Another important function of the source decorator is to provide credentials and other configuration to the code that extracts data. The decorator may automatically bind the source function arguments to the secret and config values.
>>> @dlt.source
>>> def chess(username, chess_url: str = dlt.config.value, api_secret = dlt.secret.value, title: str = "GM"):
>>> return user_profile(username, chess_url, api_secret), user_games(username, chess_url, api_secret, with_titles=title)
>>>
Expand Down Expand Up @@ -199,6 +200,7 @@ def resource(
### Passing credentials
If used as a decorator (`data` argument is a `Generator`), it may automatically bind the source function arguments to the secret and config values.
>>> @dlt.resource
>>> def user_games(username, chess_url: str = dlt.config.value, api_secret = dlt.secret.value):
>>> return requests.get("%s/games/%s" % (chess_url, username), headers={"Authorization": f"Bearer {api_secret}"})
>>>
Expand Down Expand Up @@ -289,7 +291,7 @@ def transformer(
selected: bool = True,
spec: Type[BaseConfiguration] = None
) -> Callable[[Callable[Concatenate[TDataItem, TResourceFunParams], Any]], Callable[TResourceFunParams, DltResource]]:
"""A form of `dlt resource` that takes input from other resources in order to enrich or transformer the data.
"""A form of `dlt resource` that takes input from other resources in order to enrich or transform the data.
### Example
>>> @dlt.resource
Expand Down
2 changes: 1 addition & 1 deletion dlt/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def pipeline(
full_refresh: bool = False,
credentials: Any = None
) -> Pipeline:
"""Creates a new instance of `dlt` pipeline, which moves the data from the source ie. a REST API and a destination ie. database or a data lake.
"""Creates a new instance of `dlt` pipeline, which moves the data from the source ie. a REST API to a destination ie. database or a data lake.
### Summary
The `pipeline` functions allows you to pass the destination name to which the data should be loaded, the name of the dataset and several other options that govern loading of the data.
Expand Down
Loading

0 comments on commit 8310c40

Please sign in to comment.