Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace child table handling #499

Merged
merged 11 commits into from
Jul 27, 2023
12 changes: 9 additions & 3 deletions dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ def write_data_item(self, item: TDataItems, columns: TTableSchemaColumns) -> Non
elif self.file_max_items and self._writer.items_count >= self.file_max_items:
self._rotate_file()

def write_empty_file(self, columns: TTableSchemaColumns) -> None:
if columns is not None:
self._current_columns = dict(columns)
self._flush_items(allow_empty_file=True)

def close(self) -> None:
self._ensure_open()
self._flush_and_close_file()
Expand All @@ -109,8 +114,8 @@ def _rotate_file(self) -> None:
self._flush_and_close_file()
self._file_name = self.file_name_template % uniq_id(5) + "." + self._file_format_spec.file_extension

def _flush_items(self) -> None:
if len(self._buffered_items) > 0:
def _flush_items(self, allow_empty_file: bool = False) -> None:
if len(self._buffered_items) > 0 or allow_empty_file:
# we only open a writer when there are any items in the buffer and first flush is requested
if not self._writer:
# create new writer and write header
Expand All @@ -121,7 +126,8 @@ def _flush_items(self) -> None:
self._writer = DataWriter.from_file_format(self.file_format, self._file, caps=self._caps)
self._writer.write_header(self._current_columns)
# write buffer
self._writer.write_data(self._buffered_items)
if self._buffered_items:
self._writer.write_data(self._buffered_items)
self._buffered_items.clear()

def _flush_and_close_file(self) -> None:
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ def write_row(row: StrAny) -> None:
self._chunks_written += 1

def write_footer(self) -> None:
assert self._chunks_written > 0
self._f.write(";")
if self._chunks_written > 0:
self._f.write(";")

@classmethod
def data_format(cls) -> TFileFormatSpec:
Expand Down
4 changes: 4 additions & 0 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
def restore_file_load(self, file_path: str) -> LoadJob:
pass

def get_truncate_destination_table_dispositions(self) -> List[TWriteDisposition]:
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
# in the base job, all replace strategies are treated the same, see filesystem for example
return ["replace"]

@abstractmethod
def create_table_chain_completed_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
"""Creates a list of followup jobs that should be executed after a table chain is completed"""
Expand Down
10 changes: 9 additions & 1 deletion dlt/common/storages/data_item_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(self, load_file_type: TLoaderFileFormat, *args: Any) -> None:
self.buffered_writers: Dict[str, BufferedDataWriter] = {}
super().__init__(*args)

def write_data_item(self, load_id: str, schema_name: str, table_name: str, item: TDataItems, columns: TTableSchemaColumns) -> None:
def get_writer(self, load_id: str, schema_name: str, table_name: str) -> BufferedDataWriter:
# unique writer id
writer_id = f"{load_id}.{schema_name}.{table_name}"
writer = self.buffered_writers.get(writer_id, None)
Expand All @@ -22,9 +22,17 @@ def write_data_item(self, load_id: str, schema_name: str, table_name: str, item:
path = self._get_data_item_path_template(load_id, schema_name, table_name)
writer = BufferedDataWriter(self.loader_file_format, path)
self.buffered_writers[writer_id] = writer
return writer

def write_data_item(self, load_id: str, schema_name: str, table_name: str, item: TDataItems, columns: TTableSchemaColumns) -> None:
writer = self.get_writer(load_id, schema_name, table_name)
# write item(s)
writer.write_data_item(item, columns)

def write_empty_file(self, load_id: str, schema_name: str, table_name: str, columns: TTableSchemaColumns) -> None:
writer = self.get_writer(load_id, schema_name, table_name)
writer.write_empty_file(columns)

def close_writers(self, extract_id: str) -> None:
# flush and close all files
for name, writer in self.buffered_writers.items():
Expand Down
7 changes: 3 additions & 4 deletions dlt/destinations/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
disposition = table["write_disposition"]
job = BigQueryLoadJob(
FileStorage.get_file_name_from_file_path(file_path),
self._create_load_job(table, disposition in self.get_stage_dispositions(), self._should_truncate_destination_table(disposition), file_path),
self._create_load_job(table, disposition in self.get_stage_dispositions(), file_path),
self.config.http_timeout,
self.config.retry_deadline
)
Expand Down Expand Up @@ -256,9 +256,8 @@ def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]
except gcp_exceptions.NotFound:
return False, schema_table

