Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add specific cluster error info, shard info and additional metadata for CCS when minimizing roundtrips #97731

Merged
merged 34 commits into from Aug 7, 2023

Conversation

quux00
Copy link
Contributor

@quux00 quux00 commented Jul 17, 2023

For CCS searches with ccs_minimize_roundtrips=true, when an error is returned, it is unclear which cluster
caused the problem. This commit adds additional accounting and error information to the search response
for each cluster involved in a cross-cluster search.

The _clusters section of the SearchResponse has a new details section added with an entry for each cluster
(remote and local). It includes status info, shard accounting counters and error information that are added
incrementally as the search happens.

The search on each cluster can be in one of 5 states:
RUNNING
SUCCESSFUL - all shards were successfully searched (successful or skipped)
PARTIAL - some shard searches failed, but at least one succeeded and partial data has been returned
SKIPPED - no shards were successfully searched (all failed or cluster unavailable) when skip_unavailable=true
FAILED - no shards were successfully searched (all failed or cluster unavailable) when skip_unavailable=false

A new SearchReponse.Cluster object has been added. Each TransportSearchAction.CCSActionListener
(one for each cluster) has a reference to a separate Cluster instance and updates once it gets back information
from its cluster.

The SearchResponse.Clusters object only uses the new Cluster object for CCS minimize_roundtrips=true.
For local-only searches and CCS minimize_roundtrips=false, it uses the current (immutable) Clusters object as before.

@quux00 quux00 added the WIP label Jul 17, 2023
@quux00 quux00 force-pushed the ccs/mrt-error-messages branch 2 times, most recently from 2853c25 to 4dd24f7 Compare July 18, 2023 00:25
@quux00
Copy link
Contributor Author

quux00 commented Jul 19, 2023

[UPDATE] This comment is now out of date, as the code has changed based on feedback to this question. See this comment for the latest output format details:

Current state for review.

All of the discussion below pertains only to searches with ccs_minimize_roundtrips=true, which is the focus of this PR.

Clusters for a CCS search can be in one of 5 states:

  1. RUNNING
  2. SUCCESSFUL - all shards were successfully searched (successful or skipped)
  3. PARTIAL - some shard searches failed, but at least one succeeded and partial data has been returned
  4. SKIPPED - no shards where successfully searched (all failed or cluster unavailable) when skip_unavailable=true
  5. FAILED - no shards where successfully searched (all failed or cluster unavailable) when skip_unavailable=false

The testing/demo set up I used has three clusters, each with a blogs index and 3 shards.

(toggle) Query used for testing

The busywork function score is used to make queries run for 30 to 90 seconds depending on the value of m and the number of documents indexed.

