Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow passing repo config path via flag #3077

Merged
merged 7 commits into from Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -64,7 +64,7 @@ def main():
print("Running setup_it.py")

setup_data()
existing_repo_config = load_repo_config(Path("."))
existing_repo_config = load_repo_config(Path("."), Path(".") / "feature_store.yaml")

# Update to default online store since otherwise, relies on Dockerized Redis service
fs = FeatureStore(config=existing_repo_config.copy(update={"online_store": {}}))
Expand Down
126 changes: 82 additions & 44 deletions sdk/python/feast/cli.py
Expand Up @@ -72,8 +72,17 @@ def format_options(self, ctx: click.Context, formatter: click.HelpFormatter):
default="info",
help="The logging level. One of DEBUG, INFO, WARNING, ERROR, and CRITICAL (case-insensitive).",
)
@click.option(
achals marked this conversation as resolved.
Show resolved Hide resolved
"--feature-store-yaml",
help="Override the directory where the CLI should look for the feature_store.yaml file.",
)
@click.pass_context
def cli(ctx: click.Context, chdir: Optional[str], log_level: str):
def cli(
ctx: click.Context,
chdir: Optional[str],
log_level: str,
feature_store_yaml: Optional[str],
):
"""
Feast CLI

Expand All @@ -83,6 +92,11 @@ def cli(ctx: click.Context, chdir: Optional[str], log_level: str):
"""
ctx.ensure_object(dict)
ctx.obj["CHDIR"] = Path.cwd() if chdir is None else Path(chdir).absolute()
ctx.obj["FS_YAML_FILE"] = (
Path(feature_store_yaml).absolute()
if feature_store_yaml
else ctx.obj["CHDIR"] / "feature_store.yaml"
)
try:
level = getattr(logging, log_level.upper())
logging.basicConfig(
Expand Down Expand Up @@ -143,8 +157,9 @@ def ui(ctx: click.Context, host: str, port: int, registry_ttl_sec: int):
Shows the Feast UI over the current directory
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
# Pass in the registry_dump method to get around a circular dependency
store.serve_ui(
host=host,
Expand All @@ -161,8 +176,9 @@ def endpoint(ctx: click.Context):
Display feature server endpoints
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
endpoint = store.get_feature_server_endpoint()
if endpoint is not None:
_logger.info(
Expand All @@ -188,8 +204,9 @@ def data_source_describe(ctx: click.Context, name: str):
Describe a data source
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)

try:
data_source = store.get_data_source(name)
Expand All @@ -216,8 +233,9 @@ def data_source_list(ctx: click.Context):
List all data sources
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
table = []
for datasource in store.list_data_sources():
table.append([datasource.name, datasource.__class__])
Expand Down Expand Up @@ -248,8 +266,9 @@ def entity_describe(ctx: click.Context, name: str):
Describe an entity
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)

try:
entity = store.get_entity(name)
Expand All @@ -271,8 +290,9 @@ def entity_list(ctx: click.Context):
List all entities
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
table = []
for entity in store.list_entities():
table.append([entity.name, entity.description, entity.value_type])
Expand All @@ -298,8 +318,9 @@ def feature_service_describe(ctx: click.Context, name: str):
Describe a feature service
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)

try:
feature_service = store.get_feature_service(name)
Expand All @@ -323,8 +344,9 @@ def feature_service_list(ctx: click.Context):
List all feature services
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
feature_services = []
for feature_service in store.list_feature_services():
feature_names = []
Expand Down Expand Up @@ -355,8 +377,9 @@ def feature_view_describe(ctx: click.Context, name: str):
Describe a feature view
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)

try:
feature_view = store.get_feature_view(name)
Expand All @@ -378,8 +401,10 @@ def feature_view_list(ctx: click.Context):
List all feature views
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
fs_yaml_file = ctx.obj["FS_YAML_FILE"]

cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
table = []
for feature_view in [
*store.list_feature_views(),
Expand Down Expand Up @@ -421,8 +446,9 @@ def on_demand_feature_view_describe(ctx: click.Context, name: str):
[Experimental] Describe an on demand feature view
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)

