Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSANDRA-19477 trunk dont go to disk for hints sizes #3180

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 22 additions & 2 deletions src/java/org/apache/cassandra/hints/HintsDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ final class HintsDescriptor
final UUID hostId;
final int version;
final long timestamp;
final String hintsFileName;
final String crc32FileName;

final ImmutableMap<String, Object> parameters;
final ParameterizedClass compressionConfig;
Expand All @@ -93,6 +95,8 @@ final class HintsDescriptor
this.hostId = hostId;
this.version = version;
this.timestamp = timestamp;
hintsFileName = hostId + "-" + timestamp + '-' + version + ".hints";
crc32FileName = hostId + "-" + timestamp + '-' + version + ".crc32";
compressionConfig = createCompressionConfig(parameters);

EncryptionData encryption = createEncryption(parameters);
Expand Down Expand Up @@ -205,12 +209,12 @@ private EncryptionData(Cipher cipher, ICompressor compressor, ImmutableMap<Strin

String fileName()
{
return String.format("%s-%s-%s.hints", hostId, timestamp, version);
return hintsFileName;
}

String checksumFileName()
{
return String.format("%s-%s-%s.crc32", hostId, timestamp, version);
return crc32FileName;
}

File file(File hintsDirectory)
Expand All @@ -223,6 +227,22 @@ File checksumFile(File hintsDirectory)
return new File(hintsDirectory, checksumFileName());
}

/** cached size of the represented hints file */
private transient volatile long hintsFileSize = -1L;

long hintsFileSize(File hintsDirectory)
{
long size = hintsFileSize;
if (size == -1L) // we may race and duplicate lookup the first time the size is being queried, but that is fine
hintsFileSize = size = file(hintsDirectory).length();
return size;
}

void hintsFileSize(long value)
{
hintsFileSize = value;
}

int messagingVersion()
{
return messagingVersion(version);
Expand Down
12 changes: 7 additions & 5 deletions src/java/org/apache/cassandra/hints/HintsStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ private void deleteHints(Predicate<HintsDescriptor> predicate)
if (predicate.test(descriptor))
{
cleanUp(descriptor);
delete(descriptor);
removeSet.add(descriptor);
delete(descriptor);
}
}
}
Expand Down Expand Up @@ -235,17 +235,19 @@ void markDispatchOffset(HintsDescriptor descriptor, InputPosition inputPosition)
dispatchPositions.put(descriptor, inputPosition);
}


/**
* @return the total size of all files belonging to the hints store, in bytes.
*/
long getTotalFileSize()
{
long total = 0;
for (HintsDescriptor descriptor : Iterables.concat(dispatchDequeue, corruptedFiles))
{
total += descriptor.file(hintsDirectory).length();
}
total += descriptor.hintsFileSize(hintsDirectory);

HintsWriter currentWriter = getWriter();
if (null != currentWriter)
total += currentWriter.descriptor().hintsFileSize(hintsDirectory);

return total;
}

Expand Down
13 changes: 7 additions & 6 deletions src/java/org/apache/cassandra/hints/HintsWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ static HintsWriter create(File directory, HintsDescriptor descriptor) throws IOE
ByteBuffer descriptorBytes = dob.unsafeGetBufferAndFlip();
updateChecksum(crc, descriptorBytes);
channel.write(descriptorBytes);
descriptor.hintsFileSize(channel.position());

if (descriptor.isEncrypted())
return new EncryptedHintsWriter(directory, descriptor, file, channel, fd, crc);
Expand Down Expand Up @@ -113,6 +114,7 @@ private void writeChecksum()
}
}

