Skip to content

Commit

Permalink
Do not go to disk for reading hints file sizes
Browse files Browse the repository at this point in the history
patch by Stefan Miklosovic; reviewed by Aleksey Yeschenko, Jon Haddad, Chris Lohfink for CASSANDRA-19477
  • Loading branch information
smiklosovic committed Mar 25, 2024
1 parent 96692d7 commit 3840893
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
4.1.5
* Do not go to disk for reading hints file sizes (CASSANDRA-19477)
* Fix system_views.settings to handle array types (CASSANDRA-19475)
Merged from 4.0:
* Push LocalSessions info logs to debug (CASSANDRA-18335)
Expand Down
24 changes: 22 additions & 2 deletions src/java/org/apache/cassandra/hints/HintsDescriptor.java
Expand Up @@ -77,6 +77,8 @@ final class HintsDescriptor
final UUID hostId;
final int version;
final long timestamp;
final String hintsFileName;
final String crc32FileName;

final ImmutableMap<String, Object> parameters;
final ParameterizedClass compressionConfig;
Expand All @@ -89,6 +91,8 @@ final class HintsDescriptor
this.hostId = hostId;
this.version = version;
this.timestamp = timestamp;
hintsFileName = hostId + "-" + timestamp + '-' + version + ".hints";
crc32FileName = hostId + "-" + timestamp + '-' + version + ".crc32";
compressionConfig = createCompressionConfig(parameters);

EncryptionData encryption = createEncryption(parameters);
Expand Down Expand Up @@ -201,12 +205,12 @@ private EncryptionData(Cipher cipher, ICompressor compressor, ImmutableMap<Strin

String fileName()
{
return String.format("%s-%s-%s.hints", hostId, timestamp, version);
return hintsFileName;
}

String checksumFileName()
{
return String.format("%s-%s-%s.crc32", hostId, timestamp, version);
return crc32FileName;
}

File file(File hintsDirectory)
Expand All @@ -219,6 +223,22 @@ File checksumFile(File hintsDirectory)
return new File(hintsDirectory, checksumFileName());
}

/** cached size of the represented hints file */
private transient volatile long hintsFileSize = -1L;

long hintsFileSize(File hintsDirectory)
{
long size = hintsFileSize;
if (size == -1L) // we may race and duplicate lookup the first time the size is being queried, but that is fine
hintsFileSize = size = file(hintsDirectory).length();
return size;
}

void hintsFileSize(long value)
{
hintsFileSize = value;
}

int messagingVersion()
{
return messagingVersion(version);
Expand Down
13 changes: 8 additions & 5 deletions src/java/org/apache/cassandra/hints/HintsStore.java
Expand Up @@ -194,8 +194,8 @@ private void deleteHints(Predicate<HintsDescriptor> predicate)
if (predicate.test(descriptor))
{
cleanUp(descriptor);
delete(descriptor);
removeSet.add(descriptor);
delete(descriptor);
}
}
}
Expand Down Expand Up @@ -235,17 +235,20 @@ void markDispatchOffset(HintsDescriptor descriptor, InputPosition inputPosition)
dispatchPositions.put(descriptor, inputPosition);
}


/**
* @return the total size of all files belonging to the hints store, in bytes.
*/
@SuppressWarnings({ "resource" })
long getTotalFileSize()
{
long total = 0;
for (HintsDescriptor descriptor : Iterables.concat(dispatchDequeue, corruptedFiles))
{
total += descriptor.file(hintsDirectory).length();
}
total += descriptor.hintsFileSize(hintsDirectory);

HintsWriter currentWriter = getWriter();
if (null != currentWriter)
total += currentWriter.descriptor().hintsFileSize(hintsDirectory);

return total;
}

Expand Down
11 changes: 6 additions & 5 deletions src/java/org/apache/cassandra/hints/HintsWriter.java
Expand Up @@ -81,6 +81,7 @@ static HintsWriter create(File directory, HintsDescriptor descriptor) throws IOE
ByteBuffer descriptorBytes = dob.buffer();
updateChecksum(crc, descriptorBytes);
channel.write(descriptorBytes);
descriptor.hintsFileSize(channel.position());

if (descriptor.isEncrypted())
return new EncryptedHintsWriter(directory, descriptor, file, channel, fd, crc);
Expand Down Expand Up @@ -113,6 +114,7 @@ private void writeChecksum()
}
}

