forked from feast-dev/feast
/
test_universal_e2e.py
123 lines (105 loc) · 3.71 KB
/
test_universal_e2e.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import math
from datetime import datetime, timedelta
from typing import Optional
import pandas as pd
from pytz import utc
from feast import FeatureStore, FeatureView
from tests.integration.feature_repos.test_repo_configuration import parametrize_e2e_test
@parametrize_e2e_test
def test_e2e_consistency(fs: FeatureStore):
run_offline_online_store_consistency_test(fs)
def check_offline_and_online_features(
fs: FeatureStore,
fv: FeatureView,
driver_id: int,
event_timestamp: datetime,
expected_value: Optional[float],
full_feature_names: bool,
check_offline_store: bool = True,
) -> None:
# Check online store
response_dict = fs.get_online_features(
[f"{fv.name}:value"],
[{"driver": driver_id}],
full_feature_names=full_feature_names,
).to_dict()
if full_feature_names:
if expected_value:
assert abs(response_dict[f"{fv.name}__value"][0] - expected_value) < 1e-6
else:
assert response_dict[f"{fv.name}__value"][0] is None
else:
if expected_value:
assert abs(response_dict["value"][0] - expected_value) < 1e-6
else:
assert response_dict["value"][0] is None
# Check offline store
if check_offline_store:
df = fs.get_historical_features(
entity_df=pd.DataFrame.from_dict(
{"driver_id": [driver_id], "event_timestamp": [event_timestamp]}
),
features=[f"{fv.name}:value"],
full_feature_names=full_feature_names,
).to_df()
if full_feature_names:
if expected_value:
assert abs(df.to_dict()[f"{fv.name}__value"][0] - expected_value) < 1e-6
else:
assert math.isnan(df.to_dict()[f"{fv.name}__value"][0])
else:
if expected_value:
assert abs(df.to_dict()["value"][0] - expected_value) < 1e-6
else:
assert math.isnan(df.to_dict()["value"][0])
def run_offline_online_store_consistency_test(fs: FeatureStore,) -> None:
now = datetime.utcnow()
fv = fs.get_feature_view("test_correctness")
full_feature_names = True
check_offline_store: bool = True
# Run materialize()
# use both tz-naive & tz-aware timestamps to test that they're both correctly handled
start_date = (now - timedelta(hours=5)).replace(tzinfo=utc)
end_date = now - timedelta(hours=2)
fs.materialize(feature_views=[fv.name], start_date=start_date, end_date=end_date)
# check result of materialize()
check_offline_and_online_features(
fs=fs,
fv=fv,
driver_id=1,
event_timestamp=end_date,
expected_value=0.3,
full_feature_names=full_feature_names,
check_offline_store=check_offline_store,
)
check_offline_and_online_features(
fs=fs,
fv=fv,
driver_id=2,
event_timestamp=end_date,
expected_value=None,
full_feature_names=full_feature_names,
check_offline_store=check_offline_store,
)
# check prior value for materialize_incremental()
check_offline_and_online_features(
fs=fs,
fv=fv,
driver_id=3,
event_timestamp=end_date,
expected_value=4,
full_feature_names=full_feature_names,
check_offline_store=check_offline_store,
)
# run materialize_incremental()
fs.materialize_incremental(feature_views=[fv.name], end_date=now)
# check result of materialize_incremental()
check_offline_and_online_features(
fs=fs,
fv=fv,
driver_id=3,
event_timestamp=now,
expected_value=5,
full_feature_names=full_feature_names,
check_offline_store=check_offline_store,
)