Skip to content

Commit

Permalink
Infer entity dataframe event timestamp column (#1495)
Browse files Browse the repository at this point in the history
* Infer entity df event timestamp

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Simplify inference of BQ

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Address comment

Signed-off-by: Jacob Klegar <jacob@tecton.ai>
  • Loading branch information
jklegar committed Apr 27, 2021
1 parent 1efc53b commit 4d7aada
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 88 deletions.
50 changes: 37 additions & 13 deletions sdk/python/feast/driver_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pandas as pd
from pytz import FixedOffset, timezone, utc

from feast.infra.provider import ENTITY_DF_EVENT_TIMESTAMP_COL
from feast.infra.provider import DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL


class EventTimestampType(Enum):
Expand All @@ -27,7 +27,12 @@ def _convert_event_timestamp(event_timestamp: pd.Timestamp, t: EventTimestampTyp


def create_orders_df(
customers, drivers, start_date, end_date, order_count
customers,
drivers,
start_date,
end_date,
order_count,
infer_event_timestamp_col=False,
) -> pd.DataFrame:
"""
Example df generated by this function:
Expand All @@ -45,19 +50,38 @@ def create_orders_df(
df["customer_id"] = np.random.choice(customers, order_count)
df["order_is_success"] = np.random.randint(0, 2, size=order_count).astype(np.int32)

df[ENTITY_DF_EVENT_TIMESTAMP_COL] = [
_convert_event_timestamp(
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"),
EventTimestampType(idx % 4),
if infer_event_timestamp_col:
df["e_ts"] = [
_convert_event_timestamp(
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"),
EventTimestampType(3),
)
for idx, dt in enumerate(
pd.date_range(start=start_date, end=end_date, periods=order_count)
)
]
df.sort_values(
by=["e_ts", "order_id", "driver_id", "customer_id"], inplace=True,
)
for idx, dt in enumerate(
pd.date_range(start=start_date, end=end_date, periods=order_count)
else:
df[DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL] = [
_convert_event_timestamp(
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"),
EventTimestampType(idx % 4),
)
for idx, dt in enumerate(
pd.date_range(start=start_date, end=end_date, periods=order_count)
)
]
df.sort_values(
by=[
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
"order_id",
"driver_id",
"customer_id",
],
inplace=True,
)
]
df.sort_values(
by=[ENTITY_DF_EVENT_TIMESTAMP_COL, "order_id", "driver_id", "customer_id"],
inplace=True,
)
return df


Expand Down
69 changes: 61 additions & 8 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.provider import (
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
RetrievalJob,
_get_requested_feature_views_to_features_dict,
)
Expand Down Expand Up @@ -79,12 +80,19 @@ def get_historical_features(
client = _get_bigquery_client()

if type(entity_df) is str:
entity_df_sql_table = f"({entity_df})"
entity_df_job = client.query(entity_df)
entity_df_result = entity_df_job.result() # also starts job

entity_df_event_timestamp_col = _infer_event_timestamp_from_bigquery_query(
entity_df_result
)

entity_df_sql_table = f"`{entity_df_job.destination.project}.{entity_df_job.destination.dataset_id}.{entity_df_job.destination.table_id}`"
elif isinstance(entity_df, pandas.DataFrame):
if "event_timestamp" not in entity_df.columns:
raise ValueError(
"Please provide an entity_df with a column named event_timestamp representing the time of events."
)
entity_df_event_timestamp_col = _infer_event_timestamp_from_dataframe(
entity_df
)

table_id = _upload_entity_df_into_bigquery(
config.project, entity_df, client
)
Expand All @@ -107,12 +115,55 @@ def get_historical_features(
min_timestamp=datetime.now() - timedelta(days=365),
max_timestamp=datetime.now() + timedelta(days=1),
left_table_query_string=entity_df_sql_table,
entity_df_event_timestamp_col=entity_df_event_timestamp_col,
)

job = BigQueryRetrievalJob(query=query, client=client)
return job


def _infer_event_timestamp_from_bigquery_query(entity_df_result) -> str:
if any(
schema_field.name == DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL
for schema_field in entity_df_result.schema
):
return DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL
else:
datetime_columns = list(
filter(
lambda schema_field: schema_field.field_type == "TIMESTAMP",
entity_df_result.schema,
)
)
if len(datetime_columns) == 1:
print(
f"Using {datetime_columns[0].name} as the event timestamp. To specify a column explicitly, please name it {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL}."
)
return datetime_columns[0].name
else:
raise ValueError(
f"Please provide an entity_df with a column named {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events."
)


def _infer_event_timestamp_from_dataframe(entity_df: pandas.DataFrame) -> str:
if DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL in entity_df.columns:
return DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL
else:
datetime_columns = entity_df.select_dtypes(
include=["datetime", "datetimetz"]
).columns
if len(datetime_columns) == 1:
print(
f"Using {datetime_columns[0]} as the event timestamp. To specify a column explicitly, please name it {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL}."
)
return datetime_columns[0]
else:
raise ValueError(
f"Please provide an entity_df with a column named {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events."
)


class BigQueryRetrievalJob(RetrievalJob):
def __init__(self, query, client):
self.query = query
Expand Down Expand Up @@ -230,6 +281,7 @@ def build_point_in_time_query(
min_timestamp: datetime,
max_timestamp: datetime,
left_table_query_string: str,
entity_df_event_timestamp_col: str,
):
"""Build point-in-time query between each feature view table and the entity dataframe"""
template = Environment(loader=BaseLoader()).from_string(
Expand All @@ -241,6 +293,7 @@ def build_point_in_time_query(
"min_timestamp": min_timestamp,
"max_timestamp": max_timestamp,
"left_table_query_string": left_table_query_string,
"entity_df_event_timestamp_col": entity_df_event_timestamp_col,
"featureviews": [asdict(context) for context in feature_view_query_contexts],
}

Expand Down Expand Up @@ -292,7 +345,7 @@ def _get_bigquery_client():
-- unique identifier for each row in the entity dataset.
row_number,
-- event_timestamp contains the timestamps to join onto
event_timestamp,
{{entity_df_event_timestamp_col}} AS event_timestamp,
-- the feature_timestamp, i.e. the latest occurrence of the requested feature relative to the entity_dataset timestamp
NULL as {{ featureview.name }}_feature_timestamp,
-- created timestamp of the feature at the corresponding feature_timestamp
Expand Down Expand Up @@ -373,7 +426,7 @@ def _get_bigquery_client():
/*
Joins the outputs of multiple time travel joins to a single table.
*/
SELECT edf.event_timestamp as event_timestamp, * EXCEPT (row_number, event_timestamp) FROM entity_dataframe edf
SELECT edf.{{entity_df_event_timestamp_col}} as {{entity_df_event_timestamp_col}}, * EXCEPT (row_number, {{entity_df_event_timestamp_col}}) FROM entity_dataframe edf
{% for featureview in featureviews %}
LEFT JOIN (
SELECT
Expand All @@ -384,5 +437,5 @@ def _get_bigquery_client():
FROM {{ featureview.name }}__deduped
) USING (row_number)
{% endfor %}
ORDER BY event_timestamp
ORDER BY {{entity_df_event_timestamp_col}}
"""
38 changes: 24 additions & 14 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.provider import (
ENTITY_DF_EVENT_TIMESTAMP_COL,
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
_get_requested_feature_views_to_features_dict,
_run_field_mapping,
)
Expand Down Expand Up @@ -44,10 +44,20 @@ def get_historical_features(
raise ValueError(
f"Please provide an entity_df of type {type(pd.DataFrame)} instead of type {type(entity_df)}"
)
if ENTITY_DF_EVENT_TIMESTAMP_COL not in entity_df.columns:
raise ValueError(
f"Please provide an entity_df with a column named {ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events."
)
entity_df_event_timestamp_col = DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL # local modifiable copy of global variable
if entity_df_event_timestamp_col not in entity_df.columns:
datetime_columns = entity_df.select_dtypes(
include=["datetime", "datetimetz"]
).columns
if len(datetime_columns) == 1:
print(
f"Using {datetime_columns[0]} as the event timestamp. To specify a column explicitly, please name it {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL}."
)
entity_df_event_timestamp_col = datetime_columns[0]
else:
raise ValueError(
f"Please provide an entity_df with a column named {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events."
)

feature_views_to_features = _get_requested_feature_views_to_features_dict(
feature_refs, feature_views
Expand All @@ -57,22 +67,22 @@ def get_historical_features(
def evaluate_historical_retrieval():

# Make sure all event timestamp fields are tz-aware. We default tz-naive fields to UTC
entity_df[ENTITY_DF_EVENT_TIMESTAMP_COL] = entity_df[
ENTITY_DF_EVENT_TIMESTAMP_COL
entity_df[entity_df_event_timestamp_col] = entity_df[
entity_df_event_timestamp_col
].apply(lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc))

# Create a copy of entity_df to prevent modifying the original
entity_df_with_features = entity_df.copy()

# Convert event timestamp column to datetime and normalize time zone to UTC
# This is necessary to avoid issues with pd.merge_asof
entity_df_with_features[ENTITY_DF_EVENT_TIMESTAMP_COL] = pd.to_datetime(
entity_df_with_features[ENTITY_DF_EVENT_TIMESTAMP_COL], utc=True
entity_df_with_features[entity_df_event_timestamp_col] = pd.to_datetime(
entity_df_with_features[entity_df_event_timestamp_col], utc=True
)

# Sort event timestamp values
entity_df_with_features = entity_df_with_features.sort_values(
ENTITY_DF_EVENT_TIMESTAMP_COL
entity_df_event_timestamp_col
)

# Load feature view data from sources and join them incrementally
Expand Down Expand Up @@ -153,14 +163,14 @@ def evaluate_historical_retrieval():
entity_df_with_features = pd.merge_asof(
entity_df_with_features,
df_to_join,
left_on=ENTITY_DF_EVENT_TIMESTAMP_COL,
left_on=entity_df_event_timestamp_col,
right_on=event_timestamp_column,
by=right_entity_columns,
tolerance=feature_view.ttl,
)

# Remove right (feature table/view) event_timestamp column.
if event_timestamp_column != ENTITY_DF_EVENT_TIMESTAMP_COL:
if event_timestamp_column != entity_df_event_timestamp_col:
entity_df_with_features.drop(
columns=[event_timestamp_column], inplace=True
)
Expand All @@ -170,9 +180,9 @@ def evaluate_historical_retrieval():

# Move "datetime" column to front
current_cols = entity_df_with_features.columns.tolist()
current_cols.remove(ENTITY_DF_EVENT_TIMESTAMP_COL)
current_cols.remove(entity_df_event_timestamp_col)
entity_df_with_features = entity_df_with_features[
[ENTITY_DF_EVENT_TIMESTAMP_COL] + current_cols
[entity_df_event_timestamp_col] + current_cols
]

return entity_df_with_features
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from feast.repo_config import RepoConfig
from feast.type_map import python_value_to_proto_value

ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp"
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp"


class Provider(abc.ABC):
Expand Down

0 comments on commit 4d7aada

Please sign in to comment.