@Override
public void close()
{
perform(file, Throwables.FileOpType.WRITE, this::doFsync, channel::close);
Expand Down Expand Up @@ -194,7 +196,6 @@ long position()
* writes to the underlying channel when the buffer is overflown.
*
* @param hint the serialized hint (with CRC included)
* @throws IOException
*/
void append(ByteBuffer hint) throws IOException
{
Expand Down Expand Up @@ -223,11 +224,10 @@ void append(ByteBuffer hint) throws IOException
/**
* Serializes and appends the hint (with CRC included) to this session's aggregation buffer,
* writes to the underlying channel when the buffer is overflown.
*
* Used mainly by tests and {@link LegacyHintsMigrator}
* <p>
* Used mainly by tests
*
* @param hint the unserialized hint
* @throws IOException
*/
void append(Hint hint) throws IOException
{
Expand Down Expand Up @@ -261,13 +261,14 @@ void append(Hint hint) throws IOException

/**
* Closes the session - flushes the aggregation buffer (if not empty), does page aligning, and potentially fsyncs.
* @throws IOException
*/
@Override
public void close() throws IOException
{
flushBuffer();
maybeFsync();
maybeSkipCache();
descriptor.hintsFileSize(position());
}

private void flushBuffer() throws IOException
Expand Down
14 changes: 8 additions & 6 deletions src/java/org/apache/cassandra/service/StorageProxy.java
Expand Up @@ -2413,13 +2413,15 @@ public static boolean shouldHint(Replica replica, boolean tryEnablePersistentWin
}

long maxHintsSize = DatabaseDescriptor.getMaxHintsSizePerHost();
long actualTotalHintsSize = HintsService.instance.getTotalHintsSize(hostIdForEndpoint);
boolean hasHintsReachedMaxSize = maxHintsSize > 0 && actualTotalHintsSize > maxHintsSize;
if (hasHintsReachedMaxSize)
if (maxHintsSize > 0)
{
Tracing.trace("Not hinting {} which has reached to the max hints size {} bytes on disk. The actual hints size on disk: {}",
endpoint, maxHintsSize, actualTotalHintsSize);
return false;
long actualTotalHintsSize = HintsService.instance.getTotalHintsSize(hostIdForEndpoint);
if (actualTotalHintsSize > maxHintsSize)
{
Tracing.trace("Not hinting {} which has reached to the max hints size {} bytes on disk. The actual hints size on disk: {}",
endpoint, maxHintsSize, actualTotalHintsSize);
return false;
}
}

return true;
Expand Down
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;

import java.util.UUID;

import org.junit.Test;

import org.apache.cassandra.auth.CassandraRoleManager;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.hints.HintsService;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.service.StorageService;

import static java.lang.String.format;
import static java.lang.String.valueOf;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

@SuppressWarnings("Convert2MethodRef")
public class HintsMaxSizeTest extends TestBaseImpl
{
@Test
public void testMaxHintedHandoffSize() throws Exception
{
try (Cluster cluster = init(Cluster.build(2)
.withDataDirCount(1)
.withConfig(config -> config.with(NETWORK, GOSSIP)
.set("hinted_handoff_enabled", true)
.set("max_hints_delivery_threads", "1")
.set("hints_flush_period", "1s")
.set("max_hints_size_per_host", "2MiB")
.set("max_hints_file_size", "1MiB"))
.start(), 2))
{
final IInvokableInstance node1 = cluster.get(1);
final IInvokableInstance node2 = cluster.get(2);

waitForExistingRoles(cluster);

String createTableStatement = format("CREATE TABLE %s.cf (k text PRIMARY KEY, c1 text) " +
"WITH compaction = {'class': 'SizeTieredCompactionStrategy', 'enabled': 'false'} ", KEYSPACE);
cluster.schemaChange(createTableStatement);

UUID node2UUID = node2.callOnInstance((IIsolatedExecutor.SerializableCallable<UUID>) () -> StorageService.instance.getLocalHostUUID());

// shutdown the second node in a blocking manner
node2.shutdown().get();

// insert data and sleep every 10k to have a chance to flush hints
for (int i = 0; i < 70000; i++)
{
cluster.coordinator(1)
.execute(withKeyspace("INSERT INTO %s.cf (k, c1) VALUES (?, ?);"),
ONE, valueOf(i), UUID.randomUUID().toString());

if (i % 10000 == 0)
await().atLeast(2, SECONDS).pollDelay(2, SECONDS).until(() -> true);
}

await().atLeast(3, SECONDS).pollDelay(3, SECONDS).until(() -> true);

// we see that metrics are updated
await().until(() -> node1.callOnInstance(() -> StorageMetrics.totalHints.getCount()) > 0);
// we do not have 100k of hints because it violated max_hints_size_per_host
assertThat(node1.callOnInstance(() -> StorageMetrics.totalHints.getCount())).isLessThan(70000);

assertHintsSizes(node1, node2UUID);

// restart the first node and check hints sizes again
// to be sure sizes were picked up correctly
node1.shutdown().get();
node1.startup();

assertHintsSizes(node1, node2UUID);
}
}

private void assertHintsSizes(IInvokableInstance node, UUID node2UUID)
{
// we indeed have some hints in its dir
File hintsDir = new File(node.config().getString("hints_directory"));
assertThat(FileUtils.folderSize(hintsDir)).isPositive();

// and there is positive size of hints on the disk for the second node
long totalHintsSize = node.appliesOnInstance((IIsolatedExecutor.SerializableFunction<UUID, Long>) secondNode -> {
return HintsService.instance.getTotalHintsSize(secondNode);
}).apply(node2UUID);

assertThat(totalHintsSize).isPositive();
// there might be small overflow in general, depending on hints flushing etc
assertThat(totalHintsSize).isLessThan(4 * 1000 * 1000);
assertThat(totalHintsSize).isGreaterThan(2 * 1000 * 1000);
}

private static void waitForExistingRoles(Cluster cluster)
{
cluster.forEach(instance -> {
await().pollDelay(1, SECONDS)
.pollInterval(1, SECONDS)
.atMost(60, SECONDS)
.until(() -> instance.callOnInstance(CassandraRoleManager::hasExistingRoles));
});
}
}
3 changes: 3 additions & 0 deletions test/unit/org/apache/cassandra/hints/AlteredHints.java
Expand Up @@ -37,6 +37,7 @@
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.schema.KeyspaceParams;
import org.hamcrest.Matchers;

import static org.apache.cassandra.utils.ByteBufferUtil.bytes;

Expand Down Expand Up @@ -100,6 +101,8 @@ public void multiFlushAndDeserializeTest() throws Exception
hintNum++;
}
}

Assert.assertThat(descriptor.hintsFileSize(dir), Matchers.greaterThan(0L));
}

