Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Local checkpoints #15111

Closed
wants to merge 29 commits into from

Conversation

bleskes
Copy link
Contributor

@bleskes bleskes commented Nov 30, 2015

This PR introduces the notion of a local checkpoint on the shard level. A local check point is defined as a the highest sequence number for which all previous operations (i.e. with a lower seq#) have been processed.

The current implementation is based on a fixed in memory bit array which is used in a round robin fashion. This introduces a limit to the spread between inflight indexing operation. We are still discussing options to work around this, but I think we should move forward toward a working system and optimize from there (and either remove this limitation or better understand it's implications).

relates to #10708

Every shard group in Elasticsearch has a selected copy called a primary. When a primary shard fails a new primary would be selected from the existing replica copies. This PR introduces `primary terms` to track the number of times this has happened. This will allow us, as follow up work and among other things, to identify operations that come from old stale primaries. It is also the first step in road towards sequence numbers.

Relates to elastic#10708
Closes elastic#14062
Adds a counter to each write operation on a shard. This sequence numbers is indexed into lucene using doc values, for now (we will probably require indexing to support range searchers in the future).

On top of this, primary term semantics are enforced and shards will refuse write operation coming from an older primary.

Other notes:
- The add SequenceServiceNumber is just a skeleton and will be replaced with much heavier one, once we have all the building blocks (i.e., checkpoints).
- I completely ignored recovery - for this we will need checkpoints as well.
- A new based class is introduced for all single doc write operations. This is handy to unify common logic (like toXContent).
- For now, we don't use seq# as versioning. We could in the future.

Relates to elastic#10708
Closes elastic#14651
* this is a temporary fix until a more permanent fix is done on master *

During primary relocation, some operation can be done on the source primary but reach the target primary only after the relocation is completed. At the moment the new primary will have a new primary term and as such it will reject the operations from the old one, causing data loss.

This changes relocations to move the source primary to a relocated state, prevent any new operations from happening on it and waits for ongoing operations to complete.

Long we term we may also conisder not incrementing the primary term on relocation.
/**
* Serializes a potential null value.
*/
public <T extends StreamableReader<T>> T readOptionalStreamableReader(StreamableReader<T> streamableReader) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just make SeqNoStats implement Streamable and just use the existing readOptionalStreamable and writeOptionalStreamable? That's consistent with most (all?) the existing stats objects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are trying to move to use Writeable and StreamableReader which allows us to construct the object while reading from the stream and use final members. That said, I think with Java8 only changes we have something like readOptionalStreamableReader here where we pass a "factory" method which takes the stream as an input and returns an object. This can typically be a public constructor. That means we can decouple writing from reading and have Writable not inherit from StreamableReader, which will save on a use less method and a prototype. @s1monw what are your thoughts here?

public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) {
super(shardId, indexSettings);
indexLagThreshold = indexSettings.getSettings().getAsInt(SETTINGS_INDEX_LAG_THRESHOLD, DEFAULT_INDEX_LAG_THRESHOLD);
indexLagMaxWait = indexSettings.getSettings().getAsTime(SETTINGS_INDEX_LAG_MAX_WAIT, DEFAULT_INDEX_LAG_MAX_WAIT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validate these settings?

Change IndexShard counters for the new simplifies ReplicationAction
do {
// clear the flag as we are making it free for future operations. do se before we expose it
// by moving the checkpoint
processedSeqNo.clear(offset);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just clear a range of bits with FixedBitSet#(int, int) instead of clearing bit by bit? The implementation looks to be more efficient and would just require care around the offset wrapping.

@bleskes
Copy link
Contributor Author

bleskes commented Dec 11, 2015

@jasontedor I pushed a new approach. Can you take another look?

@bleskes
Copy link
Contributor Author

bleskes commented Dec 11, 2015

closing... github made this a mess.

@bleskes bleskes closed this Dec 11, 2015
@clintongormley clintongormley added :Engine :Distributed/Engine Anything around managing Lucene and the Translog in an open shard. and removed :Sequence IDs labels Feb 14, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed/Engine Anything around managing Lucene and the Translog in an open shard.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants