Skip to content

Commit

Permalink
[Airflow] Unify Dataflow task group code (Recidiviz/recidiviz-data#29453
Browse files Browse the repository at this point in the history
)

## Description of the change

We had two slightly different versions of the code that builds a task
group for running a Dataflow pipeline in Airflow. One was specific to
building ingest pipeline parameters / running ingest pipelines, and the
other was built to handle all pipeline types handled by the calc DAG. In
preparation for moving the ingest pipelines into the calc DAG, this PR
unifies that logic into a single `build_dataflow_pipeline_task_group`
helper.

Functionality changes:
* Normalization pipelines no longer have to be specified in
`calculation_pipeline_templates.yaml` to be turned on. They will use the
same cloud region as the ingest pipeline does (as defined in
`DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE`). In order to keep
things super simple, normalization pipelines run *even* if there are no
ingest views defined. I tested an IA pipeline and it ran fine without
crashing and cost $0.13, so this shouldn't be a big deal as a stop gap
until the ingest and normalization pipelines are unified (we don't run
ingest pipelines if no ingest views are defined).
* There were a few slight changes to task_id structure for dataflow task
groups

## Type of change

> All pull requests must have at least one of the following labels
applied (otherwise the PR will fail):

| Label | Description |
|-----------------------------
|-----------------------------------------------------------------------------------------------------------
|
| Type: Bug | non-breaking change that fixes an issue |
| Type: Feature | non-breaking change that adds functionality |
| Type: Breaking Change | fix or feature that would cause existing
functionality to not work as expected |
| Type: Non-breaking refactor | change addresses some tech debt item or
prepares for a later change, but does not change functionality |
| Type: Configuration Change | adjusts configuration to achieve some end
related to functionality, development, performance, or security |
| Type: Dependency Upgrade | upgrades a project dependency - these
changes are not included in release notes |

## Related issues

Part of Recidiviz/recidiviz-data#27373

## Checklists

### Development

**This box MUST be checked by the submitter prior to merging**:
- [x] **Double- and triple-checked that there is no Personally
Identifiable Information (PII) being mistakenly added in this pull
request**

These boxes should be checked by the submitter prior to merging:
- [x] Tests have been written to cover the code changed/added as part of
this pull request

### Code review

These boxes should be checked by reviewers prior to merging:

- [x] This pull request has a descriptive title and information useful
to a reviewer
- [x] Potential security implications or infrastructural changes have
been considered, if relevant

GitOrigin-RevId: dedeac1cdc1bc51a693ae3836c3542f0174aafb9
  • Loading branch information
ageiduschek authored and Helper Bot committed May 16, 2024
1 parent 596580b commit fc759a8
Show file tree
Hide file tree
Showing 21 changed files with 473 additions and 287 deletions.
6 changes: 3 additions & 3 deletions recidiviz/admin_panel/ingest_dataflow_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from recidiviz.ingest.direct.types.direct_ingest_instance import DirectIngestInstance
from recidiviz.pipelines.ingest.dataset_config import state_dataset_for_state_code
from recidiviz.pipelines.ingest.pipeline_utils import (
DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE,
DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE,
)
from recidiviz.utils import metadata

