Skip to content

Commit

Permalink
Decouple build feature code (#838)
Browse files Browse the repository at this point in the history
* Restructure code to encapsulate the `save_to_feature_config_from_context`

* Update client.py

* fix merge issues

* Update config_helper.py

* fix comments
  • Loading branch information
xiaoyongzhu authored Nov 23, 2022
1 parent c21d89d commit 799fac0
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 408 deletions.
54 changes: 36 additions & 18 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from feathr.definition.settings import ObservationSettings
from feathr.definition.sink import Sink
from feathr.protobuf.featureValue_pb2 import FeatureValue
from feathr.registry.feature_registry import default_registry_client
from feathr.spark_provider._databricks_submission import _FeathrDatabricksJobLauncher
from feathr.spark_provider._localspark_submission import _FeathrLocalSparkJobLauncher
from feathr.spark_provider._synapse_submission import _FeathrSynapseJobLauncher
Expand All @@ -34,8 +33,14 @@
from feathr.utils.feature_printer import FeaturePrinter
from feathr.utils.spark_job_params import FeatureGenerationJobParams, FeatureJoinJobParams
from feathr.definition.source import InputContext
from azure.identity import DefaultAzureCredential
from jinja2 import Template
from loguru import logger
from feathr.definition.config_helper import FeathrConfigHelper
from pyhocon import ConfigFactory
from feathr.registry._feathr_registry_client import _FeatureRegistry
from feathr.registry._feature_registry_purview import _PurviewRegistry
from feathr.version import get_version

class FeathrClient(object):
"""Feathr client.
Expand Down Expand Up @@ -170,10 +175,24 @@ def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir

self.secret_names = []

# initialize registry
self.registry = default_registry_client(self.project_name, config_path=config_path, credential=self.credential)
# initialize config helper
self.config_helper = FeathrConfigHelper()

logger.info(f"Feathr Client {get_version()} initialized successfully")
# initialize registry
self.registry = None
registry_endpoint = self.envutils.get_environment_variable_with_default("feature_registry", "api_endpoint")
azure_purview_name = self.envutils.get_environment_variable_with_default('feature_registry', 'purview', 'purview_name')
if registry_endpoint:
self.registry = _FeatureRegistry(self.project_name, endpoint=registry_endpoint, project_tags=project_registry_tag, credential=credential)
elif azure_purview_name:
registry_delimiter = self.envutils.get_environment_variable_with_default('feature_registry', 'purview', 'delimiter')
# initialize the registry no matter whether we set purview name or not, given some of the methods are used there.
self.registry = _PurviewRegistry(self.project_name, azure_purview_name, registry_delimiter, project_registry_tag, config_path = config_path, credential=credential)
else:
# no registry configured
logger.info("Feathr registry is not configured. Consider setting the Feathr registry component for richer feature store experience.")

logger.info(f"Feathr client {get_version()} initialized successfully.")

def _check_required_environment_variables_exist(self):
"""Checks if the required environment variables(form feathr_config.yaml) is set.
Expand All @@ -197,7 +216,7 @@ def register_features(self, from_context: bool = True):
if from_context:
# make sure those items are in `self`
if 'anchor_list' in dir(self) and 'derived_feature_list' in dir(self):
self.registry.save_to_feature_config_from_context(self.anchor_list, self.derived_feature_list, self.local_workspace_dir)
self.config_helper.save_to_feature_config_from_context(self.anchor_list, self.derived_feature_list, self.local_workspace_dir)
self.registry.register_features(self.local_workspace_dir, from_context=from_context, anchor_list=self.anchor_list, derived_feature_list=self.derived_feature_list)
else:
raise RuntimeError("Please call FeathrClient.build_features() first in order to register features")
Expand All @@ -224,9 +243,8 @@ def build_features(self, anchor_list: List[FeatureAnchor] = [], derived_feature_
else:
source_names[anchor.source.name] = anchor.source

preprocessingPyudfManager = _PreprocessingPyudfManager()
_PreprocessingPyudfManager.build_anchor_preprocessing_metadata(anchor_list, self.local_workspace_dir)
self.registry.save_to_feature_config_from_context(anchor_list, derived_feature_list, self.local_workspace_dir)
self.config_helper.save_to_feature_config_from_context(anchor_list, derived_feature_list, self.local_workspace_dir)
self.anchor_list = anchor_list
self.derived_feature_list = derived_feature_list

Expand Down Expand Up @@ -470,7 +488,7 @@ def get_offline_features(self,
# otherwise users will be confused on what are the available features
# in build_features it will assign anchor_list and derived_feature_list variable, hence we are checking if those two variables exist to make sure the above condition is met
if 'anchor_list' in dir(self) and 'derived_feature_list' in dir(self):
self.registry.save_to_feature_config_from_context(self.anchor_list, self.derived_feature_list, self.local_workspace_dir)
self.config_helper.save_to_feature_config_from_context(self.anchor_list, self.derived_feature_list, self.local_workspace_dir)
else:
raise RuntimeError("Please call FeathrClient.build_features() first in order to get offline features")

Expand Down Expand Up @@ -678,7 +696,7 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf
# otherwise users will be confused on what are the available features
# in build_features it will assign anchor_list and derived_feature_list variable, hence we are checking if those two variables exist to make sure the above condition is met
if 'anchor_list' in dir(self) and 'derived_feature_list' in dir(self):
self.registry.save_to_feature_config_from_context(self.anchor_list, self.derived_feature_list, self.local_workspace_dir)
self.config_helper.save_to_feature_config_from_context(self.anchor_list, self.derived_feature_list, self.local_workspace_dir)
else:
raise RuntimeError("Please call FeathrClient.build_features() first in order to materialize the features")

Expand Down Expand Up @@ -772,7 +790,7 @@ def _get_s3_config_str(self):
# keys can't be only accessed through environment
access_key = self.envutils.get_environment_variable('S3_ACCESS_KEY')
secret_key = self.envutils.get_environment_variable('S3_SECRET_KEY')
# HOCCON format will be parsed by the Feathr job
# HOCON format will be parsed by the Feathr job
config_str = """
S3_ENDPOINT: {S3_ENDPOINT}
S3_ACCESS_KEY: "{S3_ACCESS_KEY}"
Expand All @@ -787,7 +805,7 @@ def _get_adls_config_str(self):
# if ADLS Account is set in the feathr_config, then we need other environment variables
# keys can't be only accessed through environment
key = self.envutils.get_environment_variable('ADLS_KEY')
# HOCCON format will be parsed by the Feathr job
# HOCON format will be parsed by the Feathr job
config_str = """
ADLS_ACCOUNT: {ADLS_ACCOUNT}
ADLS_KEY: "{ADLS_KEY}"
Expand All @@ -801,7 +819,7 @@ def _get_blob_config_str(self):
# if BLOB Account is set in the feathr_config, then we need other environment variables
# keys can't be only accessed through environment
key = self.envutils.get_environment_variable('BLOB_KEY')
# HOCCON format will be parsed by the Feathr job
# HOCON format will be parsed by the Feathr job
config_str = """
BLOB_ACCOUNT: {BLOB_ACCOUNT}
BLOB_KEY: "{BLOB_KEY}"
Expand All @@ -817,7 +835,7 @@ def _get_sql_config_str(self):
driver = self.envutils.get_environment_variable('JDBC_DRIVER')
auth_flag = self.envutils.get_environment_variable('JDBC_AUTH_FLAG')
token = self.envutils.get_environment_variable('JDBC_TOKEN')
# HOCCON format will be parsed by the Feathr job
# HOCON format will be parsed by the Feathr job
config_str = """
JDBC_TABLE: {JDBC_TABLE}
JDBC_USER: {JDBC_USER}
Expand All @@ -834,7 +852,7 @@ def _get_monitoring_config_str(self):
user = self.envutils.get_environment_variable_with_default('monitoring', 'database', 'sql', 'user')
password = self.envutils.get_environment_variable('MONITORING_DATABASE_SQL_PASSWORD')
if url:
# HOCCON format will be parsed by the Feathr job
# HOCON format will be parsed by the Feathr job
config_str = """
MONITORING_DATABASE_SQL_URL: "{url}"
MONITORING_DATABASE_SQL_USER: {user}
Expand All @@ -852,7 +870,7 @@ def _get_snowflake_config_str(self):
sf_role = self.envutils.get_environment_variable_with_default('offline_store', 'snowflake', 'role')
sf_warehouse = self.envutils.get_environment_variable_with_default('offline_store', 'snowflake', 'warehouse')
sf_password = self.envutils.get_environment_variable('JDBC_SF_PASSWORD')
# HOCCON format will be parsed by the Feathr job
# HOCON format will be parsed by the Feathr job
config_str = """
JDBC_SF_URL: {JDBC_SF_URL}
JDBC_SF_USER: {JDBC_SF_USER}
Expand All @@ -866,7 +884,7 @@ def _get_kafka_config_str(self):
"""Construct the Kafka config string. The endpoint, access key, secret key, and other parameters can be set via
environment variables."""
sasl = self.envutils.get_environment_variable('KAFKA_SASL_JAAS_CONFIG')
# HOCCON format will be parsed by the Feathr job
# HOCON format will be parsed by the Feathr job
config_str = """
KAFKA_SASL_JAAS_CONFIG: "{sasl}"
""".format(sasl=sasl)
Expand Down Expand Up @@ -899,4 +917,4 @@ def _reshape_config_str(self, config_str:str):
if self.spark_runtime == 'local':
return "'{" + config_str + "}'"
else:
return config_str
return config_str
193 changes: 193 additions & 0 deletions feathr_project/feathr/definition/config_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
from feathr.definition.dtype import *
from feathr.registry.registry_utils import *
from feathr.utils._file_utils import write_to_file
from feathr.definition.anchor import FeatureAnchor
from feathr.constants import *
from feathr.definition.feature import Feature, FeatureType,FeatureBase
from feathr.definition.feature_derivations import DerivedFeature
from feathr.definition.repo_definitions import RepoDefinitions
from feathr.definition.source import HdfsSource, InputContext, JdbcSource, Source
from feathr.definition.transformation import (ExpressionTransformation, Transformation,
WindowAggTransformation)
from feathr.definition.typed_key import TypedKey
from feathr.registry.feature_registry import FeathrRegistry
from feathr.definition.repo_definitions import RepoDefinitions
from pathlib import Path
from jinja2 import Template
import sys
from feathr.utils._file_utils import write_to_file
import importlib
import os

class FeathrConfigHelper(object):
def __init__(self) -> None:
pass
def _get_py_files(self, path: Path) -> List[Path]:
"""Get all Python files under path recursively, excluding __init__.py"""
py_files = []
for item in path.glob('**/*.py'):
if "__init__.py" != item.name:
py_files.append(item)
return py_files

def _convert_to_module_path(self, path: Path, workspace_path: Path) -> str:
"""Convert a Python file path to its module path so that we can import it later"""
prefix = os.path.commonprefix(
[path.resolve(), workspace_path.resolve()])
resolved_path = str(path.resolve())
module_path = resolved_path[len(prefix): -len(".py")]
# Convert features under nested folder to module name
# e.g. /path/to/pyfile will become path.to.pyfile
return (
module_path
.lstrip('/')
.replace("/", ".")
)

def _extract_features_from_context(self, anchor_list, derived_feature_list, result_path: Path) -> RepoDefinitions:
"""Collect feature definitions from the context instead of python files"""
definitions = RepoDefinitions(
sources=set(),
features=set(),
transformations=set(),
feature_anchors=set(),
derived_features=set()
)
for derived_feature in derived_feature_list:
if isinstance(derived_feature, DerivedFeature):
definitions.derived_features.add(derived_feature)
definitions.transformations.add(
vars(derived_feature)["transform"])
else:
raise RuntimeError(f"Please make sure you pass a list of `DerivedFeature` objects to the `derived_feature_list` argument. {str(type(derived_feature))} is detected.")

for anchor in anchor_list:
# obj is `FeatureAnchor`
definitions.feature_anchors.add(anchor)
# add the source section of this `FeatureAnchor` object
definitions.sources.add(vars(anchor)['source'])
for feature in vars(anchor)['features']:
# get the transformation object from `Feature` or `DerivedFeature`
if isinstance(feature, Feature):
# feature is of type `Feature`
definitions.features.add(feature)
definitions.transformations.add(vars(feature)["transform"])
else:

raise RuntimeError(f"Please make sure you pass a list of `Feature` objects. {str(type(feature))} is detected.")

return definitions

def _extract_features(self, workspace_path: Path) -> RepoDefinitions:
"""Collect feature definitions from the python file, convert them into feature config and save them locally"""
os.chdir(workspace_path)
# Add workspace path to system path so that we can load features defined in Python via import_module
sys.path.append(str(workspace_path))
definitions = RepoDefinitions(
sources=set(),
features=set(),
transformations=set(),
feature_anchors=set(),
derived_features=set()
)
for py_file in self._get_py_files(workspace_path):
module_path = self._convert_to_module_path(py_file, workspace_path)
module = importlib.import_module(module_path)
for attr_name in dir(module):
obj = getattr(module, attr_name)
if isinstance(obj, Source):
definitions.sources.add(obj)
elif isinstance(obj, Feature):
definitions.features.add(obj)
elif isinstance(obj, DerivedFeature):
definitions.derived_features.add(obj)
elif isinstance(obj, FeatureAnchor):
definitions.feature_anchors.add(obj)
elif isinstance(obj, Transformation):
definitions.transformations.add(obj)
return definitions

def save_to_feature_config(self, workspace_path: Path, config_save_dir: Path):
"""Save feature definition within the workspace into HOCON feature config files"""
repo_definitions = self._extract_features(workspace_path)
self._save_request_feature_config(repo_definitions, config_save_dir)
self._save_anchored_feature_config(repo_definitions, config_save_dir)
self._save_derived_feature_config(repo_definitions, config_save_dir)

def save_to_feature_config_from_context(self, anchor_list, derived_feature_list, local_workspace_dir: Path):
"""Save feature definition within the workspace into HOCON feature config files from current context, rather than reading from python files"""
repo_definitions = self._extract_features_from_context(
anchor_list, derived_feature_list, local_workspace_dir)
self._save_request_feature_config(repo_definitions, local_workspace_dir)
self._save_anchored_feature_config(repo_definitions, local_workspace_dir)
self._save_derived_feature_config(repo_definitions, local_workspace_dir)

def _save_request_feature_config(self, repo_definitions: RepoDefinitions, local_workspace_dir="./"):
config_file_name = "feature_conf/auto_generated_request_features.conf"
tm = Template(
"""
// THIS FILE IS AUTO GENERATED. PLEASE DO NOT EDIT.
anchors: {
{% for anchor in feature_anchors %}
{% if anchor.source.name == "PASSTHROUGH" %}
{{anchor.to_feature_config()}}
{% endif %}
{% endfor %}
}
"""
)

request_feature_configs = tm.render(
feature_anchors=repo_definitions.feature_anchors)
config_file_path = os.path.join(local_workspace_dir, config_file_name)
write_to_file(content=request_feature_configs,
full_file_name=config_file_path)

@classmethod
def _save_anchored_feature_config(self, repo_definitions: RepoDefinitions, local_workspace_dir="./"):
config_file_name = "feature_conf/auto_generated_anchored_features.conf"
tm = Template(
"""
// THIS FILE IS AUTO GENERATED. PLEASE DO NOT EDIT.
anchors: {
{% for anchor in feature_anchors %}
{% if not anchor.source.name == "PASSTHROUGH" %}
{{anchor.to_feature_config()}}
{% endif %}
{% endfor %}
}
sources: {
{% for source in sources%}
{% if not source.name == "PASSTHROUGH" %}
{{source.to_feature_config()}}
{% endif %}
{% endfor %}
}
"""
)
anchored_feature_configs = tm.render(feature_anchors=repo_definitions.feature_anchors,
sources=repo_definitions.sources)
config_file_path = os.path.join(local_workspace_dir, config_file_name)
write_to_file(content=anchored_feature_configs,
full_file_name=config_file_path)

@classmethod
def _save_derived_feature_config(self, repo_definitions: RepoDefinitions, local_workspace_dir="./"):
config_file_name = "feature_conf/auto_generated_derived_features.conf"
tm = Template(
"""
anchors: {}
derivations: {
{% for derived_feature in derived_features %}
{{derived_feature.to_feature_config()}}
{% endfor %}
}
"""
)
derived_feature_configs = tm.render(
derived_features=repo_definitions.derived_features)
config_file_path = os.path.join(local_workspace_dir, config_file_name)
write_to_file(content=derived_feature_configs,
full_file_name=config_file_path)

Loading

0 comments on commit 799fac0

Please sign in to comment.