POST blogs,*:blogs/_async_search?ccs_minimize_roundtrips=true
{
  "size": 2,
  "query": {
    "function_score": {
      "query": {
        "wildcard": {
          "url": "/*"
        }
      },
      "script_score": {
        "script": "int m = 5;\n  double u = 1.0;\n  double z = 2.0;\n for (int x = 0; x < m; ++x) {\n for (int y = 0; y < 10000; ++y) {\n u=Math.log(y);\n  z=z + (Math.log(x) - Math.log(y));\n }\n }\n return u;\n"
      }
    }
  },
  "aggs": {
    "indexgroup": {
      "terms": {
        "field": "_index"
      }
    }
  }
}
(toggle) Intermediate state where all clusters are still running the search
{
  "id": "FjVlY05WdkVrVE9hRDBmNUhrNkpIZUEaWmV5UFJGblJUdUdxTnRobjlKS3lEZzoyOTU=",
  "is_partial": true,
  "is_running": true,
  "start_time_in_millis": 1689777039277,
  "expiration_time_in_millis": 1690209039277,
  "response": {
    "took": 831,
    "timed_out": false,
    "terminated_early": false,
    "num_reduce_phases": 0,
    "_shards": {
      "total": 3,   // note this only knows about local cluster shards at this point
      "successful": 0,
      "skipped": 0,
      "failed": 0
    },
    "_clusters": {
      "total": 3,
      "successful": 0,
      "skipped": 0,
      "details": {
        "(local)": {
          "status": "running"
        },
        "remote2": {
          "status": "running"
        },
        "remote1": {
          "status": "running"
        }
      }
    },
    "hits": {
      "total": {
        "value": 0,
        "relation": "gte"
      },
      "max_score": null,
      "hits": []
    }
  }
}
(toggle) Response showing all clusters having finished successfully
{
  "is_partial": false,
  "is_running": false,
  "start_time_in_millis": 1689776974105,
  "expiration_time_in_millis": 1690208974105,
  "completion_time_in_millis": 1689776974220,
  "response": {
    "took": 115,
    "timed_out": false,
    "num_reduce_phases": 4,
    "_shards": {
      "total": 9,
      "successful": 9,
      "skipped": 0,
      "failed": 0
    },
    "_clusters": {
      "total": 3,
      "successful": 3,
      "skipped": 0,
      "details": {
        "(local)": {
          "status": "successful",
          "total_shards": 3,
          "successful_shards": 3,
          "skipped_shards": 0,
          "failed_shards": 0,
          "percent_shards_successful": "100.00",
          "search_duration": 100
        },
        "remote2": {
          "status": "successful",
          "total_shards": 3,
          "successful_shards": 3,
          "skipped_shards": 0,
          "failed_shards": 0,
          "percent_shards_successful": "100.00",
          "search_duration": 106
        },
        "remote1": {
          "status": "successful",
          "total_shards": 3,
          "successful_shards": 3,
          "skipped_shards": 0,
          "failed_shards": 0,
          "percent_shards_successful": "100.00",
          "search_duration": 108
        }
      }
    },
    "hits": {
      "total": {
        "value": 10000,
        "relation": "gte"
      },
      "max_score": 1,
      "hits": [
        {
          ... 
        }
      ]
    },
    "aggregations": {
      "indexgroup": {
        "doc_count_error_upper_bound": 0,
        "sum_other_doc_count": 0,
        "buckets": [
          {
            "key": "remote1:blogs",
            "doc_count": 4198
          },
          {
            "key": "remote2:blogs",
            "doc_count": 4198
          },
          {
            "key": "blogs",
            "doc_count": 2400
          }
        ]
      }
    }
  }
}
(toggle) Intermediate state shows updated state of clusters that have finished (successfully or not)
{
  "id": "FnN6QTFuVDFtVHMtYTEzak1Kb3hNUkEaWmV5UFJGblJUdUdxTnRobjlKS3lEZzoxMjI=",
  "is_partial": true,
  "is_running": true,
  "start_time_in_millis": 1689777658335,
  "expiration_time_in_millis": 1690209658335,
  "response": {
    "took": 10999,
    "timed_out": false,
    "terminated_early": false,
    "_shards": {
      "total": 3,
      "successful": 3,
      "skipped": 0,
      "failed": 0
    },
    "_clusters": {
      "total": 3,
      "successful": 2,
      "skipped": 0,
      "details": {
        "(local)": {
          "status": "successful",
          "total_shards": 3,
          "successful_shards": 3,
          "skipped_shards": 0,
          "failed_shards": 0,
          "percent_shards_successful": "100.00",
          "search_duration": 5034
        },
        "remote2": {
          "status": "partial",
          "total_shards": 3,
          "successful_shards": 2,
          "skipped_shards": 0,
          "failed_shards": 1,
          "percent_shards_successful": "66.67",
          "search_duration": 8166,
          "errors": [
            {
              "shard": 0,
              "index": "remote2:blogs",
              "node": "dvQdLM35SZqLVG2My3fatA",
              "reason": {
                "type": "i_o_exception",
                "reason": "Detailed IO error message here"
              }
            }
          ]
        },
        "remote1": {
          "status": "running"
        }
      }
    },
    "hits": {
      "total": {
        "value": 2400,
        "relation": "eq"
      },
      "max_score": null,
      "hits": []
    },
    "aggregations": {
      "indexgroup": {
        "doc_count_error_upper_bound": 0,
        "sum_other_doc_count": 0,
        "buckets": [
          {
            "key": "blogs",
            "doc_count": 2400
          }
        ]
      }
    }
  }
}
(toggle) Final state shows updated state of clusters and additional info about shards and duration
{
  "id": "FnN6QTFuVDFtVHMtYTEzak1Kb3hNUkEaWmV5UFJGblJUdUdxTnRobjlKS3lEZzoxMjI=",
  "is_partial": true,     // marked as partial since some of the clusters returned partial results
  "is_running": false,
  "start_time_in_millis": 1689777658335,
  "expiration_time_in_millis": 1690209658335,
  "completion_time_in_millis": 1689777675807,
  "response": {
    "took": 17472,
    "timed_out": false,
    "num_reduce_phases": 4,
    "_shards": {
      "total": 9,
      "successful": 8,
      "skipped": 0,
      "failed": 1,
      "failures": [
        {
          "shard": 0,
          "index": "remote2:blogs",
          "node": "dvQdLM35SZqLVG2My3fatA",
          "reason": {
            "type": "i_o_exception",
            "reason": "Detailed IO error message here"
          }
        }
      ]
    },
    "_clusters": {
      "total": 3,
      "successful": 3,
      "skipped": 0,
      "details": {
        "(local)": {
          "status": "successful",
          "total_shards": 3,
          "successful_shards": 3,
          "skipped_shards": 0,
          "failed_shards": 0,
          "percent_shards_successful": "100.00",
          "search_duration": 5034
        },
        "remote2": {
          "status": "partial",
          "total_shards": 3,
          "successful_shards": 2,
          "skipped_shards": 0,
          "failed_shards": 1,
          "percent_shards_successful": "66.67",
          "search_duration": 8166,
          "errors": [
            {
              "shard": 0,
              "index": "remote2:blogs",
              "node": "dvQdLM35SZqLVG2My3fatA",
              "reason": {
                "type": "i_o_exception",
                "reason": "Detailed IO error message here"
              }
            }
          ]
        },
        "remote1": {
          "status": "successful",
          "total_shards": 3,
          "successful_shards": 3,
          "skipped_shards": 0,
          "failed_shards": 0,
          "percent_shards_successful": "100.00",
          "search_duration": 17466
        }
      }
    },
    "hits": {
      "total": {
        "value": 10000,
        "relation": "gte"
      },
      "max_score": 9.21024,
      "hits": [
        {
          ...
        }
      ]
    },
    "aggregations": {
      "indexgroup": {
        "doc_count_error_upper_bound": 0,
        "sum_other_doc_count": 0,
        "buckets": [
          {
            "key": "remote1:blogs",
            "doc_count": 9198
          },
          {
            "key": "remote2:blogs",
            "doc_count": 2795
          },
          {
            "key": "blogs",
            "doc_count": 2400
          }
        ]
      }
    }
  }
}
(toggle) Final state where a skip_unavailable:false cluster fully failed, causing the whole search to fail
{
  "id": "FlVEUmZZRlhWUUx5ejRFNkxRUFBGZ2caMU9MY29NM2hSZldrR0t2NV9RaUxMZzozNzY=",
  "is_partial": true,
  "is_running": false,
  "start_time_in_millis": 1689735073961,
  "expiration_time_in_millis": 1690167073961,
  "completion_time_in_millis": 1689735108480,
  "response": {
    "took": 34519,
    "timed_out": false,
    "terminated_early": false,
    "_shards": {
      "total": 3,   // for MRT=true, _shards information is inaccurate while search is running and if any cluster is fully skipped or fails
      "successful": 3,
      "skipped": 0,
      "failed": 0
    },
    "_clusters": {
      "total": 3,
      "successful": 2,
      "skipped": 1,
      "details": {
        "(local)": {
          "status": "successful",
          "total_shards": 3,
          "successful_shards": 3,
          "skipped_shards": 0,
          "failed_shards": 0,
          "percent_shards_successful": "100.00",
          "search_duration": 15749
        },
        "remote2": {
          "status": "partial",
          "total_shards": 3,
          "successful_shards": 1,
          "skipped_shards": 0,
          "failed_shards": 2,
          "percent_shards_successful": "33.33",
          "search_duration": 34507,
          "errors": [
            {
              "shard": 0,
              "index": "remote2:blogs",
              "node": "dyViicpVRwyiy8b5RqWa3Q",
              "reason": {
                "type": "parse_exception",
                "reason": "Detailed parse exception here"
              }
            },
            {
              "shard": 2,
              "index": "remote2:blogs",
              "node": "dyViicpVRwyiy8b5RqWa3Q",
              "reason": {
                "type": "i_o_exception",
                "reason": "Detailed IO error message here"
              }
            }
          ]
        },
        "remote1": {
          "status": "failed",
          "errors": [
            {
              "shard": -1,
              "index": null,
              "reason": {
                "type": "search_phase_execution_exception",
                "reason": "all shards failed",
                "phase": "query",
                "grouped": true,
                "failed_shards": [
                  {
                    "shard": 0,
                    "index": "remote1:blogs",
                    "node": "AQtFRIsNR9y7l8rHvjWIhA",
                    "reason": {
                      "type": "parse_exception",
                      "reason": "Detailed parse exception here"
                    }
                  },
                  {
                    "shard": 1,
                    "index": "remote1:blogs",
                    "node": "AQtFRIsNR9y7l8rHvjWIhA",
                    "reason": {
                      "type": "i_o_exception",
                      "reason": "Detailed IO error message here"
                    }
                  },
                  {
                    "shard": 2,
                    "index": "remote1:blogs",
                    "node": "AQtFRIsNR9y7l8rHvjWIhA",
                    "reason": {
                      "type": "role_restriction_exception",
                      "reason": "Detailed role restriction error message here"
                    }
                  }
                ],
                "caused_by": {
                  "type": "parse_exception",
                  "reason": "Detailed parse exception here"
                }
              }
            }
          ]
        }
      }
    },
    "hits": {
      "total": {
        "value": 1200,
        "relation": "eq"
      },
      "max_score": null,
      "hits": []
    },
    "aggregations": {
      "indexgroup": {
        "doc_count_error_upper_bound": 0,
        "sum_other_doc_count": 0,
        "buckets": [
          {
            "key": "blogs",
            "doc_count": 1200
          }
        ]
      }
    }
  },
  "error": {   // because this overall search failed, a top-level "error" section is included (when HTTP status = 500)
    "type": "status_exception",
    "reason": "error while executing search",
    "caused_by": {
      "type": "search_phase_execution_exception",
      "reason": "all shards failed",
      "phase": "query",
      "grouped": true,
      "failed_shards": [
        {
          "shard": 0,
          "index": "remote1:blogs",
          "node": "AQtFRIsNR9y7l8rHvjWIhA",
          "reason": {
            "type": "parse_exception",
            "reason": "Detailed parse exception here"
          }
        },
        {
          "shard": 1,
          "index": "remote1:blogs",
          "node": "AQtFRIsNR9y7l8rHvjWIhA",
          "reason": {
            "type": "i_o_exception",
            "reason": "Detailed IO error message here"
          }
        },
        {
          "shard": 2,
          "index": "remote1:blogs",
          "node": "AQtFRIsNR9y7l8rHvjWIhA",
          "reason": {
            "type": "role_restriction_exception",
            "reason": "Detailed role restriction error message here"
          }
        }
      ],
      "caused_by": {
        "type": "parse_exception",
        "reason": "Detailed parse exception here"
      }
    }
  }
}
(toggle) The `GET _async_search/status/:id` output also shows the new _cluster info:
{
  "id": "FnN6QTFuVDFtVHMtYTEzak1Kb3hNUkEaWmV5UFJGblJUdUdxTnRobjlKS3lEZzoxMjI",
  "is_running": false,
  "is_partial": true,
  "start_time_in_millis": 1689777658335,
  "expiration_time_in_millis": 1690209658335,
  "completion_time_in_millis": 1689777675807,
  "_shards": {
    "total": 9,
    "successful": 8,
    "skipped": 0,
    "failed": 1
  },
  "_clusters": {
    "total": 3,
    "successful": 3,
    "skipped": 0,
    "details": {
      "(local)": {
        "status": "successful",
        "total_shards": 3,
        "successful_shards": 3,
        "skipped_shards": 0,
        "failed_shards": 0,
        "percent_shards_successful": "100.00",
        "search_duration": 5034
      },
      "remote2": {
        "status": "partial",
        "total_shards": 3,
        "successful_shards": 2,
        "skipped_shards": 0,
        "failed_shards": 1,
        "percent_shards_successful": "66.67",
        "search_duration": 8166,
        "errors": [
          {
            "shard": 0,
            "index": "remote2:blogs",
            "node": "dvQdLM35SZqLVG2My3fatA",
            "reason": {
              "type": "i_o_exception",
              "reason": "Detailed IO error message here"
            }
          }
        ]
      },
      "remote1": {
        "status": "successful",
        "total_shards": 3,
        "successful_shards": 3,
        "skipped_shards": 0,
        "failed_shards": 0,
        "percent_shards_successful": "100.00",
        "search_duration": 17466
      }
    }
  },
  "completion_status": 200
}

