title | description | published |
---|---|---|
Batch Backfilling |
Chalk makes it easy to batch ingest historical feature data from bulk data sources. |
true |
If you need to ingest historical feature data from bulk data sources (i.e. a data warehouse or S3), you can use Chalk's support for "feature timestamping". This functionality allows you to override the default timestamps on the ingested feature values.
First, define a feature_time feature for the relevant feature set:
@features
class User:
id: int
...
backfilled_feature: str
...
ts: FeatureTime
Note that this ts
feature doesn't need to correspond to a specific feature in your business domain.
Chalk simply uses this feature_time
to set the "observation time" of other features that your resolvers compute.
Please see documentation on feature_time for more information on this topic.
Now, define a resolver to ingest data from your data source:
@offline
def ingest_historical_data() -> DataFrame[User.id, User.backfilled_feature, User.ts]:
return (
snowflake
.query_string(
"SELECT id, backfilled_feature, updated_at FROM source_table"
fields={"updated_at": User.ts}
)
.all()
)
Chalk assumes that your timestamp column is UTC
, unless otherwise specified. Note: you may return many
values of backfilled_feature
for the same id
, but they should have different ts
values.
Then, after running chalk apply
, you can trigger this resolver to run one time using the Chalk Dashboard,
or the chalk trigger
command:
chalk trigger --resolver your.module.path.ingest_historical_data
You must manually trigger the resolver because it has no cron schedule specified.
Once your resolver run completes, your data will be available in the offline store with effective times
specified by the values returned for your feature_time
(in this example, updated_at
from source_table
).
If the feature is marked as tolerating staleness, and etl_offline_to_online=True
, then Chalk will also insert
feature values into the online store if they are newer than existing values.
Resolvers that use incremental ingest, don't re-process data from before their "max observed timestamp" by default, even if the query is changed.
Chalk lets you reset the maximum observed timestamp of incremental resolvers to a specific timestamp, or re-ingest all historical data.
Chalk uses offline queries to perform this operation. Suppose that you want to add a new column,
favorite_color
, to this existing batch SQL resolver:
@offline(cron="0 * * * *")
def ingest_preferences() -> DataFrame[User.id, User.favorite_food, User.favorite_color]:
return (
snowflake
.query_string("SELECT id, favorite_food, favorite_color, updated_at FROM preferences")
.incremental(incremental_column="updated_at")
)
If you have been running this resolver in production for a long time, then simply adding favorite_color
and
running chalk apply
will not ingest historical color preferences because the incremental
timestamp
lower bound will prevent the query from returning "old" rows which include these historical favorite_color
observations.
Instead, run an offline query to obtain a dataset of the feature values you want to ingest and then call
.ingest
to either store online or offline.
from chalk.client import ChalkClient
dataset = ChalkClient().offline_query(
input={
User.id: [i for i in range(1000)],
},
output=[
User.favorite_color
],
)
dataset.recompute(features=['user.favorite_color'])
dataset.ingest(store_offline=True)
Chalk can also backfill feature data for resolvers that take arguments to compute feature values. This is useful for generating historically accurate training datasets using new resolvers that are derived from features where we have historical observations.
Suppose we have the following feature class:
@features
class User:
id: int
name: str
reversed_name: str # a feature we want to backfill
with the following newly added resolver:
@online
def reverse_name(name: User.name) -> User.reversed_name:
return name[::-1]
Chalk can automatically compute this feature using historically observed values of User.name
.
To do so, you can either use an offline query with recompute_features=True
and ingest the
computed feature values as we did above, or you can use the chalk
CLI tool to
trigger a resolver run.
chalk trigger --resolver your.module.path.reverse_name --lower-bound 2024-05-05T12:00:00+00:00 --persist-offline=True
In this example, Chalk will query the offline store to sample all combinations of:
| user.id | user.name | observed_at |
where observed_at is later than the ISO8601 timestamp passed in as the lower_bound
and then invoke your resolver.
For each sampled tuple (id, name, observed_at)
, Chalk will write
(id, reverse_name(name), observed-at)
back to the offline store, and to the online store if
the new feature is marked with non-zero max_staleness
and etl_offline_to_online=True
.