Expand Down Expand Up @@ -89,10 +89,10 @@ def get_latest_job_for_state_instance(
return None

# TODO(#209930): remove this check once dataflow is launched for all states
if state_code not in DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE:
if state_code not in DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE:
return None

location = DEFAULT_INGEST_PIPELINE_REGIONS_BY_STATE_CODE[state_code]
location = DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE[state_code]
client = dataflow_v1beta3.JobsV1Beta3Client()

if job_id:
Expand Down
16 changes: 16 additions & 0 deletions recidiviz/airflow/dags/calculation/dataflow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Recidiviz - a data platform for criminal justice reform
# Copyright (C) 2024 Recidiviz, Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# =============================================================================
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Recidiviz - a data platform for criminal justice reform
# Copyright (C) 2024 Recidiviz, Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# =============================================================================
"""Implementation of DataflowPipelineTaskGroupDelegate for ingest Dataflow pipeline
task groups.
"""
import json
from typing import Any, Dict

from airflow.models import DagRun

from recidiviz.airflow.dags.operators.cloud_sql_query_operator import (
CloudSqlQueryOperator,
)
from recidiviz.airflow.dags.utils.config_utils import get_ingest_instance
from recidiviz.airflow.dags.utils.dataflow_pipeline_group import (
DataflowPipelineTaskGroupDelegate,
)
from recidiviz.airflow.dags.utils.environment import get_project_id
from recidiviz.common.constants.states import StateCode
from recidiviz.ingest.direct.types.direct_ingest_instance import DirectIngestInstance
from recidiviz.pipelines.ingest.pipeline_parameters import (
INGEST_PIPELINE_NAME,
IngestPipelineParameters,
)
from recidiviz.pipelines.ingest.pipeline_utils import (
DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE,
)


class IngestDataflowPipelineTaskGroupDelegate(
DataflowPipelineTaskGroupDelegate[IngestPipelineParameters]
):
"""Implementation of DataflowPipelineTaskGroupDelegate for ingest Dataflow pipeline
task groups.
"""

def __init__(
self,
state_code: StateCode,
default_ingest_instance: DirectIngestInstance,
max_update_datetimes_operator: CloudSqlQueryOperator,
) -> None:
self._state_code = state_code
self._default_ingest_instance = default_ingest_instance
self._max_update_datetimes_operator = max_update_datetimes_operator

def get_default_parameters(self) -> IngestPipelineParameters:
return IngestPipelineParameters(
project=get_project_id(),
# This will get overwritten with a dynamic value at runtime
raw_data_upper_bound_dates_json=json.dumps({}),
pipeline=INGEST_PIPELINE_NAME,
state_code=self._state_code.value,
region=DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE[self._state_code],
ingest_instance=self._default_ingest_instance.value,
)

def get_pipeline_specific_dynamic_args(self, dag_run: DagRun) -> Dict[str, Any]:
ingest_instance = get_ingest_instance(dag_run)
if not ingest_instance:
ingest_instance = self._default_ingest_instance.value
max_update_datetimes = dag_run.get_task_instance(
self._max_update_datetimes_operator.task_id
).xcom_pull()

return {
"raw_data_upper_bound_dates_json": json.dumps(max_update_datetimes),
"ingest_instance": ingest_instance,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Recidiviz - a data platform for criminal justice reform
# Copyright (C) 2024 Recidiviz, Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# =============================================================================
"""Implementation of DataflowPipelineTaskGroupDelegate for metrics Dataflow pipeline
task groups.
"""
from typing import Any, Dict

from airflow.models import DagRun

from recidiviz.airflow.dags.utils.dataflow_pipeline_group import (
DataflowPipelineTaskGroupDelegate,
)
from recidiviz.airflow.dags.utils.environment import get_project_id
from recidiviz.pipelines.metrics.pipeline_parameters import MetricsPipelineParameters
from recidiviz.utils.yaml_dict import YAMLDict


class MetricsDataflowPipelineTaskGroupDelegate(
DataflowPipelineTaskGroupDelegate[MetricsPipelineParameters]
):
"""Implementation of DataflowPipelineTaskGroupDelegate for metrics Dataflow pipeline
task groups.
"""

def __init__(self, pipeline_config: YAMLDict) -> None:
self._pipeline_config = pipeline_config

def get_default_parameters(self) -> MetricsPipelineParameters:
return MetricsPipelineParameters(
project=get_project_id(),
**self._pipeline_config.get(), # type: ignore
)

def get_pipeline_specific_dynamic_args(self, dag_run: DagRun) -> Dict[str, Any]:
return {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Recidiviz - a data platform for criminal justice reform
# Copyright (C) 2024 Recidiviz, Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# =============================================================================
"""Implementation of DataflowPipelineTaskGroupDelegate for normalization Dataflow
pipeline task groups.
"""
from typing import Any, Dict

from airflow.models import DagRun

from recidiviz.airflow.dags.utils.dataflow_pipeline_group import (
DataflowPipelineTaskGroupDelegate,
)
from recidiviz.airflow.dags.utils.environment import get_project_id
from recidiviz.common.constants.states import StateCode
from recidiviz.pipelines.ingest.pipeline_utils import (
DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE,
)
from recidiviz.pipelines.normalization.pipeline_parameters import (
NormalizationPipelineParameters,
)


class NormalizationDataflowPipelineTaskGroupDelegate(
DataflowPipelineTaskGroupDelegate[NormalizationPipelineParameters]
):
def __init__(self, state_code: StateCode) -> None:
self._state_code = state_code

def get_default_parameters(self) -> NormalizationPipelineParameters:
return NormalizationPipelineParameters(
project=get_project_id(),
pipeline="comprehensive_normalization",
state_code=self._state_code.value,
region=DEFAULT_PIPELINE_REGIONS_BY_STATE_CODE[self._state_code],
)

def get_pipeline_specific_dynamic_args(self, dag_run: DagRun) -> Dict[str, Any]:
return {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Recidiviz - a data platform for criminal justice reform
# Copyright (C) 2024 Recidiviz, Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# =============================================================================
"""Implementation of DataflowPipelineTaskGroupDelegate for supplemental Dataflow
pipeline task groups.
"""
from typing import Any, Dict

from airflow.models import DagRun

from recidiviz.airflow.dags.utils.dataflow_pipeline_group import (
DataflowPipelineTaskGroupDelegate,
)
from recidiviz.airflow.dags.utils.environment import get_project_id
from recidiviz.pipelines.supplemental.pipeline_parameters import (
SupplementalPipelineParameters,
)
from recidiviz.utils.yaml_dict import YAMLDict


class SupplementalDataflowPipelineTaskGroupDelegate(
DataflowPipelineTaskGroupDelegate[SupplementalPipelineParameters]
):
"""Implementation of DataflowPipelineTaskGroupDelegate for supplemental Dataflow
pipeline task groups.
"""

def __init__(self, pipeline_config: YAMLDict) -> None:
self._pipeline_config = pipeline_config

def get_default_parameters(self) -> SupplementalPipelineParameters:
return SupplementalPipelineParameters(
project=get_project_id(),
**self._pipeline_config.get(), # type: ignore
)

def get_pipeline_specific_dynamic_args(self, dag_run: DagRun) -> Dict[str, Any]:
return {}
Loading

0 comments on commit fc759a8

Please sign in to comment.