From 3ffa2a0fe92bc402122661dcf04d3a21e889fcb6 Mon Sep 17 00:00:00 2001 From: Anna Geiduschek Date: Wed, 1 May 2024 16:38:00 -0700 Subject: [PATCH] Revert "[Airflow] Unify Dataflow task group code (Recidiviz/recidiviz-data#29453)" (Recidiviz/recidiviz-data#29533) ## Description of the change This reverts commit dedeac1cdc1bc51a693ae3836c3542f0174aafb9. This change caused a few problems: 1) NC normalization pipelines are crashing (NC wasn't running normalization before) 2) Ingest DAG is failing to launch pipelines because the way I'm pulling `raw_data_upper_bound_dates_json` is buggy. ## Type of change > All pull requests must have at least one of the following labels applied (otherwise the PR will fail): | Label | Description | |----------------------------- |----------------------------------------------------------------------------------------------------------- | | Type: Bug | non-breaking change that fixes an issue | | Type: Feature | non-breaking change that adds functionality | | Type: Breaking Change | fix or feature that would cause existing functionality to not work as expected | | Type: Non-breaking refactor | change addresses some tech debt item or prepares for a later change, but does not change functionality | | Type: Configuration Change | adjusts configuration to achieve some end related to functionality, development, performance, or security | | Type: Dependency Upgrade | upgrades a project dependency - these changes are not included in release notes | ## Related issues Related to Recidiviz/recidiviz-data#27378. ## Checklists ### Development **This box MUST be checked by the submitter prior to merging**: - [x] **Double- and triple-checked that there is no Personally Identifiable Information (PII) being mistakenly added in this pull request** These boxes should be checked by the submitter prior to merging: - [x] Tests have been written to cover the code changed/added as part of this pull request ### Code review These boxes should be checked by reviewers prior to merging: - [x] This pull request has a descriptive title and information useful to a reviewer - [x] Potential security implications or infrastructural changes have been considered, if relevant GitOrigin-RevId: bfb164555c01b63ecd9249c20abd1a4d46f3cb87 --- .../admin_panel/ingest_dataflow_operations.py | 6 +- .../dags/calculation/dataflow/__init__.py | 16 --- .../ingest_pipeline_task_group_delegate.py | 83 ------------ .../metrics_pipeline_task_group_delegate.py | 49 ------- ...malization_pipeline_task_group_delegate.py | 52 -------- ...pplemental_pipeline_task_group_delegate.py | 51 -------- recidiviz/airflow/dags/calculation_dag.py | 122 ++++++++++++++---- .../ingest/single_ingest_pipeline_group.py | 68 ++++++++-- .../dags/utils/dataflow_pipeline_group.py | 109 ---------------- .../airflow/tests/calculation_dag_test.py | 25 +--- .../single_ingest_pipeline_group_test.py | 32 +++-- recidiviz/airflow/tests/ingest_dag_test.py | 15 +-- .../calculation_pipeline_templates.yaml | 46 +++++++ .../pipelines/dataflow_orchestration_utils.py | 13 +- recidiviz/pipelines/ingest/pipeline_utils.py | 4 +- .../ingest_dataflow_operations_test.py | 4 +- ...alculation_pipeline_templates_yaml_test.py | 16 ++- .../dataflow_output_table_manager_test.py | 29 ++++- .../ingest/pipeline_parameters_test.py | 4 +- .../pipelines/pipeline_parameters_test.py | 9 ++ recidiviz/tools/validate_source_visibility.py | 7 +- 21 files changed, 287 insertions(+), 473 deletions(-) delete mode 100644 recidiviz/airflow/dags/calculation/dataflow/__init__.py delete mode 100644 recidiviz/airflow/dags/calculation/dataflow/ingest_pipeline_task_group_delegate.py delete mode 100644 recidiviz/airflow/dags/calculation/dataflow/metrics_pipeline_task_group_delegate.py delete mode 100644 recidiviz/airflow/dags/calculation/dataflow/normalization_pipeline_task_group_delegate.py delete mode 100644 recidiviz/airflow/dags/calculation/dataflow/supplemental_pipeline_task_group_delegate.py delete mode 100644 recidiviz/airflow/dags/utils/dataflow_pipeline_group.py diff --git a/recidiviz/admin_panel/ingest_dataflow_operations.py b/recidiviz/admin_panel/ingest_dataflow_operations.py index b997e17342..d4456de726 100644 --- a/recidiviz/admin_panel/ingest_dataflow_operations.py +++ b/recidiviz/admin_panel/ingest_dataflow_operations.py @@ -44,7 +44,7 @@ from recidiviz.ingest.direct.types.direct_ingest_instance import DirectIngestInstance from recidiviz.pipelines.ingest.dataset_config import state_dataset_for_state_code from recidiviz.pipelines.ingest.pipeline_utils import ( - DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE, + DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE, ) from recidiviz.utils import metadata @@ -89,10 +89,10 @@ def get_latest_job_for_state_instance( return None # TODO(#209930): remove this check once dataflow is launched for all states - if state_code not in DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE: + if state_code not in DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE: return None - location = DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE[state_code] + location = DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE[state_code] client = dataflow_v1beta3.JobsV1Beta3Client() if job_id: diff --git a/recidiviz/airflow/dags/calculation/dataflow/__init__.py b/recidiviz/airflow/dags/calculation/dataflow/__init__.py deleted file mode 100644 index 21c33dedf2..0000000000 --- a/recidiviz/airflow/dags/calculation/dataflow/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Recidiviz - a data platform for criminal justice reform -# Copyright (C) 2024 Recidiviz, Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# ============================================================================= diff --git a/recidiviz/airflow/dags/calculation/dataflow/ingest_pipeline_task_group_delegate.py b/recidiviz/airflow/dags/calculation/dataflow/ingest_pipeline_task_group_delegate.py deleted file mode 100644 index d196f7d81e..0000000000 --- a/recidiviz/airflow/dags/calculation/dataflow/ingest_pipeline_task_group_delegate.py +++ /dev/null @@ -1,83 +0,0 @@ -# Recidiviz - a data platform for criminal justice reform -# Copyright (C) 2024 Recidiviz, Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# ============================================================================= -"""Implementation of DataflowPipelineTaskGroupDelegate for ingest Dataflow pipeline -task groups. -""" -import json -from typing import Any, Dict - -from airflow.models import DagRun - -from recidiviz.airflow.dags.operators.cloud_sql_query_operator import ( - CloudSqlQueryOperator, -) -from recidiviz.airflow.dags.utils.config_utils import get_ingest_instance -from recidiviz.airflow.dags.utils.dataflow_pipeline_group import ( - DataflowPipelineTaskGroupDelegate, -) -from recidiviz.airflow.dags.utils.environment import get_project_id -from recidiviz.common.constants.states import StateCode -from recidiviz.ingest.direct.types.direct_ingest_instance import DirectIngestInstance -from recidiviz.pipelines.ingest.pipeline_parameters import ( - INGEST_PIPELINE_NAME, - IngestPipelineParameters, -) -from recidiviz.pipelines.ingest.pipeline_utils import ( - DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE, -) - - -class IngestDataflowPipelineTaskGroupDelegate( - DataflowPipelineTaskGroupDelegate[IngestPipelineParameters] -): - """Implementation of DataflowPipelineTaskGroupDelegate for ingest Dataflow pipeline - task groups. - """ - - def __init__( - self, - state_code: StateCode, - default_ingest_instance: DirectIngestInstance, - max_update_datetimes_operator: CloudSqlQueryOperator, - ) -> None: - self._state_code = state_code - self._default_ingest_instance = default_ingest_instance - self._max_update_datetimes_operator = max_update_datetimes_operator - - def get_default_parameters(self) -> IngestPipelineParameters: - return IngestPipelineParameters( - project=get_project_id(), - # This will get overwritten with a dynamic value at runtime - raw_data_upper_bound_dates_json=json.dumps({}), - pipeline=INGEST_PIPELINE_NAME, - state_code=self._state_code.value, - region=DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE[self._state_code], - ingest_instance=self._default_ingest_instance.value, - ) - - def get_pipeline_specific_dynamic_args(self, dag_run: DagRun) -> Dict[str, Any]: - ingest_instance = get_ingest_instance(dag_run) - if not ingest_instance: - ingest_instance = self._default_ingest_instance.value - max_update_datetimes = dag_run.get_task_instance( - self._max_update_datetimes_operator.task_id - ).xcom_pull() - - return { - "raw_data_upper_bound_dates_json": json.dumps(max_update_datetimes), - "ingest_instance": ingest_instance, - } diff --git a/recidiviz/airflow/dags/calculation/dataflow/metrics_pipeline_task_group_delegate.py b/recidiviz/airflow/dags/calculation/dataflow/metrics_pipeline_task_group_delegate.py deleted file mode 100644 index 5129e68d27..0000000000 --- a/recidiviz/airflow/dags/calculation/dataflow/metrics_pipeline_task_group_delegate.py +++ /dev/null @@ -1,49 +0,0 @@ -# Recidiviz - a data platform for criminal justice reform -# Copyright (C) 2024 Recidiviz, Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# ============================================================================= -"""Implementation of DataflowPipelineTaskGroupDelegate for metrics Dataflow pipeline -task groups. -""" -from typing import Any, Dict - -from airflow.models import DagRun - -from recidiviz.airflow.dags.utils.dataflow_pipeline_group import ( - DataflowPipelineTaskGroupDelegate, -) -from recidiviz.airflow.dags.utils.environment import get_project_id -from recidiviz.pipelines.metrics.pipeline_parameters import MetricsPipelineParameters -from recidiviz.utils.yaml_dict import YAMLDict - - -class MetricsDataflowPipelineTaskGroupDelegate( - DataflowPipelineTaskGroupDelegate[MetricsPipelineParameters] -): - """Implementation of DataflowPipelineTaskGroupDelegate for metrics Dataflow pipeline - task groups. - """ - - def __init__(self, pipeline_config: YAMLDict) -> None: - self._pipeline_config = pipeline_config - - def get_default_parameters(self) -> MetricsPipelineParameters: - return MetricsPipelineParameters( - project=get_project_id(), - **self._pipeline_config.get(), # type: ignore - ) - - def get_pipeline_specific_dynamic_args(self, dag_run: DagRun) -> Dict[str, Any]: - return {} diff --git a/recidiviz/airflow/dags/calculation/dataflow/normalization_pipeline_task_group_delegate.py b/recidiviz/airflow/dags/calculation/dataflow/normalization_pipeline_task_group_delegate.py deleted file mode 100644 index cf01d7c519..0000000000 --- a/recidiviz/airflow/dags/calculation/dataflow/normalization_pipeline_task_group_delegate.py +++ /dev/null @@ -1,52 +0,0 @@ -# Recidiviz - a data platform for criminal justice reform -# Copyright (C) 2024 Recidiviz, Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# ============================================================================= -"""Implementation of DataflowPipelineTaskGroupDelegate for normalization Dataflow -pipeline task groups. -""" -from typing import Any, Dict - -from airflow.models import DagRun - -from recidiviz.airflow.dags.utils.dataflow_pipeline_group import ( - DataflowPipelineTaskGroupDelegate, -) -from recidiviz.airflow.dags.utils.environment import get_project_id -from recidiviz.common.constants.states import StateCode -from recidiviz.pipelines.ingest.pipeline_utils import ( - DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE, -) -from recidiviz.pipelines.normalization.pipeline_parameters import ( - NormalizationPipelineParameters, -) - - -class NormalizationDataflowPipelineTaskGroupDelegate( - DataflowPipelineTaskGroupDelegate[NormalizationPipelineParameters] -): - def __init__(self, state_code: StateCode) -> None: - self._state_code = state_code - - def get_default_parameters(self) -> NormalizationPipelineParameters: - return NormalizationPipelineParameters( - project=get_project_id(), - pipeline="comprehensive_normalization", - state_code=self._state_code.value, - region=DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE[self._state_code], - ) - - def get_pipeline_specific_dynamic_args(self, dag_run: DagRun) -> Dict[str, Any]: - return {} diff --git a/recidiviz/airflow/dags/calculation/dataflow/supplemental_pipeline_task_group_delegate.py b/recidiviz/airflow/dags/calculation/dataflow/supplemental_pipeline_task_group_delegate.py deleted file mode 100644 index 2b7568d94a..0000000000 --- a/recidiviz/airflow/dags/calculation/dataflow/supplemental_pipeline_task_group_delegate.py +++ /dev/null @@ -1,51 +0,0 @@ -# Recidiviz - a data platform for criminal justice reform -# Copyright (C) 2024 Recidiviz, Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# ============================================================================= -"""Implementation of DataflowPipelineTaskGroupDelegate for supplemental Dataflow -pipeline task groups. -""" -from typing import Any, Dict - -from airflow.models import DagRun - -from recidiviz.airflow.dags.utils.dataflow_pipeline_group import ( - DataflowPipelineTaskGroupDelegate, -) -from recidiviz.airflow.dags.utils.environment import get_project_id -from recidiviz.pipelines.supplemental.pipeline_parameters import ( - SupplementalPipelineParameters, -) -from recidiviz.utils.yaml_dict import YAMLDict - - -class SupplementalDataflowPipelineTaskGroupDelegate( - DataflowPipelineTaskGroupDelegate[SupplementalPipelineParameters] -): - """Implementation of DataflowPipelineTaskGroupDelegate for supplemental Dataflow - pipeline task groups. - """ - - def __init__(self, pipeline_config: YAMLDict) -> None: - self._pipeline_config = pipeline_config - - def get_default_parameters(self) -> SupplementalPipelineParameters: - return SupplementalPipelineParameters( - project=get_project_id(), - **self._pipeline_config.get(), # type: ignore - ) - - def get_pipeline_specific_dynamic_args(self, dag_run: DagRun) -> Dict[str, Any]: - return {} diff --git a/recidiviz/airflow/dags/calculation_dag.py b/recidiviz/airflow/dags/calculation_dag.py index 825d49b5c4..3fe06ae012 100644 --- a/recidiviz/airflow/dags/calculation_dag.py +++ b/recidiviz/airflow/dags/calculation_dag.py @@ -19,7 +19,7 @@ This file is uploaded to GCS on deploy. """ from collections import defaultdict -from typing import Dict, Iterable, List, Optional +from typing import Dict, Iterable, List, Optional, Type, Union from airflow.decorators import dag, task, task_group from airflow.models import DagRun @@ -27,17 +27,9 @@ from airflow.utils.task_group import TaskGroup from airflow.utils.trigger_rule import TriggerRule from google.api_core.retry import Retry +from more_itertools import one from requests import Response -from recidiviz.airflow.dags.calculation.dataflow.metrics_pipeline_task_group_delegate import ( - MetricsDataflowPipelineTaskGroupDelegate, -) -from recidiviz.airflow.dags.calculation.dataflow.normalization_pipeline_task_group_delegate import ( - NormalizationDataflowPipelineTaskGroupDelegate, -) -from recidiviz.airflow.dags.calculation.dataflow.supplemental_pipeline_task_group_delegate import ( - SupplementalDataflowPipelineTaskGroupDelegate, -) from recidiviz.airflow.dags.calculation.initialize_calculation_dag_group import ( INGEST_INSTANCE_JINJA_ARG, SANDBOX_PREFIX_JINJA_ARG, @@ -45,18 +37,20 @@ initialize_calculation_dag_group, ) from recidiviz.airflow.dags.monitoring.dag_registry import get_calculation_dag_id +from recidiviz.airflow.dags.operators.recidiviz_dataflow_operator import ( + RecidivizDataflowFlexTemplateOperator, +) from recidiviz.airflow.dags.operators.recidiviz_kubernetes_pod_operator import ( RecidivizKubernetesPodOperator, build_kubernetes_pod_task, ) from recidiviz.airflow.dags.utils.branching_by_key import create_branching_by_key from recidiviz.airflow.dags.utils.config_utils import ( + get_ingest_instance, + get_sandbox_prefix, get_state_code_filter, get_trigger_ingest_dag_post_bq_refresh, ) -from recidiviz.airflow.dags.utils.dataflow_pipeline_group import ( - build_dataflow_pipeline_task_group, -) from recidiviz.airflow.dags.utils.default_args import DEFAULT_ARGS from recidiviz.airflow.dags.utils.environment import get_project_id from recidiviz.metrics.export.products.product_configs import ( @@ -66,8 +60,19 @@ ) from recidiviz.persistence.database.schema_type import SchemaType from recidiviz.pipelines.config_paths import PIPELINE_CONFIG_YAML_PATH -from recidiviz.pipelines.dataflow_orchestration_utils import ( - get_normalization_pipeline_enabled_states, +from recidiviz.pipelines.ingest.pipeline_parameters import IngestPipelineParameters +from recidiviz.pipelines.metrics.pipeline_parameters import MetricsPipelineParameters +from recidiviz.pipelines.normalization.pipeline_parameters import ( + NormalizationPipelineParameters, +) +from recidiviz.pipelines.pipeline_parameters import ( + PIPELINE_INPUT_DATASET_OVERRIDES_JSON_ARG_NAME, + PIPELINE_OUTPUT_SANDBOX_PREFIX_ARG_NAME, + PipelineParameters, + PipelineParametersT, +) +from recidiviz.pipelines.supplemental.pipeline_parameters import ( + SupplementalPipelineParameters, ) from recidiviz.utils.yaml_dict import YAMLDict @@ -235,16 +240,79 @@ def create_pipeline_configs_by_state( return pipeline_params_by_state +def build_dataflow_pipeline_task_group( + pipeline_config: YAMLDict, + parameter_cls: Type[PipelineParametersT], +) -> TaskGroup: + """Builds a task Group that handles creating the flex template operator for a given + pipeline parameters. + """ + params_no_overrides: PipelineParameters = parameter_cls( + project=get_project_id(), + **pipeline_config.get(), # type: ignore + ) + with TaskGroup(group_id=params_no_overrides.job_name) as dataflow_pipeline_group: + + @task(task_id="create_flex_template") + def create_flex_template( + dag_run: Optional[DagRun] = None, + ) -> Dict[str, Union[str, int, bool]]: + if not dag_run: + raise ValueError( + "dag_run not provided. This should be automatically set by Airflow." + ) + + ingest_instance = get_ingest_instance(dag_run) + if not ingest_instance: + raise ValueError( + "[ingest_instance] must be set in dag_run configuration" + ) + + sandbox_prefix = get_sandbox_prefix(dag_run) + + config = pipeline_config.get() + if parameter_cls is IngestPipelineParameters: + config["ingest_instance"] = ingest_instance + + if sandbox_prefix: + config[PIPELINE_OUTPUT_SANDBOX_PREFIX_ARG_NAME] = sandbox_prefix + # TODO(#27373): Actually hydrate this based on which pipelines have + # run earlier in the DAG. + config[PIPELINE_INPUT_DATASET_OVERRIDES_JSON_ARG_NAME] = None + + parameters: PipelineParameters = parameter_cls( + project=get_project_id(), + **config, # type: ignore + ) + + return parameters.flex_template_launch_body() + + _ = RecidivizDataflowFlexTemplateOperator( + task_id="run_pipeline", + location=pipeline_config.peek("region", str), + body=create_flex_template(), + project_id=get_project_id(), + ) + + return dataflow_pipeline_group + + def normalization_pipeline_branches_by_state_code() -> Dict[str, TaskGroup]: - branches_by_state_code = {} - for state_code in get_normalization_pipeline_enabled_states(): - group, _pipeline_task = build_dataflow_pipeline_task_group( - delegate=NormalizationDataflowPipelineTaskGroupDelegate( - state_code=state_code - ), + normalization_pipelines = _get_pipeline_config().pop_dicts( + "normalization_pipelines" + ) + normalization_pipeline_params_by_state: Dict[ + str, List[YAMLDict] + ] = create_pipeline_configs_by_state(normalization_pipelines) + + return { + state_code: build_dataflow_pipeline_task_group( + # There should only be one normalization pipeline per state + pipeline_config=one(parameters_list), + parameter_cls=NormalizationPipelineParameters, ) - branches_by_state_code[state_code.value] = group - return branches_by_state_code + for state_code, parameters_list in normalization_pipeline_params_by_state.items() + } def post_normalization_pipeline_branches_by_state_code() -> Dict[str, TaskGroup]: @@ -270,16 +338,16 @@ def post_normalization_pipeline_branches_by_state_code() -> Dict[str, TaskGroup] ) as state_code_dataflow_pipelines: for pipeline_config in metric_pipeline_params_by_state[state_code]: build_dataflow_pipeline_task_group( - delegate=MetricsDataflowPipelineTaskGroupDelegate(pipeline_config), + pipeline_config=pipeline_config, + parameter_cls=MetricsPipelineParameters, ) for pipeline_config in supplemental_pipeline_parameters_by_state[ state_code ]: build_dataflow_pipeline_task_group( - delegate=SupplementalDataflowPipelineTaskGroupDelegate( - pipeline_config - ), + pipeline_config=pipeline_config, + parameter_cls=SupplementalPipelineParameters, ) branches_by_state_code[state_code] = state_code_dataflow_pipelines diff --git a/recidiviz/airflow/dags/ingest/single_ingest_pipeline_group.py b/recidiviz/airflow/dags/ingest/single_ingest_pipeline_group.py index f2986a4ca6..0da3babbe4 100644 --- a/recidiviz/airflow/dags/ingest/single_ingest_pipeline_group.py +++ b/recidiviz/airflow/dags/ingest/single_ingest_pipeline_group.py @@ -17,17 +17,16 @@ """ Logic for state and ingest instance specific dataflow pipelines. """ +import json import logging -from typing import Dict, Tuple +from typing import Dict, Tuple, Union from airflow.decorators import task +from airflow.models import BaseOperator from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from airflow.utils.task_group import TaskGroup from airflow.utils.trigger_rule import TriggerRule -from recidiviz.airflow.dags.calculation.dataflow.ingest_pipeline_task_group_delegate import ( - IngestDataflowPipelineTaskGroupDelegate, -) from recidiviz.airflow.dags.ingest.add_ingest_job_completion_sql_query_generator import ( AddIngestJobCompletionSqlQueryGenerator, ) @@ -44,16 +43,24 @@ from recidiviz.airflow.dags.operators.cloud_sql_query_operator import ( CloudSqlQueryOperator, ) +from recidiviz.airflow.dags.operators.recidiviz_dataflow_operator import ( + RecidivizDataflowFlexTemplateOperator, +) from recidiviz.airflow.dags.operators.recidiviz_kubernetes_pod_operator import ( build_kubernetes_pod_task, ) from recidiviz.airflow.dags.utils.cloud_sql import cloud_sql_conn_id_for_schema_type -from recidiviz.airflow.dags.utils.dataflow_pipeline_group import ( - build_dataflow_pipeline_task_group, -) +from recidiviz.airflow.dags.utils.environment import get_project_id from recidiviz.common.constants.states import StateCode from recidiviz.ingest.direct.types.direct_ingest_instance import DirectIngestInstance from recidiviz.persistence.database.schema_type import SchemaType +from recidiviz.pipelines.ingest.pipeline_parameters import ( + INGEST_PIPELINE_NAME, + IngestPipelineParameters, +) +from recidiviz.pipelines.ingest.pipeline_utils import ( + DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE, +) # Need a disable pointless statement because Python views the chaining operator ('>>') # as a "pointless" statement @@ -219,6 +226,43 @@ def _release_lock( ) +def _create_dataflow_pipeline( + state_code: StateCode, + ingest_instance: DirectIngestInstance, + max_update_datetimes_operator: BaseOperator, +) -> Tuple[TaskGroup, BaseOperator]: + """Builds a task Group that handles creating the flex template operator for a given + pipeline parameters. + """ + + with TaskGroup(group_id="dataflow_pipeline") as dataflow_pipeline_group: + region = DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE[state_code] + + @task(task_id="create_flex_template") + def create_flex_template( + max_update_datetimes: Dict[str, str] + ) -> Dict[str, Union[str, int, bool]]: + parameters = IngestPipelineParameters( + project=get_project_id(), + ingest_instance=ingest_instance.value, + raw_data_upper_bound_dates_json=json.dumps(max_update_datetimes), + pipeline=INGEST_PIPELINE_NAME, + state_code=state_code.value, + region=region, + ) + + return parameters.flex_template_launch_body() + + run_pipeline = RecidivizDataflowFlexTemplateOperator( + task_id="run_pipeline", + location=region, + body=create_flex_template(max_update_datetimes_operator.output), # type: ignore[arg-type] + project_id=get_project_id(), + ) + + return dataflow_pipeline_group, run_pipeline + + def create_single_ingest_pipeline_group( state_code: StateCode, instance: DirectIngestInstance, @@ -241,12 +285,10 @@ def create_single_ingest_pipeline_group( acquire_lock = _acquire_lock(state_code, instance) - dataflow_pipeline_group, run_pipeline = build_dataflow_pipeline_task_group( - delegate=IngestDataflowPipelineTaskGroupDelegate( - state_code=state_code, - default_ingest_instance=instance, - max_update_datetimes_operator=get_max_update_datetimes, - ) + dataflow_pipeline_group, run_pipeline = _create_dataflow_pipeline( + state_code=state_code, + ingest_instance=instance, + max_update_datetimes_operator=get_max_update_datetimes, ) release_lock = _release_lock(state_code, instance) diff --git a/recidiviz/airflow/dags/utils/dataflow_pipeline_group.py b/recidiviz/airflow/dags/utils/dataflow_pipeline_group.py deleted file mode 100644 index 31c1ae7223..0000000000 --- a/recidiviz/airflow/dags/utils/dataflow_pipeline_group.py +++ /dev/null @@ -1,109 +0,0 @@ -# Recidiviz - a data platform for criminal justice reform -# Copyright (C) 2024 Recidiviz, Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# ============================================================================= -"""Helper for creating a TaskGroup that generates the arguments for, then runs a - Dataflow pipeline. - """ -import abc -from typing import Any, Dict, Generic, Optional, Tuple, Union - -import attr -from airflow.decorators import task -from airflow.models import DagRun -from airflow.utils.task_group import TaskGroup - -from recidiviz.airflow.dags.operators.recidiviz_dataflow_operator import ( - RecidivizDataflowFlexTemplateOperator, -) -from recidiviz.airflow.dags.utils.config_utils import get_sandbox_prefix -from recidiviz.airflow.dags.utils.environment import get_project_id -from recidiviz.pipelines.pipeline_parameters import ( - PIPELINE_INPUT_DATASET_OVERRIDES_JSON_ARG_NAME, - PIPELINE_OUTPUT_SANDBOX_PREFIX_ARG_NAME, - PipelineParameters, - PipelineParametersT, -) - -CREATE_FLEX_TEMPLATE_TASK_ID = "create_flex_template" -DATAFLOW_OPERATOR_TASK_ID = "run_pipeline" - - -class DataflowPipelineTaskGroupDelegate(Generic[PipelineParametersT]): - """Class that provides information about how to build pipeline arguments of a given - type for a pipeline running in the context of an Airflow DAG. - """ - - @abc.abstractmethod - def get_default_parameters(self) -> PipelineParametersT: - """Subclasses should implement to return a set of PipelineParameters for the - desired pipeline. This set of parameters may be updated to incorporate any - sandbox arguments set dynamically for a given DAG run before actually running - the pipeline. - """ - - @abc.abstractmethod - def get_pipeline_specific_dynamic_args(self, dag_run: DagRun) -> Dict[str, Any]: - """Returns PipelineParameters keyword arguments that can only be generated - at runtime (i.e. based on results of previous tasks or DAG input arguments). - """ - - -def build_dataflow_pipeline_task_group( - delegate: DataflowPipelineTaskGroupDelegate, -) -> Tuple[TaskGroup, RecidivizDataflowFlexTemplateOperator]: - """Builds a TaskGroup that handles running a dataflow pipeline specified by the - provided |delegate|. - - Returns both the overall task group and the actual dataflow pipeline task. - """ - params_no_overrides = delegate.get_default_parameters() - with TaskGroup( - group_id=params_no_overrides.job_name, - ) as dataflow_pipeline_group: - - @task(task_id=CREATE_FLEX_TEMPLATE_TASK_ID) - def create_flex_template( - dag_run: Optional[DagRun] = None, - ) -> Dict[str, Union[str, int, bool]]: - if not dag_run: - raise ValueError( - "dag_run not provided. This should be automatically set by Airflow." - ) - - sandbox_prefix = get_sandbox_prefix(dag_run) - - dynamic_args = { - **attr.asdict(params_no_overrides), - **delegate.get_pipeline_specific_dynamic_args(dag_run), - PIPELINE_OUTPUT_SANDBOX_PREFIX_ARG_NAME: sandbox_prefix, - # TODO(#27373): Actually hydrate this based on which pipelines have - # run earlier in the DAG. - PIPELINE_INPUT_DATASET_OVERRIDES_JSON_ARG_NAME: None, - } - - parameters_cls = type(params_no_overrides) - parameters: PipelineParameters = parameters_cls(**dynamic_args) - - return parameters.flex_template_launch_body() - - run_pipeline = RecidivizDataflowFlexTemplateOperator( - task_id=DATAFLOW_OPERATOR_TASK_ID, - location=params_no_overrides.region, - body=create_flex_template(), - project_id=get_project_id(), - ) - - return dataflow_pipeline_group, run_pipeline diff --git a/recidiviz/airflow/tests/calculation_dag_test.py b/recidiviz/airflow/tests/calculation_dag_test.py index 14222a1757..641cac2d4e 100644 --- a/recidiviz/airflow/tests/calculation_dag_test.py +++ b/recidiviz/airflow/tests/calculation_dag_test.py @@ -19,7 +19,7 @@ """ import os from typing import Dict, List, Set -from unittest.mock import MagicMock, patch +from unittest.mock import patch import yaml from airflow.models import DagRun, import_all_models @@ -35,11 +35,7 @@ ) from recidiviz.airflow.tests.test_utils import DAG_FOLDER, AirflowIntegrationTest from recidiviz.airflow.tests.utils.dag_helper_functions import fake_operator_constructor -from recidiviz.common.constants.states import StateCode from recidiviz.persistence.database.schema_type import SchemaType -from recidiviz.pipelines.ingest.pipeline_utils import ( - DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE, -) from recidiviz.tests import pipelines as recidiviz_pipelines_tests_module from recidiviz.utils.environment import GCPEnvironment from recidiviz.utils.yaml_dict import YAMLDict @@ -621,10 +617,6 @@ def test_execute_entrypoint_arguments_nonetypes(self) -> None: ) -@patch.dict( - DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE, - values={StateCode.US_XX: "us-east1", StateCode.US_YY: "us-east2"}, -) class TestCalculationDagIntegration(AirflowIntegrationTest): """ Integration tests for the calculation DAG. @@ -643,17 +635,6 @@ def setUp(self) -> None: ) self.environment_patcher.start() - self.ingest_states_patcher = patch( - "recidiviz.pipelines.dataflow_orchestration_utils.get_direct_ingest_states_launched_in_env", - MagicMock(return_value=[StateCode.US_XX, StateCode.US_YY]), - ) - self.ingest_states_patcher.start() - self.ingest_regions_patcher = patch.dict( - DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE, - values={StateCode.US_XX: "us-east1", StateCode.US_YY: "us-east2"}, - ) - self.ingest_regions_patcher.start() - from recidiviz.airflow.dags.calculation_dag import create_calculation_dag self.project_patcher = patch( @@ -681,7 +662,7 @@ def setUp(self) -> None: self.kubernetes_pod_operator_patcher.start() self.recidiviz_dataflow_operator_patcher = patch( - "recidiviz.airflow.dags.utils.dataflow_pipeline_group.RecidivizDataflowFlexTemplateOperator", + "recidiviz.airflow.dags.calculation_dag.RecidivizDataflowFlexTemplateOperator", side_effect=fake_operator_constructor, ) self.recidiviz_dataflow_operator_patcher.start() @@ -697,8 +678,6 @@ def setUp(self) -> None: def tearDown(self) -> None: super().tearDown() self.environment_patcher.stop() - self.ingest_states_patcher.stop() - self.ingest_regions_patcher.stop() self.project_patcher.stop() self.pipeline_config_yaml_path_patcher.stop() self.project_environment_patcher.stop() diff --git a/recidiviz/airflow/tests/ingest/single_ingest_pipeline_group_test.py b/recidiviz/airflow/tests/ingest/single_ingest_pipeline_group_test.py index ce9ff74692..8c99c52505 100644 --- a/recidiviz/airflow/tests/ingest/single_ingest_pipeline_group_test.py +++ b/recidiviz/airflow/tests/ingest/single_ingest_pipeline_group_test.py @@ -51,7 +51,7 @@ from recidiviz.common.constants.states import StateCode from recidiviz.ingest.direct.types.direct_ingest_instance import DirectIngestInstance from recidiviz.pipelines.ingest.pipeline_utils import ( - DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE, + DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE, ) from recidiviz.utils.environment import GCPEnvironment @@ -85,7 +85,7 @@ def test_single_ingest_pipeline_group_dag() -> None: @patch.dict( - DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE, + DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE, values={StateCode.US_XX: "us-east1-test"}, ) class TestSingleIngestPipelineGroup(unittest.TestCase): @@ -181,8 +181,9 @@ def test_dataflow_pipeline_task_exists(self) -> None: StateCode.US_XX, DirectIngestInstance.PRIMARY ) - task_group_id = "us_xx_primary_dataflow.us-xx-ingest-primary" - dataflow_pipeline_task = test_dag.get_task(f"{task_group_id}.run_pipeline") + dataflow_pipeline_task = test_dag.get_task( + f"{get_ingest_branch_key(StateCode.US_XX.value, DirectIngestInstance.PRIMARY.value)}.dataflow_pipeline.run_pipeline" + ) if not isinstance( dataflow_pipeline_task, RecidivizDataflowFlexTemplateOperator @@ -198,15 +199,18 @@ def test_dataflow_pipeline_task(self) -> None: test_dag = _create_test_single_ingest_pipeline_group_dag( StateCode.US_XX, DirectIngestInstance.PRIMARY ) - task_group_id = "us_xx_primary_dataflow.us-xx-ingest-primary" + + ingest_branch_key = get_ingest_branch_key( + StateCode.US_XX.value, DirectIngestInstance.PRIMARY.value + ) task: RecidivizDataflowFlexTemplateOperator = test_dag.get_task( # type: ignore - f"{task_group_id}.run_pipeline" + f"{ingest_branch_key}.dataflow_pipeline.run_pipeline" ) self.assertEqual(task.location, "us-east1-test") self.assertEqual(task.project_id, _PROJECT_ID) - self.assertEqual(task.body.operator.task_id, f"{task_group_id}.create_flex_template") # type: ignore + self.assertEqual(task.body.operator.task_id, f"{ingest_branch_key}.dataflow_pipeline.create_flex_template") # type: ignore def _fake_failure_execute(*args: Any, **kwargs: Any) -> None: @@ -230,7 +234,7 @@ def _fake_pod_operator_ingest_pipeline_should_run_in_dag_false( @patch.dict( - DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE, + DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE, values={StateCode.US_XX: "us-east1-test"}, ) class TestSingleIngestPipelineGroupIntegration(AirflowIntegrationTest): @@ -259,7 +263,7 @@ def setUp(self) -> None: self.cloud_sql_query_operator_patcher.start() self.recidiviz_dataflow_operator_patcher = patch( - "recidiviz.airflow.dags.utils.dataflow_pipeline_group.RecidivizDataflowFlexTemplateOperator", + "recidiviz.airflow.dags.ingest.single_ingest_pipeline_group.RecidivizDataflowFlexTemplateOperator", side_effect=fake_operator_constructor, ) self.mock_dataflow_operator = self.recidiviz_dataflow_operator_patcher.start() @@ -297,7 +301,7 @@ def test_ingest_pipeline_should_run_in_dag_false(self) -> None: r".*should_run_based_on_watermarks", r".*verify_raw_data_flashing_not_in_progress", r".*acquire_lock", - r"us_xx_primary_dataflow\.us-xx-ingest-primary.*", + r".*_dataflow\.dataflow_pipeline.*", r".*release_lock", r".*write_ingest_job_completion", r".*write_upper_bounds", @@ -331,7 +335,7 @@ def test_initialize_dataflow_pipeline_short_circuits_when_watermark_datetime_gre expected_skipped_ids=[ r".*verify_raw_data_flashing_not_in_progress", r".*acquire_lock", - r"us_xx_primary_dataflow\.us-xx-ingest-primary.*", + r".*_dataflow\.dataflow_pipeline.*", r".*release_lock", r".*write_ingest_job_completion", r".*write_upper_bounds", @@ -363,7 +367,7 @@ def test_failed_verify_raw_data_flashing_not_in_progress( expected_failure_ids=[ r".*verify_raw_data_flashing_not_in_progress", r".*acquire_lock", - r"us_xx_primary_dataflow\.us-xx-ingest-primary.*", + r".*_dataflow\.dataflow_pipeline.*", r".*write_ingest_job_completion", r".*write_upper_bounds", _DOWNSTREAM_TASK_ID, @@ -408,7 +412,7 @@ def test_failed_dataflow_pipeline(self) -> None: test_dag, session, expected_failure_ids=[ - r"us_xx_primary_dataflow\.us-xx-ingest-primary\.run_pipeline", + r".*dataflow_pipeline.run_pipeline", r".*write_ingest_job_completion", r".*write_upper_bounds", _DOWNSTREAM_TASK_ID, @@ -432,7 +436,7 @@ def test_failed_acquire_lock(self, mock_acquire_lock: MagicMock) -> None: session, expected_failure_ids=[ r".*acquire_lock", - r"us_xx_primary_dataflow\.us-xx-ingest-primary.*", + r".*_dataflow\.dataflow_pipeline.*", r".*write_ingest_job_completion", r".*write_upper_bounds", _DOWNSTREAM_TASK_ID, diff --git a/recidiviz/airflow/tests/ingest_dag_test.py b/recidiviz/airflow/tests/ingest_dag_test.py index 0d0fb07e19..e477de93dc 100644 --- a/recidiviz/airflow/tests/ingest_dag_test.py +++ b/recidiviz/airflow/tests/ingest_dag_test.py @@ -33,9 +33,6 @@ ) from recidiviz.common.constants.states import StateCode from recidiviz.ingest.direct.types.direct_ingest_instance import DirectIngestInstance -from recidiviz.pipelines.ingest.pipeline_utils import ( - DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE, -) _PROJECT_ID = "recidiviz-testing" @@ -98,13 +95,13 @@ def setUp(self) -> None: self.cloud_sql_query_operator_patcher.start() self.recidiviz_dataflow_operator_patcher = patch( - "recidiviz.airflow.dags.utils.dataflow_pipeline_group.RecidivizDataflowFlexTemplateOperator", + "recidiviz.airflow.dags.ingest.single_ingest_pipeline_group.RecidivizDataflowFlexTemplateOperator", side_effect=fake_operator_constructor, ) self.mock_dataflow_operator = self.recidiviz_dataflow_operator_patcher.start() self.default_ingest_pipeline_regions_by_state_code_patcher = patch.dict( - DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE, + "recidiviz.airflow.dags.ingest.single_ingest_pipeline_group.DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE", values={StateCode.US_XX: "us-east1-test", StateCode.US_YY: "us-east2-test"}, ) self.default_ingest_pipeline_regions_by_state_code_patcher.start() @@ -234,7 +231,7 @@ def fail_for_us_xx_primary_acquire_lock( session=session, expected_failure_ids=[ ".*us_xx_primary_dataflow.acquire_lock.*", - ".*us_xx_primary_dataflow.us-xx-ingest-primary.*", + ".*us_xx_primary_dataflow.dataflow_pipeline.*", ".*us_xx_primary_dataflow.write_ingest_job_completion", ".*us_xx_primary_dataflow.write_upper_bounds.*", ".*ingest_branching.branch_end.*", @@ -289,7 +286,7 @@ def fake_ingest_pipeline_should_run_in_dag( r".*us_xx_primary_dataflow.initialize_dataflow_pipeline.should_run_based_on_watermarks", r".*us_xx_primary_dataflow.initialize_dataflow_pipeline.verify_raw_data_flashing_not_in_progress", r".*us_xx_primary_dataflow.acquire_lock.*", - r".*us_xx_primary_dataflow.us-xx-ingest-primary.*", + r".*us_xx_primary_dataflow.dataflow_pipeline.*", r".*us_xx_primary_dataflow.write_ingest_job_completion", r".*us_xx_primary_dataflow.write_upper_bounds.*", r".*us_xx_primary_dataflow.release_lock.*", @@ -365,14 +362,14 @@ def fail_for_us_yy_primary_acquire_lock( r".*us_xx_primary_dataflow.initialize_dataflow_pipeline.should_run_based_on_watermarks", r".*us_xx_primary_dataflow.initialize_dataflow_pipeline.verify_raw_data_flashing_not_in_progress", r".*us_xx_primary_dataflow.acquire_lock.*", - r".*us_xx_primary_dataflow.us-xx-ingest-primary.*", + r".*us_xx_primary_dataflow.dataflow_pipeline.*", r".*us_xx_primary_dataflow.write_ingest_job_completion", r".*us_xx_primary_dataflow.write_upper_bounds.*", r".*us_xx_primary_dataflow.release_lock.*", ], expected_failure_ids=[ ".*us_yy_primary_dataflow.acquire_lock.*", - ".*us_yy_primary_dataflow.us-yy-ingest-primary.*", + ".*us_yy_primary_dataflow.dataflow_pipeline.*", ".*us_yy_primary_dataflow.write_ingest_job_completion", ".*us_yy_primary_dataflow.write_upper_bounds.*", ".*ingest_branching.branch_end.*", diff --git a/recidiviz/pipelines/calculation_pipeline_templates.yaml b/recidiviz/pipelines/calculation_pipeline_templates.yaml index a622ec10ae..68c4b6c52a 100644 --- a/recidiviz/pipelines/calculation_pipeline_templates.yaml +++ b/recidiviz/pipelines/calculation_pipeline_templates.yaml @@ -7,6 +7,52 @@ # to determine which region has the most capacity for a new pipeline. # List of pipelines with their necessary details + +normalization_pipelines: + # Comprehensive normalization pipelines + - pipeline: comprehensive_normalization + state_code: US_AR + region: us-east1 + staging_only: True + - pipeline: comprehensive_normalization + state_code: US_CA + region: us-east1 + - pipeline: comprehensive_normalization + state_code: US_CO + region: us-west3 + - pipeline: comprehensive_normalization + state_code: US_IX + region: us-west3 + - pipeline: comprehensive_normalization + state_code: US_ME + region: us-east1 + - pipeline: comprehensive_normalization + state_code: US_MO + region: us-central1 + - pipeline: comprehensive_normalization + state_code: US_ND + region: us-west1 + - pipeline: comprehensive_normalization + state_code: US_OR + region: us-west3 + - pipeline: comprehensive_normalization + state_code: US_OZ + region: us-west3 + staging_only: True + - pipeline: comprehensive_normalization + state_code: US_PA + region: us-west3 + - pipeline: comprehensive_normalization + state_code: US_TN + region: us-east1 + - pipeline: comprehensive_normalization + state_code: US_MI + region: us-central1 + - pipeline: comprehensive_normalization + state_code: US_AZ + region: us-west1 + staging_only: True + metric_pipelines: # Full US_AR calculations with no time limit - pipeline: supervision_metrics diff --git a/recidiviz/pipelines/dataflow_orchestration_utils.py b/recidiviz/pipelines/dataflow_orchestration_utils.py index fecf5e3aac..bcecc9030d 100644 --- a/recidiviz/pipelines/dataflow_orchestration_utils.py +++ b/recidiviz/pipelines/dataflow_orchestration_utils.py @@ -18,16 +18,21 @@ from typing import Set from recidiviz.common.constants.states import StateCode -from recidiviz.ingest.direct.regions.direct_ingest_region_utils import ( - get_direct_ingest_states_launched_in_env, -) from recidiviz.pipelines.config_paths import PIPELINE_CONFIG_YAML_PATH from recidiviz.utils.yaml_dict import YAMLDict def get_normalization_pipeline_enabled_states() -> Set[StateCode]: """Returns all states that have scheduled normalization pipelines that run.""" - return set(get_direct_ingest_states_launched_in_env()) + pipeline_templates_yaml = YAMLDict.from_path(PIPELINE_CONFIG_YAML_PATH) + + normalization_pipelines = pipeline_templates_yaml.pop_dicts( + "normalization_pipelines" + ) + return { + StateCode(pipeline.peek("state_code", str)) + for pipeline in normalization_pipelines + } def get_metric_pipeline_enabled_states() -> Set[StateCode]: diff --git a/recidiviz/pipelines/ingest/pipeline_utils.py b/recidiviz/pipelines/ingest/pipeline_utils.py index caa9b25fb4..bee19e6555 100644 --- a/recidiviz/pipelines/ingest/pipeline_utils.py +++ b/recidiviz/pipelines/ingest/pipeline_utils.py @@ -19,9 +19,9 @@ from recidiviz.common.constants.states import StateCode -# The compute region (e.g. "us-east1") Dataflow pipelines for a given state should be +# The compute region (e.g. "us-east1") ingest pipelines for a given state should be # run in. -DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE: Dict[StateCode, str] = { +DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE: Dict[StateCode, str] = { StateCode.US_AR: "us-east1", StateCode.US_CA: "us-east1", StateCode.US_CO: "us-west3", diff --git a/recidiviz/tests/admin_panel/ingest_dataflow_operations_test.py b/recidiviz/tests/admin_panel/ingest_dataflow_operations_test.py index c6af007090..29793e2cf2 100644 --- a/recidiviz/tests/admin_panel/ingest_dataflow_operations_test.py +++ b/recidiviz/tests/admin_panel/ingest_dataflow_operations_test.py @@ -43,7 +43,7 @@ ) from recidiviz.ingest.direct.types.direct_ingest_instance import DirectIngestInstance from recidiviz.pipelines.ingest.pipeline_utils import ( - DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE, + DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE, ) from recidiviz.tests import pipelines as recidiviz_pipelines_tests_module from recidiviz.tools.postgres import local_persistence_helpers, local_postgres_helpers @@ -59,7 +59,7 @@ @patch.dict( - DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE, + DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE, values={StateCode.US_XX: "us-east1", StateCode.US_YY: "us-east2"}, ) @pytest.mark.uses_db diff --git a/recidiviz/tests/pipelines/calculation_pipeline_templates_yaml_test.py b/recidiviz/tests/pipelines/calculation_pipeline_templates_yaml_test.py index 7c8727894f..0472b73ddc 100644 --- a/recidiviz/tests/pipelines/calculation_pipeline_templates_yaml_test.py +++ b/recidiviz/tests/pipelines/calculation_pipeline_templates_yaml_test.py @@ -31,7 +31,6 @@ from recidiviz.pipelines.config_paths import PIPELINE_CONFIG_YAML_PATH from recidiviz.pipelines.dataflow_orchestration_utils import ( get_metric_pipeline_enabled_states, - get_normalization_pipeline_enabled_states, ) from recidiviz.pipelines.metrics.pipeline_parameters import MetricsPipelineParameters from recidiviz.utils.environment import GCP_PROJECT_PRODUCTION @@ -42,9 +41,18 @@ class TestConfiguredPipelines(unittest.TestCase): """Tests the configuration of pipelines.""" def test_normalization_pipeline_completeness(self) -> None: - state_codes_with_normalization_pipelines: Set[ - StateCode - ] = get_normalization_pipeline_enabled_states() + state_codes_with_normalization_pipelines: Set[StateCode] = set() + + pipeline_templates_yaml = YAMLDict.from_path(PIPELINE_CONFIG_YAML_PATH) + + normalization_pipelines = pipeline_templates_yaml.pop_dicts( + "normalization_pipelines" + ) + + for pipeline in normalization_pipelines: + state_codes_with_normalization_pipelines.add( + StateCode(pipeline.peek("state_code", str)) + ) for state_code in get_metric_pipeline_enabled_states(): if state_code not in state_codes_with_normalization_pipelines: diff --git a/recidiviz/tests/pipelines/dataflow_output_table_manager_test.py b/recidiviz/tests/pipelines/dataflow_output_table_manager_test.py index 2a590f81ab..f69b7b7164 100644 --- a/recidiviz/tests/pipelines/dataflow_output_table_manager_test.py +++ b/recidiviz/tests/pipelines/dataflow_output_table_manager_test.py @@ -15,6 +15,7 @@ # along with this program. If not, see . # ============================================================================= """Tests the dataflow_output_table_manager.""" +import os import unittest from typing import List, cast from unittest import mock @@ -51,6 +52,11 @@ normalized_state_dataset_for_state_code, ) +FAKE_PIPELINE_CONFIG_YAML_PATH = os.path.join( + os.path.dirname(__file__), + "fake_calculation_pipeline_templates.yaml", +) + class DataflowMetricTableManagerTest(unittest.TestCase): """Tests the update_dataflow_metric_tables_schemas function in @@ -216,8 +222,8 @@ def tearDown(self) -> None: self.project_number_patcher.stop() @mock.patch( - "recidiviz.pipelines.dataflow_orchestration_utils.get_direct_ingest_states_launched_in_env", - MagicMock(return_value=[StateCode.US_XX, StateCode.US_YY]), + "recidiviz.pipelines.dataflow_orchestration_utils.PIPELINE_CONFIG_YAML_PATH", + FAKE_PIPELINE_CONFIG_YAML_PATH, ) def test_update_state_specific_normalized_state_schemas(self) -> None: def mock_dataset_ref_for_id( @@ -248,8 +254,8 @@ def mock_dataset_ref_for_id( ) @mock.patch( - "recidiviz.pipelines.dataflow_orchestration_utils.get_direct_ingest_states_launched_in_env", - MagicMock(return_value=[StateCode.US_XX, StateCode.US_YY]), + "recidiviz.pipelines.dataflow_orchestration_utils.PIPELINE_CONFIG_YAML_PATH", + FAKE_PIPELINE_CONFIG_YAML_PATH, ) def test_update_state_specific_normalized_state_schemas_adds_dataset_prefix( self, @@ -307,8 +313,8 @@ def test_update_normalized_table_schemas_in_dataset_update_table(self) -> None: self.mock_client.create_table_with_schema.assert_not_called() @mock.patch( - "recidiviz.pipelines.dataflow_orchestration_utils.get_direct_ingest_states_launched_in_env", - MagicMock(return_value=[StateCode.US_XX, StateCode.US_YY]), + "recidiviz.pipelines.dataflow_orchestration_utils.PIPELINE_CONFIG_YAML_PATH", + FAKE_PIPELINE_CONFIG_YAML_PATH, ) def test_get_all_state_specific_normalized_state_datasets(self) -> None: dataset_ids: List[str] = [] @@ -363,6 +369,16 @@ def setUp(self) -> None: self.project_id, self.mock_view_dataset_name ) + self.dataflow_config_patcher = mock.patch( + "recidiviz.pipelines.dataflow_output_table_manager.dataflow_config" + ) + self.mock_dataflow_config = self.dataflow_config_patcher.start() + + self.mock_pipeline_template_path = FAKE_PIPELINE_CONFIG_YAML_PATH + self.mock_dataflow_config.PIPELINE_CONFIG_YAML_PATH = ( + self.mock_pipeline_template_path + ) + self.project_id_patcher = patch("recidiviz.utils.metadata.project_id") self.project_id_patcher.start().return_value = self.project_id self.project_number_patcher = patch("recidiviz.utils.metadata.project_number") @@ -380,6 +396,7 @@ def tearDown(self) -> None: self.bq_client_patcher.stop() self.project_id_patcher.stop() self.project_number_patcher.stop() + self.dataflow_config_patcher.stop() def test_update_supplemental_data_schemas_create_table(self) -> None: """Test that update_supplemental_dataset_schemas calls the client to create a diff --git a/recidiviz/tests/pipelines/ingest/pipeline_parameters_test.py b/recidiviz/tests/pipelines/ingest/pipeline_parameters_test.py index bf5468539a..95aaeed218 100644 --- a/recidiviz/tests/pipelines/ingest/pipeline_parameters_test.py +++ b/recidiviz/tests/pipelines/ingest/pipeline_parameters_test.py @@ -25,7 +25,7 @@ from recidiviz.ingest.direct.types.direct_ingest_instance import DirectIngestInstance from recidiviz.pipelines.ingest.pipeline_parameters import IngestPipelineParameters from recidiviz.pipelines.ingest.pipeline_utils import ( - DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE, + DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE, ) from recidiviz.pipelines.ingest.state.pipeline import StateIngestPipeline from recidiviz.tools.utils.run_sandbox_dataflow_pipeline_utils import ( @@ -381,7 +381,7 @@ def test_default_ingest_pipeline_regions_by_state_code_filled_out(self) -> None: for state_code, _instance in get_ingest_pipeline_enabled_state_and_instance_pairs() } - states_with_regions = set(DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE.keys()) + states_with_regions = set(DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE.keys()) states_missing_regions = pipeline_enabled_states - states_with_regions if states_missing_regions: self.fail( diff --git a/recidiviz/tests/pipelines/pipeline_parameters_test.py b/recidiviz/tests/pipelines/pipeline_parameters_test.py index 04f652b267..0da04d0171 100644 --- a/recidiviz/tests/pipelines/pipeline_parameters_test.py +++ b/recidiviz/tests/pipelines/pipeline_parameters_test.py @@ -63,6 +63,15 @@ def test_metrics_pipelines_for_valid_parameters(self) -> None: d: dict[str, Any] = pipeline.get() MetricsPipelineParameters(project=self.PROJECT_ID, **d) + def test_normalization_pipelines_for_valid_parameters(self) -> None: + normalization_pipelines = self.PIPELINE_CONFIG.pop_dicts( + "normalization_pipelines" + ) + + for pipeline in normalization_pipelines: + d: dict[str, Any] = pipeline.get() + NormalizationPipelineParameters(project=self.PROJECT_ID, **d) + def test_supplemental_pipelines_for_valid_parameters(self) -> None: supplemental_pipelines = self.PIPELINE_CONFIG.pop_dicts( "supplemental_dataset_pipelines" diff --git a/recidiviz/tools/validate_source_visibility.py b/recidiviz/tools/validate_source_visibility.py index e4dc62f9cd..fd9da60209 100644 --- a/recidiviz/tools/validate_source_visibility.py +++ b/recidiviz/tools/validate_source_visibility.py @@ -265,14 +265,13 @@ def main() -> int: "recidiviz.big_query.big_query_address", "recidiviz.common", "recidiviz.cloud_storage.gcsfs_path", - "recidiviz.ingest.direct.direct_ingest_regions", - "recidiviz.ingest.direct.regions.direct_ingest_region_utils", + "recidiviz.ingest.direct.dataset_config", "recidiviz.ingest.direct.types.direct_ingest_instance", "recidiviz.metrics.export.products", "recidiviz.persistence.database.schema_type", "recidiviz.pipelines.config_paths", - "recidiviz.pipelines.dataflow_orchestration_utils", - "recidiviz.pipelines.ingest.pipeline_utils", + "recidiviz.pipelines.ingest.pipeline_parameters", + "recidiviz.pipelines.ingest.dataset_config", "recidiviz.pipelines.metrics.pipeline_parameters", "recidiviz.pipelines.normalization.pipeline_parameters", "recidiviz.pipelines.normalization.dataset_config",