Skip to content

Commit

Permalink
Get normalize config without decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Jun 13, 2024
1 parent dfa86b6 commit 706dce5
Showing 1 changed file with 15 additions and 10 deletions.
25 changes: 15 additions & 10 deletions dlt/extract/extractors.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from copy import copy
from typing import Set, Dict, Any, Optional, List

from dlt.common.configuration import known_sections
from dlt.common.configuration import known_sections, resolve_configuration, with_config
from dlt.common import logger
from dlt.common.configuration.inject import with_config
from dlt.common.configuration.specs import BaseConfiguration, configspec
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.exceptions import MissingDependencyException
Expand Down Expand Up @@ -229,13 +228,16 @@ class ArrowExtractor(Extractor):
- `pandas.DataFrame` (is converted to arrow `Table` before processing)
"""

# Inject the parts of normalize configuration that are used here
@with_config(
spec=ItemsNormalizerConfiguration, sections=(known_sections.NORMALIZE, "parquet_normalizer")
)
def __init__(self, *args: Any, add_dlt_load_id: bool = False, **kwargs: Any) -> None:
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.add_dlt_load_id = add_dlt_load_id
self._normalize_config = self._retrieve_normalize_config()

def _retrieve_normalize_config(self) -> ItemsNormalizerConfiguration:
"""Get normalizer settings that are used here"""
return resolve_configuration(
ItemsNormalizerConfiguration(),
sections=(known_sections.NORMALIZE, "parquet_normalizer"),
)

def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> None:
static_table_name = self._get_static_table_name(resource, meta)
Expand Down Expand Up @@ -314,7 +316,7 @@ def _write_item(
columns,
self.naming,
self._caps,
load_id=self.load_id if self.add_dlt_load_id else None,
load_id=self.load_id if self._normalize_config.add_dlt_load_id else None,
)
for item in items
]
Expand All @@ -340,7 +342,10 @@ def _compute_table(
# normalize arrow table before merging
arrow_table = self.schema.normalize_table_identifiers(arrow_table)
# Add load_id column
if self.add_dlt_load_id and "_dlt_load_id" not in arrow_table["columns"]:
if (
self._normalize_config.add_dlt_load_id
and "_dlt_load_id" not in arrow_table["columns"]
):
arrow_table["columns"]["_dlt_load_id"] = {
"name": "_dlt_load_id",
"data_type": "text",
Expand Down

0 comments on commit 706dce5

Please sign in to comment.