-
Notifications
You must be signed in to change notification settings - Fork 24.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce translog generation rolling #23606
Changes from 14 commits
cabd994
152cae7
91d5f85
c88e788
7e69ae8
fb00dd3
0d5e6e2
bbab126
421b336
8f6b609
70ade35
d9fbc42
9076d79
b79053e
b4ed67d
1669224
35d0119
32afd9e
518f12e
7480faf
dd14ba8
f59233c
3c78802
192a7ce
6c2adb6
1a31061
07fc092
2e7f722
a14937d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,6 @@ | |
import org.apache.lucene.index.MergePolicy; | ||
import org.elasticsearch.Version; | ||
import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
import org.elasticsearch.common.logging.DeprecationLogger; | ||
import org.elasticsearch.common.logging.Loggers; | ||
import org.elasticsearch.common.settings.IndexScopedSettings; | ||
import org.elasticsearch.common.settings.Setting; | ||
|
@@ -112,6 +111,16 @@ public final class IndexSettings { | |
Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic, | ||
Property.IndexScope); | ||
|
||
/** | ||
* The maximum size of a translog generation. This is independent of the maximum size of | ||
* translog operations that have not been flushed. | ||
*/ | ||
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING = | ||
Setting.byteSizeSetting( | ||
"index.translog.generation_threshold_size", | ||
new ByteSizeValue(64, ByteSizeUnit.MB), | ||
new Property[]{Property.Dynamic, Property.IndexScope}); | ||
|
||
public static final Setting<TimeValue> INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL = | ||
Setting.timeSetting("index.seq_no.checkpoint_sync_interval", new TimeValue(30, TimeUnit.SECONDS), | ||
new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope); | ||
|
@@ -156,6 +165,7 @@ public final class IndexSettings { | |
private volatile TimeValue refreshInterval; | ||
private final TimeValue globalCheckpointInterval; | ||
private volatile ByteSizeValue flushThresholdSize; | ||
private volatile ByteSizeValue generationThresholdSize; | ||
private final MergeSchedulerConfig mergeSchedulerConfig; | ||
private final MergePolicyConfig mergePolicyConfig; | ||
private final IndexScopedSettings scopedSettings; | ||
|
@@ -250,6 +260,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti | |
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING); | ||
globalCheckpointInterval = scopedSettings.get(INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL); | ||
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); | ||
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); | ||
mergeSchedulerConfig = new MergeSchedulerConfig(this); | ||
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); | ||
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING); | ||
|
@@ -281,6 +292,9 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti | |
scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer); | ||
scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes); | ||
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, this::setTranslogFlushThresholdSize); | ||
scopedSettings.addSettingsUpdateConsumer( | ||
INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, | ||
this::setGenerationThresholdSize); | ||
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval); | ||
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); | ||
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll); | ||
|
@@ -290,6 +304,10 @@ private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) { | |
this.flushThresholdSize = byteSizeValue; | ||
} | ||
|
||
private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) { | ||
this.generationThresholdSize = generationThresholdSize; | ||
} | ||
|
||
private void setGCDeletes(TimeValue timeValue) { | ||
this.gcDeletesInMillis = timeValue.getMillis(); | ||
} | ||
|
@@ -461,6 +479,10 @@ public TimeValue getGlobalCheckpointInterval() { | |
*/ | ||
public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; } | ||
|
||
public ByteSizeValue getGenerationThresholdSize() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. javadocs please There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 35d0119. |
||
return generationThresholdSize; | ||
} | ||
|
||
/** | ||
* Returns the {@link MergeSchedulerConfig} | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,6 +55,7 @@ | |
import java.nio.file.StandardOpenOption; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
@@ -127,6 +128,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC | |
private final TranslogConfig config; | ||
private final LongSupplier globalCheckpointSupplier; | ||
private final String translogUUID; | ||
private final AtomicBoolean rollingGeneration = new AtomicBoolean(); | ||
|
||
/** | ||
* Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is | ||
|
@@ -329,7 +331,7 @@ public Path location() { | |
* Returns the generation of the current transaction log. | ||
*/ | ||
public long currentFileGeneration() { | ||
try (ReleasableLock lock = readLock.acquire()) { | ||
try (ReleasableLock ignored = readLock.acquire()) { | ||
return current.getGeneration(); | ||
} | ||
} | ||
|
@@ -409,20 +411,40 @@ TranslogWriter createWriter(long fileGeneration) throws IOException { | |
public Location add(final Operation operation) throws IOException { | ||
final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); | ||
try { | ||
final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out); | ||
final long start = out.position(); | ||
out.skip(Integer.BYTES); | ||
writeOperationNoSize(checksumStreamOutput, operation); | ||
writeOperationNoSize(new BufferedChecksumStreamOutput(out), operation); | ||
final long end = out.position(); | ||
final int operationSize = (int) (end - Integer.BYTES - start); | ||
out.seek(start); | ||
out.writeInt(operationSize); | ||
out.seek(end); | ||
final ReleasablePagedBytesReference bytes = out.bytes(); | ||
final Location location; | ||
final boolean shouldRollGeneration; | ||
try (ReleasableLock ignored = readLock.acquire()) { | ||
ensureOpen(); | ||
return current.add(bytes, operation.seqNo()); | ||
location = current.add(bytes, operation.seqNo()); | ||
// check if we should roll under the read lock | ||
shouldRollGeneration = | ||
shouldRollGeneration() && rollingGeneration.compareAndSet(false, true); | ||
} | ||
if (shouldRollGeneration) { | ||
try (ReleasableLock ignored = writeLock.acquire()) { | ||
/* | ||
* We have to check the condition again lest we could roll twice if another | ||
* thread committed the translog (which rolls the generation) between us | ||
* releasing the read lock and acquiring the write lock. | ||
*/ | ||
if (shouldRollGeneration()) { | ||
this.rollGeneration(); | ||
} | ||
} finally { | ||
final boolean wasRolling = rollingGeneration.getAndSet(false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we have a similar logic in |
||
assert wasRolling; | ||
} | ||
} | ||
return location; | ||
} catch (final AlreadyClosedException | IOException ex) { | ||
try { | ||
closeOnTragicEvent(ex); | ||
|
@@ -442,6 +464,19 @@ public Location add(final Operation operation) throws IOException { | |
} | ||
} | ||
|
||
/** | ||
* Tests whether or not the current generation of the translog should be rolled into a new | ||
* generation. This test is based on the size of the current generation compared to the | ||
* configured generation threshold size. | ||
* | ||
* @return {@code true} if the current generation should be rolled into a new generation | ||
*/ | ||
private boolean shouldRollGeneration() { | ||
final long size = this.current.sizeInBytes(); | ||
final long threshold = this.indexSettings.getGenerationThresholdSize().getBytes(); | ||
return size > threshold; | ||
} | ||
|
||
/** | ||
* The a {@linkplain Location} that will sort after the {@linkplain Location} returned by the last write but before any locations which | ||
* can be returned by the next write. | ||
|
@@ -1322,44 +1357,62 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl | |
out.writeInt((int) checksum); | ||
} | ||
|
||
/** | ||
* Roll the current translog generation into a new generation. This does not commit the | ||
* translog. The translog write lock must be held by the current thread. | ||
* | ||
* @throws IOException if an I/O exception occurred during any file operations | ||
*/ | ||
void rollGeneration() throws IOException { | ||
assert writeLock.isHeldByCurrentThread(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe allow this assert to give us a message? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 32afd9e. |
||
try { | ||
final TranslogReader reader = current.closeIntoReader(); | ||
readers.add(reader); | ||
final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME); | ||
assert Checkpoint.read(checkpoint).generation == current.getGeneration(); | ||
final Path generationCheckpoint = | ||
location.resolve(getCommitCheckpointFileName(current.getGeneration())); | ||
Files.copy(checkpoint, generationCheckpoint); | ||
IOUtils.fsync(generationCheckpoint, false); | ||
IOUtils.fsync(generationCheckpoint.getParent(), true); | ||
// create a new translog file; this will sync it and update the checkpoint data; | ||
current = createWriter(current.getGeneration() + 1); | ||
logger.trace("current translog set to [{}]", current.getGeneration()); | ||
} catch (final Exception e) { | ||
IOUtils.closeWhileHandlingException(this); // tragic event | ||
throw e; | ||
} | ||
} | ||
|
||
@Override | ||
public long prepareCommit() throws IOException { | ||
try (ReleasableLock lock = writeLock.acquire()) { | ||
try (ReleasableLock ignored = writeLock.acquire()) { | ||
ensureOpen(); | ||
if (currentCommittingGeneration != NOT_SET_GENERATION) { | ||
throw new IllegalStateException("already committing a translog with generation: " + currentCommittingGeneration); | ||
final String message = String.format( | ||
Locale.ROOT, | ||
"already committing a translog with generation [%d]", | ||
currentCommittingGeneration); | ||
throw new IllegalStateException(message); | ||
} | ||
currentCommittingGeneration = current.getGeneration(); | ||
TranslogReader currentCommittingTranslog = current.closeIntoReader(); | ||
readers.add(currentCommittingTranslog); | ||
Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME); | ||
assert Checkpoint.read(checkpoint).generation == currentCommittingTranslog.getGeneration(); | ||
Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(currentCommittingTranslog.getGeneration())); | ||
Files.copy(checkpoint, commitCheckpoint); | ||
IOUtils.fsync(commitCheckpoint, false); | ||
IOUtils.fsync(commitCheckpoint.getParent(), true); | ||
// create a new translog file - this will sync it and update the checkpoint data; | ||
current = createWriter(current.getGeneration() + 1); | ||
logger.trace("current translog set to [{}]", current.getGeneration()); | ||
|
||
} catch (Exception e) { | ||
IOUtils.closeWhileHandlingException(this); // tragic event | ||
throw e; | ||
rollGeneration(); | ||
} | ||
return 0L; | ||
return 0; | ||
} | ||
|
||
@Override | ||
public long commit() throws IOException { | ||
try (ReleasableLock lock = writeLock.acquire()) { | ||
try (ReleasableLock ignored = writeLock.acquire()) { | ||
ensureOpen(); | ||
if (currentCommittingGeneration == NOT_SET_GENERATION) { | ||
prepareCommit(); | ||
} | ||
assert currentCommittingGeneration != NOT_SET_GENERATION; | ||
assert readers.stream().filter(r -> r.getGeneration() == currentCommittingGeneration).findFirst().isPresent() | ||
: "reader list doesn't contain committing generation [" + currentCommittingGeneration + "]"; | ||
lastCommittedTranslogFileGeneration = current.getGeneration(); // this is important - otherwise old files will not be cleaned up | ||
assert readers.stream().anyMatch(r -> r.getGeneration() == currentCommittingGeneration) | ||
: "readers missing committing generation [" + currentCommittingGeneration + "]"; | ||
// set the last committed generation otherwise old files will not be cleaned up | ||
lastCommittedTranslogFileGeneration = current.getGeneration(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is wrong - if we end up rolling during a commit we need to set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch (adding multiple rolls between pre-commit and commit catches it too)! I pushed 1669224. |
||
currentCommittingGeneration = NOT_SET_GENERATION; | ||
trimUnreferencedReaders(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -370,6 +370,27 @@ public void testTranslogFlushSizeThreshold() { | |
assertEquals(actualNewTranslogFlushThresholdSize, settings.getFlushThresholdSize()); | ||
} | ||
|
||
public void testTranslogGenerationSizeThreshold() { | ||
final ByteSizeValue size = new ByteSizeValue(Math.abs(randomInt())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we protect against 0? Thinking about it more, we should at least fit one op in there. So I wonder if we should have a decent lower bound. The problem in making it too big is testing - how about making it 1kb? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it necessary? Even at zero one operation can squeeze into the generation before rolling (we do not check until after an operation is added to the translog). I like it small for testing, it's easier to think about. If you feel strongly that we should lower bound it, I'm fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm on the fence. Ideally we should protect from abusing a production system. Let's keep things as they are and not over engineer this. Thanks for pointing out that a single op will always go in there. |
||
final String key = IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(); | ||
final ByteSizeValue actualValue = | ||
ByteSizeValue.parseBytesSizeValue(size.toString(), key); | ||
final IndexMetaData metaData = | ||
newIndexMeta( | ||
"index", | ||
Settings.builder() | ||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) | ||
.put(key, size.toString()) | ||
.build()); | ||
final IndexSettings settings = new IndexSettings(metaData, Settings.EMPTY); | ||
assertEquals(actualValue, settings.getGenerationThresholdSize()); | ||
final ByteSizeValue newSize = new ByteSizeValue(Math.abs(randomInt())); | ||
final ByteSizeValue actual = ByteSizeValue.parseBytesSizeValue(newSize.toString(), key); | ||
settings.updateIndexMetaData( | ||
newIndexMeta("index", Settings.builder().put(key, newSize.toString()).build())); | ||
assertEquals(actual, settings.getGenerationThresholdSize()); | ||
} | ||
|
||
public void testArchiveBrokenIndexSettings() { | ||
Settings settings = | ||
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS.archiveUnknownOrInvalidSettings( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: wondering if this should just be
index.translog.generation_size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I picked
index.translog.generation_threshold_size
to be consistent withindex.translog.flush_threshold_size
. Do you still wonder if it should be changed?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can keep as is then. I don't mind much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you maybe document what happens if that size is exceeded or add a link to the explain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanna understand why this is
64MB
why can't we useINDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING
or it's default instead rather than an arbitrary value? I would love to get some insight that we also can document? Then there is also the question why we can't simply flush this from the outside viaIndexShard#maybeFlush
, that would simplify the translog potentially, one property that I liked about theTranslog#add()
operation was that it would never acquire a write lock. We are now entering potentially dangerous territory here, locking can be a beast.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jason can say why he chose 64MB - I think any value we choose now will be arbitrary so I was good with it for now and tweak later if needed.
We want a number that's much smaller than the flush size. We're heading towards keeping the generations that are needed to recover all seq# ops after a certain point. That means that we won't always clean previous generation when flushing. If we use the flush size for generations we can end up in a poisonous where we repeatedly try to flush but the translog is not trimmed.
Do you mean doing both flushing and potentially opening a new generation from the same method that is called after indexing? i.e., call it maybeFlushAndRoll ( :) ) ? I would be good with that (though prefer the current approach).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was looking into having a
Translog#rollover()
method we can from themaybeFlush
we can renamemaybeFlush
toonAfterOpteration
to not necessarily yield impl details. I am a bit concerned about the write lock, which might not be a problem today but maybe tomorrow.makes sense. lets document this. it's not clear from reviewing the PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed dd14ba8 to move the control of rolling to index shard. Would @nik9000, @bleskes, and @s1monw please take another look?