Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix add_limit behavior in edge cases #1052

Merged
merged 3 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from functools import wraps



import dlt
from dlt.common.exceptions import MissingDependencyException
from dlt.common import pendulum, logger
Expand Down
9 changes: 9 additions & 0 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,17 @@ def add_limit(self, max_items: int) -> "DltResource": # noqa: A003
"DltResource": returns self
"""

# make sure max_items is a number, to allow "None" as value for unlimited
if max_items is None:
max_items = -1

def _gen_wrap(gen: TPipeStep) -> TPipeStep:
"""Wrap a generator to take the first `max_items` records"""

# zero items should produce empty generator
if max_items == 0:
return

count = 0
is_async_gen = False
if inspect.isfunction(gen):
Expand Down
2 changes: 2 additions & 0 deletions docs/examples/connector_x_arrow/load_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dlt
from dlt.sources.credentials import ConnectionStringCredentials


def read_sql_x(
conn_str: ConnectionStringCredentials = dlt.secrets.value,
query: str = dlt.config.value,
Expand All @@ -14,6 +15,7 @@ def read_sql_x(
protocol="binary",
)


def genome_resource():
# create genome resource with merge on `upid` primary key
genome = dlt.resource(
Expand Down
5 changes: 4 additions & 1 deletion docs/examples/google_sheets/google_sheets.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
)
from dlt.common.typing import DictStrAny, StrAny


def _initialize_sheets(
credentials: Union[GcpOAuthCredentials, GcpServiceAccountCredentials]
) -> Any:
# Build the service object.
service = build("sheets", "v4", credentials=credentials.to_native_credentials())
return service


@dlt.source
def google_spreadsheet(
spreadsheet_id: str,
Expand Down Expand Up @@ -55,6 +57,7 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]:
for name in sheet_names
]


if __name__ == "__main__":
pipeline = dlt.pipeline(destination="duckdb")
# see example.secrets.toml to where to put credentials
Expand All @@ -67,4 +70,4 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]:
sheet_names=range_names,
)
)
print(info)
print(info)
8 changes: 4 additions & 4 deletions docs/examples/incremental_loading/zendesk.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
from dlt.common.typing import TAnyDateTime
from dlt.sources.helpers.requests import client


@dlt.source(max_table_nesting=2)
def zendesk_support(
credentials: Dict[str, str] = dlt.secrets.value,
start_date: Optional[TAnyDateTime] = pendulum.datetime( # noqa: B008
year=2000, month=1, day=1
),
start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008
end_date: Optional[TAnyDateTime] = None,
):
"""
Expand Down Expand Up @@ -113,11 +112,12 @@ def get_pages(
if not response_json["end_of_stream"]:
get_url = response_json["next_page"]


if __name__ == "__main__":
# create dlt pipeline
pipeline = dlt.pipeline(
pipeline_name="zendesk", destination="duckdb", dataset_name="zendesk_data"
)

load_info = pipeline.run(zendesk_support())
print(load_info)
print(load_info)
2 changes: 2 additions & 0 deletions docs/examples/nested_data/nested_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

CHUNK_SIZE = 10000


# You can limit how deep dlt goes when generating child tables.
# By default, the library will descend and generate child tables
# for all nested lists, without a limit.
Expand Down Expand Up @@ -81,6 +82,7 @@ def load_documents(self) -> Iterator[TDataItem]:
while docs_slice := list(islice(cursor, CHUNK_SIZE)):
yield map_nested_in_place(convert_mongo_objs, docs_slice)


def convert_mongo_objs(value: Any) -> Any:
if isinstance(value, (ObjectId, Decimal128)):
return str(value)
Expand Down
5 changes: 4 additions & 1 deletion docs/examples/pdf_to_weaviate/pdf_to_weaviate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dlt.destinations.impl.weaviate import weaviate_adapter
from PyPDF2 import PdfReader


@dlt.resource(selected=False)
def list_files(folder_path: str):
folder_path = os.path.abspath(folder_path)
Expand All @@ -15,6 +16,7 @@ def list_files(folder_path: str):
"mtime": os.path.getmtime(file_path),
}


@dlt.transformer(primary_key="page_id", write_disposition="merge")
def pdf_to_text(file_item, separate_pages: bool = False):
if not separate_pages:
Expand All @@ -28,6 +30,7 @@ def pdf_to_text(file_item, separate_pages: bool = False):
page_item["page_id"] = file_item["file_name"] + "_" + str(page_no)
yield page_item


pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate")

# this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf"
Expand All @@ -51,4 +54,4 @@ def pdf_to_text(file_item, separate_pages: bool = False):

client = weaviate.Client("http://localhost:8080")
# get text of all the invoices in InvoiceText class we just created above
print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do())
print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do())
9 changes: 5 additions & 4 deletions docs/examples/qdrant_zendesk/qdrant.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@

from dlt.common.configuration.inject import with_config


# function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk
@dlt.source(max_table_nesting=2)
def zendesk_support(
credentials: Dict[str, str] = dlt.secrets.value,
start_date: Optional[TAnyDateTime] = pendulum.datetime( # noqa: B008
year=2000, month=1, day=1
),
start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008
end_date: Optional[TAnyDateTime] = None,
):
"""
Expand Down Expand Up @@ -80,13 +79,15 @@ def _parse_date_or_none(value: Optional[str]) -> Optional[pendulum.DateTime]:
return None
return ensure_pendulum_datetime(value)


# modify dates to return datetime objects instead
def _fix_date(ticket):
ticket["updated_at"] = _parse_date_or_none(ticket["updated_at"])
ticket["created_at"] = _parse_date_or_none(ticket["created_at"])
ticket["due_at"] = _parse_date_or_none(ticket["due_at"])
return ticket


# function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk
def get_pages(
url: str,
Expand Down Expand Up @@ -127,6 +128,7 @@ def get_pages(
if not response_json["end_of_stream"]:
get_url = response_json["next_page"]


if __name__ == "__main__":
# create a pipeline with an appropriate name
pipeline = dlt.pipeline(
Expand All @@ -146,7 +148,6 @@ def get_pages(

print(load_info)


# running the Qdrant client to connect to your Qdrant database

@with_config(sections=("destination", "qdrant", "credentials"))
Expand Down
4 changes: 3 additions & 1 deletion docs/examples/transformers/pokemon.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import dlt
from dlt.sources.helpers import requests


@dlt.source(max_table_nesting=2)
def source(pokemon_api_url: str):
""""""
Expand Down Expand Up @@ -46,6 +47,7 @@ def species(pokemon_details):

return (pokemon_list | pokemon, pokemon_list | pokemon | species)


if __name__ == "__main__":
# build duck db pipeline
pipeline = dlt.pipeline(
Expand All @@ -54,4 +56,4 @@ def species(pokemon_details):

# the pokemon_list resource does not need to be loaded
load_info = pipeline.run(source("https://pokeapi.co/api/v2/pokemon"))
print(load_info)
print(load_info)
3 changes: 3 additions & 0 deletions docs/website/docs/general-usage/resource.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,9 @@ assert list(r) == list(range(10))
> 💡 You cannot limit transformers. They should process all the data they receive fully to avoid
> inconsistencies in generated datasets.

> 💡 If you are paremetrizing the value of `add_limit` and sometimes need it to be disabled, you can set `None` or `-1`
> to disable the limiting. You can also set the limit to `0` for the resource to not yield any items.

### Set table name and adjust schema

You can change the schema of a resource, be it standalone or as a part of a source. Look for method
Expand Down
26 changes: 26 additions & 0 deletions tests/extract/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Iterator

import pytest
import asyncio

import dlt
from dlt.common.configuration.container import Container
Expand Down Expand Up @@ -789,6 +790,31 @@ def test_limit_infinite_counter() -> None:
assert list(r) == list(range(10))


@pytest.mark.parametrize("limit", (None, -1, 0, 10))
def test_limit_edge_cases(limit: int) -> None:
r = dlt.resource(range(20), name="infinity").add_limit(limit) # type: ignore

@dlt.resource()
async def r_async():
for i in range(20):
await asyncio.sleep(0.01)
yield i

sync_list = list(r)
async_list = list(r_async().add_limit(limit))

# check the expected results
assert sync_list == async_list
if limit == 10:
assert sync_list == list(range(10))
elif limit in [None, -1]:
assert sync_list == list(range(20))
elif limit == 0:
assert sync_list == []
else:
raise AssertionError(f"Unexpected limit: {limit}")


def test_limit_source() -> None:
def mul_c(item):
yield from "A" * (item + 2)
Expand Down
Loading