Skip to content

Commit

Permalink
New "refresh" mode and "dev_mode" (#1063)
Browse files Browse the repository at this point in the history
* Rename full_refresh -> dev_mode, add deprecation warning

* Replace some full_refresh usage in code and docs

* Replace full_refresh usage in tests

* Init experimental refresh = full with drop command

* Refresh modes with dropped_tables file

* Init separate local/write drop command

* Use load_package_state instead of drop tables file

* Use drop schema in init_client (TODO: error)

* Separate cli drop command instructions/execute

* drop tables in init_client

* dropped_tables field on load package state

* Fix import

* test/fix truncate mode

* Save truncated tables in load package state

* Remove load package state copying

* cleanup

* Drop cmd use package state, refactoring

* Don't drop tables without data

* Validate literals in configspec

* Match stored schema by version+version_hash

solves detecting when dropped tables need to be recreated

* Cleanup

* Fix dlt version test

* Cleanup

* Remove dropped_tables_filename

* Fix snippet

* Pipeline refresh docs

* refresh argument docstring

* Restore and update commented drop cmd tests

* Cleanup refresh tests and test whether table dropped vs truncated

* Test existing schema hash

* Revert "Match stored schema by version+version_hash"

This reverts commit 689b3ca.

* Use replace_schema flag

* Change drop_tables replace_schema to delete_schema

* Refresh drop only selected sources

* Rename refresh modes, update docs

* pipeline.run/extract refresh argument

* Don't modify schema when refresh='drop_data'

* Move refresh tests to load, add filesystem truncate test

* Fix duck import

* Remove generated doc sections

* Default full_refresh=None

* Cleanup unused imports

* Close caution blocks

* Update config field docstring

* Add filesystem drop_tables method

* Run all refresh tests on local filesystem destination

* Fix test drop

* Fix iter filesystem schemas

* Fix drop_resources

* Default config.full_refresh also None

* Fix filesystem + test
  • Loading branch information
steinitzu committed Jun 3, 2024
1 parent 6db85a6 commit cbed225
Show file tree
Hide file tree
Showing 67 changed files with 1,263 additions and 380 deletions.
13 changes: 8 additions & 5 deletions dlt/cli/deploy_command_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,22 +263,25 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optio
if n.PIPELINE in visitor.known_calls:
for call_args in visitor.known_calls[n.PIPELINE]:
pipeline_name, pipelines_dir = None, None
f_r_node = call_args.arguments.get("full_refresh")
# Check both full_refresh/dev_mode until full_refresh option is removed from dlt
f_r_node = call_args.arguments.get("full_refresh") or call_args.arguments.get(
"dev_mode"
)
if f_r_node:
f_r_value = evaluate_node_literal(f_r_node)
if f_r_value is None:
fmt.warning(
"The value of `full_refresh` in call to `dlt.pipeline` cannot be"
"The value of `dev_mode` in call to `dlt.pipeline` cannot be"
f" determined from {unparse(f_r_node).strip()}. We assume that you know"
" what you are doing :)"
)
if f_r_value is True:
if fmt.confirm(
"The value of 'full_refresh' is set to True. Do you want to abort to set it"
" to False?",
"The value of 'dev_mode' or 'full_refresh' is set to True. Do you want to"
" abort to set it to False?",
default=True,
):
raise CliCommandException("deploy", "Please set the full_refresh to False")
raise CliCommandException("deploy", "Please set the dev_mode to False")

p_d_node = call_args.arguments.get("pipelines_dir")
if p_d_node:
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/configuration/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def _resolve_config_field(
embedded_sections: Tuple[str, ...],
accept_partial: bool,
) -> Tuple[Any, List[LookupTrace]]:
inner_hint = extract_inner_hint(hint)
inner_hint = extract_inner_hint(hint, preserve_literal=True)

if explicit_value is not None:
value = explicit_value
Expand Down
12 changes: 9 additions & 3 deletions dlt/common/configuration/specs/base_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
overload,
ClassVar,
TypeVar,
Literal,
)
from typing_extensions import get_args, get_origin, dataclass_transform, Annotated, TypeAlias
from functools import wraps
Expand Down Expand Up @@ -120,13 +121,18 @@ def is_valid_hint(hint: Type[Any]) -> bool:
return False


