Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable incremental for HDFS sink #695

Merged
merged 8 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/concepts/materializing-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ More reference on the APIs:

In the above example, we define a Redis table called `nycTaxiDemoFeature` and materialize two features called `f_location_avg_fare` and `f_location_max_fare` to Redis.

## Incremental Aggregation
Use incremental aggregation will significantly expedite the WindowAggTransformation feature calculation.
For example, aggregation sum of a feature F within a 180-day window at day T can be expressed as: F(T) = F(T - 1)+DirectAgg(T-1)-DirectAgg(T - 181).
enya-yx marked this conversation as resolved.
Show resolved Hide resolved
Once a SNAPSHOT of the first day is generated, the calculation for the following days can leverage it.

A storeName is required if incremental aggregated is enabled. There could be multiple output Datasets, and each of them need to be stored in a separate folder. The storeName is used as the folder name to create under the base "path".
enya-yx marked this conversation as resolved.
Show resolved Hide resolved

Incremental aggregation is enabled by default when using HdfsSink.

More reference on the APIs:
- [HdfsSink API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.HdfsSink)

## Feature Backfill

It is also possible to backfill the features till a particular time, like below. If the `BackfillTime` part is not specified, it's by default to `now()` (i.e. if not specified, it's equivalent to `BackfillTime(start=now, end=now, step=timedelta(days=1))`).
Expand Down
3 changes: 3 additions & 0 deletions feathr_project/feathr/definition/_materialization_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ def _to_materialization_config(settings: MaterializationSettings):
endTime: "{{ settings.backfill_time.end.strftime('%Y-%m-%d %H:%M:%S') }}"
endTimeFormat: "yyyy-MM-dd HH:mm:ss"
resolution: DAILY
{% if settings.has_hdfs_sink == True %}
enableIncremental = true
{% endif %}
output:[
{% for sink in settings.sinks %}
{{sink.to_feature_config()}}
Expand Down
5 changes: 4 additions & 1 deletion feathr_project/feathr/definition/materialization_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ def __init__(self, name: str, sinks: List[Sink], feature_names: List[str], backf
now = datetime.now()
self.backfill_time = backfill_time if backfill_time else BackfillTime(start=now, end=now, step=timedelta(days=1))
for sink in sinks:
if isinstance(sink, RedisSink):
if isinstance(sink, HdfsSink):
self.has_hdfs_sink = True
sink.aggregation_features = feature_names
elif isinstance(sink, RedisSink):
sink.aggregation_features = feature_names
self.sinks = sinks
self.feature_names = feature_names
Expand Down
20 changes: 17 additions & 3 deletions feathr_project/feathr/definition/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,25 +103,35 @@ def to_argument(self):

class HdfsSink(Sink):
"""Offline Hadoop HDFS-compatible(HDFS, delta lake, Azure blog storage etc) sink that is used to store feature data.
The result is in AVRO format.
The result is in AVRO format.

Incremental aggregation is enabled by default when using HdfsSink. Use incremental aggregation will significantly expedite the WindowAggTransformation feature calculation.
For example, aggregation sum of a feature F within a 180-day window at day T can be expressed as: F(T) = F(T - 1)+DirectAgg(T-1)-DirectAgg(T - 181).
Once a SNAPSHOT of the first day is generated, the calculation for the following days can leverage it.

Attributes:
output_path: output path
store_name: the folder name under the base "path". Used especially for the current dataset to support 'Incremental' aggregation.

"""
def __init__(self, output_path: str) -> None:
def __init__(self, output_path: str, store_name: Optional[str]="df0") -> None:
self.output_path = output_path

self.store_name = store_name
# Sample generated HOCON config:
# operational: {
# name: testFeatureGen
# endTime: 2019-05-01
# endTimeFormat: "yyyy-MM-dd"
# resolution: DAILY
# enableIncremental = true
# output:[
# {
# name: HDFS
# outputFormat: RAW_DATA
# params: {
# path: "/user/featureGen/hdfsResult/"
# features: [mockdata_a_ct_gen, mockdata_a_sample_gen]
# storeName: "yyyy/MM/dd"
# }
# }
# ]
Expand All @@ -132,11 +142,15 @@ def to_feature_config(self) -> str:
tm = Template("""
{
name: HDFS
outputFormat: RAW_DATA
params: {
path: "{{sink.output_path}}"
{% if sink.aggregation_features %}
features: [{{','.join(sink.aggregation_features)}}]
{% endif %}
{% if sink.store_name %}
storeName: "{{sink.store_name}}"
{% endif %}
}
}
""")
Expand Down
5 changes: 5 additions & 0 deletions feathr_project/test/test_feature_materialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,17 @@ def test_feature_materialization_offline_config():
endTime: "2020-05-20 00:00:00"
endTimeFormat: "yyyy-MM-dd HH:mm:ss"
resolution: DAILY
enableIncremental = true
output:[
{
name: HDFS
outputFormat: RAW_DATA
params: {
path: "abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/output/hdfs_test.avro"
features: [f_location_avg_fare,f_location_max_fare]
storeName: "df0"
}

}
]
}
Expand Down