@Override
public void close()
{
perform(file, Throwables.FileOpType.WRITE, this::doFsync, channel::close);
Expand Down Expand Up @@ -194,7 +196,6 @@ long position()
* writes to the underlying channel when the buffer is overflown.
*
* @param hint the serialized hint (with CRC included)
* @throws IOException
*/
void append(ByteBuffer hint) throws IOException
{
Expand Down Expand Up @@ -223,11 +224,10 @@ void append(ByteBuffer hint) throws IOException
/**
* Serializes and appends the hint (with CRC included) to this session's aggregation buffer,
* writes to the underlying channel when the buffer is overflown.
*
* Used mainly by tests and {@link LegacyHintsMigrator}
smiklosovic marked this conversation as resolved.
Show resolved Hide resolved
* <p>
* Used mainly by tests
*
* @param hint the unserialized hint
* @throws IOException
*/
void append(Hint hint) throws IOException
{
Expand Down Expand Up @@ -259,18 +259,19 @@ void append(Hint hint) throws IOException
if (hintBuffer == buffer)
bytesWritten += totalSize;
else
append((ByteBuffer) hintBuffer.flip());
smiklosovic marked this conversation as resolved.
Show resolved Hide resolved
append(hintBuffer.flip());
}

/**
* Closes the session - flushes the aggregation buffer (if not empty), does page aligning, and potentially fsyncs.
* @throws IOException
*/
@Override
public void close() throws IOException
{
flushBuffer();
maybeFsync();
maybeSkipCache();
descriptor.hintsFileSize(position());
}

private void flushBuffer() throws IOException
Expand Down
14 changes: 8 additions & 6 deletions src/java/org/apache/cassandra/service/StorageProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -2406,13 +2406,15 @@ public static boolean shouldHint(Replica replica, boolean tryEnablePersistentWin
}

long maxHintsSize = DatabaseDescriptor.getMaxHintsSizePerHost();
long actualTotalHintsSize = HintsService.instance.getTotalHintsSize(hostIdForEndpoint);
boolean hasHintsReachedMaxSize = maxHintsSize > 0 && actualTotalHintsSize > maxHintsSize;
if (hasHintsReachedMaxSize)
if (maxHintsSize > 0)
{
Tracing.trace("Not hinting {} which has reached to the max hints size {} bytes on disk. The actual hints size on disk: {}",
endpoint, maxHintsSize, actualTotalHintsSize);
return false;
long actualTotalHintsSize = HintsService.instance.getTotalHintsSize(hostIdForEndpoint);
if (actualTotalHintsSize > maxHintsSize)
{
Tracing.trace("Not hinting {} which has reached to the max hints size {} bytes on disk. The actual hints size on disk: {}",
endpoint, maxHintsSize, actualTotalHintsSize);
return false;
}
}

return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;

import java.util.UUID;

import org.junit.Test;

import org.apache.cassandra.auth.CassandraRoleManager;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.hints.HintsService;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.service.StorageService;

import static java.lang.String.format;
import static java.lang.String.valueOf;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

@SuppressWarnings("Convert2MethodRef")
public class HintsMaxSizeTest extends TestBaseImpl
{
@Test
public void testMaxHintedHandoffSize() throws Exception
{
try (Cluster cluster = init(Cluster.build(2)
.withDataDirCount(1)
.withConfig(config -> config.with(NETWORK, GOSSIP)
.set("hinted_handoff_enabled", true)
.set("max_hints_delivery_threads", "1")
.set("hints_flush_period", "1s")
.set("max_hints_size_per_host", "2MiB")
.set("max_hints_file_size", "1MiB"))
.start(), 2))
{
final IInvokableInstance node1 = cluster.get(1);
final IInvokableInstance node2 = cluster.get(2);

waitForExistingRoles(cluster);

String createTableStatement = format("CREATE TABLE %s.cf (k text PRIMARY KEY, c1 text) " +
"WITH compaction = {'class': 'SizeTieredCompactionStrategy', 'enabled': 'false'} ", KEYSPACE);
cluster.schemaChange(createTableStatement);

UUID node2UUID = node2.callOnInstance((IIsolatedExecutor.SerializableCallable<UUID>) () -> StorageService.instance.getLocalHostUUID());

// shutdown the second node in a blocking manner
node2.shutdown().get();

// insert data and sleep every 10k to have a chance to flush hints
for (int i = 0; i < 70000; i++)
{
cluster.coordinator(1)
.execute(withKeyspace("INSERT INTO %s.cf (k, c1) VALUES (?, ?);"),
ONE, valueOf(i), UUID.randomUUID().toString());

if (i % 10000 == 0)
await().atLeast(2, SECONDS).pollDelay(2, SECONDS).until(() -> true);
}

await().atLeast(3, SECONDS).pollDelay(3, SECONDS).until(() -> true);

// we see that metrics are updated
await().until(() -> node1.callOnInstance(() -> StorageMetrics.totalHints.getCount()) > 0);
// we do not have 100k of hints because it violated max_hints_size_per_host
assertThat(node1.callOnInstance(() -> StorageMetrics.totalHints.getCount())).isLessThan(70000);

assertHintsSizes(node1, node2UUID);

// restart the first node and check hints sizes again
// to be sure sizes were picked up correctly
node1.shutdown().get();
node1.startup();

assertHintsSizes(node1, node2UUID);
}
}

private void assertHintsSizes(IInvokableInstance node, UUID node2UUID)
{
// we indeed have some hints in its dir
File hintsDir = new File(node.config().getString("hints_directory"));
assertThat(FileUtils.folderSize(hintsDir)).isPositive();

// and there is positive size of hints on the disk for the second node
long totalHintsSize = node.appliesOnInstance((IIsolatedExecutor.SerializableFunction<UUID, Long>) secondNode -> {
return HintsService.instance.getTotalHintsSize(secondNode);
}).apply(node2UUID);

assertThat(totalHintsSize).isPositive();
// there might be small overflow in general, depending on hints flushing etc
assertThat(totalHintsSize).isLessThan(4 * 1000 * 1000);
assertThat(totalHintsSize).isGreaterThan(2 * 1000 * 1000);
}

private static void waitForExistingRoles(Cluster cluster)
{
cluster.forEach(instance -> {
await().pollDelay(1, SECONDS)
.pollInterval(1, SECONDS)
.atMost(60, SECONDS)
.until(() -> instance.callOnInstance(CassandraRoleManager::hasExistingRoles));
});
}
}
3 changes: 3 additions & 0 deletions test/unit/org/apache/cassandra/hints/AlteredHints.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.schema.KeyspaceParams;
import org.hamcrest.Matchers;

import static org.apache.cassandra.utils.ByteBufferUtil.bytes;

Expand Down Expand Up @@ -100,6 +101,8 @@ public void multiFlushAndDeserializeTest() throws Exception
hintNum++;
}
}

Assert.assertThat(descriptor.hintsFileSize(dir), Matchers.greaterThan(0L));
}

try (HintsReader reader = HintsReader.open(descriptor.file(dir)))
Expand Down
17 changes: 10 additions & 7 deletions test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;

import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import static org.apache.cassandra.Util.dk;
Expand Down Expand Up @@ -146,22 +149,20 @@ public void exciseHintFiles() throws IOException
}

@Test
public void hintsTotalSizeTest() throws IOException
public void emptyHintsTotalSizeTest() throws IOException
{
File directory = new File(testFolder.newFolder());
UUID hostId = UUID.randomUUID();
long now = Clock.Global.currentTimeMillis();
long totalSize = 0;
HintsCatalog catalog = HintsCatalog.load(directory, ImmutableMap.of());
HintsStore store = catalog.get(hostId);
assertEquals(totalSize, store.getTotalFileSize());
for (int i = 0; i < 3; i++)
{
HintsDescriptor descriptor = new HintsDescriptor(hostId, now + i);
writeDescriptor(directory, descriptor);
store.offerLast(descriptor);
assertTrue("Total file size should increase after writing more hints", store.getTotalFileSize() > totalSize);
totalSize = store.getTotalFileSize();
store.getOrOpenWriter();
store.closeWriter();
assertTrue("Even empty hint files should occupy some space as descriptor is written",
store.getTotalFileSize() > 0);
}
}

Expand Down Expand Up @@ -243,5 +244,7 @@ private static void createHintFile(File directory, HintsDescriptor descriptor) t
session.append(hint);
}
}

assertThat(descriptor.hintsFileSize(directory), greaterThan(0L));
}
}
5 changes: 5 additions & 0 deletions test/unit/org/apache/cassandra/hints/HintsReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

import org.apache.cassandra.exceptions.CoordinatorBehindException;
import org.apache.cassandra.io.util.File;

import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

Expand All @@ -48,6 +50,7 @@
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaTestUtil;
import org.apache.cassandra.schema.TableMetadata;
import org.hamcrest.Matchers;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -98,6 +101,8 @@ private void generateHints(int num, String ks) throws IOException
}
FileUtils.clean(buffer);
}

Assert.assertThat(descriptor.hintsFileSize(directory), Matchers.greaterThan(0L));
}

private void readHints(int num, int numTable)
Expand Down