-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SCD2 support #1168
SCD2 support #1168
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
tests/load/pipeline/test_scd2.py
Outdated
|
||
|
||
ACTIVE_TS = datetime.fromisoformat(HIGH_TS.isoformat()).replace(tzinfo=None) | ||
h = DataItemNormalizer.get_row_hash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe alias this to get_row_hash
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed in 7726d98
@@ -155,6 +155,14 @@ class NormalizerInfo(TypedDict, total=True): | |||
new_table: bool | |||
|
|||
|
|||
class TMergeConfig(TypedDict, total=False): | |||
strategy: Optional[TLoaderMergeStrategy] | |||
validity_column_names: Optional[List[str]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it allow duplicates column names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't, didn't think of that. Added validity column name checking in 30bb2e0. An exception is raised if a configured validity column name appears in the data.
tests/load/pipeline/test_scd2.py
Outdated
) | ||
|
||
|
||
ACTIVE_TS = datetime.fromisoformat(HIGH_TS.isoformat()).replace(tzinfo=None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we just use HIGHT_TS.replace(tzinfo=None)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HIGH_TS
is a pendulum
DateTime
because the datetime
module is a banned import. However, when fetching data from the destination tables to assert its content, timestamps are returned as datetime
objects. Hence the conversion from DateTime
to datetime.
ids=lambda x: x.name, | ||
) | ||
@pytest.mark.parametrize("simple", [True, False]) | ||
def test_child_table(destination_config: DestinationTestConfiguration, simple: bool) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we should test the child table functionality on all destinations, so either enable all dests here or add a child table to the basic test. the reason is, that we should run all sql code that we have against all destinations, just to make sure it works everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 11748a6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice work, thanks! Only a few small changes and we still need the docs. Also I want to discuss two points with @rudolfix before we can merge.
@rudolfix two questions on this PR:
|
Note that this is why I chose a different interface than the one used to configure replace strategy: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is really good! @jorritsandbrink congrats on plugging the work into our normalizer and hints perfectly.
I think we can do the user interface better by extending write_disposition
instead of adding a new argument. @sh-rp exactly what we did for data contracts with shorthand and full notation - see my review
also: I trust you that SQL transformation is good and properly tested :)
dlt/common/schema/typing.py
Outdated
@@ -65,6 +64,7 @@ | |||
] | |||
"""Known hints of a column used to declare hint regexes.""" | |||
TWriteDisposition = Literal["skip", "append", "replace", "merge"] | |||
TLoaderMergeStrategy = Literal["scd2"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should have a default strategy which is "delete-insert". and we'll add one more merge
to support MERGE sql statements #1129
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in 396ec59
dlt/extract/decorators.py
Outdated
@@ -297,6 +297,7 @@ def resource( | |||
columns: TTableHintTemplate[TAnySchemaColumns] = None, | |||
primary_key: TTableHintTemplate[TColumnNames] = None, | |||
merge_key: TTableHintTemplate[TColumnNames] = None, | |||
merge_config: TMergeConfig = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jorritsandbrink @sh-rp maybe instead of doing this, we could extend write_disposition
to
``
write_disposition: TTableHintTemplate[Union[TWriteDisposition, TMergeConfig, TReplaceConfig]] = None
exactly what we do with `schema_contract` below? then we can support short hand strings and full definitions. and still use the same parameter
in that case I'd rename TMergeConfig to TMergeDispositionConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and we also handle the replace strategies in there? (not in the pr but later)? i think it's a good idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rudolfix I adjusted the user interface according to your suggestion and introduced a default I'll add docs after we've settled on the user interface. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is good! I have two optimizations that IMO are OK to add.
thx for adding apply_hints tests!
@@ -296,10 +319,18 @@ def normalize_data_item( | |||
row = cast(TDataItemRowRoot, item) | |||
# identify load id if loaded data must be processed after loading incrementally | |||
row["_dlt_load_id"] = load_id | |||
# determine if row hash should be used as dlt id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you precompute a list of all "scd2" tables in _reset
method? this part of schema remains constant during normalization.
and this method is called for each normalized row. so it makes sense to optimize it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Solved with caching as you suggested on Slack: 6b24378
dlt/extract/hints.py
Outdated
def _create_table_schema(resource_hints: TResourceHints, resource_name: str) -> TTableSchema: | ||
"""Creates table schema from resource hints and resource name.""" | ||
|
||
dict_ = cast(Dict[str, Any], deepcopy(resource_hints)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to deepcopy hints? it was not copied in merge_keys
. are you getting errors in tests?
it was already done in compute_table()
:
# resolve a copy of a held template
table_template = self._clone_hints(table_template)
note that deepcopy will also clone pydantic models and other things that were used as original column definition. and that may be quite costly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deepcopy
was indeed obsolete—removed it in 1f399bc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
oh dremio
does not like ZULU timestamps in sql merge job... do you generate them for SCD2? I think Z timestamps are not really part of ISO
we have a dremio container in tests if you want to fix it
…ate to be passed on, uses load_id as created_at if possible
… execution as long as packages are processed in order
…on and updates schema in the normalizer
Description
This PR adds basic support for SCD2. Scope is defined here.
scd2
as a special case of themerge
write disposition._dlt_valid_from
and_dlt_valid_to
if the user does not specify names.9999-12-31T00:00:00+00:00
) in "valid to" column to indicate active records.scd2
strategy and another one with "regular" merge behavior, or use different validity column names for different tables.IntroducesExtendsmerge_config
argument on theresource
decoratorwrite_disposition
argument such that it also accepts configuration dictionaries to let the user specify the merge strategy and validity column names..Addsmerge_config
key toTResourceHints
, but not toTTableSchema
.x-merge-strategy
at the table levelx-valid-from
andx-valid-to
at the column levelRelated Issues