def _create_load_job(self, table: TTableSchema, use_staging_table: bool, _should_truncate_destination_table: bool, file_path: str) -> bigquery.LoadJob:
def _create_load_job(self, table: TTableSchema, use_staging_table: bool, file_path: str) -> bigquery.LoadJob:
# append to table for merge loads (append to stage) and regular appends
bq_wd = bigquery.WriteDisposition.WRITE_TRUNCATE if _should_truncate_destination_table else bigquery.WriteDisposition.WRITE_APPEND
table_name = table["name"]

# determine wether we load from local or uri
Expand All @@ -284,7 +283,7 @@ def _create_load_job(self, table: TTableSchema, use_staging_table: bool, _should
job_id = BigQueryLoadJob.get_job_id_from_file_path(file_path)
job_config = bigquery.LoadJobConfig(
autodetect=False,
write_disposition=bq_wd,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
create_disposition=bigquery.CreateDisposition.CREATE_NEVER,
source_format=source_format,
decimal_target_types=decimal_target_types,
Expand Down
6 changes: 2 additions & 4 deletions dlt/destinations/duckdb/duck.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,12 @@


class DuckDbCopyJob(LoadJob, FollowupJob):
def __init__(self, table_name: str, use_staging_table: bool, _should_truncate_destination_table: bool, file_path: str, sql_client: DuckDbSqlClient) -> None:
def __init__(self, table_name: str, use_staging_table: bool, file_path: str, sql_client: DuckDbSqlClient) -> None:
super().__init__(FileStorage.get_file_name_from_file_path(file_path))

with sql_client.with_staging_dataset(use_staging_table):
qualified_table_name = sql_client.make_qualified_table_name(table_name)
with sql_client.begin_transaction():
if _should_truncate_destination_table:
sql_client.execute_sql(f"TRUNCATE TABLE {qualified_table_name}")
sql_client.execute_sql(f"COPY {qualified_table_name} FROM '{file_path}' ( FORMAT PARQUET );")


Expand All @@ -78,7 +76,7 @@ def __init__(self, schema: Schema, config: DuckDbClientConfiguration) -> None:
def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
if file_path.endswith("parquet"):
disposition = table["write_disposition"]
return DuckDbCopyJob(table["name"], disposition in self.get_stage_dispositions(), self._should_truncate_destination_table(disposition), file_path, self.sql_client)
return DuckDbCopyJob(table["name"], disposition in self.get_stage_dispositions(), file_path, self.sql_client)
return super().start_file_load(table, file_path, load_id)

def _get_column_def_sql(self, c: TColumnSchema) -> str:
Expand Down
38 changes: 13 additions & 25 deletions dlt/destinations/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,7 @@ def __init__(

super().__init__(file_name)
fs_client, _ = client_from_config(config)

# fallback to replace for merge without any merge keys
if write_disposition == 'merge':
write_disposition = 'append' if has_merge_keys else 'replace'

# replace existing files. also check if dir exists for bucket storages that cannot create dirs
if write_disposition == 'replace' and fs_client.isdir(dataset_path):
job_info = LoadStorage.parse_job_file_name(file_name)
# remove those files
search_prefix = posixpath.join(dataset_path, f"{schema_name}.{job_info.table_name}.")
# but leave actual load id - files may be loaded from other threads
ignore_prefix = posixpath.join(dataset_path, f"{schema_name}.{job_info.table_name}.{load_id}.")
# NOTE: glob implementation in fsspec does not look thread safe, way better is to use ls and then filter
all_files: List[str] = fs_client.ls(dataset_path, detail=False, refresh=True)
items = [item for item in all_files if item.startswith(search_prefix) and not item.startswith(ignore_prefix)]
# NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors
# if items:
# fs_client.rm(items[0])
for item in items:
# ignore file not found as we can have races from other deleting threads
try:
fs_client.rm_file(item)
except FileNotFoundError:
pass

self.destination_file_name = LoadFilesystemJob.make_destination_filename(file_name, schema_name, load_id)
fs_client.put_file(local_path, self.make_remote_path())

@staticmethod
Expand Down Expand Up @@ -106,6 +82,18 @@ def dataset_path(self) -> str:
return posixpath.join(self.fs_path, self.config.dataset_name)

def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
# clean up existing files for tables selected for truncating
if truncate_tables and self.fs_client.isdir(self.dataset_path):
all_files = self.fs_client.ls(self.dataset_path, detail=False, refresh=True)
for table in truncate_tables:
search_prefix = posixpath.join(self.dataset_path, f"{self.schema.name}.{table}.")
for item in all_files:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please preserve the comment form the above. it is a nice warning that you cannot trust fsspec` too much:

# NOTE: glob implementation in fsspec does not look thread safe, way better is to use ls and then filter

and

 # NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors

# NOTE: glob implementation in fsspec does not look thread safe, way better is to use ls and then filter
if item.startswith(search_prefix):
# NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors
self.fs_client.rm_file(item)

# create destination dir
self.fs_client.makedirs(self.dataset_path, exist_ok=True)

def is_storage_initialized(self) -> bool:
Expand Down
10 changes: 4 additions & 6 deletions dlt/destinations/insert_job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@


class InsertValuesLoadJob(LoadJob, FollowupJob):
def __init__(self, table_name: str, use_staging_table: bool, _should_truncate_destination_table: bool, file_path: str, sql_client: SqlClientBase[Any]) -> None:
def __init__(self, table_name: str, use_staging_table: bool, file_path: str, sql_client: SqlClientBase[Any]) -> None:
super().__init__(FileStorage.get_file_name_from_file_path(file_path))
self._sql_client = sql_client
# insert file content immediately
with self._sql_client.with_staging_dataset(use_staging_table):
with self._sql_client.begin_transaction():
for fragments in self._insert(sql_client.make_qualified_table_name(table_name), _should_truncate_destination_table, file_path):
for fragments in self._insert(sql_client.make_qualified_table_name(table_name), file_path):
self._sql_client.execute_fragments(fragments)

def state(self) -> TLoadJobState:
Expand All @@ -29,7 +29,7 @@ def exception(self) -> str:
# this part of code should be never reached
raise NotImplementedError()

def _insert(self, qualified_table_name: str, _should_truncate_destination_table: bool, file_path: str) -> Iterator[List[str]]:
def _insert(self, qualified_table_name: str, file_path: str) -> Iterator[List[str]]:
# WARNING: maximum redshift statement is 16MB https://docs.aws.amazon.com/redshift/latest/dg/c_redshift-sql.html
# the procedure below will split the inserts into max_query_length // 2 packs
with FileStorage.open_zipsafe_ro(file_path, "r", encoding="utf-8") as f:
Expand All @@ -39,8 +39,6 @@ def _insert(self, qualified_table_name: str, _should_truncate_destination_table:
assert values_mark == "VALUES\n"

insert_sql = []
if _should_truncate_destination_table:
insert_sql.append("DELETE FROM {};".format(qualified_table_name))
while content := f.read(self._sql_client.capabilities.max_query_length // 2):
# write INSERT
insert_sql.extend([header.format(qualified_table_name), values_mark, content])
Expand Down Expand Up @@ -93,7 +91,7 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
if not job:
# this is using sql_client internally and will raise a right exception
disposition = table["write_disposition"]
job = InsertValuesLoadJob(table["name"], disposition in self.get_stage_dispositions(), self._should_truncate_destination_table(disposition), file_path, self.sql_client)
job = InsertValuesLoadJob(table["name"], disposition in self.get_stage_dispositions(), file_path, self.sql_client)
return job

# TODO: implement indexes and primary keys for postgres
Expand Down
6 changes: 4 additions & 2 deletions dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,10 @@ def get_stage_dispositions(self) -> List[TWriteDisposition]:
dispositions.append("replace")
return dispositions

def _should_truncate_destination_table(self, disposition: TWriteDisposition) -> bool:
return disposition == "replace" and self.config.replace_strategy == "truncate-and-insert"
def get_truncate_destination_table_dispositions(self) -> List[TWriteDisposition]:
if self.config.replace_strategy == "truncate-and-insert":
return ["replace"]
return []

def _create_merge_job(self, table_chain: Sequence[TTableSchema]) -> NewLoadJob:
return SqlMergeJob.from_table_chain(table_chain, self.sql_client)
Expand Down
6 changes: 1 addition & 5 deletions dlt/destinations/redshift/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,10 @@ def __init__(self, table: TTableSchema,
file_path: str,
sql_client: SqlClientBase[Any],
use_staging_table: bool,
should_truncate_destination_table: bool,
staging_credentials: Optional[CredentialsConfiguration] = None,
staging_iam_role: str = None) -> None:
self._staging_iam_role = staging_iam_role
self._use_staging_table = use_staging_table
self._should_truncate_destination_table = should_truncate_destination_table
super().__init__(table, file_path, sql_client, staging_credentials)

def execute(self, table: TTableSchema, bucket_path: str) -> None:
Expand Down Expand Up @@ -128,8 +126,6 @@ def execute(self, table: TTableSchema, bucket_path: str) -> None:
with self._sql_client.with_staging_dataset(self._use_staging_table):
with self._sql_client.begin_transaction():
dataset_name = self._sql_client.dataset_name
if self._should_truncate_destination_table:
self._sql_client.execute_sql(f"""TRUNCATE TABLE {table_name}""")
# TODO: if we ever support csv here remember to add column names to COPY
self._sql_client.execute_sql(f"""
COPY {dataset_name}.{table_name}
Expand Down Expand Up @@ -184,7 +180,7 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
"""Starts SqlLoadJob for files ending with .sql or returns None to let derived classes to handle their specific jobs"""
if NewReferenceJob.is_reference_job(file_path):
disposition = table["write_disposition"]
return RedshiftCopyFileLoadJob(table, file_path, self.sql_client, disposition in self.get_stage_dispositions(), self._should_truncate_destination_table(disposition), staging_credentials=self.config.staging_credentials, staging_iam_role=self.config.staging_iam_role)
return RedshiftCopyFileLoadJob(table, file_path, self.sql_client, disposition in self.get_stage_dispositions(), staging_credentials=self.config.staging_credentials, staging_iam_role=self.config.staging_iam_role)
return super().start_file_load(table, file_path, load_id)

@classmethod
Expand Down
6 changes: 2 additions & 4 deletions dlt/destinations/snowflake/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@

class SnowflakeLoadJob(LoadJob, FollowupJob):
def __init__(
self, file_path: str, table_name: str, use_staging_table: bool, _should_truncate_destination_table: bool, load_id: str, client: SnowflakeSqlClient,
self, file_path: str, table_name: str, use_staging_table: bool, load_id: str, client: SnowflakeSqlClient,
stage_name: Optional[str] = None, keep_staged_files: bool = True, staging_credentials: Optional[CredentialsConfiguration] = None
) -> None:
file_name = FileStorage.get_file_name_from_file_path(file_path)
Expand Down Expand Up @@ -96,8 +96,6 @@ def __init__(
source_format = "(TYPE = 'PARQUET', BINARY_AS_TEXT = FALSE)"

with client.begin_transaction():
if _should_truncate_destination_table:
client.execute_sql(f"TRUNCATE TABLE IF EXISTS {qualified_table_name}")
# PUT and COPY in one tx if local file, otherwise only copy
if not bucket_path:
client.execute_sql(f'PUT file://{file_path} @{stage_name}/"{load_id}" OVERWRITE = TRUE, AUTO_COMPRESS = FALSE')
Expand Down Expand Up @@ -155,7 +153,7 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
if not job:
disposition = table['write_disposition']
job = SnowflakeLoadJob(
file_path, table['name'], disposition in self.get_stage_dispositions(), self._should_truncate_destination_table(disposition), load_id, self.sql_client,
file_path, table['name'], disposition in self.get_stage_dispositions(), load_id, self.sql_client,
stage_name=self.config.stage_name, keep_staged_files=self.config.keep_staged_files,
staging_credentials=self.config.staging_credentials
)
Expand Down
Loading