Skip to content

Commit

Permalink
feat: Feast extractor (#414)
Browse files Browse the repository at this point in the history
* Feast extractor

Signed-off-by: Mariusz Strzelecki <mariusz.strzelecki@getindata.com>

* Better looking descriptions

Signed-off-by: Mariusz Strzelecki <mariusz.strzelecki@getindata.com>

* Sample loader

Signed-off-by: Mariusz Strzelecki <mariusz.strzelecki@getindata.com>

* Docstrings

Signed-off-by: Mariusz Strzelecki <mariusz.strzelecki@getindata.com>

* Licence header

Signed-off-by: Mariusz Strzelecki <mariusz.strzelecki@getindata.com>
  • Loading branch information
szczeles committed Nov 25, 2020
1 parent c07cec9 commit 2343a90
Show file tree
Hide file tree
Showing 4 changed files with 464 additions and 2 deletions.
139 changes: 139 additions & 0 deletions databuilder/extractor/feast_extractor.py
@@ -0,0 +1,139 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

from typing import Iterator, Union
from datetime import datetime

import yaml
from feast import Client
from feast.feature_table import FeatureTable
from pyhocon import ConfigFactory, ConfigTree

from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata


class FeastExtractor(Extractor):
"""
Extracts feature tables from Feast Core service. Since Feast is
a metadata store (and not the database itself), it maps the
following atributes:
* a database is name of feast project
* table name is a name of the feature table
* columns are features stored in the feature table
"""

FEAST_SERVICE_CONFIG_KEY = "instance_name"
FEAST_ENDPOINT_CONFIG_KEY = "endpoint"
DESCRIBE_FEATURE_TABLES = "describe_feature_tables"
DEFAULT_CONFIG = ConfigFactory.from_dict(
{FEAST_SERVICE_CONFIG_KEY: "main", DESCRIBE_FEATURE_TABLES: True}
)

def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(FeastExtractor.DEFAULT_CONFIG)
self._feast_service = conf.get_string(FeastExtractor.FEAST_SERVICE_CONFIG_KEY)
self._describe_feature_tables = conf.get_bool(
FeastExtractor.DESCRIBE_FEATURE_TABLES
)
self._client = Client(
core_url=conf.get_string(FeastExtractor.FEAST_ENDPOINT_CONFIG_KEY)
)
self._extract_iter: Union[None, Iterator] = None

def get_scope(self) -> str:
return "extractor.feast"

def extract(self) -> Union[TableMetadata, None]:
"""
For every feature table from Feast, a multiple objets are extracted:
1. TableMetadata with feature table description
2. Programmatic Description of the feature table, containing
metadata - date of creation and labels
3. Programmatic Description with Batch Source specification
4. (if applicable) Programmatic Description with Stream Source
specification
"""
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None

def _get_extract_iter(self) -> Iterator[TableMetadata]:
for project in self._client.list_projects():
for feature_table in self._client.list_feature_tables(project=project):
yield from self._extract_feature_table(project, feature_table)

def _extract_feature_table(
self, project: str, feature_table: FeatureTable
) -> Iterator[TableMetadata]:
columns = []
for index, entity_name in enumerate(feature_table.entities):
entity = self._client.get_entity(entity_name, project=project)
columns.append(
ColumnMetadata(
entity.name, entity.description, entity.value_type, index
)
)

for index, feature in enumerate(feature_table.features):
columns.append(
ColumnMetadata(
feature.name,
None,
feature.dtype.name,
len(feature_table.entities) + index,
)
)

yield TableMetadata(
"feast",
self._feast_service,
project,
feature_table.name,
None,
columns,
)

if self._describe_feature_tables:
created_at = datetime.utcfromtimestamp(
feature_table.created_timestamp.seconds
)
description = f"* Created at **{created_at}**\n"

if feature_table.labels:
description += "* Labels:\n"
for key, value in feature_table.labels.items():
description += f" * {key}: **{value}**\n"

yield TableMetadata(
"feast",
self._feast_service,
project,
feature_table.name,
description,
description_source="feature_table_details",
)

yield TableMetadata(
"feast",
self._feast_service,
project,
feature_table.name,
f'```\n{yaml.dump(feature_table.to_dict()["spec"]["batchSource"])}```',
description_source="batch_source",
)

if feature_table.stream_source:
yield TableMetadata(
"feast",
self._feast_service,
project,
feature_table.name,
f'```\n{yaml.dump(feature_table.to_dict()["spec"]["streamSource"])}```',
description_source="stream_source",
)
58 changes: 58 additions & 0 deletions example/scripts/sample_feast_loader.py
@@ -0,0 +1,58 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

"""
This is a example script for extracting Feast feature tables
"""

from databuilder.extractor.feast_extractor import FeastExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.task.task import DefaultTask
from pyhocon import ConfigFactory

# NEO4J cluster endpoints
NEO4J_ENDPOINT = 'bolt://localhost:7687/'

neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'

FEAST_ENDPOINT = 'feast-core.featurestore.svc.cluster.local:6565'

feast_endpoint = FEAST_ENDPOINT


def create_feast_job_config():
tmp_folder = '/var/tmp/amundsen/table_metadata'
node_files_folder = '{tmp_folder}/nodes/'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships/'.format(tmp_folder=tmp_folder)

job_config = ConfigFactory.from_dict({
'extractor.feast.{}'.format(FeastExtractor.FEAST_ENDPOINT_CONFIG_KEY): feast_endpoint,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH):
node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR):
node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY):
neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER):
neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD):
neo4j_password,
'publisher.neo4j.job_publish_tag':
'some_unique_tag' # TO-DO unique tag must be added
})
return job_config


if __name__ == "__main__":
job = DefaultJob(conf=create_feast_job_config(),
task=DefaultTask(extractor=FeastExtractor(), loader=FsNeo4jCSVLoader()),
publisher=neo4j_csv_publisher.Neo4jCsvPublisher())
job.launch()
9 changes: 7 additions & 2 deletions setup.py
Expand Up @@ -61,8 +61,12 @@
'pyspark == 3.0.1'
]

feast = [
'feast==0.8.0'
]

all_deps = requirements + kafka + cassandra + glue + snowflake + athena + \
bigquery + jsonpath + db2 + dremio + druid + spark
bigquery + jsonpath + db2 + dremio + druid + spark + feast

setup(
name='amundsen-databuilder',
Expand All @@ -87,7 +91,8 @@
'db2': db2,
'dremio': dremio,
'druid': druid,
'delta-lake': spark
'delta-lake': spark,
'feast': feast,
},
classifiers=[
'Programming Language :: Python :: 3.6',
Expand Down

0 comments on commit 2343a90

Please sign in to comment.