-
Notifications
You must be signed in to change notification settings - Fork 166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add upsert
merge strategy
#1466
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
dlt/common/destination/reference.py
Outdated
" dlt will fall back to `append` for this table." | ||
) | ||
elif table.get("x-merge-strategy") == "upsert": | ||
if self.config.destination_name not in ("postgres", "snowflake"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a supported_merge_strategies
destination capability?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes - if you at some point (as early as possible ie. when passing data to normalizer when we must have destination capabilities) are able to issue a warning and say which strategy will be used instead.
same thing with replace strategies (we have 3 afaik)
also if you want to do it: we do not even store which write dispositions are supported. so maybe this is a separate ticket :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- added
supported_merge_strategies
capability and configured it for all SQL destinations - added
_verify_destination_capabilities
that checks configured merge strategy against supported merge strategies- raises error if not supported
- (I know you suggested falling back to a supported strategy and issueing a warning instead, but I'm not a fan of that approach. Will of course change it if that's how we do things in
dlt
, but wanted to challenge it first.)
- (I know you suggested falling back to a supported strategy and issueing a warning instead, but I'm not a fan of that approach. Will of course change it if that's how we do things in
- called right before
normalize
step — is that the right place?
- raises error if not supported
I can create a separate ticket for supported replace strategies and supported write dispositions capabilities.
dlt/destinations/sql_jobs.py
Outdated
# generate statements for child tables if they exist | ||
child_tables = table_chain[1:] | ||
if child_tables: | ||
root_row_key = escape_id("_dlt_id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the row id be marked with a new hint (other than unique
) to prevent hard-coding _dlt_id
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not unique
? technically you just need a unique column to delete / merge child tables correctly. delete-insert
does that, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need the root key to delete from child tables, because you check if the root key is present in the staging root table (lines 606 and 623 in sql_jobs.py
).
If I understand correctly, the root key is always _dlt_id
, not an arbitrary unique
column.
delete-insert
indeed uses the unique
column hint instead of hard-coding _dlt_id
, but that seems wrong to me.
Do we have a test case for the merge
disposition with an arbitrary unique
key? Maybe the tests pass because in practice the unique
hint always resolves to _dlt_id
in all our cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, we do not have such test case. what I want to achieve here is that we rely on annotations, not on column names.
_dlt_id
is annotated asunique
by a standard schemaroot_key
is a also an annotation, not a hardcoded field
relational.py
adds config for each table withmerge
write disposition:
"propagation": {
"tables": {
table_name: {
TColumnName(self.c_dlt_id): TColumnName(self.c_dlt_root_id)
}
}
}
which will propagate _dlt_id
from root to each child table as _dlt_root_id
and
self.schema._merge_hints(
{
"not_null": [
TSimpleRegex(self.c_dlt_id),
TSimpleRegex(self.c_dlt_root_id),
TSimpleRegex(self.c_dlt_parent_id),
TSimpleRegex(self.c_dlt_list_idx),
TSimpleRegex(self.c_dlt_load_id),
],
"foreign_key": [TSimpleRegex(self.c_dlt_parent_id)],
"root_key": [TSimpleRegex(self.c_dlt_root_id)],
"unique": [TSimpleRegex(self.c_dlt_id)],
},
normalize_identifiers=False, # already normalized
)
makes sure that hints are applied. this is quite old version of dlt
and we'll reimplement it. but high level mechanism (we rely on annotations, not names) is good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks good, is short and clean. important info
- I must merge allows naming conventions to be changed #998 - there all column names are abstracted away etc. your PR will generate a lot of conflict and it will be way easier to merge it after that
- for later: it would be cool if users can request dlt id type for append/replace as well - ie to just use content or primary key (deterministic) hash
- what happens if input data is not deduplicated? so we have several values with the same primary key?
delete-insert
is deduplicating input data. see what happens in your case
dlt/common/destination/reference.py
Outdated
" dlt will fall back to `append` for this table." | ||
) | ||
elif table.get("x-merge-strategy") == "upsert": | ||
if self.config.destination_name not in ("postgres", "snowflake"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes - if you at some point (as early as possible ie. when passing data to normalizer when we must have destination capabilities) are able to issue a warning and say which strategy will be used instead.
same thing with replace strategies (we have 3 afaik)
also if you want to do it: we do not even store which write dispositions are supported. so maybe this is a separate ticket :)
if row_hash: | ||
row_id = self.get_row_hash(dict_row) # type: ignore[arg-type] | ||
dict_row["_dlt_id"] = row_id | ||
if row_id_type in ("key_hash", "row_hash"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this logic must be moved to _add_row_id
. I think the way we handle _dlt_id
needs a little bit more work
- we have super simple bring your own
_dlt_id
row_id = flattened_row.get("_dlt_id", None)
if not row_id:
row_id = self._add_row_id(table, flattened_row, parent_row_id, pos, _r_lvl)
we should replace it with hint based method - if there's any unique column we use it for _dlt_id
. that may be a separate ticket. but current "bring your own" must work and now you ignore it here
- we have many methods to generate
_dlt_id
for parent table: random, deterministic (primary key based) and content based (used by scd2). they way you do it now is good enough. in the future I'd like people to be able to pick the way dlt id is generated (ie viaItemsNormalizerConfiguration
) - for child tables we always have deterministic
_dlt_id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have moved the logic to _add_row_id
. Current "bring your own" (if I understand it correctly) should work now.
dlt/destinations/sql_jobs.py
Outdated
# generate statements for child tables if they exist | ||
child_tables = table_chain[1:] | ||
if child_tables: | ||
root_row_key = escape_id("_dlt_id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not unique
? technically you just need a unique column to delete / merge child tables correctly. delete-insert
does that, right?
…com/dlt-hub/dlt into feat/1129-add-upsert-merge-strategy
…mplement any of the defined merge strategies
@rudolfix I addressed your comments. Can you review?
See "Note 1" in the PR description. |
…-add-upsert-merge-strategy
assert "primary_key" in r._hints | ||
assert "merge_key" in r._hints | ||
p.run(r()) | ||
assert ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion fails on CI because capsys.readouterr().err
is an empty string there. It does work on my local machine. Any idea on how to best test warning logs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I have no idea. maybe the log is routed to stdout on CI? but I doubt that, or the test is really not passing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented out the test for now so this doesn't block progress: 0f641e4
dlt/extract/hints.py
Outdated
dict_["x-merge-strategy"] = DEFAULT_MERGE_STRATEGY | ||
if "strategy" in mddict: | ||
if mddict["strategy"] not in MERGE_STRATEGIES: | ||
raise ValueError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the right way/place to do user input validation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. probably not. all hints are applied via apply_hints
method which calls _set_hints
at the end. the validation should happen there. I think it makes sense to move those checks. there's a little bit of validation already done in _set_hints
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved it to _set_hints
in 479e52a
…-add-upsert-merge-strategy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really good and test coverage is sufficient. I had a few remarks regarding using hardcoded column names vs annotation but it looks mostly fixed now.
other things are minor
we also lack some docs but can go to the next PR. up to you
again - sorry for the big merge and conflicts - you'll have a few more places to fix when you merge current devel - but way less than last time
|
||
@staticmethod | ||
@lru_cache(maxsize=None) | ||
def _get_primary_key(schema: Schema, table_name: str) -> List[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here's one detail: we may be dealing with non-existing table that nevertheless gets primary key from schema hints. (data comes first, then we detect the schema - so to be fully correct we'd need to use flattened data to predict if any primary key will be detected on it)
I looked at our code and this will significantly slow us down, make caching harder etc. so let's ignore it for now.
elif merge_strategy == "scd2": | ||
if ( | ||
schema.get_table(table_name)["columns"] | ||
.get("_dlt_id", {}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe use get_columns_names_with_prop
to look for "x-row-version"
directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I adjusted in 0d84e6d
dlt/destinations/sql_client.py
Outdated
@@ -173,6 +173,12 @@ def make_qualified_table_name_path( | |||
path.append(table_name) | |||
return path | |||
|
|||
def get_qualified_table_names(self, table_name: str, escape: bool = True) -> Tuple[str, str]: | |||
"""Returns qualified names for table and corresponding staging table as tuple.""" | |||
with self.with_staging_dataset(staging=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed stagingflag from
with_staging_dataset. it is in
devel` now
dlt/extract/hints.py
Outdated
dict_["x-merge-strategy"] = DEFAULT_MERGE_STRATEGY | ||
if "strategy" in mddict: | ||
if mddict["strategy"] not in MERGE_STRATEGIES: | ||
raise ValueError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. probably not. all hints are applied via apply_hints
method which calls _set_hints
at the end. the validation should happen there. I think it makes sense to move those checks. there's a little bit of validation already done in _set_hints
assert "primary_key" in r._hints | ||
assert "merge_key" in r._hints | ||
p.run(r()) | ||
assert ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I have no idea. maybe the log is routed to stdout on CI? but I doubt that, or the test is really not passing
merge_strategy: TLoaderMergeStrategy, | ||
destination: TDestination, | ||
) -> None: | ||
if merge_strategy not in destination.capabilities().supported_merge_strategies: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is good. but should be a part of destinations_configs
filters. definitely a task for the future: if we add supported write disposiitons to the capabilities we'll be able to generate a lot of filters automatically
This reverts commit 17f2115.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Description
upsert
primary_key
, should be unique (see note 1)_dlt_id
delete-insert
strategymerge_key
postgres
andsnowflake
destinations (other destinations come in follow up PR)Note 1:
A primary key-based
_dlt_id
requires that the primary key is unique before going into thenormalize
step. Primary key-based deduplication in theload
step, as we do in thedelete-insert
merge strategy, is not possible.Problems when primary key is not unique (i.e. it has duplicates):
UNIQUE
constraint on_dlt_id
column is violated (can be solved by not imposing the constraint)Related Issues
upsert
merge strategy #1129