From e21ce2d424936d531db0707ee2ecf65efdfa612a Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Tue, 22 Nov 2022 14:38:05 +0530 Subject: [PATCH] More tests and more fixes --- .../index/translog/RemoteFsTranslog.java | 4 +- .../transfer/TranslogTransferManager.java | 1 + .../index/translog/RemoteFSTranslogTests.java | 863 +++++++++++++++++- .../index/translog/TranslogTests.java | 2 +- 4 files changed, 843 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index b9d6d4e9effa3..942f41dfa0aad 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -387,7 +387,9 @@ public void close() throws IOException { : "Translog.close method is called from inside Translog, but not via closeOnTragicEvent method"; if (closed.compareAndSet(false, true)) { try (ReleasableLock lock = writeLock.acquire()) { - prepareAndUpload(primaryTermSupplier.getAsLong(), null); + if (current.syncNeeded()) { + prepareAndUpload(primaryTermSupplier.getAsLong(), null); + } } finally { logger.debug("translog closed"); closeFilesIfNoPendingRetentionLocks(); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 7755803192cec..12ca2a98473a3 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -82,6 +82,7 @@ public boolean uploadTranslog(TransferSnapshot translogCheckpointTransferSnapsho toUpload.addAll(exclusionFilter.apply(translogCheckpointTransferSnapshot.getCheckpointFileSnapshots())); if (toUpload.isEmpty()) { logger.warn("Nothing to upload for transfer size {}", translogCheckpointTransferSnapshot.getTransferSize()); + translogTransferListener.onUploadComplete(translogCheckpointTransferSnapshot); return true; } final CountDownLatch latch = new CountDownLatch(toUpload.size()); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index 66fd781c17d45..186019e2b727d 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -8,7 +8,12 @@ package org.opensearch.index.translog; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil; import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.ByteArrayDataOutput; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.tests.mockfile.FilterFileChannel; import org.apache.lucene.tests.util.LuceneTestCase; import org.junit.After; import org.junit.Before; @@ -16,15 +21,21 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.bytes.ReleasableBytesReference; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.common.util.concurrent.AbstractRunnable; +import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.env.Environment; import org.opensearch.env.TestEnvironment; import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.MissingHistoryOperationsException; +import org.opensearch.index.seqno.LocalCheckpointTracker; +import org.opensearch.index.seqno.LocalCheckpointTrackerTests; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.transfer.BlobStoreTransferService; @@ -38,25 +49,43 @@ import org.opensearch.threadpool.ThreadPool; import java.io.Closeable; +import java.io.EOFException; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongConsumer; import java.util.zip.CRC32; import java.util.zip.CheckedInputStream; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; @@ -245,25 +274,6 @@ public void testSimpleOperations() throws IOException { final long noopTerm = randomLongBetween(1, primaryTerm.get()); addToTranslogAndList(translog, ops, new Translog.NoOp(seqNo, noopTerm, reason)); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - Translog.Index index = (Translog.Index) snapshot.next(); - assertNotNull(index); - assertThat(BytesReference.toBytes(index.source()), equalTo(new byte[] { 1 })); - - Translog.Delete delete = (Translog.Delete) snapshot.next(); - assertNotNull(delete); - assertThat(delete.id(), equalTo("2")); - - - Translog.NoOp noOp = (Translog.NoOp) snapshot.next(); - assertNotNull(noOp); - assertThat(noOp.seqNo(), equalTo(seqNo)); - assertThat(noOp.primaryTerm(), equalTo(noopTerm)); - assertThat(noOp.reason(), equalTo(reason)); - - assertNull(snapshot.next()); - } - try (Translog.Snapshot snapshot = translog.newSnapshot()) { assertThat(snapshot, containsOperationsInAnyOrder(ops)); assertThat(snapshot.totalOperations(), equalTo(ops.size())); @@ -415,24 +425,827 @@ public void testSimpleOperationsUpload() throws IOException { // assert content of ckp and tlog files BlobPath path = repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(String.valueOf(primaryTerm.get())); - BlobPath mdPath = repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add("metadata"); for (TranslogReader reader : translog.readers) { final long readerGeneration = reader.getGeneration(); logger.error("Asserting content of {}", readerGeneration); Path translogPath = reader.path(); - try (CheckedInputStream stream = new CheckedInputStream(new FileInputStream(translogPath.toFile()), new CRC32())) { + try (CheckedInputStream stream = new CheckedInputStream(new FileInputStream(translogPath.toFile()), new CRC32()); + InputStream tlogStream = blobStoreTransferService.readFile(path, Translog.getFilename(readerGeneration));) { byte[] content = stream.readAllBytes(); - byte[] tlog = blobStoreTransferService.readFile(path, Translog.getFilename(readerGeneration)).readAllBytes(); + byte[] tlog = tlogStream.readAllBytes(); assertArrayEquals(tlog, content); } Path checkpointPath = translog.location().resolve(Translog.getCommitCheckpointFileName(readerGeneration)); - try (CheckedInputStream stream = new CheckedInputStream(new FileInputStream(checkpointPath.toFile()), new CRC32())) { + try (CheckedInputStream stream = new CheckedInputStream(new FileInputStream(checkpointPath.toFile()), new CRC32()); + InputStream ckpStream = blobStoreTransferService.readFile(path, Translog.getCommitCheckpointFileName(readerGeneration))) { byte[] content = stream.readAllBytes(); - byte[] ckp = blobStoreTransferService.readFile(mdPath, Translog.getCommitCheckpointFileName(readerGeneration)).readAllBytes(); + byte[] ckp = ckpStream.readAllBytes(); assertArrayEquals(ckp, content); } } } + private Long populateTranslogOps(boolean withMissingOps) throws IOException { + long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + final int generations = between(2, 20); + long currentSeqNo = 0L; + List firstGenOps = null; + Map> operationsByGen = new HashMap<>(); + for (int gen = 0; gen < generations; gen++) { + List seqNos = new ArrayList<>(); + int numOps = randomIntBetween(4, 10); + for (int i = 0; i < numOps; i++, currentSeqNo++) { + minSeqNo = SequenceNumbers.min(minSeqNo, currentSeqNo); + maxSeqNo = SequenceNumbers.max(maxSeqNo, currentSeqNo); + seqNos.add(currentSeqNo); + } + Collections.shuffle(seqNos, new Random(100)); + List ops = new ArrayList<>(seqNos.size()); + for (long seqNo : seqNos) { + Translog.Index op = new Translog.Index(randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[] { randomByte() }); + boolean shouldAdd = !withMissingOps || seqNo % 4 != 0; + if (shouldAdd) { + translog.add(op); + ops.add(op); + } + } + operationsByGen.put(translog.currentFileGeneration(), ops); + if (firstGenOps == null) { + firstGenOps = ops; + } + translog.rollGeneration(); + if (rarely()) { + translog.rollGeneration(); // empty generation + } + } + return currentSeqNo; + } + + public void testFullRangeSnapshot() throws Exception { + // Successful snapshot + long nextSeqNo = populateTranslogOps(false); + long fromSeqNo = 0L; + long toSeqNo = Math.min(nextSeqNo - 1, fromSeqNo + 15); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo, true)) { + int totOps = 0; + for (Translog.Operation op = snapshot.next(); op != null; op = snapshot.next()) { + totOps++; + } + assertEquals(totOps, toSeqNo - fromSeqNo + 1); + } + } + + public void testFullRangeSnapshotWithFailures() throws Exception { + long nextSeqNo = populateTranslogOps(true); + long fromSeqNo = 0L; + long toSeqNo = Math.min(nextSeqNo - 1, fromSeqNo + 15); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo, true)) { + int totOps = 0; + for (Translog.Operation op = snapshot.next(); op != null; op = snapshot.next()) { + totOps++; + } + fail("Should throw exception for missing operations"); + } catch (MissingHistoryOperationsException e) { + assertTrue(e.getMessage().contains("Not all operations between from_seqno")); + } + } + + public void testConcurrentWritesWithVaryingSize() throws Throwable { + final int opsPerThread = randomIntBetween(10, 200); + int threadCount = 2 + randomInt(5); + + logger.info("testing with [{}] threads, each doing [{}] ops", threadCount, opsPerThread); + final BlockingQueue writtenOperations = new ArrayBlockingQueue<>(threadCount * opsPerThread); + + Thread[] threads = new Thread[threadCount]; + final Exception[] threadExceptions = new Exception[threadCount]; + final AtomicLong seqNoGenerator = new AtomicLong(); + final CountDownLatch downLatch = new CountDownLatch(1); + for (int i = 0; i < threadCount; i++) { + final int threadId = i; + threads[i] = new TranslogThread( + translog, + downLatch, + opsPerThread, + threadId, + writtenOperations, + seqNoGenerator, + threadExceptions + ); + threads[i].setDaemon(true); + threads[i].start(); + } + + downLatch.countDown(); + + for (int i = 0; i < threadCount; i++) { + if (threadExceptions[i] != null) { + throw threadExceptions[i]; + } + threads[i].join(60 * 1000); + } + + List collect = new ArrayList<>(writtenOperations); + collect.sort( + Comparator.comparing(op -> op.operation.seqNo()) + ); + + List opsList = new ArrayList<>(threadCount * opsPerThread); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + for (Translog.Operation op = snapshot.next(); op != null; op = snapshot.next()) { + opsList.add(op); + } + } + opsList.sort( + Comparator.comparing( op -> op.seqNo()) + ); + + for (int i =0; i < threadCount * opsPerThread; i++) { + assertEquals(opsList.get(i), collect.get(i).operation); + } + } + + /** + * Tests that concurrent readers and writes maintain view and snapshot semantics + */ + public void testConcurrentWriteViewsAndSnapshot() throws Throwable { + final Thread[] writers = new Thread[randomIntBetween(1, 3)]; + final Thread[] readers = new Thread[randomIntBetween(1, 3)]; + final int flushEveryOps = randomIntBetween(5, 100); + final int maxOps = randomIntBetween(200, 1000); + final Object signalReaderSomeDataWasIndexed = new Object(); + final AtomicLong idGenerator = new AtomicLong(); + final CyclicBarrier barrier = new CyclicBarrier(writers.length + readers.length + 1); + + // a map of all written ops and their returned location. + final Map writtenOps = ConcurrentCollections.newConcurrentMap(); + + // a signal for all threads to stop + final AtomicBoolean run = new AtomicBoolean(true); + + final Object flushMutex = new Object(); + final AtomicLong lastCommittedLocalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final LocalCheckpointTracker tracker = LocalCheckpointTrackerTests.createEmptyTracker(); + final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); + // any errors on threads + final List errors = new CopyOnWriteArrayList<>(); + logger.info("using [{}] readers. [{}] writers. flushing every ~[{}] ops.", readers.length, writers.length, flushEveryOps); + for (int i = 0; i < writers.length; i++) { + final String threadName = "writer_" + i; + final int threadId = i; + writers[i] = new Thread(new AbstractRunnable() { + @Override + public void doRun() throws BrokenBarrierException, InterruptedException, IOException { + barrier.await(); + int counter = 0; + while (run.get() && idGenerator.get() < maxOps) { + long id = idGenerator.getAndIncrement(); + final Translog.Operation op; + final Translog.Operation.Type type = Translog.Operation.Type.values()[((int) (id % Translog.Operation.Type + .values().length))]; + switch (type) { + case CREATE: + case INDEX: + op = new Translog.Index("" + id, id, primaryTerm.get(), new byte[] { (byte) id }); + break; + case DELETE: + op = new Translog.Delete(Long.toString(id), id, primaryTerm.get()); + break; + case NO_OP: + op = new Translog.NoOp(id, 1, Long.toString(id)); + break; + default: + throw new AssertionError("unsupported operation type [" + type + "]"); + } + Translog.Location location = translog.add(op); + tracker.markSeqNoAsProcessed(id); + Translog.Location existing = writtenOps.put(op, location); + if (existing != null) { + fail("duplicate op [" + op + "], old entry at " + location); + } + if (id % writers.length == threadId) { + translog.ensureSynced(location); + } + if (id % flushEveryOps == 0) { + synchronized (flushMutex) { + // we need not do this concurrently as we need to make sure that the generation + // we're committing - is still present when we're committing + long localCheckpoint = tracker.getProcessedCheckpoint(); + translog.rollGeneration(); + // expose the new checkpoint (simulating a commit), before we trim the translog + lastCommittedLocalCheckpoint.set(localCheckpoint); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); + translog.trimUnreferencedReaders(); + } + } + if (id % 7 == 0) { + synchronized (signalReaderSomeDataWasIndexed) { + signalReaderSomeDataWasIndexed.notifyAll(); + } + } + counter++; + } + logger.info("--> [{}] done. wrote [{}] ops.", threadName, counter); + } + + @Override + public void onFailure(Exception e) { + logger.error(() -> new ParameterizedMessage("--> writer [{}] had an error", threadName), e); + errors.add(e); + } + }, threadName); + writers[i].start(); + } + + for (int i = 0; i < readers.length; i++) { + final String threadId = "reader_" + i; + readers[i] = new Thread(new AbstractRunnable() { + Closeable retentionLock = null; + long committedLocalCheckpointAtView; + + @Override + public void onFailure(Exception e) { + logger.error(() -> new ParameterizedMessage("--> reader [{}] had an error", threadId), e); + errors.add(e); + try { + closeRetentionLock(); + } catch (IOException inner) { + inner.addSuppressed(e); + logger.error("unexpected error while closing view, after failure", inner); + } + } + + void closeRetentionLock() throws IOException { + if (retentionLock != null) { + retentionLock.close(); + } + } + + void acquireRetentionLock() throws IOException { + closeRetentionLock(); + retentionLock = translog.acquireRetentionLock(); + // captures the last committed checkpoint, while holding the view, simulating + // recovery logic which captures a view and gets a lucene commit + committedLocalCheckpointAtView = lastCommittedLocalCheckpoint.get(); + logger.info("--> [{}] min gen after acquiring lock [{}]", threadId, translog.getMinFileGeneration()); + } + + @Override + protected void doRun() throws Exception { + barrier.await(); + int iter = 0; + while (idGenerator.get() < maxOps) { + if (iter++ % 10 == 0) { + acquireRetentionLock(); + } + + // captures al views that are written since the view was created (with a small caveat see bellow) + // these are what we expect the snapshot to return (and potentially some more). + Set expectedOps = new HashSet<>(writtenOps.keySet()); + expectedOps.removeIf(op -> op.seqNo() <= committedLocalCheckpointAtView); + try (Translog.Snapshot snapshot = translog.newSnapshot(committedLocalCheckpointAtView + 1L, Long.MAX_VALUE)) { + Translog.Operation op; + while ((op = snapshot.next()) != null) { + expectedOps.remove(op); + } + } + if (expectedOps.isEmpty() == false) { + StringBuilder missed = new StringBuilder("missed ").append(expectedOps.size()) + .append(" operations from [") + .append(committedLocalCheckpointAtView + 1L) + .append("]"); + boolean failed = false; + for (Translog.Operation expectedOp : expectedOps) { + final Translog.Location loc = writtenOps.get(expectedOp); + failed = true; + missed.append("\n --> [").append(expectedOp).append("] written at ").append(loc); + } + if (failed) { + fail(missed.toString()); + } + } + // slow down things a bit and spread out testing.. + synchronized (signalReaderSomeDataWasIndexed) { + if (idGenerator.get() < maxOps) { + signalReaderSomeDataWasIndexed.wait(); + } + } + } + closeRetentionLock(); + logger.info("--> [{}] done. tested [{}] snapshots", threadId, iter); + } + }, threadId); + readers[i].start(); + } + + barrier.await(); + logger.debug("--> waiting for threads to stop"); + for (Thread thread : writers) { + thread.join(); + } + logger.debug("--> waiting for readers to stop"); + // force stopping, if all writers crashed + synchronized (signalReaderSomeDataWasIndexed) { + idGenerator.set(Long.MAX_VALUE); + signalReaderSomeDataWasIndexed.notifyAll(); + } + for (Thread thread : readers) { + thread.join(); + } + if (errors.size() > 0) { + Throwable e = errors.get(0); + for (Throwable suppress : errors.subList(1, errors.size())) { + e.addSuppressed(suppress); + } + throw e; + } + logger.info("--> test done. total ops written [{}]", writtenOps.size()); + } + + public void testSyncUpTo() throws IOException { + int translogOperations = randomIntBetween(10, 100); + int count = 0; + for (int op = 0; op < translogOperations; op++) { + int seqNo = ++count; + final Translog.Location location = translog.add( + new Translog.Index("" + op, seqNo, primaryTerm.get(), Integer.toString(seqNo).getBytes(Charset.forName("UTF-8"))) + ); + if (randomBoolean()) { + assertTrue("at least one operation pending", translog.syncNeeded()); + assertTrue("this operation has not been synced", translog.ensureSynced(location)); + // we are the last location so everything should be synced + assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); + seqNo = ++count; + translog.add( + new Translog.Index("" + op, seqNo, primaryTerm.get(), Integer.toString(seqNo).getBytes(Charset.forName("UTF-8"))) + ); + assertTrue("one pending operation", translog.syncNeeded()); + assertFalse("this op has been synced before", translog.ensureSynced(location)); // not syncing now + assertTrue("we only synced a previous operation yet", translog.syncNeeded()); + } + if (rarely()) { + translog.rollGeneration(); + assertFalse("location is from a previous translog - already synced", translog.ensureSynced(location)); // not syncing now + assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); + } + + if (randomBoolean()) { + translog.sync(); + assertFalse("translog has been synced already", translog.ensureSynced(location)); + } + } + } + public void testSyncUpToStream() throws IOException { + int iters = randomIntBetween(5, 10); + for (int i = 0; i < iters; i++) { + int translogOperations = randomIntBetween(10, 100); + int count = 0; + ArrayList locations = new ArrayList<>(); + for (int op = 0; op < translogOperations; op++) { + if (rarely()) { + translog.rollGeneration(); + } + final Translog.Location location = translog.add( + new Translog.Index("" + op, op, primaryTerm.get(), Integer.toString(++count).getBytes(Charset.forName("UTF-8"))) + ); + locations.add(location); + } + Collections.shuffle(locations, random()); + if (randomBoolean()) { + assertTrue("at least one operation pending", translog.syncNeeded()); + assertTrue("this operation has not been synced", translog.ensureSynced(locations.stream())); + // we are the last location so everything should be synced + assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); + } else if (rarely()) { + translog.rollGeneration(); + // not syncing now + assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream())); + assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); + } else { + translog.sync(); + assertFalse("translog has been synced already", translog.ensureSynced(locations.stream())); + } + for (Translog.Location location : locations) { + assertFalse("all of the locations should be synced: " + location, translog.ensureSynced(location)); + } + } + } + + public void testLocationComparison() throws IOException { + List locations = new ArrayList<>(); + int translogOperations = randomIntBetween(10, 100); + int count = 0; + for (int op = 0; op < translogOperations; op++) { + locations.add( + translog.add( + new Translog.Index("" + op, op, primaryTerm.get(), Integer.toString(++count).getBytes(Charset.forName("UTF-8"))) + ) + ); + if (randomBoolean()) { + translog.ensureSynced(locations.get(op)); + } + if (rarely() && translogOperations > op + 1) { + translog.rollGeneration(); + } + } + Collections.shuffle(locations, random()); + Translog.Location max = locations.get(0); + for (Translog.Location location : locations) { + max = max(max, location); + } + + assertEquals(max.generation, translog.currentFileGeneration()); + try (Translog.Snapshot snap = new TranslogTests.SortedSnapshot(translog.newSnapshot())) { + Translog.Operation next; + Translog.Operation maxOp = null; + while ((next = snap.next()) != null) { + maxOp = next; + } + assertNotNull(maxOp); + assertEquals(maxOp.getSource().source.utf8ToString(), Integer.toString(count)); + } + } + + + public static Translog.Location max(Translog.Location a, Translog.Location b) { + if (a.compareTo(b) > 0) { + return a; + } + return b; + } + + public void testTranslogWriter() throws IOException { + final TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1); + final Set persistedSeqNos = new HashSet<>(); + persistedSeqNoConsumer.set(persistedSeqNos::add); + final int numOps = scaledRandomIntBetween(8, 250000); + final Set seenSeqNos = new HashSet<>(); + boolean opsHaveValidSequenceNumbers = randomBoolean(); + for (int i = 0; i < numOps; i++) { + byte[] bytes = new byte[4]; + DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(bytes)); + out.writeInt(i); + long seqNo; + do { + seqNo = opsHaveValidSequenceNumbers ? randomNonNegativeLong() : SequenceNumbers.UNASSIGNED_SEQ_NO; + opsHaveValidSequenceNumbers = opsHaveValidSequenceNumbers || !rarely(); + } while (seenSeqNos.contains(seqNo)); + if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { + seenSeqNos.add(seqNo); + } + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), seqNo); + } + assertThat(persistedSeqNos, empty()); + writer.sync(); + persistedSeqNos.remove(SequenceNumbers.UNASSIGNED_SEQ_NO); + assertEquals(seenSeqNos, persistedSeqNos); + + final BaseTranslogReader reader = randomBoolean() + ? writer + : translog.openReader(writer.path(), Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME))); + for (int i = 0; i < numOps; i++) { + ByteBuffer buffer = ByteBuffer.allocate(4); + reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i); + buffer.flip(); + final int value = buffer.getInt(); + assertEquals(i, value); + } + final long minSeqNo = seenSeqNos.stream().min(Long::compareTo).orElse(SequenceNumbers.NO_OPS_PERFORMED); + final long maxSeqNo = seenSeqNos.stream().max(Long::compareTo).orElse(SequenceNumbers.NO_OPS_PERFORMED); + assertThat(reader.getCheckpoint().minSeqNo, equalTo(minSeqNo)); + assertThat(reader.getCheckpoint().maxSeqNo, equalTo(maxSeqNo)); + + byte[] bytes = new byte[4]; + DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(bytes)); + out.writeInt(2048); + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); + + if (reader instanceof TranslogReader) { + ByteBuffer buffer = ByteBuffer.allocate(4); + try { + reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * numOps); + fail("read past EOF?"); + } catch (EOFException ex) { + // expected + } + ((TranslogReader) reader).close(); + } else { + // live reader! + ByteBuffer buffer = ByteBuffer.allocate(4); + final long pos = reader.getFirstOperationOffset() + 4 * numOps; + reader.readBytes(buffer, pos); + buffer.flip(); + final int value = buffer.getInt(); + assertEquals(2048, value); + } + IOUtils.close(writer); + } + + public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException { + Path tempDir = createTempDir(); + final TranslogConfig temp = getTranslogConfig(tempDir); + final TranslogConfig config = new TranslogConfig( + temp.getShardId(), + temp.getTranslogPath(), + temp.getIndexSettings(), + temp.getBigArrays(), + new ByteSizeValue(1, ByteSizeUnit.KB) + ); + + final Set persistedSeqNos = new HashSet<>(); + final AtomicInteger writeCalls = new AtomicInteger(); + + final ChannelFactory channelFactory = (file, openOption) -> { + FileChannel delegate = FileChannel.open(file, openOption); + boolean success = false; + try { + // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation + final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); + + final FileChannel channel; + if (isCkpFile) { + channel = delegate; + } else { + channel = new FilterFileChannel(delegate) { + + @Override + public int write(ByteBuffer src) throws IOException { + writeCalls.incrementAndGet(); + return super.write(src); + } + }; + } + success = true; + return channel; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(delegate); + } + } + }; + + String translogUUID = Translog.createEmptyTranslog( + config.getTranslogPath(), + SequenceNumbers.NO_OPS_PERFORMED, + shardId, + channelFactory, + primaryTerm.get() + ); + + try ( + Translog translog = new RemoteFsTranslog( + config, + translogUUID, + new DefaultTranslogDeletionPolicy(-1, -1, 0), + () -> SequenceNumbers.NO_OPS_PERFORMED, + primaryTerm::get, + persistedSeqNos::add, + repository, + threadPool + ) { + @Override + ChannelFactory getChannelFactory() { + return channelFactory; + } + } + ) { + TranslogWriter writer = translog.getCurrent(); + int initialWriteCalls = writeCalls.get(); + byte[] bytes = new byte[256]; + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1); + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2); + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 3); + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 4); + assertThat(persistedSeqNos, empty()); + assertEquals(initialWriteCalls, writeCalls.get()); + + if (randomBoolean()) { + // Since the buffer is full, this will flush before performing the add. + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 5); + assertThat(persistedSeqNos, empty()); + assertThat(writeCalls.get(), greaterThan(initialWriteCalls)); + } else { + // Will flush on read + writer.readBytes(ByteBuffer.allocate(256), 0); + assertThat(persistedSeqNos, empty()); + assertThat(writeCalls.get(), greaterThan(initialWriteCalls)); + + // Add after we the read flushed the buffer + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 5); + } + + writer.sync(); + + // Sequence numbers are marked as persisted after sync + assertThat(persistedSeqNos, contains(1L, 2L, 3L, 4L, 5L)); + } + } + + public void testCloseIntoReader() throws IOException { + try (TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1)) { + final int numOps = randomIntBetween(8, 128); + for (int i = 0; i < numOps; i++) { + final byte[] bytes = new byte[4]; + final DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(bytes)); + out.writeInt(i); + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); + } + writer.sync(); + final Checkpoint writerCheckpoint = writer.getCheckpoint(); + TranslogReader reader = writer.closeIntoReader(); + try { + if (randomBoolean()) { + reader.close(); + reader = translog.openReader(reader.path(), writerCheckpoint); + } + for (int i = 0; i < numOps; i++) { + final ByteBuffer buffer = ByteBuffer.allocate(4); + reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i); + buffer.flip(); + final int value = buffer.getInt(); + assertEquals(i, value); + } + final Checkpoint readerCheckpoint = reader.getCheckpoint(); + assertThat(readerCheckpoint, equalTo(writerCheckpoint)); + } finally { + IOUtils.close(reader); + } + } + } + + public void testRecoveryUncommitted() throws IOException { + List locations = new ArrayList<>(); + int translogOperations = randomIntBetween(10, 100); + final int prepareOp = randomIntBetween(0, translogOperations - 1); + Translog.TranslogGeneration translogGeneration = null; + final boolean sync = randomBoolean(); + for (int op = 0; op < translogOperations; op++) { + locations.add( + translog.add(new Translog.Index("" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))) + ); + if (op == prepareOp) { + translogGeneration = translog.getGeneration(); + translog.rollGeneration(); + assertEquals( + "expected this to be the first roll (1 gen is on creation, 2 when opened)", + 2L, + translogGeneration.translogFileGeneration + ); + assertNotNull(translogGeneration.translogUUID); + } + } + if (sync) { + translog.sync(); + } + // we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted + // translog here as well. + TranslogConfig config = translog.getConfig(); + final String translogUUID = translog.getTranslogUUID(); + final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); + try ( + Translog translog = new RemoteFsTranslog( + config, + translogUUID, + deletionPolicy, + () -> globalCheckpoint.get(), + primaryTerm::get, + getPersistedSeqNoConsumer(), + repository, + threadPool + ) + ) { + assertNotNull(translogGeneration); + assertEquals( + "lastCommitted must be 1 less than current - we never finished the commit", + translogGeneration.translogFileGeneration + 1, + translog.currentFileGeneration() + ); + assertFalse(translog.syncNeeded()); + try (Translog.Snapshot snapshot = new TranslogTests.SortedSnapshot(translog.newSnapshot())) { + int upTo = sync ? translogOperations : prepareOp; + for (int i = 0; i < upTo; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null synced: " + sync, next); + assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + } + } + } + if (randomBoolean()) { // recover twice + try ( + Translog translog = new RemoteFsTranslog( + config, + translogUUID, + deletionPolicy, + () -> SequenceNumbers.NO_OPS_PERFORMED, + primaryTerm::get, + seqNo -> {}, + repository, + threadPool + ) + ) { + assertNotNull(translogGeneration); + assertEquals( + "lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", + translogGeneration.translogFileGeneration + 3, + translog.currentFileGeneration() + ); + assertFalse(translog.syncNeeded()); + try (Translog.Snapshot snapshot = new TranslogTests.SortedSnapshot(translog.newSnapshot())) { + int upTo = sync ? translogOperations : prepareOp; + for (int i = 0; i < upTo; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null synced: " + sync, next); + assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + } + } + } + } + } + + + class TranslogThread extends Thread { + private final CountDownLatch downLatch; + private final int opsPerThread; + private final int threadId; + private final Collection writtenOperations; + private final Exception[] threadExceptions; + private final Translog translog; + private final AtomicLong seqNoGenerator; + + TranslogThread( + Translog translog, + CountDownLatch downLatch, + int opsPerThread, + int threadId, + Collection writtenOperations, + AtomicLong seqNoGenerator, + Exception[] threadExceptions + ) { + this.translog = translog; + this.downLatch = downLatch; + this.opsPerThread = opsPerThread; + this.threadId = threadId; + this.writtenOperations = writtenOperations; + this.seqNoGenerator = seqNoGenerator; + this.threadExceptions = threadExceptions; + } + + @Override + public void run() { + try { + downLatch.await(); + for (int opCount = 0; opCount < opsPerThread; opCount++) { + Translog.Operation op; + final Translog.Operation.Type type = randomFrom(Translog.Operation.Type.values()); + switch (type) { + case CREATE: + case INDEX: + op = new Translog.Index( + threadId + "_" + opCount, + seqNoGenerator.getAndIncrement(), + primaryTerm.get(), + randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8") + ); + break; + case DELETE: + op = new Translog.Delete( + threadId + "_" + opCount, + seqNoGenerator.getAndIncrement(), + primaryTerm.get(), + 1 + randomInt(100000) + ); + break; + case NO_OP: + op = new Translog.NoOp(seqNoGenerator.getAndIncrement(), primaryTerm.get(), randomAlphaOfLength(16)); + break; + default: + throw new AssertionError("unsupported operation type [" + type + "]"); + } + + Translog.Location loc = add(op); + writtenOperations.add(new TranslogTests.LocationOperation(op, loc)); + if (rarely()) { // lets verify we can concurrently read this + assertEquals(op, translog.readOperation(loc)); + } + afterAdd(); + } + } catch (Exception t) { + threadExceptions[threadId] = t; + } + } + + protected Translog.Location add(Translog.Operation op) throws IOException { + Translog.Location location = translog.add(op); + if (randomBoolean()) { + translog.ensureSynced(location); + } + return location; + } + + protected void afterAdd() { + } + } + } diff --git a/server/src/test/java/org/opensearch/index/translog/TranslogTests.java b/server/src/test/java/org/opensearch/index/translog/TranslogTests.java index cce19cc35341a..f7741031f4d6e 100644 --- a/server/src/test/java/org/opensearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/TranslogTests.java @@ -2373,7 +2373,7 @@ public void testCloseConcurrently() throws Throwable { } } - private class TranslogThread extends Thread { + class TranslogThread extends Thread { private final CountDownLatch downLatch; private final int opsPerThread; private final int threadId;