Skip to content

Commit

Permalink
Implement materialize method (#1379)
Browse files Browse the repository at this point in the history
* WIP Ingest into Firestore

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Full materialize function

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Rebase

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Add basic ingestion integration test

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Update created_ts to use column or null instead of current ts

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Update feast types import

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* lint

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Address comments

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Use existing type map function

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Address comments

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Address comments

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Add comment

Signed-off-by: Jacob Klegar <jacob@tecton.ai>
  • Loading branch information
jklegar committed Mar 22, 2021
1 parent 3ae52ff commit cc00315
Show file tree
Hide file tree
Showing 9 changed files with 357 additions and 101 deletions.
195 changes: 193 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,32 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Union
from typing import Dict, List, Optional, Tuple, Union

import pandas as pd
import pyarrow

from feast.data_source import FileSource
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.provider import Provider, get_provider
from feast.offline_store import RetrievalJob, get_offline_store_for_retrieval
from feast.offline_store import (
RetrievalJob,
get_offline_store,
get_offline_store_for_retrieval,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import (
LocalOnlineStoreConfig,
OnlineStoreConfig,
RepoConfig,
load_repo_config,
)
from feast.type_map import python_value_to_proto_value


class FeatureStore:
Expand Down Expand Up @@ -153,6 +163,88 @@ def get_historical_features(
)
return job

def materialize(
self,
feature_views: Optional[List[str]],
start_date: datetime,
end_date: datetime,
) -> None:
"""
Materialize data from the offline store into the online store.
This method loads feature data in the specified interval from either
the specified feature views, or all feature views if none are specified,
into the online store where it is available for online serving.
Args:
feature_views (List[str]): Optional list of feature view names. If selected, will only run
materialization for the specified feature views.
start_date (datetime): Start date for time range of data to materialize into the online store
end_date (datetime): End date for time range of data to materialize into the online store
Examples:
Materialize all features into the online store over the interval
from 3 hours ago to 10 minutes ago.
>>> from datetime import datetime, timedelta
>>> from feast.feature_store import FeatureStore
>>>
>>> fs = FeatureStore(config=RepoConfig(provider="gcp"))
>>> fs.materialize(
>>> start_date=datetime.utcnow() - timedelta(hours=3),
>>> end_date=datetime.utcnow() - timedelta(minutes=10)
>>> )
"""
feature_views_to_materialize = []
registry = self._get_registry()
if feature_views is None:
feature_views_to_materialize = registry.list_feature_views(
self.config.project
)
else:
for name in feature_views:
feature_view = registry.get_feature_view(name, self.config.project)
feature_views_to_materialize.append(feature_view)

# TODO paging large loads
for feature_view in feature_views_to_materialize:
if isinstance(feature_view.input, FileSource):
raise NotImplementedError(
"This function is not yet implemented for File data sources"
)
if not feature_view.input.table_ref:
raise NotImplementedError(
f"This function is only implemented for FeatureViews with a table_ref; {feature_view.name} does not have one."
)
(
entity_names,
feature_names,
event_timestamp_column,
created_timestamp_column,
) = _run_reverse_field_mapping(feature_view)

offline_store = get_offline_store(self.config)
table = offline_store.pull_latest_from_table(
feature_view.input,
entity_names,
feature_names,
event_timestamp_column,
created_timestamp_column,
start_date,
end_date,
)

if feature_view.input.field_mapping is not None:
table = _run_forward_field_mapping(
table, feature_view.input.field_mapping
)

rows_to_write = _convert_arrow_to_proto(table, feature_view)

provider = self._get_provider()
provider.online_write_batch(
self.config.project, feature_view, rows_to_write
)


def _get_requested_feature_views(
feature_refs: List[str], all_feature_views: List[FeatureView]
Expand All @@ -176,3 +268,102 @@ def _get_requested_feature_views(
feature_views_list.append(view)

return feature_views_list


def _run_reverse_field_mapping(
feature_view: FeatureView,
) -> Tuple[List[str], List[str], str, Optional[str]]:
"""
If a field mapping exists, run it in reverse on the entity names,
feature names, event timestamp column, and created timestamp column
to get the names of the relevant columns in the BigQuery table.
Args:
feature_view: FeatureView object containing the field mapping
as well as the names to reverse-map.
Returns:
Tuple containing the list of reverse-mapped entity names,
reverse-mapped feature names, reverse-mapped event timestamp column,
and reverse-mapped created timestamp column that will be passed into
the query to the offline store.
"""
# if we have mapped fields, use the original field names in the call to the offline store
event_timestamp_column = feature_view.input.event_timestamp_column
entity_names = [entity for entity in feature_view.entities]
feature_names = [feature.name for feature in feature_view.features]
created_timestamp_column = feature_view.input.created_timestamp_column
if feature_view.input.field_mapping is not None:
reverse_field_mapping = {
v: k for k, v in feature_view.input.field_mapping.items()
}
event_timestamp_column = (
reverse_field_mapping[event_timestamp_column]
if event_timestamp_column in reverse_field_mapping.keys()
else event_timestamp_column
)
created_timestamp_column = (
reverse_field_mapping[created_timestamp_column]
if created_timestamp_column
and created_timestamp_column in reverse_field_mapping.keys()
else created_timestamp_column
)
entity_names = [
reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col
for col in entity_names
]
feature_names = [
reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col
for col in feature_names
]
return (
entity_names,
feature_names,
event_timestamp_column,
created_timestamp_column,
)


def _run_forward_field_mapping(
table: pyarrow.Table, field_mapping: Dict[str, str],
) -> pyarrow.Table:
# run field mapping in the forward direction
cols = table.column_names
mapped_cols = [
field_mapping[col] if col in field_mapping.keys() else col for col in cols
]
table = table.rename_columns(mapped_cols)
return table


def _convert_arrow_to_proto(
table: pyarrow.Table, feature_view: FeatureView
) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
rows_to_write = []
for row in zip(*table.to_pydict().values()):
entity_key = EntityKeyProto()
for entity_name in feature_view.entities:
entity_key.entity_names.append(entity_name)
idx = table.column_names.index(entity_name)
value = python_value_to_proto_value(row[idx])
entity_key.entity_values.append(value)
feature_dict = {}
for feature in feature_view.features:
idx = table.column_names.index(feature.name)
value = python_value_to_proto_value(row[idx])
feature_dict[feature.name] = value
event_timestamp_idx = table.column_names.index(
feature_view.input.event_timestamp_column
)
event_timestamp = row[event_timestamp_idx]
if feature_view.input.created_timestamp_column is not None:
created_timestamp_idx = table.column_names.index(
feature_view.input.created_timestamp_column
)
created_timestamp = row[created_timestamp_idx]
else:
created_timestamp = None

rows_to_write.append(
(entity_key, feature_dict, event_timestamp, created_timestamp)
)
return rows_to_write
22 changes: 15 additions & 7 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,15 @@ def online_write_batch(
self,
project: str,
table: Union[FeatureTable, FeatureView],
data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime]],
created_ts: datetime,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
) -> None:
from google.cloud import datastore

client = self._initialize_client()

for entity_key, features, timestamp in data:
for entity_key, features, timestamp, created_ts in data:
document_id = compute_datastore_entity_id(entity_key)

key = client.key(
Expand All @@ -125,9 +126,12 @@ def online_write_batch(
if entity["event_ts"] > _make_tzaware(timestamp):
# Do not overwrite feature values computed from fresher data
continue
elif entity["event_ts"] == _make_tzaware(timestamp) and entity[
"created_ts"
] > _make_tzaware(created_ts):
elif (
entity["event_ts"] == _make_tzaware(timestamp)
and created_ts is not None
and entity["created_ts"] is not None
and entity["created_ts"] > _make_tzaware(created_ts)
):
# Do not overwrite feature values computed from the same data, but
# computed later than this one
continue
Expand All @@ -139,7 +143,11 @@ def online_write_batch(
key=entity_key.SerializeToString(),
values={k: v.SerializeToString() for k, v in features.items()},
event_ts=_make_tzaware(timestamp),
created_ts=_make_tzaware(created_ts),
created_ts=(
_make_tzaware(created_ts)
if created_ts is not None
else None
),
)
)
client.put(entity)
Expand Down
10 changes: 6 additions & 4 deletions sdk/python/feast/infra/local_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,22 @@ def online_write_batch(
self,
project: str,
table: Union[FeatureTable, FeatureView],
data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime]],
created_ts: datetime,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
) -> None:
conn = self._get_conn()

with conn:
for entity_key, values, timestamp in data:
for entity_key, values, timestamp, created_ts in data:
for feature_name, val in values.items():
entity_key_bin = serialize_entity_key(entity_key)

conn.execute(
f"""
UPDATE {_table_id(project, table)}
SET value = ?, event_ts = ?, created_ts = ?
WHERE (event_ts < ? OR (event_ts = ? AND created_ts < ?))
WHERE (event_ts < ? OR (event_ts = ? AND (created_ts IS NULL OR ? IS NULL OR created_ts < ?)))
AND (entity_key = ? AND feature_name = ?)
""",
(
Expand All @@ -79,6 +80,7 @@ def online_write_batch(
timestamp,
timestamp,
created_ts,
created_ts,
entity_key_bin,
feature_name,
),
Expand Down
12 changes: 6 additions & 6 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ def online_write_batch(
self,
project: str,
table: Union[FeatureTable, FeatureView],
data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime]],
created_ts: datetime,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
) -> None:
"""
Write a batch of feature rows to the online store. This is a low level interface, not
Expand All @@ -56,10 +57,9 @@ def online_write_batch(
Args:
project: Feast project name
table: Feast FeatureTable
data: a list of triplets containing Feature data. Each triplet contains an Entity Key,
a dict containing feature values, and event timestamp for the row.
created_ts: the created timestamp (typically set to current time), same value used for
all rows.
data: a list of quadruplets containing Feature data. Each quadruplet contains an Entity Key,
a dict containing feature values, an event timestamp for the row, and
the created timestamp for the row if it exists.
"""
...

Expand Down

0 comments on commit cc00315

Please sign in to comment.