def extract_inner_hint(hint: Type[Any], preserve_new_types: bool = False) -> Type[Any]:
def extract_inner_hint(
hint: Type[Any], preserve_new_types: bool = False, preserve_literal: bool = False
) -> Type[Any]:
# extract hint from Optional / Literal / NewType hints
inner_hint = extract_inner_type(hint, preserve_new_types)
inner_hint = extract_inner_type(hint, preserve_new_types, preserve_literal)
# get base configuration from union type
inner_hint = get_config_if_union_hint(inner_hint) or inner_hint
# extract origin from generic types (ie List[str] -> List)
return get_origin(inner_hint) or inner_hint
origin = get_origin(inner_hint) or inner_hint
if preserve_literal and origin is Literal:
return inner_hint
return origin or inner_hint


def is_secret_hint(hint: Type[Any]) -> bool:
Expand Down
31 changes: 27 additions & 4 deletions dlt/common/configuration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,20 @@
import ast
import contextlib
import tomlkit
from typing import Any, Dict, Mapping, NamedTuple, Optional, Tuple, Type, Sequence
from typing import (
Any,
Dict,
Mapping,
NamedTuple,
Optional,
Tuple,
Type,
Sequence,
get_args,
Literal,
get_origin,
List,
)
from collections.abc import Mapping as C_Mapping

from dlt.common.json import json
Expand Down Expand Up @@ -51,25 +64,35 @@ def deserialize_value(key: str, value: Any, hint: Type[TAny]) -> TAny:
raise
return c # type: ignore

literal_values: Tuple[Any, ...] = ()
if get_origin(hint) is Literal:
# Literal fields are validated against the literal values
literal_values = get_args(hint)
hint_origin = type(literal_values[0])
else:
hint_origin = hint

# coerce value
hint_dt = py_type_to_sc_type(hint)
hint_dt = py_type_to_sc_type(hint_origin)
value_dt = py_type_to_sc_type(type(value))

# eval only if value is string and hint is "complex"
if value_dt == "text" and hint_dt == "complex":
if hint is tuple:
if hint_origin is tuple:
# use literal eval for tuples
value = ast.literal_eval(value)
else:
# use json for sequences and mappings
value = json.loads(value)
# exact types must match
if not isinstance(value, hint):
if not isinstance(value, hint_origin):
raise ValueError(value)
else:
# for types that are not complex, reuse schema coercion rules
if value_dt != hint_dt:
value = coerce_value(hint_dt, value_dt, value)
if literal_values and value not in literal_values:
raise ConfigValueCannotBeCoercedException(key, value, hint)
return value # type: ignore
except ConfigValueCannotBeCoercedException:
raise
Expand Down
13 changes: 13 additions & 0 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
TypeVar,
TypedDict,
Mapping,
Literal,
)
from typing_extensions import NotRequired

Expand Down Expand Up @@ -52,6 +53,10 @@
from dlt.common.versioned_state import TVersionedState


# TRefreshMode = Literal["full", "replace"]
TRefreshMode = Literal["drop_sources", "drop_resources", "drop_data"]