Things to note:

  1. skipped for shards vs. skipped for clusters have different meanings. A skipped shard is one where can-match determined that no search needed to be run since it cannot match on that shard. Thus it is a "successful" outcome, not a failure.
    For clusters skipped is a failure outcome meaning that no shards could be searched on a cluster with skip_unavailable=true

  2. The failed cluster state only happens for clusters that are marked as skip_unavailable: false (and for the local cluster if all shard searches fail). When a cluster marked with skip_unavailable: true fails, the cluster state is skipped.

  3. Clusters that are marked with skip_unavailable: false can finish in a partial state where some of the data is returned without failing the whole search.

  4. For cases where a remote cluster search is unreachable, we don't get back information about number of shards on that cluster. Thus using the _shards section to tally "percent data" returned would be inaccurate. One option is to just always present "percent clusters" that have returned data. If all of the clusters were reachable and returned at least partial data, then a more accurate "percent shards" could be calculated, but this is not always reliable.


Questions:

  1. Should we add a failed counter to the _clusters output? If not, do we account for them in the _clusters/skipped counter?
  2. How do we want to label the local cluster in the _clusters/details section? In the ES codebase it just has the alias "" (empty string), but that is potentially confusing to end users, so I've put (local) as the proposed identifier. This also matches what I did for the profile output of a search here: Profile API should show node details as well as shard details #96396
  3. Should we include the search_duration field?
  4. Should we include the percent_shards_successful field?
  5. Should we include the _cluster/details section by default? With say 100 clusters, this section is going to be quite long. Each entry in the details section is currently 8 lines of JSON when successful and another 8 to several dozen lines if there a lot of shard failures to specify the errors for. Should it be put behind a flag like detailed=true (like the Task API has).
  6. Should we have a more limited _cluster/details output by default and only put the full output behind a detailed=true flag? If yes, what information should be displayed by default?
  7. For CCS minimize_roundtrips=true, should we remove or zero out the _shards section since (a) it is somewhat redundant to the information supplied in the _clusters section when the search is successful, (b) when the search is running it always inaccurately displays only the local shard count for total shards, and most importantly, (c) that section is inaccurate at completion when a shard is skipped or failed because it cannot be reached. In that case we don't have any information on how many shards the unreachable/failing remote cluster has. (For context - the _shards section is what the CCS MRT=false setting uses for it's status and errors.)

