Skip to content

Commit

Permalink
adds tables only tests to drop command
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Mar 23, 2024
1 parent ec44b6f commit 7798be8
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 18 deletions.
2 changes: 0 additions & 2 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,8 +814,6 @@ def _bump_version(self) -> Tuple[int, str]:
Returns:
Tuple[int, str]: Current (``stored_version``, ``stored_version_hash``) tuple
"""
prev_version = self._stored_version
prev_version_hash = self._stored_version_hash
self._stored_version, self._stored_version_hash, _, _ = utils.bump_version_if_modified(
self.to_dict(bump_version=False)
)
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/storages/live_schema_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ def set_live_schema(self, schema: Schema) -> Schema:
if live_schema:
if id(live_schema) != id(schema):
# replace content without replacing instance
print(f"live schema {live_schema} updated in place")
# print(f"live schema {live_schema} updated in place")
live_schema.replace_schema_content(schema, link_to_replaced_schema=True)
else:
print(f"live schema {schema.name} created from schema")
# print(f"live schema {schema.name} created from schema")
live_schema = self.live_schemas[schema.name] = schema
return live_schema

Expand Down
18 changes: 9 additions & 9 deletions dlt/pipeline/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from dlt.common.schema.typing import TSimpleRegex
from dlt.common.typing import REPattern
from dlt.common.pipeline import (
TSourceState,
reset_resource_state,
_sources_state,
_delete_source_state_keys,
Expand All @@ -26,6 +25,7 @@
PipelineStepFailed,
PipelineHasPendingDataException,
)
from dlt.pipeline.state_sync import force_state_extract
from dlt.pipeline.typing import TPipelineStep
from dlt.pipeline import Pipeline

Expand Down Expand Up @@ -122,7 +122,7 @@ def __init__(
else:
self.tables_to_drop = []
self.drop_tables = False # No tables to drop
self.drop_state = not not self.state_paths_to_drop
self.drop_state = not not self.state_paths_to_drop # obtain truth value

self.drop_all = drop_all
self.info: _DropInfo = dict(
Expand Down Expand Up @@ -167,10 +167,11 @@ def _drop_destination_tables(self) -> None:
with client.with_staging_dataset():
client.drop_tables(*table_names, replace_schema=True)

def _delete_pipeline_tables(self) -> None:
def _delete_schema_tables(self) -> None:
for tbl in self.tables_to_drop:
del self.schema_tables[tbl["name"]]
self.schema.bump_version()
# bump schema, we'll save later
self.schema._bump_version()

def _list_state_paths(self, source_state: Dict[str, Any]) -> List[str]:
return resolve_paths(self.state_paths_to_drop, source_state)
Expand All @@ -197,7 +198,7 @@ def _create_modified_state(self) -> Dict[str, Any]:
self.info["state_paths"].extend(f"{source_name}.{p}" for p in resolved_paths)
return state # type: ignore[return-value]

def _drop_state_keys(self) -> None:
def _extract_state(self) -> None:
state: Dict[str, Any]
with self.pipeline.managed_state(extract_state=True) as state: # type: ignore[assignment]
state.clear()
Expand All @@ -216,12 +217,12 @@ def __call__(self) -> None:
return # Nothing to drop

if self.drop_tables:
self._delete_pipeline_tables()
self._delete_schema_tables()
self._drop_destination_tables()
if self.drop_tables:
self.pipeline.schemas.save_schema(self.schema)
if self.drop_state:
self._drop_state_keys()
self._extract_state()
# Send updated state to destination
self.pipeline.normalize()
try:
Expand All @@ -230,8 +231,7 @@ def __call__(self) -> None:
# Clear extracted state on failure so command can run again
self.pipeline.drop_pending_packages()
with self.pipeline.managed_state() as state:
state["_local"].pop("_last_extracted_at", None)
state["_local"].pop("_last_extracted_hash", None)
force_state_extract(state)
raise


Expand Down
15 changes: 15 additions & 0 deletions dlt/pipeline/state_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,21 @@ def bump_pipeline_state_version_if_modified(state: TPipelineState) -> Tuple[int,
return bump_state_version_if_modified(state, exclude_attrs=["_local"])


def mark_state_extracted(state: TPipelineState, hash_: str) -> None:
"""Marks state as extracted by setting last extracted hash to hash_ (which is current version_hash)
`_last_extracted_hash` is kept locally and never synced with the destination
"""
state["_local"]["_last_extracted_at"] = pendulum.now()
state["_local"]["_last_extracted_hash"] = hash_


def force_state_extract(state: TPipelineState) -> None:
"""Forces `state` to be extracted by removing local information on the most recent extraction"""
state["_local"].pop("_last_extracted_at", None)
state["_local"].pop("_last_extracted_hash", None)


def migrate_pipeline_state(
pipeline_name: str, state: DictStrAny, from_engine: int, to_engine: int
) -> TPipelineState:
Expand Down
40 changes: 35 additions & 5 deletions tests/load/pipeline/test_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,23 @@ def droppable_d(
dlt.state()["data_from_d"] = {"foo1": {"bar": 1}, "foo2": {"bar": 2}}
yield [dict(o=55), dict(o=22)]

return [droppable_a(), droppable_b(), droppable_c(), droppable_d()]
@dlt.resource(selected=True)
def droppable_no_state():
yield [1, 2, 3]

return [droppable_a(), droppable_b(), droppable_c(), droppable_d(), droppable_no_state]


RESOURCE_TABLES = dict(
droppable_a=["droppable_a"],
droppable_b=["droppable_b", "droppable_b__items"],
droppable_c=["droppable_c", "droppable_c__items", "droppable_c__items__labels"],
droppable_d=["droppable_d"],
droppable_no_state=["droppable_no_state"],
)

NO_STATE_RESOURCES = {"droppable_no_state"}


def assert_dropped_resources(pipeline: Pipeline, resources: List[str]) -> None:
assert_dropped_resource_tables(pipeline, resources)
Expand Down Expand Up @@ -95,7 +102,7 @@ def assert_dropped_resource_tables(pipeline: Pipeline, resources: List[str]) ->

def assert_dropped_resource_states(pipeline: Pipeline, resources: List[str]) -> None:
# Verify only requested resource keys are removed from state
all_resources = set(RESOURCE_TABLES.keys())
all_resources = set(RESOURCE_TABLES.keys()) - NO_STATE_RESOURCES
expected_keys = all_resources - set(resources)
sources_state = pipeline.state["sources"]
result_keys = set(sources_state["droppable"]["resources"].keys())
Expand All @@ -109,6 +116,8 @@ def assert_destination_state_loaded(pipeline: Pipeline) -> None:
destination_state = state_sync.load_pipeline_state_from_destination(
pipeline.pipeline_name, client
)
# current pipeline schema available in the destination
client.get_stored_schema_by_hash(pipeline.default_schema.version_hash)
pipeline_state = dict(pipeline.state)
del pipeline_state["_local"]
assert pipeline_state == destination_state
Expand Down Expand Up @@ -144,8 +153,7 @@ def test_drop_command_resources_and_state(destination_config: DestinationTestCon
"destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name
)
def test_drop_command_only_state(destination_config: DestinationTestConfiguration) -> None:
"""Test the drop command with resource and state path options and
verify correct data is deleted from destination and locally"""
"""Test drop command that deletes part of the state and syncs with destination"""
source = droppable_source()
pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True)
pipeline.run(source)
Expand All @@ -164,6 +172,28 @@ def test_drop_command_only_state(destination_config: DestinationTestConfiguratio
assert_destination_state_loaded(pipeline)


@pytest.mark.parametrize(
"destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name
)
def test_drop_command_only_tables(destination_config: DestinationTestConfiguration) -> None:
"""Test drop only tables and makes sure that schema and state are synced"""
source = droppable_source()
pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True)
pipeline.run(source)
sources_state = pipeline.state["sources"]

attached = _attach(pipeline)
helpers.drop(attached, resources=["droppable_no_state"])

attached = _attach(pipeline)

assert_dropped_resources(attached, ["droppable_no_state"])
# source state didn't change
assert pipeline.state["sources"] == sources_state

assert_destination_state_loaded(pipeline)


@pytest.mark.parametrize(
"destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name
)
Expand Down Expand Up @@ -202,7 +232,7 @@ def test_fail_after_drop_tables(destination_config: DestinationTestConfiguration
attached = _attach(pipeline)

with mock.patch.object(
helpers.DropCommand, "_drop_state_keys", side_effect=RuntimeError("Something went wrong")
helpers.DropCommand, "_extract_state", side_effect=RuntimeError("Something went wrong")
):
with pytest.raises(RuntimeError):
helpers.drop(attached, resources=("droppable_a", "droppable_b"))
Expand Down

0 comments on commit 7798be8

Please sign in to comment.