Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add xcom map_index as a filter to xcom endpoint #32453

Merged
merged 3 commits into from
Jul 10, 2023

Conversation

hussein-awala
Copy link
Member

closes: #32367

This PR add map_index as filter for Xcom endpoints and to their responses, and adds xcom_key as filter for get_xcom_entries.

Here are some examples:

# get all xcom entries for a task instance
curl -X 'GET' \
  'http://localhost:28080/api/v1/dags/test_dynamic_task_mapping/dagRuns/some_run_id/taskInstances/mapped_task/xcomEntries?limit=100' \
  -H 'accept: application/json'
{
  "total_entries": 6,
  "xcom_entries": [
    {
      "dag_id": "test_dynamic_task_mapping",
      "execution_date": "2023-07-09T10:00:00.00+00:00",
      "key": "return_value",
      "map_index": 0,
      "task_id": "mapped_task",
      "timestamp": "2023-07-09T10:19:02.578137+00:00"
    },
    {
      "dag_id": "test_dynamic_task_mapping",
      "execution_date": "2023-07-09T10:00:00.00+00:00",
      "key": "return_value",
      "map_index": 1,
      "task_id": "mapped_task",
      "timestamp": "2023-07-09T10:19:12.624135+00:00"
    },
    {
      "dag_id": "test_dynamic_task_mapping",
      "execution_date": "2023-07-09T10:00:00.00+00:00",
      "key": "return_value",
      "map_index": 2,
      "task_id": "mapped_task",
      "timestamp": "2023-07-09T10:19:22.626682+00:00"
    },
    {
      "dag_id": "test_dynamic_task_mapping",
      "execution_date": "2023-07-09T10:00:00.00+00:00",
      "key": "some_key",
      "map_index": 0,
      "task_id": "mapped_task",
      "timestamp": "2023-07-09T10:19:02.567293+00:00"
    },
    {
      "dag_id": "test_dynamic_task_mapping",
      "execution_date": "2023-07-09T10:00:00.00+00:00",
      "key": "some_key",
      "map_index": 1,
      "task_id": "mapped_task",
      "timestamp": "2023-07-09T10:19:12.600518+00:00"
    },
    {
      "dag_id": "test_dynamic_task_mapping",
      "execution_date": "2023-07-09T10:00:00.00+00:00",
      "key": "some_key",
      "map_index": 2,
      "task_id": "mapped_task",
      "timestamp": "2023-07-09T10:19:22.609229+00:00"
    }
  ]
}

# get all xcom entries for a task instance with a specific map_index
curl -X 'GET' \
  'http://localhost:28080/api/v1/dags/test_dynamic_task_mapping/dagRuns/some_run_id/taskInstances/mapped_task/xcomEntries?map_index=1&limit=100' \
  -H 'accept: application/json'
{
  "total_entries": 2,
  "xcom_entries": [
    {
      "dag_id": "test_dynamic_task_mapping",
      "execution_date": "2023-07-09T10:00:00.00+00:00",
      "key": "return_value",
      "map_index": 1,
      "task_id": "mapped_task",
      "timestamp": "2023-07-09T10:19:12.624135+00:00"
    },
    {
      "dag_id": "test_dynamic_task_mapping",
      "execution_date": "2023-07-09T10:00:00.00+00:00",
      "key": "some_key",
      "map_index": 1,
      "task_id": "mapped_task",
      "timestamp": "2023-07-09T10:19:12.600518+00:00"
    }
  ]
}

# get all xcom entries for a task instance with a specific map_index and key
curl -X 'GET' \
  'http://localhost:28080/api/v1/dags/test_dynamic_task_mapping/dagRuns/some_run_id/taskInstances/mapped_task/xcomEntries?map_index=1&xcom_key=some_key&limit=100' \
  -H 'accept: application/json'
{
  "total_entries": 1,
  "xcom_entries": [
    {
      "dag_id": "test_dynamic_task_mapping",
      "execution_date": "2023-07-09T10:00:00.00+00:00",
      "key": "some_key",
      "map_index": 1,
      "task_id": "mapped_task",
      "timestamp": "2023-07-09T10:19:12.600518+00:00"
    }
  ]
}

And for the value of a single xcom entry:

# get xcom entry for a task instance
curl -X 'GET' \
  'http://localhost:28080/api/v1/dags/test_dynamic_task_mapping/dagRuns/some_run_id/taskInstances/simple_task/xcomEntries/return_value?deserialize=false' \
  -H 'accept: application/json'
{
  "dag_id": "test_dynamic_task_mapping",
  "execution_date": "2023-07-09T10:00:00.00+00:00",
  "key": "return_value",
  "map_index": -1,
  "task_id": "simple_task",
  "timestamp": "2023-07-09T10:18:51.030130+00:00",
  "value": "[1, 2, 3]"
}

# get xcom entry for a mapped task instance, by default there is no result because the default map_index is -1
curl -X 'GET' \
  'http://localhost:28080/api/v1/dags/test_dynamic_task_mapping/dagRuns/some_run_id/taskInstances/mapped_task/xcomEntries/return_value?deserialize=false' \
  -H 'accept: application/json'
{
  "detail": null,
  "status": 404,
  "title": "XCom entry not found",
  "type": "http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com/docs/apache-airflow/latest/stable-rest-api-ref.html#section/Errors/NotFound"
}

# get xcom entry for a mapped task instance with a specific map_index
curl -X 'GET' \
  'http://localhost:28080/api/v1/dags/test_dynamic_task_mapping/dagRuns/some_run_id/taskInstances/mapped_task/xcomEntries/return_value?map_index=0&deserialize=false' \
  -H 'accept: application/json'
{
  "dag_id": "test_dynamic_task_mapping",
  "execution_date": "2023-07-09T10:00:00.00+00:00",
  "key": "return_value",
  "map_index": 0,
  "task_id": "mapped_task",
  "timestamp": "2023-07-09T10:19:02.578137+00:00",
  "value": "2"
}

^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Signed-off-by: Hussein Awala <hussein@awala.fr>
@boring-cyborg boring-cyborg bot added area:API Airflow's REST/HTTP API area:UI Related to UI/UX. For Frontend Developers. area:webserver Webserver related Issues labels Jul 9, 2023
@hussein-awala hussein-awala added the type:bug-fix Changelog: Bug Fixes label Jul 9, 2023
@hussein-awala hussein-awala added this to the Airflow 2.6.4 milestone Jul 9, 2023
Signed-off-by: Hussein Awala <hussein@awala.fr>
Signed-off-by: Hussein Awala <hussein@awala.fr>
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does seem like a very reasonable change.

@uranusjr uranusjr merged commit bc97646 into apache:main Jul 10, 2023
42 checks passed
@eladkal eladkal modified the milestones: Airflow 2.6.4, Airflow 2.7.0 Aug 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:API Airflow's REST/HTTP API area:UI Related to UI/UX. For Frontend Developers. area:webserver Webserver related Issues type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Unable to get mapped task xcom value via REST API. Getting MultipleResultsFound error
4 participants