@quux00 quux00 force-pushed the ccs/mrt-error-messages branch 3 times, most recently from 9eaa442 to 8c6f1c5 Compare July 20, 2023 20:04
@javanna
Copy link
Member

javanna commented Jul 25, 2023

  1. Should we add a failed counter to the _clusters output? If not, do we account for them in the _clusters/skipped counter?

Sounds like a good idea to me, although skipped and failed are never going to be greater than 0 at the same time? Should we hide either of the two depending on skip_unavailable? Or maybe that's too magic?

  1. How do we want to label the local cluster in the _clusters/details section? In the ES codebase it just has the alias "" (empty string), but that is potentially confusing to end users, so I've put (local) as the proposed identifier. This also matches what I did for the profile output of a search here: Profile API should show node details as well as shard details #96396

sounds good to me.

  1. Should we include the search_duration field?

I would call it took

  1. Should we include the percent_shards_successful field?

I would not include it, it can easily be computed.

  1. Should we include the _cluster/details section by default? With say 100 clusters, this section is going to be quite long. Each entry in the details section is currently 8 lines of JSON when successful and another 8 to several dozen lines if there a lot of shard failures to specify the errors for. Should it be put behind a flag like detailed=true (like the Task API has).

I guess it won't matter much if Kibana ends up always providing the flag? What is the plan there in terms of consuming the additional output?

  1. Should we have a more limited _cluster/details output by default and only put the full output behind a detailed=true flag? If yes, what information should be displayed by default?

