From 0df280b00db094cd396e62cd157b136a0c912c3f Mon Sep 17 00:00:00 2001 From: Olek Gorajek Date: Wed, 16 Oct 2024 22:17:48 -0700 Subject: [PATCH] Add lookback window to Materialization calls. --- .../materialization/jobs/cube_materialization.py | 1 + .../datajunction_server/models/materialization.py | 1 + datajunction-server/tests/service_clients_test.py | 2 ++ 3 files changed, 4 insertions(+) diff --git a/datajunction-server/datajunction_server/materialization/jobs/cube_materialization.py b/datajunction-server/datajunction_server/materialization/jobs/cube_materialization.py index b3f340aaa..d47053f53 100644 --- a/datajunction-server/datajunction_server/materialization/jobs/cube_materialization.py +++ b/datajunction-server/datajunction_server/materialization/jobs/cube_materialization.py @@ -92,6 +92,7 @@ def schedule( partitions=temporal_partition + categorical_partitions, job=materialization.job, strategy=materialization.strategy, + lookback_window=cube_config.lookback_window, ), request_headers=request_headers, ) diff --git a/datajunction-server/datajunction_server/models/materialization.py b/datajunction-server/datajunction_server/models/materialization.py index deddced5e..59b02b254 100644 --- a/datajunction-server/datajunction_server/models/materialization.py +++ b/datajunction-server/datajunction_server/models/materialization.py @@ -91,6 +91,7 @@ class GenericMaterializationInput(BaseModel): spark_conf: Optional[Dict] = None partitions: Optional[List[Dict]] = None columns: List[ColumnMetadata] + lookback_window: Optional[str] = "1 DAY" class DruidMaterializationInput(GenericMaterializationInput): diff --git a/datajunction-server/tests/service_clients_test.py b/datajunction-server/tests/service_clients_test.py index ac3d879ed..2fa772e86 100644 --- a/datajunction-server/tests/service_clients_test.py +++ b/datajunction-server/tests/service_clients_test.py @@ -420,6 +420,7 @@ def test_query_service_client_materialize(self, mocker: MockerFixture) -> None: "spark_conf": {}, "upstream_tables": ["default.hard_hats"], "columns": [], + "lookback_window": "1 DAY", }, headers=ANY, ) @@ -544,6 +545,7 @@ def test_materialize(self, mocker: MockerFixture) -> None: json={ "name": "default", "job": "SparkSqlMaterializationJob", + "lookback_window": "1 DAY", "strategy": "full", "node_name": "default.hard_hat", "node_version": "v1",