Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/devel' into docs/update_adjust_y…
Browse files Browse the repository at this point in the history
…our_schema

# Conflicts:
#	docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md
  • Loading branch information
AstrakhantsevaAA committed May 22, 2024
2 parents 397eeae + b1e0f77 commit 0521526
Show file tree
Hide file tree
Showing 25 changed files with 428 additions and 168 deletions.
24 changes: 24 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,29 @@ We use **master** branch for hot fixes (including documentation) that needs to b

On the release day, **devel** branch is merged into **master**. All releases of `dlt` happen only from the **master**.

### Branch naming rules

We want to make sure that our git history explains in a human readable way what has been changed with which Branch or PR. To this end, we are using the following branch naming pattern (all lowercase and dashes, no underscores):

```sh
{category}/{ticket-id}-description-of-the-branch
# example:
feat/4922-add-avro-support
```

#### Branch categories

* **feat** - a new feature that is being implemented (ticket required)
* **fix** - a change that fixes a bug (ticket required)
* **exp** - an experiment where we are testing a new idea or want to demonstrate something to the team, might turn into a `feat` later (ticket encouraged)
* **test** - anything related to the tests (ticket encouraged)
* **blogs** - a new entry to our blog (ticket optional)
* **docs** - a change to our docs (ticket optional)

#### Ticket Numbers

We encourage you to attach your branches to a ticket, if none exists, create one and explain what you are doing. For `feat` and `fix` branches, tickets are mandatory, for `exp` and `test` branches encouraged and for `blogs` and `docs` branches optional.

### Submitting a hotfix
We'll fix critical bugs and release `dlt` out of the schedule. Follow the regular procedure, but make your PR against **master** branch. Please ping us on Slack if you do it.

