Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add point_alias DAG and add alias params to create_new_es_index DAGs (#…
…3890) * Update taskgroup to optionally delete old index, add params to create_new_es_index dag * Add separate point alias DAG * Skip all point_alias tasks if target_alias not supplied * `point_alias` needs to have trigger_rule=TriggerRule.NONE_FAILED, because the previous step to remove existing alias my be skipped * however, this is a problem for DAGs that import the entire point_alias taskGroup and try to skip it using a branching operator. Because `point_alias` runs when NONE_FAILED, it will try to run even though the entire taskgroup has been skipped. * easiest solution is to have all the individual tasks in the point_alias group individually handle skipping if the appropriate params haven't been passed in * Update DAGs.md * lint * Notify slack if earlier tasks skipped * Make remove/point alias atomic operation, skip delete when index not provided * Include 'es' in dag ids * Add option to delete old index to the proportional index dag * Update DAGs.md * Update param list in proportional dag docs
- Loading branch information
Showing
5 changed files
with
275 additions
and
36 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
92 changes: 92 additions & 0 deletions
92
catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
""" | ||
# Point ES Alias DAG | ||
This file generates our Point ES Alias DAGs using a factory function. A | ||
separate DAG is generated for the staging and production environments. | ||
The DAGs are used to point a `target_alias` to a `target_index` in the | ||
given environment's elasticsearch cluster. When the alias is applied, it | ||
is first removed from any existing index to which it already applies; | ||
optionally, it can also delete that index afterward. | ||
## When this DAG runs | ||
This DAG is on a `None` schedule and is run manually. | ||
""" | ||
|
||
from datetime import datetime | ||
|
||
from airflow import DAG | ||
from airflow.models.param import Param | ||
from airflow.utils.trigger_rule import TriggerRule | ||
|
||
from common import elasticsearch as es | ||
from common import slack | ||
from common.constants import ( | ||
DAG_DEFAULT_ARGS, | ||
ENVIRONMENTS, | ||
) | ||
|
||
|
||
def point_es_alias_dag(environment: str): | ||
dag = DAG( | ||
dag_id=f"point_{environment}_es_alias", | ||
default_args=DAG_DEFAULT_ARGS, | ||
schedule=None, | ||
start_date=datetime(2024, 1, 31), | ||
tags=["database", "elasticsearch"], | ||
max_active_runs=1, | ||
catchup=False, | ||
doc_md=__doc__, | ||
params={ | ||
"target_index": Param( | ||
type="string", | ||
description=( | ||
"The existing Elasticsearch index to which the target alias" | ||
" should be applied." | ||
), | ||
), | ||
"target_alias": Param( | ||
type="string", | ||
description=( | ||
"The alias which will be applied to the index. If" | ||
" the alias already exists, it will first be removed from the" | ||
" index to which it previously pointed." | ||
), | ||
), | ||
"should_delete_old_index": Param( | ||
default=False, | ||
type="boolean", | ||
description=( | ||
"Whether to delete the index previously pointed to by the" | ||
" `target_alias`." | ||
), | ||
), | ||
}, | ||
render_template_as_native_obj=True, | ||
) | ||
|
||
with dag: | ||
es_host = es.get_es_host(environment=environment) | ||
|
||
point_alias = es.point_alias( | ||
es_host=es_host, | ||
target_index="{{ params.target_index }}", | ||
target_alias="{{ params.target_alias }}", | ||
should_delete_old_index="{{ params.should_delete_old_index }}", | ||
) | ||
|
||
notify_completion = slack.notify_slack.override( | ||
trigger_rule=TriggerRule.NONE_FAILED | ||
)( | ||
text="Alias {{ params.target_alias }} applied to index {{ params.target_index }}.", | ||
dag_id=dag.dag_id, | ||
username="Point Alias", | ||
icon_emoji=":elasticsearch:", | ||
) | ||
|
||
es_host >> point_alias >> notify_completion | ||
|
||
|
||
for environment in ENVIRONMENTS: | ||
point_es_alias_dag(environment) |
Oops, something went wrong.