Skip to content

Commit

Permalink
Merge pull request #32 from scale-vector/rfix/moves-runners
Browse files Browse the repository at this point in the history
implements venv management and singer tap source
  • Loading branch information
rudolfix committed Jun 23, 2022
2 parents 4004c86 + 6634562 commit a61f714
Show file tree
Hide file tree
Showing 35 changed files with 2,025 additions and 123 deletions.
2 changes: 1 addition & 1 deletion dlt/cli/dlt.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import argparse
from typing import Callable

from dlt.common.runners import TRunArgs, add_pool_cli_arguments
from dlt.common.runners.pool_runner import TRunArgs, add_pool_cli_arguments


def main() -> None:
Expand Down
20 changes: 20 additions & 0 deletions dlt/common/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from typing import AnyStr


class DltException(Exception):
pass

Expand Down Expand Up @@ -27,6 +30,23 @@ def __init__(self, method: str) -> None:
super().__init__(f"Process pool supports only fork start method, {method} not supported. Switch the pool type to threading")


class CannotInstallDependency(DltException):
def __init__(self, dependency: str, interpreter: str, output: AnyStr) -> None:
self.dependency = dependency
self.interpreter = interpreter
if isinstance(output, bytes):
str_output = output.decode("utf-8")
else:
str_output = output
super().__init__(f"Cannot install dependency {dependency} with {interpreter} and pip:\n{str_output}\n")


class VenvNotFound(DltException):
def __init__(self, interpreter: str) -> None:
self.interpreter = interpreter
super().__init__(f"Venv with interpreter {interpreter} not found in path")


