Skip to content

Commit

Permalink
Add Feast init command (#1414)
Browse files Browse the repository at this point in the history
* add feast init command

Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>

* tweak init

Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
  • Loading branch information
oavdeev committed Mar 30, 2021
1 parent 6d12589 commit 0892cd1
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 8 deletions.
31 changes: 26 additions & 5 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@
from feast.feature_table import FeatureTable
from feast.loaders.yaml import yaml_loader
from feast.repo_config import load_repo_config
from feast.repo_operations import apply_total, registry_dump, teardown
from feast.repo_operations import (
apply_total,
cli_check_repo,
init_repo,
registry_dump,
teardown,
)

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -360,22 +366,28 @@ def project_list():


@cli.command("apply")
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
@click.argument(
"repo_path", type=click.Path(dir_okay=True, exists=True), default=Path.cwd
)
def apply_total_command(repo_path: str):
"""
Applies a feature repo
"""
cli_check_repo(Path(repo_path))
repo_config = load_repo_config(Path(repo_path))

apply_total(repo_config, Path(repo_path).resolve())


@cli.command("teardown")
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
@click.argument(
"repo_path", type=click.Path(dir_okay=True, exists=True), default=Path.cwd
)
def teardown_command(repo_path: str):
"""
Tear down infra for a feature repo
"""
cli_check_repo(Path(repo_path))
repo_config = load_repo_config(Path(repo_path))

teardown(repo_config, Path(repo_path).resolve())
Expand All @@ -393,13 +405,15 @@ def registry_dump_command(repo_path: str):


@cli.command("materialize")
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
@click.argument("start_ts")
@click.argument("end_ts")
@click.argument(
"repo_path", type=click.Path(dir_okay=True, exists=True,), default=Path.cwd
)
@click.option(
"--views", "-v", help="Feature views to materialize", multiple=True,
)
def materialize_command(repo_path: str, start_ts: str, end_ts: str, views: List[str]):
def materialize_command(start_ts: str, end_ts: str, repo_path: str, views: List[str]):
"""
Run a (non-incremental) materialization job to ingest data into the online store. Feast
will read all data between START_TS and END_TS from the offline store and write it to the
Expand All @@ -416,5 +430,12 @@ def materialize_command(repo_path: str, start_ts: str, end_ts: str, views: List[
)


@cli.command("init")
@click.option("--minimal", "-m", is_flag=True, help="Only generate the config")
def init_command(minimal: bool):
repo_path = Path.cwd()
init_repo(repo_path, minimal)


if __name__ == "__main__":
cli()
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# This module generates dummy data to be used for tests and examples.
import numpy as np
import pandas as pd

Expand Down
36 changes: 36 additions & 0 deletions sdk/python/feast/example_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# This is an example feature definition file

from google.protobuf.duration_pb2 import Duration

from feast import Entity, Feature, FeatureView, ValueType
from feast.data_source import FileSource

# Read data from parquet files. Parquet is convenient for local development mode. For
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
# for more info.
driver_hourly_stats = FileSource(
path="%PARQUET_PATH%",
event_timestamp_column="datetime",
created_timestamp_column="created",
)

# Define an entity for the driver. You can think of entity as a primary key used to
# fetch features.
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",)

# Our parquet files contain sample data that includes a driver_id column, timestamps and
# three feature column. Here we define a Feature View that will allow us to serve this
# data to our model online.
driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=Duration(seconds=86400 * 1),
features=[
Feature(name="conv_rate", dtype=ValueType.FLOAT),
Feature(name="acc_rate", dtype=ValueType.FLOAT),
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],
online=True,
input=driver_hourly_stats,
tags={},
)
1 change: 1 addition & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def __repr__(self) -> str:

def load_repo_config(repo_path: Path) -> RepoConfig:
config_path = repo_path / "feature_store.yaml"

with open(config_path) as f:
raw_config = yaml.safe_load(f)
try:
Expand Down
80 changes: 80 additions & 0 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
import os
import random
import sys
from datetime import datetime, timedelta
from pathlib import Path
from textwrap import dedent
from typing import List, NamedTuple, Union

from feast import Entity, FeatureTable
from feast.driver_test_data import create_driver_hourly_stats_df
from feast.feature_view import FeatureView
from feast.infra.provider import get_provider
from feast.names import adjectives, animals
Expand Down Expand Up @@ -136,6 +139,83 @@ def registry_dump(repo_config: RepoConfig):
print(table)


def cli_check_repo(repo_path: Path):
config_path = repo_path / "feature_store.yaml"
if not config_path.exists():
print(
f"Can't find feature_store.yaml at {repo_path}. Make sure you're running this command in an initialized feast repository."
)
sys.exit(1)


def init_repo(repo_path: Path, minimal: bool):

repo_config = repo_path / "feature_store.yaml"

if repo_config.exists():
print("Feature repository is already initalized, nothing to do.")
sys.exit(1)

project_id = generate_project_name()

if minimal:
repo_config.write_text(
dedent(
f"""
project: {project_id}
metadata_store: /path/to/metadata.db
provider: local
online_store:
local:
path: /path/to/online_store.db
"""
)
)
print(
"Generated example feature_store.yaml. Please edit metadata_store and online_store"
"location before running apply"
)

else:
example_py = (Path(__file__).parent / "example_repo.py").read_text()

data_path = repo_path / "data"
data_path.mkdir(exist_ok=True)

end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=15)

driver_entities = [1001, 1002, 1003, 1004, 1005]
driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date)

driver_stats_path = data_path / "driver_stats.parquet"
driver_df.to_parquet(
path=str(driver_stats_path), allow_truncated_timestamps=True
)

with open(repo_path / "example.py", "wt") as f:
f.write(example_py.replace("%PARQUET_PATH%", str(driver_stats_path)))

# Generate config
repo_config.write_text(
dedent(
f"""
project: {project_id}
metadata_store: {"data/metadata.db"}
provider: local
online_store:
local:
path: {"data/online_store.db"}
"""
)
)

print("Generated feature_store.yaml and example features in example_repo.py")
print(
"Now try runing `feast apply` to apply, or `feast materialize` to sync data to the online store"
)


def generate_project_name() -> str:
"""Generates a unique project name"""
return f"{random.choice(adjectives)}_{random.choice(animals)}"
4 changes: 2 additions & 2 deletions sdk/python/tests/cli/test_e2e_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pandas as pd

import tests.driver_test_data as driver_data
import feast.driver_test_data as driver_data
from tests.cli.utils import CliRunner, get_example_repo


Expand Down Expand Up @@ -57,9 +57,9 @@ def test_basic(self) -> None:
r = runner.run(
[
"materialize",
str(store.repo_path),
start_date.isoformat(),
end_date.isoformat(),
str(store.repo_path),
],
cwd=Path(store.repo_path),
)
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/test_historical_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from google.cloud import bigquery
from pandas.testing import assert_frame_equal

import tests.driver_test_data as driver_data
import feast.driver_test_data as driver_data
from feast.data_source import BigQuerySource, FileSource
from feast.entity import Entity
from feast.feature import Feature
Expand Down
26 changes: 26 additions & 0 deletions sdk/python/tests/test_init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import tempfile
from datetime import datetime, timedelta
from pathlib import Path

from tests.cli.utils import CliRunner


def test_repo_init() -> None:
"""
This test simply makes sure that you can run `feast apply && feast materialize` on
the repo created by "feast init" without errors.
"""
runner = CliRunner()
with tempfile.TemporaryDirectory() as repo_dir_name:
repo_path = Path(repo_dir_name)
result = runner.run(["init"], cwd=repo_path)
assert result.returncode == 0
result = runner.run(["apply"], cwd=repo_path)
assert result.returncode == 0

end_date = datetime.utcnow()
start_date = end_date - timedelta(days=100)
result = runner.run(
["materialize", start_date.isoformat(), end_date.isoformat()], cwd=repo_path
)
assert result.returncode == 0

0 comments on commit 0892cd1

Please sign in to comment.