I am not sure we would be able to trim it down considerably. Maybe we could remove the shards information from it. I am not entirely sure, and depends again on how we expect this info to be consumed.

  1. For CCS minimize_roundtrips=true, should we remove or zero out the _shards section since (a) it is somewhat redundant to the information supplied in the _clusters section when the search is successful, (b) when the search is running it always inaccurately displays only the local shard count for total shards, and most importantly, (c) that section is inaccurate at completion when a shard is skipped or failed because it cannot be reached. In that case we don't have any information on how many shards the unreachable/failing remote cluster has. (For context - the _shards section is what the CCS MRT=false setting uses for it's status and errors.)

I suspect this applies to async search as it exposes partial state but less to _search. I would not consider this necessary for now.

@quux00
Copy link
Contributor Author

quux00 commented Jul 25, 2023

Sounds like a good idea to me, although skipped and failed are never going to be greater than 0 at the same time? Should we hide either of the two depending on skip_unavailable? Or maybe that's too magic?

Yes, you can have both skipped and failed with a count > 0. Example scenario: 3 remote clusters in the search. One completes successfully, the other two completely fail (all shards fail or cluster not available), where one is skip_unavailable=true and the other is skip_unavailable=false. The counters would then be:

      "total": 3,
      "successful": 1,
      "skipped": 1,
      "failed": 1