class TerminalException(Exception):
"""
Marks an exception that cannot be recovered from, should be mixed in into concrete exception class
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/runners/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from . import pool_runner
from .pool_runner import TRunArgs, TRunMetrics, initialize_runner, run_pool
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def initialize_runner(C: Type[BasicConfiguration], run_args: Optional[TRunArgs]



def pool_runner(C: Type[PoolRunnerConfiguration], run_f: Callable[[TPool], TRunMetrics]) -> int:
def run_pool(C: Type[PoolRunnerConfiguration], run_f: Callable[[TPool], TRunMetrics]) -> int:
# start pool
pool: Pool = None
if C.POOL_TYPE == "process":
Expand Down
19 changes: 19 additions & 0 deletions dlt/common/runners/stdout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from subprocess import PIPE, CalledProcessError
from typing import Any, Iterator

from dlt.common.runners.venv import Venv


def iter_stdout(venv: Venv, command: str, *script_args: Any) -> Iterator[str]:
# start a process in virtual environment, assume that text comes from stdout
process = venv.start_command(command, *script_args, stdout=PIPE, stderr=PIPE, text=True)
exit_code: int = None

# read all the lines until empty marker is returned
for line in iter(process.stdout.readline, ''):
yield line[:-1]

# we fail iterator if exit code is not 0
exit_code = process.wait()
if exit_code != 0:
raise CalledProcessError(exit_code, command)
90 changes: 90 additions & 0 deletions dlt/common/runners/venv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import os
import shutil
import venv
import types
import subprocess
from typing import Any, List, Type

from dlt.common.exceptions import CannotInstallDependency, VenvNotFound


class DLTEnvBuilder(venv.EnvBuilder):
def __init__(self) -> None:
super().__init__(with_pip=True, clear=True)

def post_setup(self, context: types.SimpleNamespace) -> None:
self.context = context


class Venv():
def __init__(self, context: types.SimpleNamespace) -> None:
self.context = context

@classmethod
def create(cls, venv_dir: str, dependencies: List[str] = None) -> "Venv":
b = DLTEnvBuilder()
try:
b.create(os.path.abspath(venv_dir))
if dependencies:
Venv._install_deps(b.context, dependencies)
except:
if os.path.isdir(venv_dir):
shutil.rmtree(venv_dir)
raise
return cls(b.context)

@classmethod
def restore(cls, venv_dir: str) -> "Venv":
if not os.path.isdir(venv_dir):
raise VenvNotFound(venv_dir)
b = venv.EnvBuilder(clear=False, upgrade=False)
c = b.ensure_directories(os.path.abspath(venv_dir))
if not os.path.isfile(c.env_exe):
raise VenvNotFound(c.env_exe)
return cls(c)

def __enter__(self) -> "Venv":
return self

def __exit__(self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb: types.TracebackType) -> None:
# delete venv
if self.context.env_dir and os.path.isdir(self.context.env_dir):
shutil.rmtree(self.context.env_dir)

def start_command(self, entry_point: str, *script_args: Any, **popen_kwargs: Any) -> "subprocess.Popen[str]":
command = os.path.join(self.context.bin_path, entry_point)
cmd = [command, *script_args]
return subprocess.Popen(cmd, **popen_kwargs)

def run_command(self, entry_point: str, *script_args: Any) -> str:
# runs one of installed entry points typically CLIS coming with packages and installed into PATH
command = os.path.join(self.context.bin_path, entry_point)
cmd = [command, *script_args]
return subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode("utf-8")

def run_script(self, script_path: str, *script_args: Any) -> str:
# os.environ is passed to executed process
cmd = [self.context.env_exe, "-I", os.path.abspath(script_path), *script_args]
try:
return subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode("utf-8")
except subprocess.CalledProcessError as cpe:
if cpe.returncode == 2:
raise FileNotFoundError(script_path)
else:
raise

def run_module(self, module: str, *module_args: Any) -> str:
cmd = [self.context.env_exe, "-Im", module, *module_args]
return subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode("utf-8")

def add_dependencies(self, dependencies: List[str] = None) -> None:
Venv._install_deps(self.context, dependencies)

@staticmethod
def _install_deps(context: types.SimpleNamespace, dependencies: List[str]) -> None:
for dep in dependencies:
cmd = [context.env_exe, "-Im", "pip", "install", dep]
try:
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as exc:
raise CannotInstallDependency(dep, context.env_exe, exc.output)
4 changes: 2 additions & 2 deletions dlt/common/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ def from_dict(cls, stored_schema: StoredSchema) -> "Schema":
self._version = stored_schema["version"]
self._preferred_types = stored_schema["preferred_types"]
self._hints = stored_schema["hints"]
self._excludes = stored_schema["excludes"]
self._includes = stored_schema["includes"]
self._excludes = stored_schema.get("excludes", [])
self._includes = stored_schema.get("includes", [])
# compile regexes
self._compile_regexes()

Expand Down
6 changes: 5 additions & 1 deletion dlt/common/typing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from typing import Callable, Dict, Any, List, Literal, Mapping, Sequence, TypeVar, TypedDict, Optional, Union
from typing import Callable, Dict, Any, Mapping, TypeVar, TypedDict, TYPE_CHECKING
if TYPE_CHECKING:
from _typeshed import StrOrBytesPath
else:
StrOrBytesPath = Any

DictStrAny = Dict[str, Any]
DictStrStr = Dict[str, str]
Expand Down
19 changes: 19 additions & 0 deletions dlt/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
from contextlib import contextmanager
import hashlib
from os import environ
import secrets
Expand Down Expand Up @@ -115,3 +117,20 @@ def update_dict_with_prune(dest: DictStrAny, update: StrAny) -> None:
def is_interactive() -> bool:
import __main__ as main
return not hasattr(main, '__file__')


@contextmanager
def custom_environ(env: StrStr) -> Iterator[None]:
"""Temporarily set environment variables inside the context manager and
fully restore previous environment afterwards
"""
original_env = {key: os.getenv(key) for key in env}
os.environ.update(env)
try:
yield
finally:
for key, value in original_env.items():
if value is None:
del os.environ[key]
else:
os.environ[key] = value
4 changes: 2 additions & 2 deletions dlt/dbt_runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dlt.common.logger import process_internal_exception, is_json_logging
from dlt.common.telemetry import get_logging_extras
from dlt.common.file_storage import FileStorage
from dlt.common.runners import TRunArgs, create_default_args, initialize_runner, pool_runner
from dlt.common.runners import TRunArgs, initialize_runner, run_pool
from dlt.common.telemetry import TRunMetrics

from dlt.dbt_runner.configuration import DBTRunnerConfiguration, gen_configuration_variant
Expand Down Expand Up @@ -196,7 +196,7 @@ def main(args: TRunArgs) -> int:
process_internal_exception("init module")
return -1

return pool_runner(C, run)
return run_pool(C, run)


def run_main(args: TRunArgs) -> None:
Expand Down
6 changes: 3 additions & 3 deletions dlt/dbt_runner/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import os
import logging
import tempfile
from typing import Any, Iterator, List, Sequence
from git import Repo, Git, RepositoryDirtyError
from typing import Any, Iterator, List, Sequence, Optional
from git import Repo, RepositoryDirtyError
from contextlib import contextmanager

from dlt.common import json
from dlt.common.utils import uniq_id
from dlt.common.typing import StrAny, Optional
from dlt.common.typing import StrAny
from dlt.dbt_runner.exceptions import DBTRunnerException

# block disabling root logger
Expand Down
4 changes: 2 additions & 2 deletions dlt/loaders/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from prometheus_client.metrics import MetricWrapperBase

from dlt.common import sleep, logger
from dlt.common.runners import TRunArgs, TRunMetrics, create_default_args, initialize_runner, pool_runner
from dlt.common.runners import TRunArgs, TRunMetrics, initialize_runner, run_pool
from dlt.common.logger import process_internal_exception, pretty_format_exception
from dlt.common.exceptions import TerminalValueError
from dlt.common.dataset_writers import TWriterType
Expand Down Expand Up @@ -248,7 +248,7 @@ def main(args: TRunArgs) -> int:
except Exception:
process_internal_exception("run")
return -1
return pool_runner(C, load)
return run_pool(C, load)


def run_main(args: TRunArgs) -> None:
Expand Down
31 changes: 15 additions & 16 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
from typing import Callable, Dict, Iterable, Iterator, List, Literal, Sequence, Tuple, TypeVar, Union, Generic
from prometheus_client import REGISTRY

from dlt.common import json, runners
from dlt.common import logger
from dlt.common import json, logger
from dlt.common.runners import pool_runner as runner, TRunArgs, TRunMetrics
from dlt.common.configuration import BasicConfiguration, make_configuration
from dlt.common.file_storage import FileStorage
from dlt.common.logger import process_internal_exception
from dlt.common.names import normalize_schema_name
from dlt.common.runners import TRunArgs, TRunMetrics
from dlt.common.schema import Schema, StoredSchema
from dlt.common.typing import DictStrAny, StrAny
from dlt.common.utils import uniq_id, is_interactive
Expand Down Expand Up @@ -51,7 +50,7 @@ def __init__(self, pipeline_name: str, log_level: str = "INFO") -> None:
"NAME": pipeline_name,
"LOG_LEVEL": log_level
})
runners.initialize_runner(C, TRunArgs(True, 0))
runner.initialize_runner(C, TRunArgs(True, 0))

def create_pipeline(self, credentials: PipelineCredentials, working_dir: str = None, schema: Schema = None) -> None:
# initialize root storage
Expand Down Expand Up @@ -118,7 +117,7 @@ def extract(self, items: Iterator[TItem], schema_name: str = None, table_name: s
try:
self._extract_iterator(default_table_name, all_items)
except:
raise PipelineStepFailed("extract", self.last_run_exception, runners.LAST_RUN_METRICS)
raise PipelineStepFailed("extract", self.last_run_exception, runner.LAST_RUN_METRICS)

def unpack(self, workers: int = 1, max_events_in_chunk: int = 100000) -> None:
if is_interactive() and workers > 1:
Expand All @@ -129,24 +128,24 @@ def unpack(self, workers: int = 1, max_events_in_chunk: int = 100000) -> None:
unpacker.CONFIG.MAX_EVENTS_IN_CHUNK = max_events_in_chunk
# switch to thread pool for single worker
unpacker.CONFIG.POOL_TYPE = "thread" if workers == 1 else "process"
runners.pool_runner(unpacker.CONFIG, unpacker.unpack)
if runners.LAST_RUN_METRICS.has_failed:
raise PipelineStepFailed("unpack", self.last_run_exception, runners.LAST_RUN_METRICS)
runner.run_pool(unpacker.CONFIG, unpacker.unpack)
if runner.LAST_RUN_METRICS.has_failed:
raise PipelineStepFailed("unpack", self.last_run_exception, runner.LAST_RUN_METRICS)

def load(self, max_parallel_loads: int = 20) -> None:
self._verify_loader_instance()
loader.CONFIG.MAX_PARALLELISM = loader.CONFIG.MAX_PARALLEL_LOADS = max_parallel_loads
runners.pool_runner(loader.CONFIG, loader.load)
if runners.LAST_RUN_METRICS.has_failed:
raise PipelineStepFailed("load", self.last_run_exception, runners.LAST_RUN_METRICS)
runner.run_pool(loader.CONFIG, loader.load)
if runner.LAST_RUN_METRICS.has_failed:
raise PipelineStepFailed("load", self.last_run_exception, runner.LAST_RUN_METRICS)

def flush(self) -> None:
self.unpack()
self.load()

@property
def last_run_exception(self) -> BaseException:
return runners.LAST_RUN_EXCEPTION
return runner.LAST_RUN_EXCEPTION

def list_extracted_loads(self) -> Sequence[str]:
return unpacker.unpack_storage.list_files_to_unpack_sorted()
Expand Down Expand Up @@ -267,18 +266,18 @@ def _extract_iterator(self, default_table_name: str, items: Sequence[DictStrAny]
load_id = uniq_id()
self.extractor_storage.save_json(f"{load_id}.json", items)
self.extractor_storage.commit_events(
self.pipeline_name,
self.default_schema_name,
self.extractor_storage.storage._make_path(f"{load_id}.json"),
default_table_name,
len(items),
load_id
)

runners.LAST_RUN_METRICS = TRunMetrics(was_idle=False, has_failed=False, pending_items=0)
runner.LAST_RUN_METRICS = TRunMetrics(was_idle=False, has_failed=False, pending_items=0)
except Exception as ex:
process_internal_exception("extracting iterator failed")
runners.LAST_RUN_METRICS = TRunMetrics(was_idle=False, has_failed=True, pending_items=0)
runners.LAST_RUN_EXCEPTION = ex
runner.LAST_RUN_METRICS = TRunMetrics(was_idle=False, has_failed=True, pending_items=0)
runner.LAST_RUN_EXCEPTION = ex
raise

@contextmanager
Expand Down
4 changes: 2 additions & 2 deletions dlt/unpacker/unpacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from dlt.common import pendulum, signals, json, logger
from dlt.common.json import custom_pua_decode
from dlt.common.runners import TRunArgs, TRunMetrics, create_default_args, pool_runner, initialize_runner
from dlt.common.runners import TRunArgs, TRunMetrics, run_pool, initialize_runner
from dlt.common.storages.unpacker_storage import UnpackerStorage
from dlt.common.telemetry import get_logging_extras
from dlt.common.utils import uniq_id
Expand Down Expand Up @@ -250,7 +250,7 @@ def main(args: TRunArgs, extract_f: TExtractFunc, default_schemas_path: str = No
process_internal_exception("init module")
return -1
# unpack
return pool_runner(C, unpack)
return run_pool(C, unpack)


def run_main(args: TRunArgs) -> None:
Expand Down

0 comments on commit a61f714

Please sign in to comment.