Skip to content

Commit

Permalink
Add checkpoint in each iteration of join loop (historical retrieval j…
Browse files Browse the repository at this point in the history
…ob) to truncate logical plan (#92)

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Aug 18, 2021
1 parent 7c6878a commit 4b553d5
Showing 1 changed file with 3 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,9 @@ def join_entity_to_feature_tables(
joined_df = as_of_join(
joined_df, entity_event_timestamp_column, feature_table_df, feature_table,
)
if SparkContext._active_spark_context._jsc.sc().getCheckpointDir().nonEmpty():
joined_df = joined_df.checkpoint()

return joined_df


Expand Down

0 comments on commit 4b553d5

Please sign in to comment.