try:
on_demand_feature_view = store.get_on_demand_feature_view(name)
Expand All @@ -446,8 +472,9 @@ def on_demand_feature_view_list(ctx: click.Context):
[Experimental] List all on demand feature views
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
table = []
for on_demand_feature_view in store.list_on_demand_feature_views():
table.append([on_demand_feature_view.name])
Expand All @@ -469,8 +496,9 @@ def plan_command(ctx: click.Context, skip_source_validation: bool):
Create or update a feature store deployment
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
repo_config = load_repo_config(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
repo_config = load_repo_config(repo, fs_yaml_file)
try:
plan(repo_config, repo, skip_source_validation)
except FeastProviderLoginError as e:
Expand All @@ -489,8 +517,10 @@ def apply_total_command(ctx: click.Context, skip_source_validation: bool):
Create or update a feature store deployment
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
repo_config = load_repo_config(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)

repo_config = load_repo_config(repo, fs_yaml_file)
try:
apply_total(repo_config, repo, skip_source_validation)
except FeastProviderLoginError as e:
Expand All @@ -504,8 +534,9 @@ def teardown_command(ctx: click.Context):
Tear down deployed feature store infrastructure
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
repo_config = load_repo_config(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
repo_config = load_repo_config(repo, fs_yaml_file)

teardown(repo_config, repo)

Expand All @@ -517,8 +548,9 @@ def registry_dump_command(ctx: click.Context):
Print contents of the metadata registry
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
repo_config = load_repo_config(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
repo_config = load_repo_config(repo, fs_yaml_file)

click.echo(registry_dump(repo_config, repo_path=repo))

Expand All @@ -545,8 +577,9 @@ def materialize_command(
START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store.materialize(
feature_views=None if not views else views,
start_date=utils.make_tzaware(parser.parse(start_ts)),
Expand All @@ -573,8 +606,9 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List
END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store.materialize_incremental(
feature_views=None if not views else views,
end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)),
Expand Down Expand Up @@ -663,8 +697,9 @@ def serve_command(
):
"""Start a feature server locally on a given port."""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)

if go:
# Turn on Go feature retrieval.
Expand All @@ -685,8 +720,9 @@ def serve_command(
def serve_transformations_command(ctx: click.Context, port: int):
"""[Experimental] Start a feature consumption server locally on a given port."""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)

store.serve_transformations(port)

Expand Down Expand Up @@ -724,8 +760,9 @@ def validate(
START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)

feature_service = store.get_feature_service(name=feature_service)
reference = store.get_validation_reference(reference)
Expand Down Expand Up @@ -766,7 +803,8 @@ def repo_upgrade(ctx: click.Context, write: bool):
Upgrade a feature repo in place.
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
try:
RepoUpgrader(repo, write).upgrade()
except FeastProviderLoginError as e:
Expand Down
34 changes: 23 additions & 11 deletions sdk/python/feast/feature_store.py
Expand Up @@ -114,30 +114,41 @@ class FeatureStore:
repo_path: Path
_registry: BaseRegistry
_provider: Provider
_go_server: "EmbeddedOnlineFeatureServer"
_go_server: Optional["EmbeddedOnlineFeatureServer"]

@log_exceptions
def __init__(
self,
repo_path: Optional[str] = None,
config: Optional[RepoConfig] = None,
fs_yaml_file: Optional[Path] = None,
):
"""
Creates a FeatureStore object.

Raises:
ValueError: If both or neither of repo_path and config are specified.
"""
if repo_path is not None and config is not None:
raise ValueError("You cannot specify both repo_path and config.")
if config is not None:
if fs_yaml_file is not None and config is not None:
achals marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("You cannot specify both fs_yaml_dir and config.")

if repo_path:
self.repo_path = Path(repo_path)
else:
self.repo_path = Path(os.getcwd())

# If config is specified, or fs_yaml_file is specified, those take precedence over
# the default feature_store.yaml location under repo_path.
if config is not None:
self.config = config
elif repo_path is not None:
self.repo_path = Path(repo_path)
self.config = load_repo_config(Path(repo_path))
elif fs_yaml_file is not None:
self.config = load_repo_config(self.repo_path, fs_yaml_file)
elif repo_path:
self.config = load_repo_config(
self.repo_path, Path(repo_path) / "feature_store.yaml"
)
else:
raise ValueError("Please specify one of repo_path or config.")
raise ValueError("Please specify one of fs_yaml_dir or config.")

registry_config = self.config.get_registry_config()
if registry_config.registry_type == "sql":
Expand All @@ -146,7 +157,8 @@ def __init__(
r = Registry(registry_config, repo_path=self.repo_path)
r._initialize_registry(self.config.project)
self._registry = r
self._provider = get_provider(self.config, self.repo_path)

self._provider = get_provider(self.config)
self._go_server = None

@log_exceptions
Expand Down Expand Up @@ -1569,7 +1581,7 @@ def _get_online_features(
}

# If the embedded Go code is enabled, send request to it instead of going through regular Python logic.
if self.config.go_feature_retrieval:
if self.config.go_feature_retrieval and self._go_server:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@achals What was the reason for adding and self._go_server to this condition expression? This seems to be the cause of why the go server will not start and trying to understand if there was another issue that was being solved for.

self._lazy_init_go_server()

entity_native_values: Dict[str, List[Any]]
Expand Down Expand Up @@ -2221,7 +2233,7 @@ def serve(
) -> None:
"""Start the feature consumption server locally on a given port."""
type_ = type_.lower()
if self.config.go_feature_serving:
if self.config.go_feature_serving and self._go_server:
# Start go server instead of python if the flag is enabled
self._lazy_init_go_server()
enable_logging = (
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/provider.py
Expand Up @@ -297,7 +297,7 @@ def get_feature_server_endpoint(self) -> Optional[str]:
return None


def get_provider(config: RepoConfig, repo_path: Path) -> Provider:
def get_provider(config: RepoConfig) -> Provider:
if "." not in config.provider:
if config.provider not in PROVIDERS_CLASS_FOR_TYPE:
raise errors.FeastProviderNotImplementedError(config.provider)
Expand Down