Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a NoopEngine implementation #31163

Merged
merged 23 commits into from Jun 15, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c345e54
Add a NoopEngine implementation
dakrone Jun 6, 2018
7d469cc
Remove isNoopEngine() method
dakrone Jun 7, 2018
5c0ed00
Remove test using isNoopEngine
dakrone Jun 7, 2018
2985692
Remove additional EngineFactory method
dakrone Jun 7, 2018
19b50b0
Remove newline
dakrone Jun 7, 2018
7d8e9de
Move translog deletion policy creation into try block
dakrone Jun 7, 2018
21de3a4
Make lastCommittedSegmentInfos final
dakrone Jun 7, 2018
ce634e9
Make NoopEngine final and package private
dakrone Jun 7, 2018
e49a854
Remove translog flushing
dakrone Jun 8, 2018
2629329
Fix checkstyle
dakrone Jun 8, 2018
191eae0
Throw UOE from ensureTranslogSynced
dakrone Jun 8, 2018
5ed1bae
Reduce visibility of getTranslog
dakrone Jun 8, 2018
6931981
Merge remote-tracking branch 'origin/master' into add-noop-engine
dakrone Jun 11, 2018
8f59a7e
Get rid of LocalCheckpointTracker and Translog in NoopEngine
dakrone Jun 11, 2018
de63a5e
Remove unused lastGen, validate Translog UUID
dakrone Jun 11, 2018
c2c820a
Read local checkpoint and max seq no out of commit data
dakrone Jun 11, 2018
ba56f25
Add check for translog operations and test
dakrone Jun 11, 2018
023effb
Fix checkstyle
dakrone Jun 12, 2018
a4d2e3b
Use try-with-resources for translog opening
dakrone Jun 12, 2018
78ae62d
Formatting nit
dakrone Jun 13, 2018
13c06b9
Add a test for validating translog UUID
dakrone Jun 13, 2018
4e428d2
Merge remote-tracking branch 'origin/master' into add-noop-engine
dakrone Jun 13, 2018
37398ac
Enhance comment about two noop engine test
dakrone Jun 14, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -838,7 +838,7 @@ private void fillSegmentInfo(SegmentReader segmentReader, boolean verbose, boole
*/
public abstract List<Segment> segments(boolean verbose);

public final boolean refreshNeeded() {
public boolean refreshNeeded() {
if (store.tryIncRef()) {
/*
we need to inc the store here since we acquire a searcher and that might keep a file open on the
Expand Down Expand Up @@ -1631,4 +1631,5 @@ public boolean isRecovering() {
* Tries to prune buffered deletes from the version map.
*/
public abstract void maybePruneDeletes();

}
334 changes: 334 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/NoopEngine.java
@@ -0,0 +1,334 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import java.util.stream.Stream;

/**
* NoopEngine is an engine implementation that does nothing but the bare minimum
* required in order to have an engine. All attempts to do something (search,
* index, get), throw {@link UnsupportedOperationException}. This does maintain
* a translog with a deletion policy so that when flushing, no translog is
* retained on disk (setting a retention size and age of 0).
*
* It's also important to notice that this does list the commits of the Store's
* Directory so that the last commit's user data can be read for the historyUUID
* and last committed segment info.
*/
public class NoopEngine extends Engine {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this class be final and maybe pkg private?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, though I'll un-final it when I work on the second half of this, but it can be final for now :)


private final Translog translog;
private final IndexCommit lastCommit;
private final LocalCheckpointTracker localCheckpointTracker;
private final String historyUUID;
private SegmentInfos lastCommittedSegmentInfos;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be final no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep


public NoopEngine(EngineConfig engineConfig) {
super(engineConfig);

store.incRef();
boolean success = false;
Translog translog = null;

// The deletion policy for the translog should not keep any translogs around, so the min age/size is set to -1
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this into the try block please?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly


try {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need a translog? all we want is to validate that the translog has a the right uuid and that it's empty?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the translog all over the place in other methods in Engine. We can try to encapsulate all of these into different methods (similar to #31213), but I'm not sure what benefit we'd actually get from that. In the next iteration we'll need the translog because we'll need to sync and flush it on engine opening so that there are no operations to be replayed during peer recovery (which I think will still need a translog to retrieve stats about # of ops).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what benefit we'd actually get from that

I looked at these methods and I agree it's on the edge - there are quite a few. I would still prefer we replace them with unsupported operations or noop returns (ex empty sanpshots). This will lock things down and prevent things that aren't supposed to happen - I think that's good. An alternative is to implement a NoopTranslog but that's another rabbit hole.

we'll need to sync and flush it on engine opening

Why is that? I think it's good to only close indices that have no ongoing indexing (like our plan for frozen index). Regardless - why can't we do the flush / trim when we close the open engine and convert it to a noop engine?

I can see one thing down the road because we may close an index on recovery where it has broken settings (TBD). In that case I would still prefer to make utilities methods like Store#associateIndexWithNewTranslog that work on the folder. Note that this problem isn't solved even if we keep the translog open as we can't index operations from it into the lucene index with the NoopEngine nor ship to a recovering shard with NoopEngine in it. We assume those don't exist. I think we should discuss this separately.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good to only close indices that have no ongoing indexing (like our plan for frozen index).

Yes, but even with no ongoing indexing, a translog still remains (due to retention policy)

Regardless - why can't we do the flush / trim when we close the open engine and convert it to a noop engine?

That would require setting a new retention policy on an existing engine (making a part of InternalEngine mutable which makes me :(). In the future though, we could do the retention policy, sync, and flush/trim when the NoopEngine is opened, and then immediately close the translog.

In order to do this though, we'll have to remove the getTranslog method from Engine, is that something you want me to do as a precursor to this?

assert translog.getGeneration() != null;
this.translog = translog;
List<IndexCommit> indexCommits = DirectoryReader.listCommits(store.directory());
lastCommit = indexCommits.get(indexCommits.size()-1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting nit: indexCommits.size()-1 -> indexCommits.size() - 1

historyUUID = lastCommit.getUserData().get(HISTORY_UUID_KEY);
// We don't want any translogs hanging around for recovery, so we need to set these accordingly
final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not used?

Also - I think it's to validate integrity - i.e. open the translog, see that it's uuid matches, see that it's empty and shut it down?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I pushed a commit removing the unused lastGen and open-closing the translog for validation purposes

translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastGen);

localCheckpointTracker = createLocalCheckpointTracker();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't really need a local checkpoint tracker here, instead can we work on master to not expose the localCheckpointTracker out of engine (similar to how we don't expose the translog) and then we can avoid creating it? We should assert that maxSeq == localCheckpoint when opening lucene and otherwise fail.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean something like this: #31213

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. thanks.

success = true;
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(translog);
if (isClosed.get() == false) {
// failure we need to dec the store reference
store.decRef();
}
}
}
logger.trace("created new NoopEngine");
}

private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier) throws IOException {
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final String translogUUID = loadTranslogUUIDFromLastCommit();
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier,
engineConfig.getPrimaryTermSupplier());
}

/**
* Reads the current stored translog ID from the last commit data.
*/
@Nullable
private String loadTranslogUUIDFromLastCommit() {
final Map<String, String> commitUserData = lastCommittedSegmentInfos.getUserData();
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");
}
return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
}

private LocalCheckpointTracker createLocalCheckpointTracker() {
final long maxSeqNo;
final long localCheckpoint;
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommittedSegmentInfos.userData.entrySet());
maxSeqNo = seqNoStats.maxSeqNo;
localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
return new LocalCheckpointTracker(maxSeqNo, localCheckpoint);
}

@Override
protected SegmentInfos getLastCommittedSegmentInfos() {
return lastCommittedSegmentInfos;
}

@Override
public String getHistoryUUID() {
return historyUUID;
}

@Override
public long getWritingBytes() {
return 0;
}

@Override
public long getIndexThrottleTimeInMillis() {
return 0;
}

@Override
public boolean isThrottled() {
return false;
}

@Override
public IndexResult index(Index index) {
throw new UnsupportedOperationException("indexing is not supported on a noop engine");
}

@Override
public DeleteResult delete(Delete delete) {
throw new UnsupportedOperationException("deletion is not supported on a noop engine");
}

@Override
public NoOpResult noOp(NoOp noOp) {
throw new UnsupportedOperationException("noop is not supported on a noop engine");
}

@Override
public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException {
throw new UnsupportedOperationException("synced flush is not supported on a noop engine");
}

@Override
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
throw new UnsupportedOperationException("gets are not supported on a noop engine");
}

