Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Feast extractor #414

Merged
merged 5 commits into from Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
139 changes: 139 additions & 0 deletions databuilder/extractor/feast_extractor.py
@@ -0,0 +1,139 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

from typing import Iterator, Union
from datetime import datetime

import yaml
from feast import Client
from feast.feature_table import FeatureTable
from pyhocon import ConfigFactory, ConfigTree

from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata


class FeastExtractor(Extractor):
"""
Extracts feature tables from Feast Core service. Since Feast is
a metadata store (and not the database itself), it maps the
following atributes:

* a database is name of feast project
* table name is a name of the feature table
* columns are features stored in the feature table
"""

FEAST_SERVICE_CONFIG_KEY = "instance_name"
FEAST_ENDPOINT_CONFIG_KEY = "endpoint"
DESCRIBE_FEATURE_TABLES = "describe_feature_tables"
DEFAULT_CONFIG = ConfigFactory.from_dict(
{FEAST_SERVICE_CONFIG_KEY: "main", DESCRIBE_FEATURE_TABLES: True}
)

def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(FeastExtractor.DEFAULT_CONFIG)
self._feast_service = conf.get_string(FeastExtractor.FEAST_SERVICE_CONFIG_KEY)
self._describe_feature_tables = conf.get_bool(
FeastExtractor.DESCRIBE_FEATURE_TABLES
)
self._client = Client(
core_url=conf.get_string(FeastExtractor.FEAST_ENDPOINT_CONFIG_KEY)
)
self._extract_iter: Union[None, Iterator] = None

def get_scope(self) -> str:
return "extractor.feast"

def extract(self) -> Union[TableMetadata, None]:
"""
For every feature table from Feast, a multiple objets are extracted:

1. TableMetadata with feature table description
2. Programmatic Description of the feature table, containing
metadata - date of creation and labels
3. Programmatic Description with Batch Source specification
4. (if applicable) Programmatic Description with Stream Source
specification
"""
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None

def _get_extract_iter(self) -> Iterator[TableMetadata]:
for project in self._client.list_projects():
for feature_table in self._client.list_feature_tables(project=project):
yield from self._extract_feature_table(project, feature_table)

def _extract_feature_table(
self, project: str, feature_table: FeatureTable
) -> Iterator[TableMetadata]:
columns = []
for index, entity_name in enumerate(feature_table.entities):
entity = self._client.get_entity(entity_name, project=project)
columns.append(
ColumnMetadata(
entity.name, entity.description, entity.value_type, index
)
)

for index, feature in enumerate(feature_table.features):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any order in feast for entity vs feature? or you just put the entity column first before feature column?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feast doesn't define ordering, these are just different properties of the feature table. I put entities first, because they are like "primary keys" of the table.

columns.append(
ColumnMetadata(
feature.name,
None,
feature.dtype.name,
len(feature_table.entities) + index,
)
)

yield TableMetadata(
"feast",
self._feast_service,
project,
feature_table.name,
None,
columns,
)

if self._describe_feature_tables:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting to know you yield the prog description in the same extractor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it was quite easy, since both tables and prog descriptions use same class. I was thinking once about creating multiple extractors - one for table and second for prog descirptions, but decided to not overcomplicate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @dikshathakur3119 for FYI as this will be easier for Lyft internal programmatic description use case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is super useful

created_at = datetime.utcfromtimestamp(
feature_table.created_timestamp.seconds
)
description = f"* Created at **{created_at}**\n"

if feature_table.labels:
description += "* Labels:\n"
for key, value in feature_table.labels.items():
description += f" * {key}: **{value}**\n"

yield TableMetadata(
"feast",
self._feast_service,
project,
feature_table.name,
description,
description_source="feature_table_details",
)

yield TableMetadata(
"feast",
self._feast_service,
project,
feature_table.name,
f'```\n{yaml.dump(feature_table.to_dict()["spec"]["batchSource"])}```',
description_source="batch_source",
)

if feature_table.stream_source:
yield TableMetadata(
"feast",
self._feast_service,
project,
feature_table.name,
f'```\n{yaml.dump(feature_table.to_dict()["spec"]["streamSource"])}```',
description_source="stream_source",
)
58 changes: 58 additions & 0 deletions example/scripts/sample_feast_loader.py
@@ -0,0 +1,58 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

"""
This is a example script for extracting Feast feature tables
"""

from databuilder.extractor.feast_extractor import FeastExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.task.task import DefaultTask
from pyhocon import ConfigFactory

# NEO4J cluster endpoints
NEO4J_ENDPOINT = 'bolt://localhost:7687/'

neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'

FEAST_ENDPOINT = 'feast-core.featurestore.svc.cluster.local:6565'

feast_endpoint = FEAST_ENDPOINT


def create_feast_job_config():
tmp_folder = '/var/tmp/amundsen/table_metadata'
node_files_folder = '{tmp_folder}/nodes/'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships/'.format(tmp_folder=tmp_folder)

job_config = ConfigFactory.from_dict({
'extractor.feast.{}'.format(FeastExtractor.FEAST_ENDPOINT_CONFIG_KEY): feast_endpoint,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH):
node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR):
node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY):
neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER):
neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD):
neo4j_password,
'publisher.neo4j.job_publish_tag':
'some_unique_tag' # TO-DO unique tag must be added
})
return job_config


if __name__ == "__main__":
job = DefaultJob(conf=create_feast_job_config(),
task=DefaultTask(extractor=FeastExtractor(), loader=FsNeo4jCSVLoader()),
publisher=neo4j_csv_publisher.Neo4jCsvPublisher())
job.launch()
9 changes: 7 additions & 2 deletions setup.py
Expand Up @@ -61,8 +61,12 @@
'pyspark == 3.0.1'
]

feast = [
'feast==0.8.0'
]

all_deps = requirements + kafka + cassandra + glue + snowflake + athena + \
bigquery + jsonpath + db2 + dremio + druid + spark
bigquery + jsonpath + db2 + dremio + druid + spark + feast

setup(
name='amundsen-databuilder',
Expand All @@ -87,7 +91,8 @@
'db2': db2,
'dremio': dremio,
'druid': druid,
'delta-lake': spark
'delta-lake': spark,
'feast': feast,
},
classifiers=[
'Programming Language :: Python :: 3.6',
Expand Down