Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the foundation of the universal feature repo and a test that uses it #1734

Merged
merged 20 commits into from Aug 4, 2021
Merged
1 change: 1 addition & 0 deletions sdk/python/feast/repo_config.py
Expand Up @@ -80,6 +80,7 @@ class RepoConfig(FeastBaseModel):

def __init__(self, **data: Any):
super().__init__(**data)

if isinstance(self.online_store, Dict):
self.online_store = get_online_config_from_type(self.online_store["type"])(
**self.online_store
Expand Down
27 changes: 27 additions & 0 deletions sdk/python/tests/data/data_creator.py
@@ -0,0 +1,27 @@
from datetime import datetime, timedelta

import pandas as pd
from pytz import timezone, utc


def create_dataset() -> pd.DataFrame:
now = datetime.utcnow()
ts = pd.Timestamp(now).round("ms")
data = {
"id": [1, 2, 1, 3, 3],
"value": [0.1, None, 0.3, 4, 5],
"ts_1": [
ts - timedelta(hours=4),
ts,
ts - timedelta(hours=3),
# Use different time zones to test tz-naive -> tz-aware conversion
(ts - timedelta(hours=4))
.replace(tzinfo=utc)
.astimezone(tz=timezone("Europe/Berlin")),
(ts - timedelta(hours=1))
.replace(tzinfo=utc)
.astimezone(tz=timezone("US/Pacific")),
],
"created_ts": [ts, ts, ts, ts, ts],
}
return pd.DataFrame.from_dict(data)
achals marked this conversation as resolved.
Show resolved Hide resolved
126 changes: 126 additions & 0 deletions sdk/python/tests/integration/e2e/test_universal_e2e.py
@@ -0,0 +1,126 @@
import math
from datetime import datetime, timedelta
from typing import Optional

import pandas as pd
from pytz import utc

from feast import FeatureStore, FeatureView
from tests.integration.feature_repos.test_repo_configuration import parametrize_e2e_test


# @pytest.mark.integration
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
# @pytest.mark.parametrize("config", FULL_REPO_CONFIGS)
@parametrize_e2e_test
def test_e2e_consistency(fs: FeatureStore):
# with construct_feature_store(config) as fs:
run_offline_online_store_consistency_test(fs)


def check_offline_and_online_features(
fs: FeatureStore,
fv: FeatureView,
driver_id: int,
event_timestamp: datetime,
expected_value: Optional[float],
full_feature_names: bool,
check_offline_store: bool = True,
) -> None:
# Check online store
response_dict = fs.get_online_features(
[f"{fv.name}:value"],
[{"driver": driver_id}],
full_feature_names=full_feature_names,
).to_dict()

if full_feature_names:
if expected_value:
assert abs(response_dict[f"{fv.name}__value"][0] - expected_value) < 1e-6
else:
assert response_dict[f"{fv.name}__value"][0] is None
else:
if expected_value:
assert abs(response_dict["value"][0] - expected_value) < 1e-6
else:
assert response_dict["value"][0] is None

# Check offline store
if check_offline_store:
df = fs.get_historical_features(
entity_df=pd.DataFrame.from_dict(
{"driver_id": [driver_id], "event_timestamp": [event_timestamp]}
),
features=[f"{fv.name}:value"],
full_feature_names=full_feature_names,
).to_df()

if full_feature_names:
if expected_value:
assert abs(df.to_dict()[f"{fv.name}__value"][0] - expected_value) < 1e-6
else:
assert math.isnan(df.to_dict()[f"{fv.name}__value"][0])
else:
if expected_value:
assert abs(df.to_dict()["value"][0] - expected_value) < 1e-6
else:
assert math.isnan(df.to_dict()["value"][0])


def run_offline_online_store_consistency_test(fs: FeatureStore,) -> None:
now = datetime.utcnow()

fv = fs.get_feature_view("test_correctness")
full_feature_names = True
check_offline_store: bool = True

# Run materialize()
# use both tz-naive & tz-aware timestamps to test that they're both correctly handled
start_date = (now - timedelta(hours=5)).replace(tzinfo=utc)
end_date = now - timedelta(hours=2)
fs.materialize(feature_views=[fv.name], start_date=start_date, end_date=end_date)

# check result of materialize()
check_offline_and_online_features(
fs=fs,
fv=fv,
driver_id=1,
event_timestamp=end_date,
expected_value=0.3,
full_feature_names=full_feature_names,
check_offline_store=check_offline_store,
)

check_offline_and_online_features(
fs=fs,
fv=fv,
driver_id=2,
event_timestamp=end_date,
expected_value=None,
full_feature_names=full_feature_names,
check_offline_store=check_offline_store,
)

# check prior value for materialize_incremental()
check_offline_and_online_features(
fs=fs,
fv=fv,
driver_id=3,
event_timestamp=end_date,
expected_value=4,
full_feature_names=full_feature_names,
check_offline_store=check_offline_store,
)

# run materialize_incremental()
fs.materialize_incremental(feature_views=[fv.name], end_date=now)

# check result of materialize_incremental()
check_offline_and_online_features(
fs=fs,
fv=fv,
driver_id=3,
event_timestamp=now,
expected_value=5,
full_feature_names=full_feature_names,
check_offline_store=check_offline_store,
)
109 changes: 109 additions & 0 deletions sdk/python/tests/integration/feature_repos/test_repo_configuration.py
@@ -0,0 +1,109 @@
import tempfile
import uuid
from contextlib import contextmanager
from pathlib import Path
from typing import Dict, List, Union

import pytest
from attr import dataclass

from feast import FeatureStore, RepoConfig, importer
from tests.data.data_creator import create_dataset
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
from tests.integration.feature_repos.universal.entities import driver
from tests.integration.feature_repos.universal.feature_views import (
correctness_feature_view,
)


@dataclass
class TestRepoConfig:
"""
This class should hold all possible parameters that may need to be varied by individual tests.
"""

provider: str = "local"
online_store: Union[str, Dict] = "sqlite"

offline_store_creator: str = "tests.integration.feature_repos.universal.data_sources.file.FileDataSourceCreator"

full_feature_names: bool = True


FULL_REPO_CONFIGS: List[TestRepoConfig] = [
TestRepoConfig(), # Local
TestRepoConfig(
provider="aws",
offline_store_creator="tests.integration.feature_repos.universal.data_sources.redshift.RedshiftDataSourceCreator",
online_store={"type": "dynamodb", "region": "us-west-2"},
),
TestRepoConfig(
provider="gcp",
offline_store_creator="tests.integration.feature_repos.universal.data_sources.bigquery.BigQueryDataSourceCreator",
online_store="datastore",
),
]


OFFLINE_STORES: List[str] = []
ONLINE_STORES: List[str] = []
PROVIDERS: List[str] = []


@contextmanager
def construct_feature_store(test_repo_config: TestRepoConfig) -> FeatureStore:
"""
This method should take in the parameters from the test repo config and created a feature repo, apply it,
and return the constructed feature store object to callers.

This feature store object can be interacted for the purposes of tests.
The user is *not* expected to perform any clean up actions.

:param test_repo_config: configuration
:return: A feature store built using the supplied configuration.
"""
df = create_dataset()

project = f"test_correctness_{str(uuid.uuid4()).replace('-', '')[:8]}"

module_name, config_class_name = test_repo_config.offline_store_creator.rsplit(
".", 1
)

offline_creator: DataSourceCreator = importer.get_class_from_type(
module_name, config_class_name, "DataSourceCreator"
)()
ds = offline_creator.create_data_source(project, df)
offline_store = offline_creator.create_offline_store_config()
online_store = test_repo_config.online_store

with tempfile.TemporaryDirectory() as repo_dir_name:
config = RepoConfig(
registry=str(Path(repo_dir_name) / "registry.db"),
project=project,
provider=test_repo_config.provider,
offline_store=offline_store,
online_store=online_store,
repo_path=repo_dir_name,
)
fs = FeatureStore(config=config)
fv = correctness_feature_view(ds)
entity = driver()
fs.apply([fv, entity])

yield fs

fs.teardown()
offline_creator.teardown(project)


def parametrize_e2e_test(e2e_test):
@pytest.mark.integration
@pytest.mark.parametrize("config", FULL_REPO_CONFIGS, ids=lambda v: v.provider)
def inner_test(config):
with construct_feature_store(config) as fs:
e2e_test(fs)

return inner_test
@@ -0,0 +1,26 @@
from abc import ABC, abstractmethod

import pandas as pd

from feast.data_source import DataSource
from feast.repo_config import FeastConfigBaseModel


class DataSourceCreator(ABC):
@abstractmethod
def create_data_source(
self,
name: str,
df: pd.DataFrame,
event_timestamp_column="ts",
created_timestamp_column="created_ts",
) -> DataSource:
...

@abstractmethod
def create_offline_store_config(self) -> FeastConfigBaseModel:
...

@abstractmethod
def teardown(self, name: str):
...
@@ -0,0 +1,54 @@
import time

import pandas as pd
from google.cloud import bigquery

from feast import BigQuerySource
from feast.data_source import DataSource
from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)


