Skip to content

Commit

Permalink
Add materialize_incremental method (#1407)
Browse files Browse the repository at this point in the history
* Add materialize_incremental

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Rebase

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Address comment

Signed-off-by: Jacob Klegar <jacob@tecton.ai>
  • Loading branch information
jklegar committed Mar 25, 2021
1 parent f1e3072 commit 04d2e3d
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 39 deletions.
8 changes: 8 additions & 0 deletions protos/feast/core/FeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,12 @@ message FeatureViewMeta {

// Time where this Feature View is last updated
google.protobuf.Timestamp last_updated_timestamp = 2;

// List of pairs (start_time, end_time) for which this feature view has been materialized.
repeated MaterializationInterval materialization_intervals = 3;
}

message MaterializationInterval {
google.protobuf.Timestamp start_time = 1;
google.protobuf.Timestamp end_time = 2;
}
111 changes: 82 additions & 29 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,55 @@ def get_historical_features(
)
return job

def materialize_incremental(
self, feature_views: Optional[List[str]], end_date: datetime,
) -> None:
"""
Materialize incremental new data from the offline store into the online store.
This method loads incremental new feature data up to the specified end time from either
the specified feature views, or all feature views if none are specified,
into the online store where it is available for online serving. The start time of
the interval materialized is either the most recent end time of a prior materialization or
(now - ttl) if no such prior materialization exists.
Args:
feature_views (List[str]): Optional list of feature view names. If selected, will only run
materialization for the specified feature views.
end_date (datetime): End date for time range of data to materialize into the online store
Examples:
Materialize all features into the online store up to 5 minutes ago.
>>> from datetime import datetime, timedelta
>>> from feast.feature_store import FeatureStore
>>>
>>> fs = FeatureStore(config=RepoConfig(provider="gcp"))
>>> fs.materialize_incremental(
>>> end_date=datetime.utcnow() - timedelta(minutes=5)
>>> )
"""
feature_views_to_materialize = []
registry = self._get_registry()
if feature_views is None:
feature_views_to_materialize = registry.list_feature_views(
self.config.project
)
else:
for name in feature_views:
feature_view = registry.get_feature_view(name, self.config.project)
feature_views_to_materialize.append(feature_view)

# TODO paging large loads
for feature_view in feature_views_to_materialize:
start_date = feature_view.most_recent_end_time
if start_date is None:
if feature_view.ttl is None:
raise Exception(
f"No start time found for feature view {feature_view.name}. materialize_incremental() requires either a ttl to be set or for materialize() to have been run at least once."
)
start_date = datetime.utcnow() - feature_view.ttl
self._materialize_single_feature_view(feature_view, start_date, end_date)

def materialize(
self,
feature_views: Optional[List[str]],
Expand Down Expand Up @@ -225,39 +274,43 @@ def materialize(

# TODO paging large loads
for feature_view in feature_views_to_materialize:
if isinstance(feature_view.input, FileSource):
raise NotImplementedError(
"This function is not yet implemented for File data sources"
)
(
entity_names,
feature_names,
event_timestamp_column,
created_timestamp_column,
) = _run_reverse_field_mapping(feature_view)

offline_store = get_offline_store(self.config)
table = offline_store.pull_latest_from_table_or_query(
feature_view.input,
entity_names,
feature_names,
event_timestamp_column,
created_timestamp_column,
start_date,
end_date,
self._materialize_single_feature_view(feature_view, start_date, end_date)

def _materialize_single_feature_view(
self, feature_view: FeatureView, start_date: datetime, end_date: datetime
) -> None:
if isinstance(feature_view.input, FileSource):
raise NotImplementedError(
"This function is not yet implemented for File data sources"
)
(
entity_names,
feature_names,
event_timestamp_column,
created_timestamp_column,
) = _run_reverse_field_mapping(feature_view)

offline_store = get_offline_store(self.config)
table = offline_store.pull_latest_from_table_or_query(
feature_view.input,
entity_names,
feature_names,
event_timestamp_column,
created_timestamp_column,
start_date,
end_date,
)

if feature_view.input.field_mapping is not None:
table = _run_forward_field_mapping(
table, feature_view.input.field_mapping
)
if feature_view.input.field_mapping is not None:
table = _run_forward_field_mapping(table, feature_view.input.field_mapping)

rows_to_write = _convert_arrow_to_proto(table, feature_view)
rows_to_write = _convert_arrow_to_proto(table, feature_view)

provider = self._get_provider()
provider.online_write_batch(
self.config.project, feature_view, rows_to_write
)
provider = self._get_provider()
provider.online_write_batch(self.config.project, feature_view, rows_to_write)

feature_view.materialization_intervals.append((start_date, end_date))
self.apply([feature_view])

def get_online_features(
self, feature_refs: List[str], entity_rows: List[Dict[str, Any]],
Expand Down
25 changes: 23 additions & 2 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import timedelta
from typing import Dict, List, Optional, Union
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Union

from google.protobuf.duration_pb2 import Duration
from google.protobuf.timestamp_pb2 import Timestamp
Expand All @@ -26,6 +26,9 @@
from feast.protos.feast.core.FeatureView_pb2 import (
FeatureViewSpec as FeatureViewSpecProto,
)
from feast.protos.feast.core.FeatureView_pb2 import (
MaterializationInterval as MaterializationIntervalProto,
)
from feast.value_type import ValueType


Expand All @@ -44,6 +47,7 @@ class FeatureView:

created_timestamp: Optional[Timestamp] = None
last_updated_timestamp: Optional[Timestamp] = None
materialization_intervals: List[Tuple[datetime, datetime]] = []

def __init__(
self,
Expand Down Expand Up @@ -98,7 +102,13 @@ def to_proto(self) -> FeatureViewProto:
meta = FeatureViewMetaProto(
created_timestamp=self.created_timestamp,
last_updated_timestamp=self.last_updated_timestamp,
materialization_intervals=[],
)
for interval in self.materialization_intervals:
interval_proto = MaterializationIntervalProto()
interval_proto.start_time.FromDatetime(interval[0])
interval_proto.end_time.FromDatetime(interval[1])
meta.materialization_intervals.append(interval_proto)

if self.ttl is not None:
ttl_duration = Duration()
Expand Down Expand Up @@ -152,4 +162,15 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):

feature_view.created_timestamp = feature_view_proto.meta.created_timestamp

for interval in feature_view_proto.meta.materialization_intervals:
feature_view.materialization_intervals.append(
(interval.start_time.ToDatetime(), interval.end_time.ToDatetime())
)

return feature_view

@property
def most_recent_end_time(self) -> Optional[datetime]:
if len(self.materialization_intervals) == 0:
return None
return max([interval[1] for interval in self.materialization_intervals])
38 changes: 30 additions & 8 deletions sdk/python/tests/test_materialize_from_bigquery_to_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,19 @@ def setup_method(self):

def test_bigquery_table_to_datastore_correctness(self):
# create dataset
ts = pd.Timestamp.now(tz="UTC").round("ms")
now = datetime.utcnow()
ts = pd.Timestamp(now).round("ms")
data = {
"id": [1, 2, 1],
"value": [0.1, 0.2, 0.3],
"ts_1": [ts - timedelta(minutes=2), ts, ts],
"created_ts": [ts, ts, ts],
"id": [1, 2, 1, 3, 3],
"value": [0.1, 0.2, 0.3, 4, 5],
"ts_1": [
ts - timedelta(seconds=4),
ts,
ts - timedelta(seconds=3),
ts - timedelta(seconds=4),
ts - timedelta(seconds=1),
],
"created_ts": [ts, ts, ts, ts, ts],
}
df = pd.DataFrame.from_dict(data)

Expand Down Expand Up @@ -67,9 +74,7 @@ def test_bigquery_table_to_datastore_correctness(self):

# run materialize()
fs.materialize(
[fv.name],
datetime.utcnow() - timedelta(minutes=5),
datetime.utcnow() - timedelta(minutes=0),
[fv.name], now - timedelta(seconds=5), now - timedelta(seconds=2),
)

# check result of materialize()
Expand All @@ -78,6 +83,23 @@ def test_bigquery_table_to_datastore_correctness(self):
).to_dict()
assert abs(response_dict[f"{fv.name}:value"][0] - 0.3) < 1e-6

# check prior value for materialize_incremental()
response_dict = fs.get_online_features(
[f"{fv.name}:value"], [{"driver_id": 3}]
).to_dict()
assert abs(response_dict[f"{fv.name}:value"][0] - 4) < 1e-6

# run materialize_incremental()
fs.materialize_incremental(
[fv.name], now - timedelta(seconds=0),
)

# check result of materialize_incremental()
response_dict = fs.get_online_features(
[f"{fv.name}:value"], [{"driver_id": 3}]
).to_dict()
assert abs(response_dict[f"{fv.name}:value"][0] - 5) < 1e-6

def test_bigquery_query_to_datastore_correctness(self):
# create dataset
ts = pd.Timestamp.now(tz="UTC").round("ms")
Expand Down

0 comments on commit 04d2e3d

Please sign in to comment.