-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
replace with staging tables #488
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
e939cb3
to
8e64643
Compare
@sh-rp huh! IMO you are better off cherry-picking the replace changes than trying to merge this. unfortunately that was expected :/ |
1537f54
to
37e439e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks really good.
- please take a look at
test_write_dispositions
and parametrize for strategies. the test should OFC pass - add new document to docs in general-usage: full-loading.md where we'll describe the mechanism, the strategies etc.
- if you implement those optimizations in warehouses, please mention those per warehouse docs
- incremental-loading.md introduces full loading: refer the document (2) from it
dlt/common/destination/reference.py
Outdated
|
||
@configspec(init=True) | ||
class DestinationClientConfiguration(BaseConfiguration): | ||
destination_name: str = None # which destination to load data to | ||
credentials: Optional[CredentialsConfiguration] | ||
replace_strategy: TLoaderReplaceStrategy = "classic" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say DestinationClientDwhConfiguration
is a better place.
dlt/common/destination/reference.py
Outdated
@@ -172,6 +176,20 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> | |||
def restore_file_load(self, file_path: str) -> LoadJob: | |||
pass | |||
|
|||
@abstractmethod | |||
def get_stage_dispositions(self) -> List[TWriteDisposition]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this goes in good direction but it is not necessary to add those items to the JobClientBase
. IMO it is time for a little refactor:
- move all stage related methods (including the create_merge_job) to
SqlJobClientBase
(or create another intermediary abstract client here) - run the staging part of
load.py
only if client implements this interface
why? filesystem, weaviate or dummy do not have need for staging dataset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DestinationClientDwhConfiguration
got really misleading. dataset_name
and default_schema_name
are never configured. they are arguments to the client instance always passed via code. we can deal with that but later
dlt/common/destination/reference.py
Outdated
def create_table_chain_completed_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]: | ||
"""Creates a list of followup jobs that should be executed after a table chain is completed""" | ||
return [] | ||
|
||
@abstractmethod | ||
def create_merge_job(self, table_chain: Sequence[TTableSchema]) -> NewLoadJob: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm it looks like it can become a private method _create_merge_job
used internally by SqlJobClientBase
dlt/destinations/job_client_impl.py
Outdated
dispositions.append("replace") | ||
return dispositions | ||
|
||
def truncate_destination_table(self, disposition: TWriteDisposition) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is only used internally by SqlJobClientBase so remove it from ABC and add _
prefix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and _should_truncate_destination_table
is better -> because we just check truncation, not do that
dlt/destinations/job_client_impl.py
Outdated
def create_merge_job(self, table_chain: Sequence[TTableSchema]) -> NewLoadJob: | ||
return SqlMergeJob.from_table_chain(table_chain, self.sql_client) | ||
|
||
def create_staging_copy_job(self, table_chain: Sequence[TTableSchema]) -> NewLoadJob: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here -> private method
@@ -81,8 +81,10 @@ def _maybe_make_terminal_exception_from_data_error(pg_ex: psycopg2.DataError) -> | |||
|
|||
class RedshiftCopyFileLoadJob(CopyRemoteFileLoadJob): | |||
|
|||
def __init__(self, table: TTableSchema, file_path: str, sql_client: SqlClientBase[Any], staging_credentials: Optional[CredentialsConfiguration] = None, staging_iam_role: str = None) -> None: | |||
def __init__(self, table: TTableSchema, file_path: str, sql_client: SqlClientBase[Any], use_staging_table: bool, truncate_destination_table: bool, staging_credentials: Optional[CredentialsConfiguration] = None, staging_iam_role: str = None) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heh maybe reformat this. line got too long
os.environ['DESTINATION__REPLACE_STRATEGY'] = "staging" | ||
|
||
# snowflake requires gcs prefix instead of gs in bucket path | ||
if destination == "snowflake": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should normalize paths in copy jobs in snowflake? it has a specialized one
dlt/load/load.py
Outdated
logger.info(f"Client for {job_client.config.destination_name} will TRUNCATE STAGING TABLES: {merge_tables}") | ||
job_client.initialize_storage(staging=True, truncate_tables=merge_tables) | ||
staging_tables = set(job.table_name for job in staging_table_jobs) | ||
job_client.update_storage_schema(staging=True, only_tables=staging_tables | dlt_tables, expected_update=expected_update) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you solved the conflict incorrectly: only_tables=merge_tables | {VERSION_TABLE_NAME}
dlt/load/load.py
Outdated
merge_jobs = self.get_new_jobs_info(load_id, schema, "merge") | ||
if merge_jobs: | ||
staging_table_jobs = self.get_new_jobs_info(load_id, schema, job_client.get_stage_dispositions()) | ||
if staging_table_jobs: | ||
logger.info(f"Client for {job_client.config.destination_name} will start initialize STAGING storage") | ||
job_client.initialize_storage(staging=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MAYBE we could remove the staging flag from initialize_storage
and update_storage_schema
and have with_staging
context manager on SqlJobClientBase (the refactor that I proposed previously?). that would remove all traces of staging from JobClientBase (the top level ABC)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have created a subclass that supports the staging interface, it could also be a mixin actually, not sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also there is one method left called drop_tables, which has a staging parameter, maybe this should also be handled within the context manager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I removed it there too..
2e1bfa2
to
0af6f10
Compare
cde7230
to
da89cec
Compare
1737160
to
746e255
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for cleaning up the tests. the PR LGTM. I'll find why the last test fails and maybe rename one of the strategies then it is good to merge
# drop destination table | ||
sql.append(f"DROP TABLE IF EXISTS {table_name};") | ||
# recreate destination table with data cloned from staging table | ||
sql.append(f"CREATE TABLE {table_name} CLONE {staging_table_name};") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is copy on write clone. exactly like bigquery!
# needed for bigquery staging tests | ||
DESTINATION__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID: AKIAT4QMVMC4J46G55G4 | ||
DESTINATION__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} | ||
CREDENTIALS__PROJECT_ID: chat-analytics-rasa-ci |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all CREDENTIALS__ below are already defined, look up
dataset_name = self._sql_client.dataset_name | ||
if self._should_truncate_destination_table: | ||
self._sql_client.execute_sql(f"""TRUNCATE TABLE {table_name}""") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think _sql_client
has a truncate method now?
from tests.utils import ALL_DESTINATIONS | ||
from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration, set_destination_config_envs | ||
|
||
REPLACE_STRATEGIES = ["truncate-and-insert", "insert-from-staging", "optimized"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rename optimized
-> staging-optimized
b591adc
to
8985712
Compare
8985712
to
80d5de4
Compare
* implement staging replace disposition * add third replace strategy and parametrize test by replace strategy * make parametrized destination tests nicer * update docs * add s3 deps to snowflake tests * add more env vars for staging tests * fix classic test and revert redshift tx change * explains optimized stage replace per warehouse doc * fixes bumping schema version and renames to staging-optimized * fixes ci workflow env * fixes several race conditions in filesystem replace --------- Co-authored-by: Marcin Rudolf <rudolfix@rudolfix.org>
No description provided.