class BigQueryDataSourceCreator(DataSourceCreator):
def teardown(self, name: str):
pass

def __init__(self):
self.client = bigquery.Client()

def create_offline_store_config(self):
return BigQueryOfflineStoreConfig()

def create_data_source(
self,
name: str,
df: pd.DataFrame,
event_timestamp_column="ts",
created_timestamp_column="created_ts",
**kwargs,
) -> DataSource:
gcp_project = self.client.project
bigquery_dataset = "test_ingestion"
dataset = bigquery.Dataset(f"{gcp_project}.{bigquery_dataset}")
self.client.create_dataset(dataset, exists_ok=True)
dataset.default_table_expiration_ms = (
achals marked this conversation as resolved.
Show resolved Hide resolved
1000 * 60 * 60 * 24 * 14
) # 2 weeks in milliseconds
self.client.update_dataset(dataset, ["default_table_expiration_ms"])

job_config = bigquery.LoadJobConfig()
table_ref = f"{gcp_project}.{bigquery_dataset}.{name}_{int(time.time_ns())}"
job = self.client.load_table_from_dataframe(
df, table_ref, job_config=job_config
)
job.result()

return BigQuerySource(
table_ref=table_ref,
event_timestamp_column=event_timestamp_column,
created_timestamp_column=created_timestamp_column,
date_partition_column="",
field_mapping={"ts_1": "ts", "id": "driver_id"},
)
@@ -0,0 +1,41 @@
import tempfile
from typing import Any

import pandas as pd

from feast import FileSource
from feast.data_format import ParquetFormat
from feast.data_source import DataSource
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from feast.repo_config import FeastConfigBaseModel
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)


class FileDataSourceCreator(DataSourceCreator):
f: Any

def create_data_source(
self,
name: str,
df: pd.DataFrame,
event_timestamp_column="ts",
created_timestamp_column="created_ts",
) -> DataSource:
self.f = tempfile.NamedTemporaryFile(suffix=".parquet", delete=False)
df.to_parquet(self.f.name)
return FileSource(
file_format=ParquetFormat(),
path=f"file://{self.f.name}",
event_timestamp_column=event_timestamp_column,
created_timestamp_column=created_timestamp_column,
date_partition_column="",
field_mapping={"ts_1": "ts", "id": "driver_id"},
)

def create_offline_store_config(self) -> FeastConfigBaseModel:
return FileOfflineStoreConfig()

def teardown(self, name: str):
self.f.close()