@quux00 quux00 force-pushed the ccs/mrt-error-messages branch 3 times, most recently from 3708f83 to 67628db Compare July 28, 2023 16:09
@quux00 quux00 added >enhancement :Search/Search Search-related issues that do not fall into other categories Team:Search Meta label for search team and removed WIP labels Jul 28, 2023
@elasticsearchmachine
Copy link
Collaborator

Hi @quux00, I've created a changelog YAML for you.

@quux00 quux00 marked this pull request as ready for review July 28, 2023 17:54
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-search (Team:Search)

@quux00 quux00 changed the title WIP: Add specific cluster error info, shard info and additional metadata for CCS when minimizing roundtrips Add specific cluster error info, shard info and additional metadata for CCS when minimizing roundtrips Jul 29, 2023
@elasticsearchmachine
Copy link
Collaborator

Hi @quux00, I've created a changelog YAML for you.

private final AtomicInteger skippedShards;
private final AtomicInteger failedShards;
private final AtomicLong took; // search latency in millis for this cluster sub-search
private final AtomicBoolean timedOut;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thread safety model of this class does not use synchronization because it holds to a single-writer principle elsewhere in the code.

The Cluster object is updated by one ActionListener callback thread. No other thread should ever update a given Cluster object. (Each is updated by a separate single ActionListener).

However, the Cluster objects do need to be read by queries (e.g., GET _async_search/:id), so all variables are set as volatile or Atomic (or final for clusterAlias and indexExpression) to allow getting latest state for status queries.

Note: the status access (e.g., GET _async_search/:id) is "best effort" since there is no synchronization. It is possible that while the status is being read for a status response, the Cluster is being actively updated by a callback thread. This is considered acceptable since that would only happen for "intermediate state". The final state when the search is completed will always be fully accurate.

Copy link
Contributor Author

@quux00 quux00 Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think technically took and timedOut could just be volatile primitives, since they are always just set and never incremented.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it an option to create a new Cluster object at every change and keep the class immutable? I think that would be easier to reason about compared to all the mutable state it has now.

Copy link
Contributor Author

@quux00 quux00 Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For MRT=true, yes, we could move to immutable Cluster objects that get overwritten (in the Clusters shared, thread-safe clusterInfo Map) because each cluster has only a single callback thread, so you have a single writer per Cluster.