class _StepInfo(NamedTuple):
pipeline: "SupportsPipeline"
loads_ids: List[str]
Expand Down Expand Up @@ -762,6 +767,14 @@ def reset_resource_state(resource_name: str, source_state_: Optional[DictStrAny]
state_["resources"].pop(resource_name)


def _get_matching_sources(
pattern: REPattern, pipeline_state: Optional[TPipelineState] = None, /
) -> List[str]:
"""Get all source names in state matching the regex pattern"""
state_ = _sources_state(pipeline_state)
return [key for key in state_ if pattern.match(key)]


def _get_matching_resources(
pattern: REPattern, source_state_: Optional[DictStrAny] = None, /
) -> List[str]:
Expand Down
11 changes: 11 additions & 0 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,17 @@ def update_schema(self, schema: "Schema") -> None:
self._settings = deepcopy(schema.settings)
self._compile_settings()

def drop_tables(
self, table_names: Sequence[str], seen_data_only: bool = False
) -> List[TTableSchema]:
"""Drops tables from the schema and returns the dropped tables"""
result = []
for table_name in table_names:
table = self.tables.get(table_name)
if table and (not seen_data_only or utils.has_table_seen_data(table)):
result.append(self._schema_tables.pop(table_name))
return result

def filter_row_with_hint(self, table_name: str, hint_type: TColumnHint, row: StrAny) -> StrAny:
rv_row: DictStrAny = {}
column_prop: TColumnProp = utils.hint_to_column_prop(hint_type)
Expand Down
7 changes: 6 additions & 1 deletion dlt/common/storages/load_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from dlt.common.destination import TLoaderFileFormat
from dlt.common.exceptions import TerminalValueError
from dlt.common.schema import Schema, TSchemaTables
from dlt.common.schema.typing import TStoredSchema, TTableSchemaColumns
from dlt.common.schema.typing import TStoredSchema, TTableSchemaColumns, TTableSchema
from dlt.common.storages import FileStorage
from dlt.common.storages.exceptions import LoadPackageNotFound, CurrentLoadPackageStateNotAvailable
from dlt.common.typing import DictStrAny, SupportsHumanize
Expand Down Expand Up @@ -76,6 +76,11 @@ class TLoadPackageState(TVersionedState, total=False):
destination_state: NotRequired[Dict[str, Any]]
"""private space for destinations to store state relevant only to the load package"""

dropped_tables: NotRequired[List[TTableSchema]]
"""List of tables that are to be dropped from the schema and destination (i.e. when `refresh` mode is used)"""
truncated_tables: NotRequired[List[TTableSchema]]
"""List of tables that are to be truncated in the destination (i.e. when `refresh='drop_data'` mode is used)"""


class TLoadPackage(TypedDict, total=False):
load_id: str
Expand Down
12 changes: 7 additions & 5 deletions dlt/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ def is_dict_generic_type(t: Type[Any]) -> bool:
return False


def extract_inner_type(hint: Type[Any], preserve_new_types: bool = False) -> Type[Any]:
def extract_inner_type(
hint: Type[Any], preserve_new_types: bool = False, preserve_literal: bool = False
) -> Type[Any]:
"""Gets the inner type from Literal, Optional, Final and NewType
Args:
Expand All @@ -256,15 +258,15 @@ def extract_inner_type(hint: Type[Any], preserve_new_types: bool = False) -> Typ
Type[Any]: Inner type if hint was Literal, Optional or NewType, otherwise hint
"""
if maybe_modified := extract_type_if_modifier(hint):
return extract_inner_type(maybe_modified, preserve_new_types)
return extract_inner_type(maybe_modified, preserve_new_types, preserve_literal)
if is_optional_type(hint):
return extract_inner_type(get_args(hint)[0], preserve_new_types)
if is_literal_type(hint):
return extract_inner_type(get_args(hint)[0], preserve_new_types, preserve_literal)
if is_literal_type(hint) and not preserve_literal:
# assume that all literals are of the same type
return type(get_args(hint)[0])
if is_newtype_type(hint) and not preserve_new_types:
# descend into supertypes of NewType
return extract_inner_type(hint.__supertype__, preserve_new_types)
return extract_inner_type(hint.__supertype__, preserve_new_types, preserve_literal)
return hint


Expand Down
34 changes: 26 additions & 8 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,15 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
self.fs_client.makedirs(self.dataset_path, exist_ok=True)
self.fs_client.touch(self.pathlib.join(self.dataset_path, INIT_FILE_NAME))

def drop_tables(self, *tables: str, delete_schema: bool = True) -> None:
self.truncate_tables(list(tables))
if not delete_schema:
return
# Delete all stored schemas
for filename, fileparts in self._iter_stored_schema_files():
if fileparts[0] == self.schema.name:
self._delete_file(filename)

def truncate_tables(self, table_names: List[str]) -> None:
"""Truncate table with given name"""
table_dirs = set(self.get_table_dirs(table_names))
Expand All @@ -180,19 +189,23 @@ def truncate_tables(self, table_names: List[str]) -> None:
# NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors
# print(f"DEL {table_file}")
try:
# NOTE: must use rm_file to get errors on delete
self.fs_client.rm_file(table_file)
except NotImplementedError:
# not all filesystem implement the above
self.fs_client.rm(table_file)
if self.fs_client.exists(table_file):
raise FileExistsError(table_file)
self._delete_file(table_file)
except FileNotFoundError:
logger.info(
f"Directory or path to truncate tables {table_names} does not exist but"
" it should have been created previously!"
)

def _delete_file(self, file_path: str) -> None:
try:
# NOTE: must use rm_file to get errors on delete
self.fs_client.rm_file(file_path)
except NotImplementedError:
# not all filesystems implement the above
self.fs_client.rm(file_path)
if self.fs_client.exists(file_path):
raise FileExistsError(file_path)

def update_stored_schema(
self,
only_tables: Iterable[str] = None,
Expand Down Expand Up @@ -401,6 +414,11 @@ def _get_schema_file_name(self, version_hash: str, load_id: str) -> str:
f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl",
)

def _iter_stored_schema_files(self) -> Iterator[Tuple[str, List[str]]]:
"""Iterator over all stored schema files"""
for filepath, fileparts in self._list_dlt_table_files(self.schema.version_table_name):
yield filepath, fileparts

def _get_stored_schema_by_hash_or_newest(
self, version_hash: str = None
) -> Optional[StorageSchemaInfo]:
Expand All @@ -409,7 +427,7 @@ def _get_stored_schema_by_hash_or_newest(
# find newest schema for pipeline or by version hash
selected_path = None
newest_load_id = "0"
for filepath, fileparts in self._list_dlt_table_files(self.schema.version_table_name):
for filepath, fileparts in self._iter_stored_schema_files():
if (
not version_hash
and fileparts[0] == self.schema.name
Expand Down
18 changes: 12 additions & 6 deletions dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,18 @@ def update_stored_schema(
)
return applied_update

def drop_tables(self, *tables: str, replace_schema: bool = True) -> None:
def drop_tables(self, *tables: str, delete_schema: bool = True) -> None:
"""Drop tables in destination database and optionally delete the stored schema as well.
Clients that support ddl transactions will have both operations performed in a single transaction.
Args:
tables: Names of tables to drop.
delete_schema: If True, also delete all versions of the current schema from storage
"""
with self.maybe_ddl_transaction():
self.sql_client.drop_tables(*tables)
if replace_schema:
self._replace_schema_in_storage(self.schema)
if delete_schema:
self._delete_schema_in_storage(self.schema)

@contextlib.contextmanager
def maybe_ddl_transaction(self) -> Iterator[None]:
Expand Down Expand Up @@ -520,13 +527,12 @@ def _row_to_schema_info(self, query: str, *args: Any) -> StorageSchemaInfo:

return StorageSchemaInfo(row[0], row[1], row[2], row[3], inserted_at, schema_str)

def _replace_schema_in_storage(self, schema: Schema) -> None:
def _delete_schema_in_storage(self, schema: Schema) -> None:
"""
Save the given schema in storage and remove all previous versions with the same name
Delete all stored versions with the same name as given schema
"""
name = self.sql_client.make_qualified_table_name(self.schema.version_table_name)
self.sql_client.execute_sql(f"DELETE FROM {name} WHERE schema_name = %s;", schema.name)
self._update_schema_in_storage(schema)

def _update_schema_in_storage(self, schema: Schema) -> None:
# get schema string or zip
Expand Down
Loading

0 comments on commit cbed225

Please sign in to comment.