Skip to content

Commit

Permalink
Enable sql expression evaluation in fieldMapping for historical retri…
Browse files Browse the repository at this point in the history
…eval (#98)

* Enable sql expression in fieldMapping for historical retrieval

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* add special case: rename

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Sep 27, 2021
1 parent 80c2943 commit f347686
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
16 changes: 11 additions & 5 deletions python/feast_spark/pyspark/historical_feature_retrieval_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from pyspark.sql.functions import (
broadcast,
col,
expr,
monotonically_increasing_id,
row_number,
)
Expand Down Expand Up @@ -321,11 +322,16 @@ class FileDestination(NamedTuple):

def _map_column(df: DataFrame, col_mapping: Dict[str, str]):
source_to_alias_map = {v: k for k, v in col_mapping.items()}
projection = [
col(col_name).alias(source_to_alias_map.get(col_name, col_name))
for col_name in df.columns
]
return df.select(projection)
projection = {}

for col_name in df.columns + list(set(col_mapping) - set(df.columns)):
if col_name in source_to_alias_map:
# column rename
projection[source_to_alias_map.get(col_name)] = col(col_name)
else:
projection[col_name] = expr(col_mapping.get(col_name, col_name))

return df.select([c.alias(a) for a, c in projection.items()])


def as_of_join(
Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_historical_feature_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def large_entity_csv_file(pytestconfig, spark):
file_path = os.path.join(temp_dir, "large_entity")
entity_schema = StructType(
[
StructField("customer_id", IntegerType()),
StructField("id", IntegerType()),
StructField("event_timestamp", TimestampType()),
]
)
Expand Down

0 comments on commit f347686

Please sign in to comment.