-
Notifications
You must be signed in to change notification settings - Fork 209
/
feast_extractor.py
139 lines (119 loc) · 4.82 KB
/
feast_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
from typing import Iterator, Union
from datetime import datetime
import yaml
from feast import Client
from feast.feature_table import FeatureTable
from pyhocon import ConfigFactory, ConfigTree
from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
class FeastExtractor(Extractor):
"""
Extracts feature tables from Feast Core service. Since Feast is
a metadata store (and not the database itself), it maps the
following atributes:
* a database is name of feast project
* table name is a name of the feature table
* columns are features stored in the feature table
"""
FEAST_SERVICE_CONFIG_KEY = "instance_name"
FEAST_ENDPOINT_CONFIG_KEY = "endpoint"
DESCRIBE_FEATURE_TABLES = "describe_feature_tables"
DEFAULT_CONFIG = ConfigFactory.from_dict(
{FEAST_SERVICE_CONFIG_KEY: "main", DESCRIBE_FEATURE_TABLES: True}
)
def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(FeastExtractor.DEFAULT_CONFIG)
self._feast_service = conf.get_string(FeastExtractor.FEAST_SERVICE_CONFIG_KEY)
self._describe_feature_tables = conf.get_bool(
FeastExtractor.DESCRIBE_FEATURE_TABLES
)
self._client = Client(
core_url=conf.get_string(FeastExtractor.FEAST_ENDPOINT_CONFIG_KEY)
)
self._extract_iter: Union[None, Iterator] = None
def get_scope(self) -> str:
return "extractor.feast"
def extract(self) -> Union[TableMetadata, None]:
"""
For every feature table from Feast, a multiple objets are extracted:
1. TableMetadata with feature table description
2. Programmatic Description of the feature table, containing
metadata - date of creation and labels
3. Programmatic Description with Batch Source specification
4. (if applicable) Programmatic Description with Stream Source
specification
"""
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def _get_extract_iter(self) -> Iterator[TableMetadata]:
for project in self._client.list_projects():
for feature_table in self._client.list_feature_tables(project=project):
yield from self._extract_feature_table(project, feature_table)
def _extract_feature_table(
self, project: str, feature_table: FeatureTable
) -> Iterator[TableMetadata]:
columns = []
for index, entity_name in enumerate(feature_table.entities):
entity = self._client.get_entity(entity_name, project=project)
columns.append(
ColumnMetadata(
entity.name, entity.description, entity.value_type, index
)
)
for index, feature in enumerate(feature_table.features):
columns.append(
ColumnMetadata(
feature.name,
None,
feature.dtype.name,
len(feature_table.entities) + index,
)
)
yield TableMetadata(
"feast",
self._feast_service,
project,
feature_table.name,
None,
columns,
)
if self._describe_feature_tables:
created_at = datetime.utcfromtimestamp(
feature_table.created_timestamp.seconds
)
description = f"* Created at **{created_at}**\n"
if feature_table.labels:
description += "* Labels:\n"
for key, value in feature_table.labels.items():
description += f" * {key}: **{value}**\n"
yield TableMetadata(
"feast",
self._feast_service,
project,
feature_table.name,
description,
description_source="feature_table_details",
)
yield TableMetadata(
"feast",
self._feast_service,
project,
feature_table.name,
f'```\n{yaml.dump(feature_table.to_dict()["spec"]["batchSource"])}```',
description_source="batch_source",
)
if feature_table.stream_source:
yield TableMetadata(
"feast",
self._feast_service,
project,
feature_table.name,
f'```\n{yaml.dump(feature_table.to_dict()["spec"]["streamSource"])}```',
description_source="stream_source",
)