Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sharding large workloads across multiple Kibana instances #93029

Closed
rudolf opened this issue Mar 1, 2021 · 5 comments
Closed

Support sharding large workloads across multiple Kibana instances #93029

rudolf opened this issue Mar 1, 2021 · 5 comments
Labels
Team:Core Core services & architecture: plugins, logging, config, saved objects, http, ES client, i18n, etc

Comments

@rudolf
Copy link
Contributor

rudolf commented Mar 1, 2021

(although this is on the core team's long term radar, we're not actively working towards adding this at the moment)

Kibana instances are currently completely isolated and don't know anything about other instances that are connected to the same Elasticsearch cluster. This means that synchronizing workloads between instances has to be done with optimistic concurrency control, however every conflict requires an additional roundtrip to identify available tasks which limits the scalability of this approach.

Sharding seems like the most promising solution to this problem and could increase the scalability of e.g. task manager and saved object migrations.

To support sharding, Kibana will need to have a discovery mechanism to discover and check the liveness of other Kibana instances connected to the same Elasticsearch cluster so that work can be sharded over the available number of instances.

Since all Kibana instances in a "cluster" already share the same Elasticsearch cluster, we can use Elasticsearch as our transport mechanism by keeping a "cluster state" document that all Kibana instances write to.

Instances broadcast their participation by bumping the lastSeen value associated with their instance id to the value of their system's monotonically increasing clock process.hrtime(). Other instances poll the cluster state document to detect the presence of new nodes, only once an instance's lastSeen value was increased from the previous value is that instance considered alive.

If the lastSeen value doesn't increase for expiry_timeout (as measured by this instance's monotonically increasing clock) that instance is no longer considered as participating in this cluster.

@rudolf rudolf added the Team:Core Core services & architecture: plugins, logging, config, saved objects, http, ES client, i18n, etc label Mar 1, 2021
@elasticmachine
Copy link
Contributor

Pinging @elastic/kibana-core (Team:Core)

@rudolf
Copy link
Contributor Author

rudolf commented Sep 9, 2021

Knowing the liveness of other Kibana instances is only required for intelligent clustering where we automatically balance the workload when the cluster size changes (due to instances having downtime or being removed permanently).

It provides a nice out-of-the-box experience for users, but an alternative is to implement dumb clustering where users configure each Kibana instance with the values needed to participate in a "cluster".

Each instance would need to have the following configured:

  • number of shards in this Kibana "cluster"
  • the shard that each specific Kibana instance is part of

As long as users configure shards < 2*total instances we'll have high availability if one instance goes down.

@rudolf
Copy link
Contributor Author

rudolf commented Sep 9, 2021

Cluster state algorithm (draft, definitely needs more scrutiny)

  1. Node reads kibana_cluster_state document from the .kibana index

  2. It sends a heartbeat every heartbeat_interval seconds by performing an update operation that adds it’s UUID to the nodes array and sets the lastSeen value to it’s current process.hrtime() (monotonically increasing clock)

  3. If the update fails due to a version conflict the update operation is retried after abs(random() * 10) seconds by fetching the document again and attempting the update operation once more. The random delay reduces the chance that all nodes try to update the cluster state document at exactly the same time leading to unnecessary conflicts.

  4. To determine how many Kibana nodes are available in this cluster, each node reads the kibana_cluster_state document every heartbeat_interval.

    Once they see that the lastSeen value has increased, they capture their own current hr time current_hr_time and starts waiting until their own process.hrtime() - current_hr_time > expiry_timeout. If at that point lastSeen hasn’t been updated the node is considered to be down.

    Using a monotonically increasing clock guarantees that the clock will always increase even if the system time changes due to daylight savings time, NTP clock syncs, or manually setting the time. This means other nodes can take up to 2*expiry_timeout to recognise a node that’s down.

  5. After filtering out all nodes that are considered offline, each node can derive it’s node_number by the offset of it’s UUID in the cluster_state document.

    {
      nodes: {
        "852bd94e-5121-47f3-a321-e09d9db8d16e": {
          version: "7.6.0",
          lastSeen: [ 1114793, 555149266 ], // hrtime()
        },
        "8d975c5b-cbf6-4418-9afb-7aa3ea34ac90": {
          version: "7.6.0",
          lastSeen: [ 1114862, 841295591 ],
        },
        "3ef25ff1-090a-4335-83a0-307a47712b4e": {
          version: "7.6.0",
          lastSeen: [ 1114877, 611368546 ],
        },
      },
    }
    

To implement sharding for queues like task manager each instance peaks at the queue and pulls e.g. 50 results. From the queue results, it only processes items that belongs to its shard. These are all documents for which hash(shard_key) % node_count == node_number

When a Kibana node goes offline it will not be able to process any items on the queue. But after
2*expiry_timeout other nodes will recognize that it is offline and rebalance the shards so that all items from the queue will again get worked on.

At any point in time, all nodes might not agree about which nodes are available / offline which could lead to multiple instances trying to acquire a lock on the same queue item. This will lead to some optimistic concurrency conflicts, but only for 2*expiry_timeout after a new node was added or an existing node goes offline.

@pgayvallet
Copy link
Contributor

Node reads kibana_cluster_state document from the .kibana index

I feel like this is not going to be the only use case where we gonna need to use ES as a Kibana 'internal system' storage. First example coming to mind would (potentially, nothing is acted yet) to store per-type index version if we were to try to implement #104081.

For that reason, and even if .kibana is pointing to a system index, I feel like we could benefit from a new .kibana_system index, to store internal kibana data (in opposition to kibana SOs)

(Just my 2cts, for the rest of the algorithm I need to think more in depth about it)

@rudolf
Copy link
Contributor Author

rudolf commented Jul 10, 2024

Closing in favour of newer discussion in #187696

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Team:Core Core services & architecture: plugins, logging, config, saved objects, http, ES client, i18n, etc
Projects
None yet
Development

No branches or pull requests

3 participants