From 2258faa8b15bf21a5173b411846ee7ed3b6531a1 Mon Sep 17 00:00:00 2001 From: z3z1ma Date: Tue, 2 Apr 2024 17:28:50 -0700 Subject: [PATCH 1/4] feat: parameterize pipeline class in the primary factory method --- dlt/pipeline/__init__.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dlt/pipeline/__init__.py b/dlt/pipeline/__init__.py index 4101e58320..c6debf5a74 100644 --- a/dlt/pipeline/__init__.py +++ b/dlt/pipeline/__init__.py @@ -1,4 +1,4 @@ -from typing import Sequence, cast, overload +from typing import Sequence, Type, cast, overload from dlt.common.schema import Schema from dlt.common.schema.typing import TColumnSchema, TWriteDisposition, TSchemaContract @@ -97,6 +97,7 @@ def pipeline( full_refresh: bool = False, credentials: Any = None, progress: TCollectorArg = _NULL_COLLECTOR, + _impl_cls: Type[Pipeline] = Pipeline, **kwargs: Any, ) -> Pipeline: ensure_correct_pipeline_kwargs(pipeline, **kwargs) @@ -129,7 +130,7 @@ def pipeline( progress = collector_from_name(progress) # create new pipeline instance - p = Pipeline( + p = _impl_cls( pipeline_name, pipelines_dir, pipeline_salt, From 15eda940e940f9abdf7ecbeeab3dd2dd4f06d89b Mon Sep 17 00:00:00 2001 From: z3z1ma Date: Tue, 2 Apr 2024 20:46:21 -0700 Subject: [PATCH 2/4] chore: use generic typing --- dlt/pipeline/__init__.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/dlt/pipeline/__init__.py b/dlt/pipeline/__init__.py index c6debf5a74..9a4d8f47be 100644 --- a/dlt/pipeline/__init__.py +++ b/dlt/pipeline/__init__.py @@ -1,4 +1,4 @@ -from typing import Sequence, Type, cast, overload +from typing import Sequence, Type, TypeVar, cast, overload from dlt.common.schema import Schema from dlt.common.schema.typing import TColumnSchema, TWriteDisposition, TSchemaContract @@ -15,6 +15,8 @@ from dlt.pipeline.progress import _from_name as collector_from_name, TCollectorArg, _NULL_COLLECTOR from dlt.pipeline.warnings import credentials_argument_deprecated +TPipeline = TypeVar("TPipeline", bound=Pipeline) + @overload def pipeline( @@ -29,7 +31,8 @@ def pipeline( full_refresh: bool = False, credentials: Any = None, progress: TCollectorArg = _NULL_COLLECTOR, -) -> Pipeline: + _impl_cls: Type[TPipeline] = Pipeline, +) -> TPipeline: """Creates a new instance of `dlt` pipeline, which moves the data from the source ie. a REST API to a destination ie. database or a data lake. #### Note: @@ -97,9 +100,9 @@ def pipeline( full_refresh: bool = False, credentials: Any = None, progress: TCollectorArg = _NULL_COLLECTOR, - _impl_cls: Type[Pipeline] = Pipeline, + _impl_cls: Type[TPipeline] = Pipeline, **kwargs: Any, -) -> Pipeline: +) -> TPipeline: ensure_correct_pipeline_kwargs(pipeline, **kwargs) # call without arguments returns current pipeline orig_args = get_orig_args(**kwargs) # original (*args, **kwargs) @@ -112,7 +115,7 @@ def pipeline( context = Container()[PipelineContext] # if pipeline instance is already active then return it, otherwise create a new one if context.is_active(): - return cast(Pipeline, context.pipeline()) + return cast(TPipeline, context.pipeline()) else: pass From 16ebdc293db6d5196c8ba42f57701cb5e4de3a0d Mon Sep 17 00:00:00 2001 From: z3z1ma Date: Wed, 3 Apr 2024 13:42:27 -0700 Subject: [PATCH 3/4] chore: remove no args overload --- dlt/pipeline/__init__.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/dlt/pipeline/__init__.py b/dlt/pipeline/__init__.py index 9a4d8f47be..1a1cacc089 100644 --- a/dlt/pipeline/__init__.py +++ b/dlt/pipeline/__init__.py @@ -81,12 +81,6 @@ def pipeline( """ -@overload -def pipeline() -> Pipeline: # type: ignore - """When called without any arguments, returns the recently created `Pipeline` instance. - If not found, it creates a new instance with all the pipeline options set to defaults.""" - - @with_config(spec=PipelineConfiguration, auto_pipeline_section=True) def pipeline( pipeline_name: str = None, From 48cdfa1eb7d7b0fb37411d61cc0eb747fba814a7 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sun, 7 Apr 2024 20:06:14 +0200 Subject: [PATCH 4/4] uses TypeVal with default --- dlt/pipeline/__init__.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/dlt/pipeline/__init__.py b/dlt/pipeline/__init__.py index 1a1cacc089..6b14eaf777 100644 --- a/dlt/pipeline/__init__.py +++ b/dlt/pipeline/__init__.py @@ -1,4 +1,5 @@ -from typing import Sequence, Type, TypeVar, cast, overload +from typing import Sequence, Type, cast, overload +from typing_extensions import TypeVar from dlt.common.schema import Schema from dlt.common.schema.typing import TColumnSchema, TWriteDisposition, TSchemaContract @@ -15,7 +16,7 @@ from dlt.pipeline.progress import _from_name as collector_from_name, TCollectorArg, _NULL_COLLECTOR from dlt.pipeline.warnings import credentials_argument_deprecated -TPipeline = TypeVar("TPipeline", bound=Pipeline) +TPipeline = TypeVar("TPipeline", bound=Pipeline, default=Pipeline) @overload @@ -31,7 +32,7 @@ def pipeline( full_refresh: bool = False, credentials: Any = None, progress: TCollectorArg = _NULL_COLLECTOR, - _impl_cls: Type[TPipeline] = Pipeline, + _impl_cls: Type[TPipeline] = Pipeline, # type: ignore[assignment] ) -> TPipeline: """Creates a new instance of `dlt` pipeline, which moves the data from the source ie. a REST API to a destination ie. database or a data lake. @@ -81,6 +82,12 @@ def pipeline( """ +@overload +def pipeline() -> Pipeline: # type: ignore + """When called without any arguments, returns the recently created `Pipeline` instance. + If not found, it creates a new instance with all the pipeline options set to defaults.""" + + @with_config(spec=PipelineConfiguration, auto_pipeline_section=True) def pipeline( pipeline_name: str = None, @@ -94,7 +101,7 @@ def pipeline( full_refresh: bool = False, credentials: Any = None, progress: TCollectorArg = _NULL_COLLECTOR, - _impl_cls: Type[TPipeline] = Pipeline, + _impl_cls: Type[TPipeline] = Pipeline, # type: ignore[assignment] **kwargs: Any, ) -> TPipeline: ensure_correct_pipeline_kwargs(pipeline, **kwargs)