But for MRT=false, the Cluster updating/writer thread is at the granularity of a shard. So during the CanMatch (SearchShards) and during the Query/Fetch Phase you have N threads (N=number of shards per cluster) all potentially updating the same Cluster object with specific shard info. So either we need a shared thread-safe Cluster object (what I've tried to do in this PR)
OR
if we want to use immutable Cluster objects, we need a shared per-Cluster lock so that each per-shard callback "writer" thread can safely overwrite the Cluster object with a new one with an "incremented" successful/skipped/failed counters and failure list without a data race that loses information.

I think both are feasible. For the latter, I think we would need the shared Clusters object (which no longer can be immutable for CCS) to construct a set of lock objects, one per Cluster, in its "CCS" constructor. Then the shard callback threads (MRT=false) would need to obtain that lock in order to create a new Cluster object with the updated information and then overwrite the existing one and then release the lock.

If we build such a locking system, we might want to then wire the MRT=true paths to use it as well, though there with a single writer per cluster it isn't necessary.

Let me know your view (or if there's another option I'm not thinking of) on how to proceed.

Copy link
Contributor Author

@quux00 quux00 Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the latest push I changed the SearchResponse.Cluster to be fully immutable. The SearchResponse.Clusters object holds an unmodifiable Map<String, AtomicReference<Cluster>>, where the key is the clusterAlias.

For MRT=true (this PR), each CCSActionListener (one per cluster) gets a reference to the AtomicReference holding the Cluster it is searching. It will then do a CAS operation to update state with a new Cluster object. This isn't strictly necessary for the MRT=true paths, since there is a single writer to the Map per Cluster.

But it sets up the concurrency model for MRT=false, where the updates will be done per shard, not per Cluster. For that work (future ticket), each updater thread will update a Cluster object by retrieving the existing Cluster object, incrementing the appropriate counters and other state (e.g., status or timedOut or failure lists) and then doing a CAS back into the AtomicReference. If that fails, it will grab the new Cluster being held by the AtomicReference and re-attempt to increment/add data to a new Cluster and again try to CAS it into the AtomicReference.

* @param clusterAlias cluster on which the failure occurred
* @param skipUnavailable the skip_unavailable setting of the cluster with the search error
*/
private static void logCCSError(ShardSearchFailure f, String clusterAlias, boolean skipUnavailable) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This "feature" was requested by Product at a recent meeting - to start logging any CCS errors that occur so that we can begin to discover what types of CCS issues happen in large production environments, especially as we don't yet have any CCS telemetry in place.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to extract this to a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I can do that.

@@ -189,7 +189,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
RestActions.buildBroadcastShardsHeader(builder, params, totalShards, successfulShards, skippedShards, failedShards, null);
if (clusters != null) {
builder = clusters.toXContent(builder, null);
builder = clusters.toXContent(builder, params);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug fix that was uncovered by the newly added code in this PR.

@@ -523,11 +523,14 @@ public void testCCSRemoteReduceMergeFails() throws Exception {
ActionListener.wrap(r -> fail("no response expected"), failure::set),
latch
);
SearchResponse.Clusters initClusters = new SearchResponse.Clusters(localIndices, remoteIndicesByCluster, true);
Copy link
Contributor Author

@quux00 quux00 Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sets up the "initial Clusters" with the list of local indices and remote clusters, so that a separate underlying AtomicReference<SearchResponse.Cluster> object can be created for each cluster (local or remote) which will be handed to each CCSActionListener created (which are cluster specific).

Removed deprecated Clusters constructor.

Created common determineCountFromClusterInfo method for dynamic counts
in Clusters from underlying Cluster objects.

Took on Cluster objects is now set from the underlying single cluster SearchResponse.
… changing to AtomicReference - not working/incomplete
The SearchResponse.Clusters object now uses an immutable HashMap<String, AtomicReference<Cluster>>
rather than ConcurrentHashMap<String, Cluster>, since the concurrency control is
now a CAS operation on the underlying AtomicReference<Cluster>> not an overwrite into
the HashMap.

Changed took in SearchResponse.Cluster to be a TimeValue rather than long to match
the usage in SearchResponse.
@quux00 quux00 merged commit 169f7d1 into elastic:main Aug 7, 2023
12 checks passed
quux00 added a commit to quux00/elasticsearch that referenced this pull request Aug 17, 2023
quux00 added a commit to quux00/elasticsearch that referenced this pull request Aug 17, 2023
quux00 added a commit that referenced this pull request Aug 21, 2023
quux00 added a commit to quux00/elasticsearch that referenced this pull request Aug 26, 2023
With the recent addition of per-cluster metadata to the `_clusters` section of the response
for cross-cluster searches (see elastic#97731), the `is_partial` setting in the async-search response,
now acts as a useful summary to end-users that search/aggs data from all shards is potentially incomplete
(not all shards fully searched), which could be for one of 3 reasons:

1. at least one shard was not successfully searched (a PARTIAL search cluster state)
2. at least one cluster (marked as `skip_unavailable`=`true`) was unavailable (or all
   searches on all shards of that cluster failed), causing the cluster to be marked as SKIPPED
3. a search on at least one cluster timed out (`timed_out`=`true`, resulting in a PARTIAL cluster search status)

This commit changes local-only (non-CCS) searches to behave consistently with cross-cluster searches,
namely, if any search on any shard fails or if the search times out, the is_partial flag is set to true.

Closes elastic#98725
quux00 added a commit to elastic/elasticsearch-specification that referenced this pull request Aug 29, 2023
Updates the spec for the response objects for `POST _async_search`, `GET _async_search/:id`,
`GET _async_search/status/:id` and `GET _search`

Two high level changes have happened in 8.10:

1) `completion_time` and `completion_time_in_millis` (async_search only) elastic/elasticsearch#97700

2) Adding `details` metadata to the `_clusters` section of both async and synchronous search responses elastic/elasticsearch#97731
quux00 added a commit to elastic/elasticsearch-specification that referenced this pull request Aug 31, 2023
Updates the spec for the response objects for `POST _async_search`, `GET _async_search/:id`,
`GET _async_search/status/:id` and `GET _search`

Two high level changes have happened in 8.10:

1) `completion_time` and `completion_time_in_millis` (async_search only) elastic/elasticsearch#97700

2) Adding `details` metadata to the `_clusters` section of both async and synchronous search responses elastic/elasticsearch#97731
quux00 added a commit to quux00/elasticsearch that referenced this pull request Aug 31, 2023
With the recent addition of per-cluster metadata to the `_clusters` section of the response
for cross-cluster searches (see elastic#97731), the `is_partial` setting in the async-search response,
now acts as a useful summary to end-users that search/aggs data from all shards is potentially incomplete
(not all shards fully searched), which could be for one of 3 reasons:

1. at least one shard was not successfully searched (a PARTIAL search cluster state)
2. at least one cluster (marked as `skip_unavailable`=`true`) was unavailable (or all
   searches on all shards of that cluster failed), causing the cluster to be marked as SKIPPED
3. a search on at least one cluster timed out (`timed_out`=`true`, resulting in a PARTIAL cluster search status)

This commit changes local-only (non-CCS) searches to behave consistently with cross-cluster searches,
namely, if any search on any shard fails or if the search times out, the is_partial flag is set to true.

Closes elastic#98725
swallez pushed a commit to elastic/elasticsearch-specification that referenced this pull request Sep 1, 2023
Updates the spec for the response objects for `POST _async_search`, `GET _async_search/:id`,
`GET _async_search/status/:id` and `GET _search`

Two high level changes have happened in 8.10:

1) `completion_time` and `completion_time_in_millis` (async_search only) elastic/elasticsearch#97700

2) Adding `details` metadata to the `_clusters` section of both async and synchronous search responses elastic/elasticsearch#97731
swallez pushed a commit to elastic/elasticsearch-specification that referenced this pull request Sep 1, 2023
Updates the spec for the response objects for `POST _async_search`, `GET _async_search/:id`,
`GET _async_search/status/:id` and `GET _search`

Two high level changes have happened in 8.10:

1) `completion_time` and `completion_time_in_millis` (async_search only) elastic/elasticsearch#97700

2) Adding `details` metadata to the `_clusters` section of both async and synchronous search responses elastic/elasticsearch#97731
swallez added a commit to elastic/elasticsearch-specification that referenced this pull request Sep 5, 2023
Updates the spec for the response objects for `POST _async_search`, `GET _async_search/:id`,
`GET _async_search/status/:id` and `GET _search`

Two high level changes have happened in 8.10:

1) `completion_time` and `completion_time_in_millis` (async_search only) elastic/elasticsearch#97700

2) Adding `details` metadata to the `_clusters` section of both async and synchronous search responses elastic/elasticsearch#97731

Co-authored-by: Michael Peterson <michael.peterson@elastic.co>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>enhancement release highlight :Search/Search Search-related issues that do not fall into other categories Team:Search Meta label for search team v8.10.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants