Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to seal an index #10032

Closed
s1monw opened this issue Mar 8, 2015 · 17 comments
Closed

Allow to seal an index #10032

s1monw opened this issue Mar 8, 2015 · 17 comments
Assignees
Labels
:Distributed/Recovery Anything around constructing a new shard, either from a local or a remote source.

Comments

@s1monw
Copy link
Contributor

s1monw commented Mar 8, 2015

In a lot of use cases indices are used for indexing only for a limited amount of time. Ie. in the daily index use case indices are created with a highish number of shards to scale out indexing and then after a couple of days these indices are idle in terms of writing. Yet, we still keep all the resources open since we are accepting writes at any time. This not necessary in a lot of cases and would allow for a large amount of optimizations:

  • we can close IndexWriter and all it's resources.
  • primary promotions are very simple since no changes are coming in
  • indices might be able to be merged ie. if you have 7 indices for per week we can allow people to
    merge them into a weekly index to reduce the number of indices.
  • query caches can rely on the fact that the index doesn't change and we can pull out min/max values for numeric fields with more confidence that it's worth the effort.
  • maybe most important, recoveries are instant. We would issue a special commit when we seal that ensures that all changes have been written containing some kind of identifier that we can use in recovery which will prevent the entire copying of files in situations of full cluster restarts etc. It will reduce the restart times dramatically with very low complexity for all the time-based indices cases which usually suffer from slow cluster restarts due to the large number of indices and shards
@s1monw s1monw added v2.0.0-beta1 discuss :Distributed/Recovery Anything around constructing a new shard, either from a local or a remote source. labels Mar 8, 2015
@kimchy
Copy link
Member

kimchy commented Mar 8, 2015

++, I think the improved recovery part idea is a killer. I wonder if we can re-use one way or another the read only flag we have today on an index.

@bleskes
Copy link
Contributor

bleskes commented Mar 8, 2015

nice

@s1monw
Copy link
Contributor Author

s1monw commented Mar 8, 2015

@kimchy I think we can. To me it's a 2 staged process, first we switch the index read only using the flag we have and once we are there and the cluster state has been published we seal the index which causes the actual optimizations to happen. so I think we can just reuse it?

@kimchy
Copy link
Member

kimchy commented Mar 8, 2015

@s1monw ++, exactly what I was thinking about, and we have the flag recorded in the cluster state, so might need to record it somewhere else as well to make sure nothing went in, ++.

@s1monw
Copy link
Contributor Author

s1monw commented Mar 9, 2015

adding this is also going to make full-cluster restart as well as rolling restarts likely instant. Even for the non-timeseries data / logging case for full restarts we can seal all the indices, shutdown, restart & unseal. This also works for rolling restarts if folks can afford having read only indices which I think is reasonable im most cases since the restart will be pretty fast. very promising!

@runningman84
Copy link

+1

@nik9000
Copy link
Member

nik9000 commented Mar 16, 2015

This also works for rolling restarts if folks can afford having read only indices which I think is reasonable im most cases since the restart will be pretty fast. very promising!

This is great! Our use case wouldn't allow us to seal an index outside of a rolling restart window or some other temporary maintenance action but we can absolutely get away with sealing them all for an hour or so.

So this is a great solution for us!

@synhershko
Copy link
Contributor

+1, especially if sealing is reversible (which #10032 (comment) implies)

@rtoma
Copy link

rtoma commented Mar 19, 2015

+100 for 'instant recoveries'

@bleskes
Copy link
Contributor

bleskes commented Mar 19, 2015

@synhershko yeah. the plan is to allow to unseal as well, making it a viable upgrade/full restart strategy (if you can afford stopping indexing).

@s1monw
Copy link
Contributor Author

s1monw commented Mar 23, 2015

This is great! Our use case wouldn't allow us to seal an index outside of a rolling restart window or some other temporary maintenance action but we can absolutely get away with sealing them all for an hour or so.

yes it's absolutely possible to unseal and the operation should be very fast. ie. makeing a cluster state update essentially.

@brwe brwe self-assigned this Mar 27, 2015
@s1monw
Copy link
Contributor Author

s1monw commented Mar 27, 2015

We had some internal discussions how to implement this and I wanted to make sure they are recorded here on the issue. Sealing an index happens basically on two levels, the index and the shard level.

Index Level sealing

On the index level we use a ClusterBlock to prevent any write operations on the index via index.blocks.read_only. This is basically a cluster state update that sets the write block on the index that will be sealed together with a seal_id the seal id is a token that is generated to identify a seal operation on a shard level and also at a later point in time to utilize during recovery. So essentially there are three values on a ClusterState for each sealed index:

  • a index.blocks.read_only to prevent writes
  • a seal state index.seal.state which can either be sealing or sealed
  • a index.seal.id in IndexMetaData.settings

This also requires the entire cluster to be on a version that supports and understands index sealing otherwise this feature will not be available. (we have the ability to check this via DiscoveryNodes#smallestNonClientNodeVersion()). The seal id should not be updatable outside of the seal API while the index.blocks.read_only is. Yet we need to prevent that this can be changed while the index is sealed.

The seal process is essentially a cluster state update (setting the block and the id) that waits for all shards to respond. This is very similar to how deleting an index works today. We issue the cluster state update that subsequently gets propagated to all the nodes in the cluster. Inside IndicesClusterStateService we listen to the relevant changes and update the IndexService where applicable.

Once the seal operation is issued we set index.seal.state : sealing. The master now registers a listener on a dedicated endpoint waiting for all relevant shards to reply with successful or failed seal operations. Once all shards replied the master issues the index.seal.state : sealed and responds to the user unless we already ran into a timeout. (note this is a non-blocking operation on the master just like all other actions). In the case of a timeout or if the cluster is stuck in sealing mode only an unseal operation can recover from that state. Unsealing is a pretty straight forward it basically removes the seal state from the index settings and publishes the clusterstate.

This is also very similar to the delete logic which is currently implemented in IndicesClusterStateService#applyDeletedShards and NodeIndexDeletedAction

Shard Level sealing

Once the IndexService knows about the sealing it essentially needs to wait until all in-flight operations are finished on the shards primary as well on all the replicas.

Shard level sealing

For this we are currently planning to use ref-counting similar to what we do on Store.java. The ref-counting implemented in AbstractRefCoutned works in a way that prevents the caller from incrementing the reference count once it reached 0. We can utilize this and increment the counter once in the IndexShard itself and decrement it again once the shard is either closed or sealed. Once we decremented the shards reference we only need to wait until
we reach 0 on the counter in order to process the sealing.

The good news is that due to the cluster block (read only settings) no new indexing operations can be issues such that we will reach 0 eventually. Certainly this requires reference counting (incRef / decRef) in the relevant API users which might be likely reducible to TransportShardReplicationOperationAction.
Once we reached 0 on the ref-count we can executed the following actions:

  • issue a seal-commit on the engine that writes the shards seal ID to the commit metadata just like we do with the transaction log ID with every commit. This action is complex and should wait for all recoveries to finish (or even cancel?), flush the transaction log etc. to ensure all changes are committed to the lucene index.
  • switch the engine from InternalEngine to an engine that is read-only to reduce resource utilization. We might be able to reuse, modify or abstract ShadowEngine for this purpose.
  • send a seal command to the replicas of the shard which basically repeats the three steps above.
  • send a seal acknowledgement to the master to eventually return to the user

At that point the index is sealed and no write operation can be submitted to the index anymore.

Unseal operation

The unseal operation pretty much reverses the sealing. We process a cluster state update that marks the index as unsealed by removing the seal states and processes the update. Once the cluster state update was successfully published we remove the index block and the index is good to go.

On a shard level we basically re-increment the reference count to 1 (allowing incoming requests to increment the count as well) and before doing that switching to InternalEngine to allow writes. The first write to the engine basically invalidates the seal such that we can't use it anymore for other operations like recovery.

Fast Recovery

Today recovery is very resource heavy and often super slow since we don't know if two shards are identical on a document level ie. did all operations reach the replica or not. We can tell on a lucene segment level but the segments are different on all replicas unless we copied the over which takes a huge amount of time. With index sealing we basically mark the replicas as identical on a operation / document level since we prevent all writes on the shards. That allows us to side-step the entire shard copying and startup replicas immediately.

Luckily implementing fast recovery on top of the sealing is very straight forward. Basically what we need is an extension of the RecoverySourceHandler or Engine.RecoveryHandler that can sidestep the entire recovery process if the seal-ID matches on both the replica and the primary and if the recovery source has no operations in it's translog. If this applies we can simply skip the entire heavy part of the recovery procedure and startup the replica immediately. We only need to be carful to not remove the seal ID from the indices by issuing commits at the end of the recovery.

For safety reasons, if any operations exist in the transaction log we can't utilize the seal ID for fast recovery. Any operation in the translog indicates an illegal state in the context of the seal ID or in other words it breaks the seal. For instance if an old replica is started on a node that was sealed before but the primary is already accepting writes again we can in theory only recover from the transaction log but for the initial iteration we should skip this optimization. In the future we might be even able to extend this process to issue seal commits on a per shard level while accepting writes.

Optimizing / Force Merge on a Sealed index

For the time based indices usecase it's important to run force-merge / optimize on these indices even if they are sealed. As an later extension we can allow users to run these operations even on a read-only engine. This is a feature that can be implemented at a later stage.

Proposed work items

  • add ref-counting to IndexShard and use it in index modifying operations on ie. on TransportShardReplicationOperationAction This should be nicely testable and can be implemented entirely stand-alone
  • implement a Engine#seal(String sealId) that flushes the translog, writes the seal id to the lucene index. This operation should fail if there is a recovery in progress and should maybe even close the engine forcefully. The later integration of this action should be straight forward since we can open a read-only engine next to the InternalEngine, swap the read-write engine out, seal it and refresh the read-only engine.
  • implement a seal action on IndexShard that utilize the Engine#seal as stated above.
  • implement a TransportShardReplicationOperationAction like class that runs seal commands on all replicas of a shard
  • implement sealing on the master
  • implement un-sealing on the master
  • use seal ID in recovery. In the first iteration we only use it if there is no operation in the translog. Future iterations might allow to recover from translog as well if the seal ID is the same in the commit point on primary and replica.

I hope I covered all the moving parts at least on a high level. if there are any questions feel free to ask. Once we basically agree I will move this to the issue itself.

@andrassy
Copy link

+1

@bleskes
Copy link
Contributor

bleskes commented Apr 16, 2015

Discussing this we came up with a new and simpler plan, which works independently of the cluster update. This gist of it is to have a best effort operation to sync the commit points both primaries and replicas. This "synced flush" is guaranteed to succeed if there are no concurrent indexing operations but will fail gracefully if there are. The result is a marker (sync id) on lucene commit points which allows us to shortcut the phase1 of recoveries which will give us the desired speed up. Since this is a best effort approach we can trigger it when ever a shard becomes inactive or in regular, longish intervals (say 30m) or any other time (TBD).

Solution sketch (this is a shard operation):

  1. Pre Synced Commit phase:
    1. reaches out to all assigned shard copies and flush them.
    2. returns the lucene commit ID resulting of this flush.
  2. Validate there are no inflight operations (if there are, we abort). This can be done using the request ref count described above.
  3. Synced Commit phase:
    1. Generate a sync id (a GUID).
    2. On primary:
      1. If there are pending writes or the lucene commit id is not identical to the one retrieved by the pre sync phase, abort.
      2. Create a new commit point with the sync id (while blocking writes to make sure nothing slips in)
    3. On replicas, we repeat the steps done on the primary. Note that if a replica doesn't participate in the sync it's OK.

TODOs:

[x] -> in feature branch https://github.com/elastic/elasticsearch/tree/feature/synced_flush

@s1monw
Copy link
Contributor Author

s1monw commented Apr 16, 2015

thx for updating @bleskes

brwe added a commit that referenced this issue May 4, 2015
Skip phase 1 of recovery in case an identical sync id was found on primary
and replica. Relates to #10032

closes #10775
karmi added a commit to elastic/elasticsearch-ruby that referenced this issue May 19, 2015
@s1monw
Copy link
Contributor Author

s1monw commented May 22, 2015

@brwe can we close this one?

bleskes added a commit to bleskes/elasticsearch that referenced this issue May 25, 2015
elastic#10032 introduced the notion of sealing an index by marking it with a special read only marker, allowing for a couple of optimization to happen. The most important one was to speed up recoveries of shards where we know nothing has changed since they were online by skipping the file based sync phase. During the implementation we came up with a light notion which achieves the same recovery benefits but without the read only aspects which we dubbed synced flush. The fact that it was light weight and didn't put the index in read only mode, allowed us to do it automatically in the background which has great advantage. However we also felt the need to allow users to manually trigger this operation.

 The implementation at elastic#11179 added the sync flush internal logic and the manual (rest) rest API. The name of the API was modeled after the sealing terminology which may end up being confusing. This commit changes the API name to match the internal synced flush naming, namely `{index}/_flush/synced'.

  On top of that it contains a couple other changes:
   - Remove all java client API. This feature is not supposed to be called programtically by applications but rather by admins.
   - Improve rest responses making structure similar to other (flush) API
   - Change IndexShard#getOperationsCount to exclude the internal +1 on open shard . it's confusing to get 1 while there are actually no ongoing operations
   - Some minor other clean ups
@brwe brwe closed this as completed May 26, 2015
@brwe brwe removed :Distributed/Recovery Anything around constructing a new shard, either from a local or a remote source. v1.6.0 v2.0.0-beta1 labels May 26, 2015
@bleskes bleskes added v2.0.0-beta1 v1.6.0 :Analytics/Aggregations Aggregations :Distributed/Recovery Anything around constructing a new shard, either from a local or a remote source. and removed v1.6.0 v2.0.0-beta1 :Analytics/Aggregations Aggregations labels May 26, 2015
@bleskes
Copy link
Contributor

bleskes commented May 26, 2015

a short note that this has been in implemented as synced flush - see #11179 & #11336 for more info

dnhatn added a commit to dnhatn/elasticsearch that referenced this issue Jan 31, 2018
Today the correctness of synced-flush is guaranteed by ensuring that
there is no ongoing indexing operations on the primary. Unfortunately,
this might not be guaranteed in some cases. This commit hardens the
correctness of the synced-flush by making sure the number of docs
on all shards (with the same shardId) are equal before issuing sync_id.

Relates elastic#10032
dnhatn added a commit that referenced this issue Feb 2, 2018
Today the correctness of synced-flush is guaranteed by ensuring that 
there is no ongoing indexing operations on the primary. Unfortunately, a
replica might fall out of sync with the primary even the condition is
met. Moreover, if synced-flush mistakenly issues a sync_id for an out of
sync replica, then that replica would not be able to recover from the 
primary. ES prevents that peer-recovery because it detects that both
indexes from primary and replica were sealed with the same sync_id but
have a different content. This commit modifies the synced-flush to not
issue sync_id for out of sync replicas. This change will report the
divergence issue earlier to users and also prevent replicas from getting
into the "unrecoverable" state.

Relates #10032
dnhatn added a commit that referenced this issue Feb 2, 2018
Today the correctness of synced-flush is guaranteed by ensuring that
there is no ongoing indexing operations on the primary. Unfortunately, a
replica might fall out of sync with the primary even the condition is
met. Moreover, if synced-flush mistakenly issues a sync_id for an out of
sync replica, then that replica would not be able to recover from the
primary. ES prevents that peer-recovery because it detects that both
indexes from primary and replica were sealed with the same sync_id but
have a different content. This commit modifies the synced-flush to not
issue sync_id for out of sync replicas. This change will report the
divergence issue earlier to users and also prevent replicas from getting
into the "unrecoverable" state.

Relates #10032
dnhatn added a commit that referenced this issue Feb 14, 2018
Today the correctness of synced-flush is guaranteed by ensuring that
there is no ongoing indexing operations on the primary. Unfortunately, a
replica might fall out of sync with the primary even the condition is
met. Moreover, if synced-flush mistakenly issues a sync_id for an out of
sync replica, then that replica would not be able to recover from the
primary. ES prevents that peer-recovery because it detects that both
indexes from primary and replica were sealed with the same sync_id but
have a different content. This commit modifies the synced-flush to not
issue sync_id for out of sync replicas. This change will report the
divergence issue earlier to users and also prevent replicas from getting
into the "unrecoverable" state.

Relates #10032
dnhatn added a commit that referenced this issue Feb 14, 2018
Today the correctness of synced-flush is guaranteed by ensuring that
there is no ongoing indexing operations on the primary. Unfortunately, a
replica might fall out of sync with the primary even the condition is
met. Moreover, if synced-flush mistakenly issues a sync_id for an out of
sync replica, then that replica would not be able to recover from the
primary. ES prevents that peer-recovery because it detects that both
indexes from primary and replica were sealed with the same sync_id but
have a different content. This commit modifies the synced-flush to not
issue sync_id for out of sync replicas. This change will report the
divergence issue earlier to users and also prevent replicas from getting
into the "unrecoverable" state.

Relates #10032
dnhatn added a commit that referenced this issue Mar 20, 2018
Today the correctness of synced-flush is guaranteed by ensuring that
there is no ongoing indexing operations on the primary. Unfortunately, a
replica might fall out of sync with the primary even the condition is
met. Moreover, if synced-flush mistakenly issues a sync_id for an out of
sync replica, then that replica would not be able to recover from the
primary. ES prevents that peer-recovery because it detects that both
indexes from primary and replica were sealed with the same sync_id but
have a different content. This commit modifies the synced-flush to not
issue sync_id for out of sync replicas. This change will report the
divergence issue earlier to users and also prevent replicas from getting
into the "unrecoverable" state.

Relates #10032
dnhatn added a commit that referenced this issue Mar 20, 2018
Today the correctness of synced-flush is guaranteed by ensuring that
there is no ongoing indexing operations on the primary. Unfortunately, a
replica might fall out of sync with the primary even the condition is
met. Moreover, if synced-flush mistakenly issues a sync_id for an out of
sync replica, then that replica would not be able to recover from the
primary. ES prevents that peer-recovery because it detects that both
indexes from primary and replica were sealed with the same sync_id but
have a different content. This commit modifies the synced-flush to not
issue sync_id for out of sync replicas. This change will report the
divergence issue earlier to users and also prevent replicas from getting
into the "unrecoverable" state.

Relates #10032
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed/Recovery Anything around constructing a new shard, either from a local or a remote source.
Projects
None yet
Development

No branches or pull requests

10 participants