Skip to content

Commit

Permalink
Support parquet ingestion (#1410)
Browse files Browse the repository at this point in the history
* support parquet ingestion

Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>

* better comments

Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>

* Update sdk/python/feast/feature_store.py

Co-authored-by: Willem Pienaar <6728866+woop@users.noreply.github.com>

Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>

* better help for materialize command

Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
  • Loading branch information
oavdeev committed Mar 25, 2021
1 parent 04d2e3d commit 960f1ed
Show file tree
Hide file tree
Showing 11 changed files with 361 additions and 114 deletions.
29 changes: 28 additions & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
import json
import logging
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict
from typing import Dict, List

import click
import pkg_resources
import yaml
from pytz import utc

from feast.client import Client
from feast.config import Config
from feast.entity import Entity
from feast.feature_store import FeatureStore
from feast.feature_table import FeatureTable
from feast.loaders.yaml import yaml_loader
from feast.repo_config import load_repo_config
Expand Down Expand Up @@ -389,5 +392,29 @@ def registry_dump_command(repo_path: str):
registry_dump(repo_config)


@cli.command("materialize")
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
@click.argument("start_ts")
@click.argument("end_ts")
@click.option(
"--views", "-v", help="Feature views to materialize", multiple=True,
)
def materialize_command(repo_path: str, start_ts: str, end_ts: str, views: List[str]):
"""
Run a (non-incremental) materialization job to ingest data into the online store. Feast
will read all data between START_TS and END_TS from the offline store and write it to the
online store. If you don't specify feature view names using --views, all registred Feature
Views will be materialized.
START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
"""
store = FeatureStore(repo_path=repo_path)
store.materialize(
feature_views=None if not views else views,
start_date=datetime.fromisoformat(start_ts).replace(tzinfo=utc),
end_date=datetime.fromisoformat(end_ts).replace(tzinfo=utc),
)


if __name__ == "__main__":
cli()
29 changes: 22 additions & 7 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import pandas as pd
import pyarrow

from feast.data_source import FileSource
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.provider import Provider, get_provider
Expand Down Expand Up @@ -51,10 +50,12 @@ class FeatureStore:
"""

config: RepoConfig
repo_path: Optional[str]

def __init__(
self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None,
):
self.repo_path = repo_path
if repo_path is not None and config is not None:
raise ValueError("You cannot specify both repo_path and config")
if config is not None:
Expand Down Expand Up @@ -279,10 +280,6 @@ def materialize(
def _materialize_single_feature_view(
self, feature_view: FeatureView, start_date: datetime, end_date: datetime
) -> None:
if isinstance(feature_view.input, FileSource):
raise NotImplementedError(
"This function is not yet implemented for File data sources"
)
(
entity_names,
feature_names,
Expand Down Expand Up @@ -504,6 +501,23 @@ def _convert_arrow_to_proto(
table: pyarrow.Table, feature_view: FeatureView
) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
rows_to_write = []

def _coerce_datetime(ts):
"""
Depending on underlying time resolution, arrow to_pydict() sometimes returns pandas
timestamp type (for nanosecond resolution), and sometimes you get standard python datetime
(for microsecond resolution).
While pandas timestamp class is a subclass of python datetime, it doesn't always behave the
same way. We convert it to normal datetime so that consumers downstream don't have to deal
with these quirks.
"""

if isinstance(ts, pd.Timestamp):
return ts.to_pydatetime()
else:
return ts

for row in zip(*table.to_pydict().values()):
entity_key = EntityKeyProto()
for entity_name in feature_view.entities:
Expand All @@ -519,12 +533,13 @@ def _convert_arrow_to_proto(
event_timestamp_idx = table.column_names.index(
feature_view.input.event_timestamp_column
)
event_timestamp = row[event_timestamp_idx]
event_timestamp = _coerce_datetime(row[event_timestamp_idx])

if feature_view.input.created_timestamp_column is not None:
created_timestamp_idx = table.column_names.index(
feature_view.input.created_timestamp_column
)
created_timestamp = row[created_timestamp_idx]
created_timestamp = _coerce_datetime(row[created_timestamp_idx])
else:
created_timestamp = None

Expand Down
12 changes: 12 additions & 0 deletions sdk/python/feast/infra/local_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from datetime import datetime
from typing import Dict, List, Optional, Sequence, Tuple, Union

import pytz

from feast import FeatureTable, FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.provider import Provider
Expand All @@ -15,6 +17,13 @@ def _table_id(project: str, table: Union[FeatureTable, FeatureView]) -> str:
return f"{project}_{table.name}"


def _to_naive_utc(ts: datetime):
if ts.tzinfo is None:
return ts
else:
return ts.astimezone(pytz.utc).replace(tzinfo=None)


class LocalSqlite(Provider):
_db_path: str

Expand Down Expand Up @@ -64,6 +73,9 @@ def online_write_batch(
for entity_key, values, timestamp, created_ts in data:
for feature_name, val in values.items():
entity_key_bin = serialize_entity_key(entity_key)
timestamp = _to_naive_utc(timestamp)
if created_ts is not None:
created_ts = _to_naive_utc(created_ts)

conn.execute(
f"""
Expand Down
25 changes: 24 additions & 1 deletion sdk/python/feast/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,28 @@ def pull_latest_from_table_or_query(
start_date: datetime,
end_date: datetime,
) -> pyarrow.Table:
pass
assert isinstance(data_source, FileSource)
source_df = pd.read_parquet(data_source.path)

ts_columns = (
[event_timestamp_column, created_timestamp_column]
if created_timestamp_column is not None
else [event_timestamp_column]
)
source_df.sort_values(by=ts_columns, inplace=True)

filtered_df = source_df[
(source_df[event_timestamp_column] >= start_date)
& (source_df[event_timestamp_column] < end_date)
]
last_values_df = filtered_df.groupby(by=entity_names).last()

# make driver_id a normal column again
last_values_df.reset_index(inplace=True)

return pyarrow.Table.from_pandas(
last_values_df[entity_names + feature_names + ts_columns]
)

@staticmethod
def get_historical_features(
Expand Down Expand Up @@ -569,5 +590,7 @@ def _get_requested_feature_views_to_features_dict(
def get_offline_store(config: RepoConfig) -> Type[OfflineStore]:
if config.provider == "gcp":
return BigQueryOfflineStore
elif config.provider == "local":
return FileOfflineStore
else:
raise ValueError(config)
27 changes: 27 additions & 0 deletions sdk/python/tests/cli/example_feature_repo_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from google.protobuf.duration_pb2 import Duration

from feast import Entity, Feature, FeatureView, ValueType
from feast.data_source import FileSource

driver_hourly_stats = FileSource(
path="%PARQUET_PATH%", # placeholder to be replaced by the test
event_timestamp_column="datetime",
created_timestamp_column="created",
)

driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",)


driver_hourly_stats = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=Duration(seconds=86400 * 1),
features=[
Feature(name="conv_rate", dtype=ValueType.FLOAT),
Feature(name="acc_rate", dtype=ValueType.FLOAT),
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],
online=True,
input=driver_hourly_stats,
tags={},
)
87 changes: 87 additions & 0 deletions sdk/python/tests/cli/test_e2e_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import os
import tempfile
from datetime import datetime, timedelta
from pathlib import Path

import pandas as pd

import tests.driver_test_data as driver_data
from tests.cli.utils import CliRunner, get_example_repo


def _get_last_feature_row(df: pd.DataFrame, driver_id):
""" Manually extract last feature value from a dataframe for a given driver_id """
filtered = df[df["driver_id"] == driver_id]
max_ts = filtered.loc[filtered["datetime"].idxmax()]["datetime"]
filtered_by_ts = filtered[filtered["datetime"] == max_ts]
return filtered_by_ts.loc[filtered_by_ts["created"].idxmax()]


class TestLocalEndToEnd:
def test_basic(self) -> None:
"""
1. Create a repo.
2. Apply
3. Ingest some data to online store from parquet
4. Read from the online store to make sure it made it there.
"""

runner = CliRunner()
with tempfile.TemporaryDirectory() as data_dir:

# Generate some test data in parquet format.
end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=15)

driver_entities = [1001, 1002, 1003, 1004, 1005]
driver_df = driver_data.create_driver_hourly_stats_df(
driver_entities, start_date, end_date
)

driver_stats_path = os.path.join(data_dir, "driver_stats.parquet")
driver_df.to_parquet(
path=driver_stats_path, allow_truncated_timestamps=True
)

# Note that runner takes care of running apply/teardown for us here.
# We patch python code in example_feature_repo_2.py to set the path to Parquet files.
with runner.local_repo(
get_example_repo("example_feature_repo_2.py").replace(
"%PARQUET_PATH%", driver_stats_path
)
) as store:

assert store.repo_path is not None

# feast materialize
r = runner.run(
[
"materialize",
str(store.repo_path),
start_date.isoformat(),
end_date.isoformat(),
],
cwd=Path(store.repo_path),
)

assert r.returncode == 0

# Read features back
result = store.get_online_features(
feature_refs=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:avg_daily_trips",
],
entity_rows=[{"driver_id": 1001}],
)