@Override
public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException {
throw new UnsupportedOperationException("searching is not supported on a noop engine");
}

@Override
public Translog getTranslog() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this increases visibilty

return translog;
}

@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) {
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can throw an unsupported operation exception too?

}

@Override
public void syncTranslog() {
}

@Override
public LocalCheckpointTracker getLocalCheckpointTracker() {
return localCheckpointTracker;
}

@Override
public long getIndexBufferRAMBytesUsed() {
return 0;
}

@Override
public List<Segment> segments(boolean verbose) {
return Arrays.asList(getSegmentInfo(lastCommittedSegmentInfos, verbose));
}

@Override
public void refresh(String source) throws EngineException {
}

// Override the refreshNeeded method so that we don't attempt to acquire a searcher checking if we need to refresh
@Override
public boolean refreshNeeded() {
// We never need to refresh a noop engine so always return false
return false;
}

@Override
public void writeIndexingBuffer() throws EngineException {
}

@Override
public boolean shouldPeriodicallyFlush() {
return false;
}

@Override
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
return new CommitId(lastCommittedSegmentInfos.getId());
}

@Override
public CommitId flush() throws EngineException {
try {
translog.rollGeneration();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks wrong. We don't write anything here why do we need to modify the translog? I think this should be read-only

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order for flushing to clear existing translogs (because we don't want any translog operations to be replayed for peer or store recovery) we want the flush method to remove the translog, this was added so that flushing the new engine would ensure that we don't have any translog operations around that could cause UOEs during recovery

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can remove this for now and re-introduce it later when adding the state transition part, if that makes it better, but we still need to be able to completely remove translog ops before doing recovery since we have no way to do operation-based recovery.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I'd like to remove it for now I can 't see in this change why it's needed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I've removed that change from this PR

translog.trimUnreferencedReaders();
} catch (IOException e) {
maybeFailEngine("flush", e);
throw new FlushFailedEngineException(shardId, e);
}
return new CommitId(lastCommittedSegmentInfos.getId());
}

@Override
public void trimTranslog() throws EngineException {
}

@Override
public void rollTranslogGeneration() throws EngineException {
}

@Override
public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade,
boolean upgradeOnlyAncientSegments) throws EngineException {
}

@Override
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException {
return new Engine.IndexCommitRef(lastCommit, () -> {});
}

@Override
public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
return acquireLastIndexCommit(false);
}

/**
* Closes the engine without acquiring the write lock. This should only be
* called while the write lock is hold or in a disaster condition ie. if the engine
* is failed.
*/
@Override
protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
if (isClosed.compareAndSet(false, true)) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() :
"Either the write lock must be held or the engine must be currently be failing itself";
try {
IOUtils.close(translog);
} catch (Exception e) {
logger.warn("Failed to close translog", e);
} finally {
try {
store.decRef();
logger.debug("engine closed [{}]", reason);
} finally {
closedLatch.countDown();
}
}
}
}

@Override
public void activateThrottling() {
throw new UnsupportedOperationException("closed engine can't throttle");
}

@Override
public void deactivateThrottling() {
throw new UnsupportedOperationException("closed engine can't throttle");
}

@Override
public void restoreLocalCheckpointFromTranslog() {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra NL

}

@Override
public int fillSeqNoGaps(long primaryTerm) {
return 0;
}

@Override
public Engine recoverFromTranslog() {
return this;
}

@Override
public void skipTranslogRecovery() {
}

@Override
public void maybePruneDeletes() {
}
}