Expand Down Expand Up @@ -166,3 +189,4 @@ Once the version has been bumped, follow these steps to publish the new release
- [Poetry Documentation](https://python-poetry.org/docs/)

If you have any questions or need help, don't hesitate to reach out to us. We're here to help you succeed in contributing to `dlt`. Happy coding!
****
3 changes: 3 additions & 0 deletions dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ def __init__(
self.closed_files: List[DataWriterMetrics] = [] # all fully processed files
# buffered items must be less than max items in file
self.buffer_max_items = min(buffer_max_items, file_max_items or buffer_max_items)
# Explicitly configured max size supersedes destination limit
self.file_max_bytes = file_max_bytes
if self.file_max_bytes is None and _caps:
self.file_max_bytes = _caps.recommended_file_size
self.file_max_items = file_max_items
# the open function is either gzip.open or open
self.open = (
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):

preferred_loader_file_format: TLoaderFileFormat = None
supported_loader_file_formats: Sequence[TLoaderFileFormat] = None
recommended_file_size: Optional[int] = None
"""Recommended file size in bytes when writing extract/load files"""
preferred_staging_file_format: Optional[TLoaderFileFormat] = None
supported_staging_file_formats: Sequence[TLoaderFileFormat] = None
escape_identifier: Callable[[str], str] = None
Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ def capabilities() -> DestinationCapabilitiesContext:
caps.supported_loader_file_formats = ["jsonl", "parquet"]
caps.preferred_staging_file_format = "parquet"
caps.supported_staging_file_formats = ["parquet", "jsonl"]
# BQ limit is 4GB but leave a large headroom since buffered writer does not preemptively check size
caps.recommended_file_size = int(1024 * 1024 * 1024)
caps.escape_identifier = escape_bigquery_identifier
caps.escape_literal = None
caps.format_datetime_literal = format_bigquery_datetime_literal
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ def capabilities() -> DestinationCapabilitiesContext:
# https://learn.microsoft.com/en-us/sql/sql-server/maximum-capacity-specifications-for-sql-server?view=sql-server-ver16&redirectedfrom=MSDN
caps.max_identifier_length = 128
caps.max_column_identifier_length = 128
caps.max_query_length = 4 * 1024 * 64 * 1024
# A SQL Query can be a varchar(max) but is shown as limited to 65,536 * Network Packet
caps.max_query_length = 65536 * 10
caps.is_max_query_length_in_bytes = True
caps.max_text_data_type_length = 2**30 - 1
caps.is_max_text_data_type_length_in_bytes = False
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/mssql/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non
if c.get(h, False) is True
)
column_name = self.capabilities.escape_identifier(c["name"])
return f"{column_name} {db_type} {hints_str} {self._gen_not_null(c['nullable'])}"
return f"{column_name} {db_type} {hints_str} {self._gen_not_null(c.get('nullable', True))}"

def _create_replace_followup_jobs(
self, table_chain: Sequence[TTableSchema]
Expand Down
15 changes: 6 additions & 9 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,16 +567,13 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltResourceImpl:
compat_wrapper(actual_resource_name, conf_f, sig, *args, **kwargs),
incremental,
)
except InvalidResourceDataTypeFunctionNotAGenerator as gen_ex:
except InvalidResourceDataTypeFunctionNotAGenerator:
# we allow an edge case: resource can return another resource
try:
# actually call the function to see if it contains DltResource
data_ = conf_f(*args, **kwargs)
if not isinstance(data_, DltResource):
raise
r = data_ # type: ignore[assignment]
except Exception:
raise gen_ex from None
# actually call the function to see if it contains DltResource
data_ = conf_f(*args, **kwargs)
if not isinstance(data_, DltResource):
raise
r = data_ # type: ignore[assignment]
# consider transformer arguments bound
r._args_bound = True
# keep explicit args passed
Expand Down
3 changes: 3 additions & 0 deletions dlt/load/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class LoaderConfiguration(PoolRunnerConfiguration):
raise_on_max_retries: int = 5
"""When gt 0 will raise when job reaches raise_on_max_retries"""
_load_storage_config: LoadStorageConfiguration = None
# if set to `True`, the staging dataset will be
# truncated after loading the data
truncate_staging_dataset: bool = False

def on_resolved(self) -> None:
self.pool_type = "none" if self.workers == 1 else "thread"
35 changes: 34 additions & 1 deletion dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
LoadClientUnsupportedWriteDisposition,
LoadClientUnsupportedFileFormats,
)
from dlt.load.utils import get_completed_table_chain, init_client
from dlt.load.utils import _extend_tables_with_table_chain, get_completed_table_chain, init_client


class Load(Runnable[Executor], WithStepInfo[LoadMetrics, LoadInfo]):
Expand Down Expand Up @@ -348,6 +348,8 @@ def complete_package(self, load_id: str, schema: Schema, aborted: bool = False)
)
):
job_client.complete_load(load_id)
self._maybe_trancate_staging_dataset(schema, job_client)

self.load_storage.complete_load_package(load_id, aborted)
# collect package info
self._loaded_packages.append(self.load_storage.get_load_package_info(load_id))
Expand Down Expand Up @@ -490,6 +492,37 @@ def run(self, pool: Optional[Executor]) -> TRunMetrics:

return TRunMetrics(False, len(self.load_storage.list_normalized_packages()))

def _maybe_trancate_staging_dataset(self, schema: Schema, job_client: JobClientBase) -> None:
"""
Truncate the staging dataset if one used,
and configuration requests truncation.
Args:
schema (Schema): Schema to use for the staging dataset.
job_client (JobClientBase):
Job client to use for the staging dataset.
"""
if not (
isinstance(job_client, WithStagingDataset) and self.config.truncate_staging_dataset
):
return

data_tables = schema.data_table_names()
tables = _extend_tables_with_table_chain(
schema, data_tables, data_tables, job_client.should_load_data_to_staging_dataset
)

try:
with self.get_destination_client(schema) as client:
with client.with_staging_dataset(): # type: ignore
client.initialize_storage(truncate_tables=tables)

except Exception as exc:
logger.warn(
f"Staging dataset truncate failed due to the following error: {exc}"
" However, it didn't affect the data integrity."
)

