-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
replace child table handling #499
Conversation
✅ Deploy Preview for dlt-hub-docs ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
1f1a873
to
65f0602
Compare
dlt/extract/extract.py
Outdated
# find defined resources that did not yield any pipeitems and create empty jobs for them | ||
for resource in source.resources.selected.values(): | ||
if resource.write_disposition != "replace" or resource._table_name_hint_fun: | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this _table_name_hint_fun work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you do not need to worry :) we do not track table names.
5de7e15
to
57a80a5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
review of extract.py
dlt/extract/extract.py
Outdated
@@ -72,6 +72,11 @@ def extract( | |||
|
|||
with collector(f"Extract {source.name}"): | |||
|
|||
def _write_empty_file(table_name: str) -> None: | |||
table_name = schema.naming.normalize_identifier(table_name) | |||
collector.update(table_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't update collector with empty tables
dlt/extract/extract.py
Outdated
# find defined resources that did not yield any pipeitems and create empty jobs for them | ||
for resource in source.resources.selected.values(): | ||
if resource.write_disposition != "replace" or resource._table_name_hint_fun: | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you do not need to worry :) we do not track table names.
_write_static_table(resource, table_name) | ||
_write_item(table_name, pipe_item.item) | ||
|
||
# find defined resources that did not yield any pipeitems and create empty jobs for them |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what you should to here:
- find the difference of the set of resources that had data and all selected resources that had write disposition
replace
- find all tables that belong to the resources of the set above
data_tables = {t["name"]: t for t in self.schema.data_tables()} # Don't remove _dlt tables
resource_tables = group_tables_by_resource(data_tables, pattern=self.resource_pattern)
the code is coming from dlt/pipeline/helpers.py
3. generate empty files for all the tables found
explanation: a resource can create any number of tables thanks to dynamic hints or explicit dlt.mark.with_table_name
see the following resources
@dlt.transformer(name="github_repo_events", primary_key="id", write_disposition="merge", table_name=lambda i: i['type'])
def github_repo_events(page):
yield page
@dlt.transformer(name="github_repo_events", primary_key="id", write_disposition="merge")
def github_repo_events_table_meta(page):
yield from [dlt.mark.with_table_name(p, p['type']) for p in page]
this is actually dlt being amazing :) you create a separate table for all event types in github event feed. all those tables are coming from a single resource
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case where the main resource yields none, the table does not show up in the group_tables_by_resource, so I had to add an extra bit of code for this case. I hope this is alright.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two bigger things:
- please check the
filesystem
destination. it also has a replace "problem" that is brute force fixed in the copy job. what we should do is to adopt the truncate-insert strategy and truncate data ininitialize_storage
.
this code can just go from copy job to the client
# fallback to replace for merge without any merge keys
if write_disposition == 'merge':
write_disposition = 'append' if has_merge_keys else 'replace'
# replace existing files. also check if dir exists for bucket storages that cannot create dirs
if write_disposition == 'replace' and fs_client.isdir(dataset_path):
job_info = LoadStorage.parse_job_file_name(file_name)
# remove those files
search_prefix = posixpath.join(dataset_path, f"{schema_name}.{job_info.table_name}.")
# but leave actual load id - files may be loaded from other threads
ignore_prefix = posixpath.join(dataset_path, f"{schema_name}.{job_info.table_name}.{load_id}.")
# NOTE: glob implementation in fsspec does not look thread safe, way better is to use ls and then filter
all_files: List[str] = fs_client.ls(dataset_path, detail=False, refresh=True)
items = [item for item in all_files if item.startswith(search_prefix) and not item.startswith(ignore_prefix)]
# NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors
# if items:
# fs_client.rm(items[0])
for item in items:
# ignore file not found as we can have races from other deleting threads
try:
fs_client.rm_file(item)
except FileNotFoundError:
pass
and we do not need to filter by ignore_prefix and catch FileNotFoundError
because we do truncate BEFORE we start any loading. so there is just one writer. this is so good
- we need more tests
- test the situation with many tables per resource (the resources for github will be a good test)
- you must test all the file types we have: jsonl, insert-values, parquet to make sure empty files are correctly processed. empty == without rows. in particular I'm not sure of insert-values
files = p.get_load_package_info(p.list_completed_load_packages()[1]).jobs["completed_jobs"] | ||
assert all(f.job_file_info.file_format == "parquet" for f in files) | ||
assert all(f.job_file_info.file_format in ["parquet", "sql"] for f in files) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should have no "sql" jobs because we replace with truncate and without staging. if we have sql jobs we doing something wrong
I am not sure how to implement this merge keys fallback strategy, since this info is not available in initialise storage, I could put something in the loader to hand this over though.. Ok I think we need to discuss this quickly, I have moved the code into the initialise function, but I will have to add some kind of filter function to reduce all top_tables present in jobs to the right set to be truncated and this filter function will probably be in the client. It kind of feels right there but not quite.. Do you know what I mean? For filesystem we need to check each table in the case of the merge write disposition to wether merge keys are present on the actual table. |
7234356
to
1d56ad0
Compare
ff08ac8
to
27f12ed
Compare
f5ec413
to
d6fca85
Compare
I have also decided to have the file destination fallback to append for merge. I think If we do this replace, then there will be problems when loading from staging destinations to redshift etc with merge. Data will go missing. |
dlt/destinations/job_client_impl.py
Outdated
def _should_truncate_destination_table(self, disposition: TWriteDisposition) -> bool: | ||
return disposition == "replace" and self.config.replace_strategy == "truncate-and-insert" | ||
def get_truncate_destination_table_dispositions(self) -> List[TWriteDisposition]: | ||
if self.config.replace_strategy: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was sure we truncate only when doing truncate-and-insert
not always?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
crap, this was an editing error, good catch!
dlt/extract/extract.py
Outdated
else: | ||
# get partial table from table template | ||
if resource._table_name_hint_fun: | ||
if isinstance(pipe_item.item, List): | ||
for item in pipe_item.item: | ||
_write_dynamic_table(resource, item) | ||
table_name = resource._table_name_hint_fun(item) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can move it back into _write_dynamic_table
dlt/extract/extract.py
Outdated
_write_item(table_name, resource.name, pipe_item.item) | ||
|
||
# find defined resources that did not yield any pipeitems and create empty jobs for them | ||
tables_by_resources = utils.group_tables_by_resource(schema.tables) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can safely use data_tables
. this eliminates _dlt
tables and incomplete tables that were never created in the destination
for resource in source.resources.selected.values(): | ||
if resource.write_disposition != "replace" or resource.name in resources_with_items: | ||
continue | ||
if resource.name not in tables_by_resources: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we do not need empty file. if resource is not in present for any complete table this means that there's nothing to truncated on the destination
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes you are right, I had an error in the test that required this.
destinations. | ||
destination tables will be truncated at the beginning of the load and the new data will be inserted consecutively but not within the same transaction. | ||
The downside of this strategy is, that your tables will have no data for a while until the load is completed. You | ||
may end up with new data in some tables and no data in other tables if the load fails during the run. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add something like "Such incomplete load may be however detected by checking if the _dlt_loads
table contains load id from _dlt_load_id
of the replaced tables
all_files = self.fs_client.ls(self.dataset_path, detail=False, refresh=True) | ||
for table in truncate_tables: | ||
search_prefix = posixpath.join(self.dataset_path, f"{self.schema.name}.{table}.") | ||
for item in all_files: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please preserve the comment form the above. it is a nice warning that you cannot trust fsspec` too much:
# NOTE: glob implementation in fsspec does not look thread safe, way better is to use ls and then filter
and
# NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors
tests/load/test_job_client.py
Outdated
@@ -485,6 +499,7 @@ def test_write_dispositions(client: SqlJobClientBase, write_disposition: str, re | |||
assert len(db_rows) == idx + 1 | |||
elif write_disposition == "replace": | |||
# we overwrite with the same row. merge fallbacks to replace when no keys specified | |||
print(db_rows) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can remove that
}] | ||
yield dlt.mark.with_table_name(data, "items") | ||
yield dlt.mark.with_table_name(data, "other_items") | ||
yield None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what a little bother me is that when we added the round robin strategy to pipe iterator, we also forced the iterator to get next value if None is yielded. to None should never get out of the pipe:
item = None
while item is None:
item = next(gen)
so how it is possible that you've got this INSERT VALUE with NULL that we were looking at?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This happens if yield None is the only statement in the resource and no data is ever yielded. I did not investigate why this happens though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I? I think this would be a problem for a separate ticket though and we should merge the replace soonish
hmmm why? redshift destination copies files from the load package. all the other files were already copied and may be deleted (or not). I do not mean you should restore this edge case (this is not so important) - but no data will be missing in that case! when we start implementing the delta lake then yeah we will replace old files with new but that's according to write disposition so all is still good |
Hm I think you are right, I had a thought error here, I was already thinking about s3 and Athena. But I think it would be either good to keep it like this or throw an error if merge is used with filesystem destination. |
… into d#/replace_child_table_handling
FYI: all the hints are handled as best effort by destinations. if something is not supported it gets ignored... so the existing code is good |
One important note: This PR only handles propers child table clearing for the staging replace strategy. There are more changes needed if we also want to clear child tables on the classic strategy.
The one test that is always failing is: tests/load/pipeline/test_merge_disposition.py::test_merge_on_keys_in_schema. The problem here is, that the test makes sure that if there are tables defined in the schema but no data is coming for them they don't get created. But in this PR all the child tables of any table that has data are created on staging and cleared, so that the child table replacing works. The way to solve this is to look at the destination and see which tables are present already and prevent creation of tables that are not present, but that part I took out again.