-
Notifications
You must be signed in to change notification settings - Fork 18
/
dlt_resource.py
117 lines (95 loc) · 3.47 KB
/
dlt_resource.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from dataclasses import dataclass
from typing import Any, Iterable, Mapping, Optional
import dlt
from dagster import (
AssetKey,
AssetSpec,
ConfigurableResource,
MaterializeResult,
ResourceDependency,
multi_asset,
)
from dagster._annotations import experimental, public
from dlt.extract.resource import DltResource
from dlt.extract.source import DltSource
from pydantic import Field
DLT_INCLUDED_METADATA = {
"first_run",
"started_at",
"finished_at",
"dataset_name",
"destination_name",
"destination_type",
}
@experimental
class ConfigurableDltResource(ConfigurableResource):
source: ResourceDependency[DltSource]
pipeline_name: str = Field(description="pipeline name parameter of `dlt.pipline()`")
dataset_name: str = Field(description="dataset name parameter of `dlt.pipeline()`")
destination: str = Field(
description="target destination parameter of `dlt.pipeline()` (eg. duckdb, snowflake)"
)
def _initialize_pipeline(self, **kwargs):
return dlt.pipeline(
pipeline_name=self.pipeline_name,
dataset_name=self.dataset_name,
destination=self.destination,
**kwargs,
)
def run(self, **pipeline_kwargs):
pipeline = self._initialize_pipeline(**pipeline_kwargs)
return pipeline.run(self.source)
@dataclass
class DagsterDltTranslator:
@classmethod
@public
def get_metadata(cls, asset_key: str, load_info: Mapping[str, Any]) -> Mapping[str, Any]:
# TODO - filter by `asset_key`
return {k: str(v) for k, v in load_info.items() if k in DLT_INCLUDED_METADATA}
@classmethod
@public
def get_asset_key(cls, resource_key: str, dataset_name: str) -> AssetKey:
"""Defines asset key for a given dlt resource key and dataset name.
Args:
resource_key (str): key of dlt resource
dataset_name (str): name of dlt dataset
Returns:
AssetKey of Dagster asset derived from dlt resource
"""
return AssetKey(f"dlt_{dataset_name}_{resource_key}")
@classmethod
@public
def get_deps_asset_keys(cls, resource: DltResource) -> Iterable[AssetKey]:
"""Defines upstream asset dependencies given a dlt resource.
Defaults to a concatenation of `resource.source_name` and `resource.name`.
Args:
resource (DltResource): dlt resource / transformer
Returns:
Iterable[AssetKey]: The Dagster asset keys upstream of `dlt_resource_key`.
"""
return [AssetKey(f"{resource.source_name}_{resource.name}")]
def build_dlt_assets(
dlt_pipeline_resource: ConfigurableDltResource,
name: Optional[str] = None,
group_name: Optional[str] = None,
dagster_dlt_translator: DagsterDltTranslator = DagsterDltTranslator(),
):
specs = [
AssetSpec(
key=dagster_dlt_translator.get_asset_key(key, dlt_pipeline_resource.dataset_name),
deps=dagster_dlt_translator.get_deps_asset_keys(value),
)
for key, value in dlt_pipeline_resource.source.resources.items()
]
@multi_asset(
name=name,
specs=specs,
group_name=group_name,
compute_kind="dlt",
)
def _assets():
load_info = dlt_pipeline_resource.run()
for spec in specs:
metadata = dagster_dlt_translator.get_metadata(str(spec.key), load_info.asdict())
yield MaterializeResult(asset_key=spec.key, metadata=metadata)
return [_assets]