def get_step_info(
self,
pipeline: SupportsPipeline,
Expand Down
1 change: 1 addition & 0 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ def load(
with signals.delayed_signals():
runner.run_pool(load_step.config, load_step)
info: LoadInfo = self._get_step_info(load_step)

self.first_run = False
return info
except Exception as l_ex:
Expand Down
56 changes: 28 additions & 28 deletions docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,13 @@ The configuration object passed to the REST API Generic Source has three main el
```py
config: RESTAPIConfig = {
"client": {
# ...
...
},
"resource_defaults": {
# ...
...
},
"resources": [
# ...
...
],
}
```
Expand All @@ -203,9 +203,7 @@ For example, you can set the primary key, write disposition, and other default s
```py
config = {
"client": {
"api_key": "your_api_key_here",
"base_url": "https://api.example.com",
# Add other client configurations here
# ...
},
"resource_defaults": {
"primary_key": "id",
Expand All @@ -219,14 +217,16 @@ config = {
"resources": [
"resource1",
{
"name": "resource2_name",
"write_disposition": "append",
"endpoint": {
"params": {
"param1": "value1",
"resource2": {
"name": "resource2_name",
"write_disposition": "append",
"endpoint": {
"params": {
"param1": "value1",
},
},
},
},
}
}
],
}
```
Expand Down Expand Up @@ -497,15 +497,15 @@ The `issue_comments` resource will make requests to the following endpoints:
The syntax for the `resolve` field in parameter configuration is:

```py
({
"{parameter_name}" :
{
{
"<parameter_name>": {
"type": "resolve",
"resource": "{parent_resource_name}",
"field": "{parent_resource_field_name}",
"resource": "<parent_resource_name>",
"field": "<parent_resource_field_name>",
}
})
}
```

Under the hood, dlt handles this by using a [transformer resource](../../general-usage/resource.md#process-resources-with-dlttransformer).

#### Include fields from the parent resource
Expand All @@ -516,7 +516,7 @@ You can include data from the parent resource in the child resource by using the
{
"name": "issue_comments",
"endpoint": {
# ...
...
},
"include_from_parent": ["id", "title", "created_at"],
}
Expand All @@ -534,41 +534,41 @@ When the API endpoint supports incremental loading, you can configure the source
1. Defining a special parameter in the `params` section of the [endpoint configuration](#endpoint-configuration):

```py
({
{
"<parameter_name>": {
"type": "incremental",
"cursor_path": "<path_to_cursor_field>",
"initial_value": "<initial_value>",
}
})
},
}
```

For example, in the `issues` resource configuration in the GitHub example, we have:

```py
({
{
"since": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": "2024-01-25T11:21:28Z",
}
})
},
}
```

This configuration tells the source to create an incremental object that will keep track of the `updated_at` field in the response and use it as a value for the `since` parameter in subsequent requests.

2. Specifying the `incremental` field in the [endpoint configuration](#endpoint-configuration):

```py
({
{
"incremental": {
"start_param": "<parameter_name>",
"end_param": "<parameter_name>",
"cursor_path": "<path_to_cursor_field>",
"initial_value": "<initial_value>",
"end_value": "<end_value>",
}
})
}
```

This configuration is more flexible and allows you to specify the start and end conditions for the incremental loading.
Expand Down
2 changes: 1 addition & 1 deletion docs/website/docs/dlt-ecosystem/verified-sources/slack.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ To get started with your data pipeline, follow these steps:

[This command](../../reference/command-line-interface) will initialize
[the pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/slack_pipeline.py)
with Google Sheets as the [source](../../general-usage/source) and
with Slack as the [source](../../general-usage/source) and
[duckdb](../destinations/duckdb.md) as the [destination](../destinations).

1. If you'd like to use a different destination, simply replace `duckdb` with the name of your
Expand Down
Loading

0 comments on commit 0521526

Please sign in to comment.