assert "driver_hourly_stats:avg_daily_trips" in result.to_dict()

assert "driver_hourly_stats:conv_rate" in result.to_dict()
assert (
abs(
result.to_dict()["driver_hourly_stats:conv_rate"][0]
- _get_last_feature_row(driver_df, 1001)["conv_rate"]
)
< 0.01
)
4 changes: 2 additions & 2 deletions sdk/python/tests/cli/test_online_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from tests.cli.utils import CliRunner
from tests.cli.utils import CliRunner, get_example_repo


class TestOnlineRetrieval:
def test_basic(self) -> None:
runner = CliRunner()
with runner.local_repo("example_feature_repo_1.py") as store:
with runner.local_repo(get_example_repo("example_feature_repo_1.py")) as store:

# Write some data to two tables
registry = store._get_registry()
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/tests/cli/test_partial_apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from feast import BigQuerySource, Feature, FeatureView, ValueType
from tests.cli.online_read_write_test import basic_rw_test
from tests.cli.utils import CliRunner
from tests.cli.utils import CliRunner, get_example_repo


class TestOnlineRetrieval:
Expand All @@ -13,7 +13,7 @@ def test_basic(self) -> None:
"""

runner = CliRunner()
with runner.local_repo("example_feature_repo_1.py") as store:
with runner.local_repo(get_example_repo("example_feature_repo_1.py")) as store:

driver_locations_source = BigQuerySource(
table_ref="rh_prod.ride_hailing_co.drivers",
Expand Down
13 changes: 7 additions & 6 deletions sdk/python/tests/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from feast.feature_store import FeatureStore


def get_example_repo(example_repo_py) -> str:
return (Path(__file__).parent / example_repo_py).read_text()


class CliRunner:
"""
NB. We can't use test runner helper from click here, since it doesn't start a new Python
Expand All @@ -27,9 +31,8 @@ def local_repo(self, example_repo_py: str):
"""
Convenience method to set up all the boilerplate for a local feature repo.
"""
project_id = "".join(
"test" + random.choice(string.ascii_lowercase + string.digits)
for _ in range(10)
project_id = "test" + "".join(
random.choice(string.ascii_lowercase + string.digits) for _ in range(10)
)

with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name:
Expand All @@ -53,9 +56,7 @@ def local_repo(self, example_repo_py: str):
)

repo_example = repo_path / "example.py"
repo_example.write_text(
(Path(__file__).parent / example_repo_py).read_text()
)
repo_example.write_text(example_repo_py)

result = self.run(["apply", str(repo_path)], cwd=repo_path)
assert result.returncode == 0
Expand Down

0 comments on commit 960f1ed

Please sign in to comment.