Skip to content

Commit

Permalink
feat: Validating logged features via Python SDK (#2640)
Browse files Browse the repository at this point in the history
* simple logged feature validation

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* validate with metadata

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* typos

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* revert entity columns

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed May 16, 2022
1 parent af57a89 commit 2874fc5
Show file tree
Hide file tree
Showing 8 changed files with 312 additions and 62 deletions.
38 changes: 35 additions & 3 deletions sdk/python/feast/dqm/profilers/ge_profiler.py
Expand Up @@ -21,6 +21,7 @@
from feast.protos.feast.core.ValidationProfile_pb2 import (
GEValidationProfiler as GEValidationProfilerProto,
)
from feast.protos.feast.serving.ServingService_pb2 import FieldStatus


def _prepare_dataset(dataset: PandasDataset) -> PandasDataset:
Expand All @@ -41,6 +42,23 @@ def _prepare_dataset(dataset: PandasDataset) -> PandasDataset:
return dataset_copy


def _add_feature_metadata(dataset: PandasDataset) -> PandasDataset:
for column in dataset.columns:
if "__" not in column:
# not a feature column
continue

if "event_timestamp" in dataset.columns:
dataset[f"{column}__timestamp"] = dataset["event_timestamp"]

dataset[f"{column}__status"] = FieldStatus.PRESENT
dataset[f"{column}__status"] = dataset[f"{column}__status"].mask(
dataset[column].isna(), FieldStatus.NOT_FOUND
)

return dataset


class GEProfile(Profile):
"""
GEProfile is an implementation of abstract Profile for integration with Great Expectations.
Expand Down Expand Up @@ -96,9 +114,12 @@ class GEProfiler(Profiler):
"""

def __init__(
self, user_defined_profiler: Callable[[pd.DataFrame], ExpectationSuite]
self,
user_defined_profiler: Callable[[pd.DataFrame], ExpectationSuite],
with_feature_metadata: bool = False,
):
self.user_defined_profiler = user_defined_profiler
self.with_feature_metadata = with_feature_metadata

def analyze_dataset(self, df: pd.DataFrame) -> Profile:
"""
Expand All @@ -113,6 +134,9 @@ def analyze_dataset(self, df: pd.DataFrame) -> Profile:

dataset = _prepare_dataset(dataset)

if self.with_feature_metadata:
dataset = _add_feature_metadata(dataset)

return GEProfile(expectation_suite=self.user_defined_profiler(dataset))

def to_proto(self):
Expand Down Expand Up @@ -158,5 +182,13 @@ def __repr__(self):
return json.dumps(failed_expectations, indent=2)


def ge_profiler(func):
return GEProfiler(user_defined_profiler=func)
def ge_profiler(*args, with_feature_metadata=False):
def wrapper(fun):
return GEProfiler(
user_defined_profiler=fun, with_feature_metadata=with_feature_metadata
)

if args:
return wrapper(args[0])

return wrapper
5 changes: 5 additions & 0 deletions sdk/python/feast/feature_logging.py
Expand Up @@ -11,6 +11,7 @@
FeatureViewNotFoundException,
OnDemandFeatureViewNotFoundException,
)
from feast.feature_view import DUMMY_ENTITY_NAME
from feast.protos.feast.core.FeatureService_pb2 import (
LoggingConfig as LoggingConfigProto,
)
Expand Down Expand Up @@ -77,7 +78,11 @@ def get_schema(self, registry: "Registry") -> pa.Schema:

else:
for entity_name in feature_view.entities:
if entity_name == DUMMY_ENTITY_NAME:
continue

entity = registry.get_entity(entity_name, self._project)

join_key = projection.join_key_map.get(
entity.join_key, entity.join_key
)
Expand Down
55 changes: 54 additions & 1 deletion sdk/python/feast/feature_store.py
Expand Up @@ -45,6 +45,7 @@
from feast.data_source import DataSource
from feast.diff.infra_diff import InfraDiff, diff_infra_protos
from feast.diff.registry_diff import RegistryDiff, apply_diff_to_registry, diff_between
from feast.dqm.errors import ValidationFailed
from feast.entity import Entity
from feast.errors import (
EntityNotFoundException,
Expand Down Expand Up @@ -83,7 +84,7 @@
from feast.repo_config import RepoConfig, load_repo_config
from feast.repo_contents import RepoContents
from feast.request_feature_view import RequestFeatureView
from feast.saved_dataset import SavedDataset, SavedDatasetStorage
from feast.saved_dataset import SavedDataset, SavedDatasetStorage, ValidationReference
from feast.type_map import (
feast_value_type_to_python_type,
python_values_to_proto_values,
Expand Down Expand Up @@ -2054,6 +2055,58 @@ def write_logged_features(
registry=self._registry,
)

def validate_logged_features(
self,
source: Union[FeatureService],
start: datetime,
end: datetime,
reference: ValidationReference,
throw_exception: bool = True,
) -> Optional[ValidationFailed]:
"""
Load logged features from an offline store and validate them against provided validation reference.
Args:
source: Logs source object (currently only feature services are supported)
start: lower bound for loading logged features
end: upper bound for loading logged features
reference: validation reference
throw_exception: throw exception or return it as a result
Returns:
Throw or return (depends on parameter) ValidationFailed exception if validation was not successful
or None if successful.
"""
warnings.warn(
"Logged features validation is an experimental feature. "
"This API is unstable and it could and most probably will be changed in the future. "
"We do not guarantee that future changes will maintain backward compatibility.",
RuntimeWarning,
)

if not isinstance(source, FeatureService):
raise ValueError("Only feature service is currently supported as a source")

j = self._get_provider().retrieve_feature_service_logs(
feature_service=source,
start_date=start,
end_date=end,
config=self.config,
registry=self.registry,
)

# read and run validation
try:
j.to_arrow(validation_reference=reference)
except ValidationFailed as exc:
if throw_exception:
raise

return exc

return None


def _validate_entity_values(join_key_values: Dict[str, List[Value]]):
set_of_row_lengths = {len(v) for v in join_key_values.values()}
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/feature_view.py
Expand Up @@ -285,6 +285,7 @@ def __copy__(self):
online=self.online,
)
fv.projection = copy.copy(self.projection)
fv.entities = self.entities
return fv

def __eq__(self, other):
Expand Down
16 changes: 12 additions & 4 deletions sdk/python/feast/infra/offline_stores/file.py
Expand Up @@ -73,6 +73,7 @@ def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
def _to_df_internal(self) -> pd.DataFrame:
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function().compute()
df = df.reset_index(drop=True)
return df

@log_exceptions_and_usage
Expand Down Expand Up @@ -555,11 +556,18 @@ def _filter_ttl(
# Filter rows by defined timestamp tolerance
if feature_view.ttl and feature_view.ttl.total_seconds() != 0:
df_to_join = df_to_join[
(
df_to_join[timestamp_field]
>= df_to_join[entity_df_event_timestamp_col] - feature_view.ttl
# do not drop entity rows if one of the sources returns NaNs
df_to_join[timestamp_field].isna()
| (
(
df_to_join[timestamp_field]
>= df_to_join[entity_df_event_timestamp_col] - feature_view.ttl
)
& (
df_to_join[timestamp_field]
<= df_to_join[entity_df_event_timestamp_col]
)
)
& (df_to_join[timestamp_field] <= df_to_join[entity_df_event_timestamp_col])
]

df_to_join = df_to_join.persist()
Expand Down
130 changes: 130 additions & 0 deletions sdk/python/tests/integration/e2e/test_validation.py
@@ -1,10 +1,21 @@
import datetime

import pandas as pd
import pyarrow as pa
import pytest
from great_expectations.core import ExpectationSuite
from great_expectations.dataset import PandasDataset

from feast import FeatureService
from feast.dqm.errors import ValidationFailed
from feast.dqm.profilers.ge_profiler import ge_profiler
from feast.feature_logging import (
LOG_TIMESTAMP_FIELD,
FeatureServiceLoggingSource,
LoggingConfig,
)
from feast.protos.feast.serving.ServingService_pb2 import FieldStatus
from feast.wait import wait_retry_backoff
from tests.integration.feature_repos.repo_configuration import (
construct_universal_feature_views,
)
Expand All @@ -13,6 +24,7 @@
driver,
location,
)
from tests.utils.logged_features import prepare_logs

_features = [
"customer_profile:current_balance",
Expand All @@ -32,6 +44,39 @@ def configurable_profiler(dataset: PandasDataset) -> ExpectationSuite:

return UserConfigurableProfiler(
profile_dataset=dataset,
ignored_columns=["event_timestamp"],
excluded_expectations=[
"expect_table_columns_to_match_ordered_list",
"expect_table_row_count_to_be_between",
],
value_set_threshold="few",
).build_suite()


@ge_profiler(with_feature_metadata=True)
def profiler_with_feature_metadata(dataset: PandasDataset) -> ExpectationSuite:
from great_expectations.profile.user_configurable_profiler import (
UserConfigurableProfiler,
)

# always present
dataset.expect_column_values_to_be_in_set(
"global_stats__avg_ride_length__status", {FieldStatus.PRESENT}
)

# present at least in 70% of rows
dataset.expect_column_values_to_be_in_set(
"customer_profile__current_balance__status", {FieldStatus.PRESENT}, mostly=0.7
)

return UserConfigurableProfiler(
profile_dataset=dataset,
ignored_columns=["event_timestamp"]
+ [
c
for c in dataset.columns
if c.endswith("__timestamp") or c.endswith("__status")
],
excluded_expectations=[
"expect_table_columns_to_match_ordered_list",
"expect_table_row_count_to_be_between",
Expand Down Expand Up @@ -127,3 +172,88 @@ def test_historical_retrieval_fails_on_validation(environment, universal_data_so

assert failed_expectations[1].check_name == "expect_column_values_to_be_in_set"
assert failed_expectations[1].column_name == "avg_passenger_count"


@pytest.mark.integration
def test_logged_features_validation(environment, universal_data_sources):
store = environment.feature_store

(_, datasets, data_sources) = universal_data_sources
feature_views = construct_universal_feature_views(data_sources)
feature_service = FeatureService(
name="test_service",
features=[
feature_views.customer[
["current_balance", "avg_passenger_count", "lifetime_trip_count"]
],
feature_views.order[["order_is_success"]],
feature_views.global_fv[["num_rides", "avg_ride_length"]],
],
logging_config=LoggingConfig(
destination=environment.data_source_creator.create_logged_features_destination()
),
)

store.apply(
[driver(), customer(), location(), feature_service, *feature_views.values()]
)

entity_df = datasets.entity_df.drop(
columns=["order_id", "origin_id", "destination_id"]
)

# add some non-existing entities to check NotFound feature handling
for i in range(5):
entity_df = entity_df.append(
{
"customer_id": 2000 + i,
"driver_id": 6000 + i,
"event_timestamp": datetime.datetime.now(),
},
ignore_index=True,
)

reference_dataset = store.create_saved_dataset(
from_=store.get_historical_features(
entity_df=entity_df, features=feature_service, full_feature_names=True
),
name="reference_for_validating_logged_features",
storage=environment.data_source_creator.create_saved_dataset_destination(),
)

log_source_df = store.get_historical_features(
entity_df=entity_df, features=feature_service, full_feature_names=False
).to_df()
logs_df = prepare_logs(log_source_df, feature_service, store)

schema = FeatureServiceLoggingSource(
feature_service=feature_service, project=store.project
).get_schema(store._registry)
store.write_logged_features(
pa.Table.from_pandas(logs_df, schema=schema), source=feature_service
)

def validate():
"""
Return Tuple[succeed, completed]
Succeed will be True if no ValidateFailed exception was raised
"""
try:
store.validate_logged_features(
feature_service,
start=logs_df[LOG_TIMESTAMP_FIELD].min(),
end=logs_df[LOG_TIMESTAMP_FIELD].max() + datetime.timedelta(seconds=1),
reference=reference_dataset.as_reference(
profiler=profiler_with_feature_metadata
),
)
except ValidationFailed:
return False, True
except Exception:
# log table is still being created
return False, False

return True, True

success = wait_retry_backoff(validate, timeout_secs=30)
assert success, "Validation failed (unexpectedly)"

0 comments on commit 2874fc5

Please sign in to comment.