From 38408938ccfe5b8c051e25c645bdcd71b45fa66e Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Wed, 20 Mar 2024 12:24:37 +0100 Subject: [PATCH] Do not go to disk for reading hints file sizes patch by Stefan Miklosovic; reviewed by Aleksey Yeschenko, Jon Haddad, Chris Lohfink for CASSANDRA-19477 --- CHANGES.txt | 1 + .../cassandra/hints/HintsDescriptor.java | 24 +++- .../apache/cassandra/hints/HintsStore.java | 13 +- .../apache/cassandra/hints/HintsWriter.java | 11 +- .../cassandra/service/StorageProxy.java | 14 +- .../distributed/test/HintsMaxSizeTest.java | 128 ++++++++++++++++++ .../apache/cassandra/hints/AlteredHints.java | 3 + .../cassandra/hints/HintsCatalogTest.java | 17 ++- .../cassandra/hints/HintsReaderTest.java | 5 + .../cassandra/hints/HintsStoreTest.java | 3 + 10 files changed, 194 insertions(+), 25 deletions(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/test/HintsMaxSizeTest.java diff --git a/CHANGES.txt b/CHANGES.txt index ab2346b2bfc8..50a55b60d66a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.1.5 + * Do not go to disk for reading hints file sizes (CASSANDRA-19477) * Fix system_views.settings to handle array types (CASSANDRA-19475) Merged from 4.0: * Push LocalSessions info logs to debug (CASSANDRA-18335) diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java b/src/java/org/apache/cassandra/hints/HintsDescriptor.java index 8e1f782f1d6c..d021dec983fd 100644 --- a/src/java/org/apache/cassandra/hints/HintsDescriptor.java +++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java @@ -77,6 +77,8 @@ final class HintsDescriptor final UUID hostId; final int version; final long timestamp; + final String hintsFileName; + final String crc32FileName; final ImmutableMap parameters; final ParameterizedClass compressionConfig; @@ -89,6 +91,8 @@ final class HintsDescriptor this.hostId = hostId; this.version = version; this.timestamp = timestamp; + hintsFileName = hostId + "-" + timestamp + '-' + version + ".hints"; + crc32FileName = hostId + "-" + timestamp + '-' + version + ".crc32"; compressionConfig = createCompressionConfig(parameters); EncryptionData encryption = createEncryption(parameters); @@ -201,12 +205,12 @@ private EncryptionData(Cipher cipher, ICompressor compressor, ImmutableMap predicate) if (predicate.test(descriptor)) { cleanUp(descriptor); - delete(descriptor); removeSet.add(descriptor); + delete(descriptor); } } } @@ -235,17 +235,20 @@ void markDispatchOffset(HintsDescriptor descriptor, InputPosition inputPosition) dispatchPositions.put(descriptor, inputPosition); } - /** * @return the total size of all files belonging to the hints store, in bytes. */ + @SuppressWarnings({ "resource" }) long getTotalFileSize() { long total = 0; for (HintsDescriptor descriptor : Iterables.concat(dispatchDequeue, corruptedFiles)) - { - total += descriptor.file(hintsDirectory).length(); - } + total += descriptor.hintsFileSize(hintsDirectory); + + HintsWriter currentWriter = getWriter(); + if (null != currentWriter) + total += currentWriter.descriptor().hintsFileSize(hintsDirectory); + return total; } diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java b/src/java/org/apache/cassandra/hints/HintsWriter.java index 663427a51b51..f48288cb1d56 100644 --- a/src/java/org/apache/cassandra/hints/HintsWriter.java +++ b/src/java/org/apache/cassandra/hints/HintsWriter.java @@ -81,6 +81,7 @@ static HintsWriter create(File directory, HintsDescriptor descriptor) throws IOE ByteBuffer descriptorBytes = dob.buffer(); updateChecksum(crc, descriptorBytes); channel.write(descriptorBytes); + descriptor.hintsFileSize(channel.position()); if (descriptor.isEncrypted()) return new EncryptedHintsWriter(directory, descriptor, file, channel, fd, crc); @@ -113,6 +114,7 @@ private void writeChecksum() } } + @Override public void close() { perform(file, Throwables.FileOpType.WRITE, this::doFsync, channel::close); @@ -194,7 +196,6 @@ long position() * writes to the underlying channel when the buffer is overflown. * * @param hint the serialized hint (with CRC included) - * @throws IOException */ void append(ByteBuffer hint) throws IOException { @@ -223,11 +224,10 @@ void append(ByteBuffer hint) throws IOException /** * Serializes and appends the hint (with CRC included) to this session's aggregation buffer, * writes to the underlying channel when the buffer is overflown. - * - * Used mainly by tests and {@link LegacyHintsMigrator} + *

+ * Used mainly by tests * * @param hint the unserialized hint - * @throws IOException */ void append(Hint hint) throws IOException { @@ -261,13 +261,14 @@ void append(Hint hint) throws IOException /** * Closes the session - flushes the aggregation buffer (if not empty), does page aligning, and potentially fsyncs. - * @throws IOException */ + @Override public void close() throws IOException { flushBuffer(); maybeFsync(); maybeSkipCache(); + descriptor.hintsFileSize(position()); } private void flushBuffer() throws IOException diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 0b45db5f2d11..d647aa44b68a 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -2413,13 +2413,15 @@ public static boolean shouldHint(Replica replica, boolean tryEnablePersistentWin } long maxHintsSize = DatabaseDescriptor.getMaxHintsSizePerHost(); - long actualTotalHintsSize = HintsService.instance.getTotalHintsSize(hostIdForEndpoint); - boolean hasHintsReachedMaxSize = maxHintsSize > 0 && actualTotalHintsSize > maxHintsSize; - if (hasHintsReachedMaxSize) + if (maxHintsSize > 0) { - Tracing.trace("Not hinting {} which has reached to the max hints size {} bytes on disk. The actual hints size on disk: {}", - endpoint, maxHintsSize, actualTotalHintsSize); - return false; + long actualTotalHintsSize = HintsService.instance.getTotalHintsSize(hostIdForEndpoint); + if (actualTotalHintsSize > maxHintsSize) + { + Tracing.trace("Not hinting {} which has reached to the max hints size {} bytes on disk. The actual hints size on disk: {}", + endpoint, maxHintsSize, actualTotalHintsSize); + return false; + } } return true; diff --git a/test/distributed/org/apache/cassandra/distributed/test/HintsMaxSizeTest.java b/test/distributed/org/apache/cassandra/distributed/test/HintsMaxSizeTest.java new file mode 100644 index 000000000000..7e25f4311ddc --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/HintsMaxSizeTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test; + +import java.util.UUID; + +import org.junit.Test; + +import org.apache.cassandra.auth.CassandraRoleManager; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.IIsolatedExecutor; +import org.apache.cassandra.hints.HintsService; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.service.StorageService; + +import static java.lang.String.format; +import static java.lang.String.valueOf; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE; +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +@SuppressWarnings("Convert2MethodRef") +public class HintsMaxSizeTest extends TestBaseImpl +{ + @Test + public void testMaxHintedHandoffSize() throws Exception + { + try (Cluster cluster = init(Cluster.build(2) + .withDataDirCount(1) + .withConfig(config -> config.with(NETWORK, GOSSIP) + .set("hinted_handoff_enabled", true) + .set("max_hints_delivery_threads", "1") + .set("hints_flush_period", "1s") + .set("max_hints_size_per_host", "2MiB") + .set("max_hints_file_size", "1MiB")) + .start(), 2)) + { + final IInvokableInstance node1 = cluster.get(1); + final IInvokableInstance node2 = cluster.get(2); + + waitForExistingRoles(cluster); + + String createTableStatement = format("CREATE TABLE %s.cf (k text PRIMARY KEY, c1 text) " + + "WITH compaction = {'class': 'SizeTieredCompactionStrategy', 'enabled': 'false'} ", KEYSPACE); + cluster.schemaChange(createTableStatement); + + UUID node2UUID = node2.callOnInstance((IIsolatedExecutor.SerializableCallable) () -> StorageService.instance.getLocalHostUUID()); + + // shutdown the second node in a blocking manner + node2.shutdown().get(); + + // insert data and sleep every 10k to have a chance to flush hints + for (int i = 0; i < 70000; i++) + { + cluster.coordinator(1) + .execute(withKeyspace("INSERT INTO %s.cf (k, c1) VALUES (?, ?);"), + ONE, valueOf(i), UUID.randomUUID().toString()); + + if (i % 10000 == 0) + await().atLeast(2, SECONDS).pollDelay(2, SECONDS).until(() -> true); + } + + await().atLeast(3, SECONDS).pollDelay(3, SECONDS).until(() -> true); + + // we see that metrics are updated + await().until(() -> node1.callOnInstance(() -> StorageMetrics.totalHints.getCount()) > 0); + // we do not have 100k of hints because it violated max_hints_size_per_host + assertThat(node1.callOnInstance(() -> StorageMetrics.totalHints.getCount())).isLessThan(70000); + + assertHintsSizes(node1, node2UUID); + + // restart the first node and check hints sizes again + // to be sure sizes were picked up correctly + node1.shutdown().get(); + node1.startup(); + + assertHintsSizes(node1, node2UUID); + } + } + + private void assertHintsSizes(IInvokableInstance node, UUID node2UUID) + { + // we indeed have some hints in its dir + File hintsDir = new File(node.config().getString("hints_directory")); + assertThat(FileUtils.folderSize(hintsDir)).isPositive(); + + // and there is positive size of hints on the disk for the second node + long totalHintsSize = node.appliesOnInstance((IIsolatedExecutor.SerializableFunction) secondNode -> { + return HintsService.instance.getTotalHintsSize(secondNode); + }).apply(node2UUID); + + assertThat(totalHintsSize).isPositive(); + // there might be small overflow in general, depending on hints flushing etc + assertThat(totalHintsSize).isLessThan(4 * 1000 * 1000); + assertThat(totalHintsSize).isGreaterThan(2 * 1000 * 1000); + } + + private static void waitForExistingRoles(Cluster cluster) + { + cluster.forEach(instance -> { + await().pollDelay(1, SECONDS) + .pollInterval(1, SECONDS) + .atMost(60, SECONDS) + .until(() -> instance.callOnInstance(CassandraRoleManager::hasExistingRoles)); + }); + } +} diff --git a/test/unit/org/apache/cassandra/hints/AlteredHints.java b/test/unit/org/apache/cassandra/hints/AlteredHints.java index 4bb6178df45a..84fa5e46f0c3 100644 --- a/test/unit/org/apache/cassandra/hints/AlteredHints.java +++ b/test/unit/org/apache/cassandra/hints/AlteredHints.java @@ -37,6 +37,7 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.schema.KeyspaceParams; +import org.hamcrest.Matchers; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; @@ -100,6 +101,8 @@ public void multiFlushAndDeserializeTest() throws Exception hintNum++; } } + + Assert.assertThat(descriptor.hintsFileSize(dir), Matchers.greaterThan(0L)); } try (HintsReader reader = HintsReader.open(descriptor.file(dir))) diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java index af73d1b00930..1406e25ed981 100644 --- a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java +++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java @@ -29,15 +29,18 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; + import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.apache.cassandra.Util.dk; @@ -146,22 +149,20 @@ public void exciseHintFiles() throws IOException } @Test - public void hintsTotalSizeTest() throws IOException + public void emptyHintsTotalSizeTest() throws IOException { File directory = new File(testFolder.newFolder()); UUID hostId = UUID.randomUUID(); - long now = Clock.Global.currentTimeMillis(); long totalSize = 0; HintsCatalog catalog = HintsCatalog.load(directory, ImmutableMap.of()); HintsStore store = catalog.get(hostId); assertEquals(totalSize, store.getTotalFileSize()); for (int i = 0; i < 3; i++) { - HintsDescriptor descriptor = new HintsDescriptor(hostId, now + i); - writeDescriptor(directory, descriptor); - store.offerLast(descriptor); - assertTrue("Total file size should increase after writing more hints", store.getTotalFileSize() > totalSize); - totalSize = store.getTotalFileSize(); + store.getOrOpenWriter(); + store.closeWriter(); + assertTrue("Even empty hint files should occupy some space as descriptor is written", + store.getTotalFileSize() > 0); } } @@ -243,5 +244,7 @@ private static void createHintFile(File directory, HintsDescriptor descriptor) t session.append(hint); } } + + assertThat(descriptor.hintsFileSize(directory), greaterThan(0L)); } } diff --git a/test/unit/org/apache/cassandra/hints/HintsReaderTest.java b/test/unit/org/apache/cassandra/hints/HintsReaderTest.java index 3e3c64953b0c..56154fd95026 100644 --- a/test/unit/org/apache/cassandra/hints/HintsReaderTest.java +++ b/test/unit/org/apache/cassandra/hints/HintsReaderTest.java @@ -29,6 +29,8 @@ import com.google.common.collect.Iterables; import org.apache.cassandra.io.util.File; + +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -46,6 +48,7 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaTestUtil; import org.apache.cassandra.schema.TableMetadata; +import org.hamcrest.Matchers; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -96,6 +99,8 @@ private void generateHints(int num, String ks) throws IOException } FileUtils.clean(buffer); } + + Assert.assertThat(descriptor.hintsFileSize(directory), Matchers.greaterThan(0L)); } private void readHints(int num, int numTable) diff --git a/test/unit/org/apache/cassandra/hints/HintsStoreTest.java b/test/unit/org/apache/cassandra/hints/HintsStoreTest.java index 93e6a1197f05..3e9d03a6dd45 100644 --- a/test/unit/org/apache/cassandra/hints/HintsStoreTest.java +++ b/test/unit/org/apache/cassandra/hints/HintsStoreTest.java @@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -42,6 +43,7 @@ import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.schema.KeyspaceParams; +import org.hamcrest.Matchers; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -172,6 +174,7 @@ private long writeHints(File directory, HintsDescriptor descriptor, int hintsCou } FileUtils.clean(buffer); } + Assert.assertThat(descriptor.hintsFileSize(directory), Matchers.greaterThan(0L)); return new File(directory, descriptor.fileName()).lastModified(); // hint file last modified time }