try (HintsReader reader = HintsReader.open(descriptor.file(dir)))
Expand Down
17 changes: 10 additions & 7 deletions test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
Expand Up @@ -29,15 +29,18 @@
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;

import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import static org.apache.cassandra.Util.dk;
Expand Down Expand Up @@ -146,22 +149,20 @@ public void exciseHintFiles() throws IOException
}

@Test
public void hintsTotalSizeTest() throws IOException
public void emptyHintsTotalSizeTest() throws IOException
{
File directory = new File(testFolder.newFolder());
UUID hostId = UUID.randomUUID();
long now = Clock.Global.currentTimeMillis();
long totalSize = 0;
HintsCatalog catalog = HintsCatalog.load(directory, ImmutableMap.of());
HintsStore store = catalog.get(hostId);
assertEquals(totalSize, store.getTotalFileSize());
for (int i = 0; i < 3; i++)
{
HintsDescriptor descriptor = new HintsDescriptor(hostId, now + i);
writeDescriptor(directory, descriptor);
store.offerLast(descriptor);
assertTrue("Total file size should increase after writing more hints", store.getTotalFileSize() > totalSize);
totalSize = store.getTotalFileSize();
store.getOrOpenWriter();
store.closeWriter();
assertTrue("Even empty hint files should occupy some space as descriptor is written",
store.getTotalFileSize() > 0);
}
}

Expand Down Expand Up @@ -243,5 +244,7 @@ private static void createHintFile(File directory, HintsDescriptor descriptor) t
session.append(hint);
}
}

assertThat(descriptor.hintsFileSize(directory), greaterThan(0L));
}
}
5 changes: 5 additions & 0 deletions test/unit/org/apache/cassandra/hints/HintsReaderTest.java
Expand Up @@ -29,6 +29,8 @@

import com.google.common.collect.Iterables;
import org.apache.cassandra.io.util.File;

import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

Expand All @@ -46,6 +48,7 @@
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaTestUtil;
import org.apache.cassandra.schema.TableMetadata;
import org.hamcrest.Matchers;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -96,6 +99,8 @@ private void generateHints(int num, String ks) throws IOException
}
FileUtils.clean(buffer);
}

Assert.assertThat(descriptor.hintsFileSize(directory), Matchers.greaterThan(0L));
}

private void readHints(int num, int numTable)
Expand Down

0 comments on commit 3840893

Please sign in to comment.