Skip to content
Permalink
Browse files
Reject oversized mutations on client and internode connection
Patch by Alex Sorokoumov; reviewed by Andres de la Peña and Josh McKenzie for CASSANDRA-17456
  • Loading branch information
Gerrrr committed May 9, 2022
1 parent 596daeb commit 9f3bc657273dfa9e20d233636adf662904f01f34
Showing 8 changed files with 81 additions and 5 deletions.
@@ -216,6 +216,8 @@ Upgrading
and do not do, as well as the edge cases associated with their use.
NOTE: ANY SCRIPTS THAT RELY ON sstableverify OR nodetool verify WILL STOP WORKING UNTIL MODIFIED.
Please see CASSANDRA-17017 for details: https://issues.apache.org/jira/browse/CASSANDRA-17017
- `MutationExceededMaxSizeException` thrown when a mutation exceeds `max_mutation_size` inherits
from `InvalidRequestException` instead of `RuntimeException`. See CASSANDRA-17456 for details.

Deprecation
-----------
@@ -24,7 +24,9 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;

import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.virtual.VirtualMutation;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
@@ -131,6 +133,7 @@ public List<IMutation> toMutations()
{
IMutation mutation = builder.build();
mutation.validateIndexedColumns();
mutation.validateSize(MessagingService.current_version, CommitLogSegment.ENTRY_OVERHEAD_SIZE);
ms.add(mutation);
}
}
@@ -31,8 +31,10 @@
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.virtual.VirtualMutation;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableMetadata;

/**
@@ -105,6 +107,7 @@ else if (metadata.isCounter())
mutation = new Mutation(builder.build());

mutation.validateIndexedColumns();
mutation.validateSize(MessagingService.current_version, CommitLogSegment.ENTRY_OVERHEAD_SIZE);
ms.add(mutation);
}

@@ -25,10 +25,11 @@
import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.exceptions.InvalidRequestException;

import static org.apache.cassandra.db.IMutation.MAX_MUTATION_SIZE;

public class MutationExceededMaxSizeException extends RuntimeException
public class MutationExceededMaxSizeException extends InvalidRequestException
{
public static final int PARTITION_MESSAGE_LIMIT = 1024;

@@ -52,7 +53,7 @@ private static String prepareMessage(IMutation mutation, int version, long total
.collect(Collectors.toList());

String topKeys = makeTopKeysString(topPartitions, PARTITION_MESSAGE_LIMIT);
return String.format("Encountered an oversized mutation (%d/%d) for keyspace: %s. Top keys are: %s",
return String.format("Rejected an oversized mutation (%d/%d) for keyspace: %s. Top keys are: %s",
totalSize,
MAX_MUTATION_SIZE,
mutation.getKeyspaceName(),
@@ -22,6 +22,8 @@
import org.apache.cassandra.net.*;
import org.apache.cassandra.tracing.Tracing;

import static org.apache.cassandra.db.commitlog.CommitLogSegment.ENTRY_OVERHEAD_SIZE;

public class MutationVerbHandler implements IVerbHandler<Mutation>
{
public static final MutationVerbHandler instance = new MutationVerbHandler();
@@ -39,6 +41,8 @@ private void failed()

public void doVerb(Message<Mutation> message)
{
message.payload.validateSize(MessagingService.current_version, ENTRY_OVERHEAD_SIZE);

// Check if there were any forwarding headers in this message
ForwardingInfo forwardTo = message.forwardTo();
if (forwardTo != null)
@@ -42,7 +42,6 @@
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputBufferFixed;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.PathUtils;
import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.net.MessagingService;
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.test;

import org.apache.commons.lang3.StringUtils;
import org.junit.Test;

import org.apache.cassandra.distributed.Cluster;
import org.assertj.core.api.Assertions;

import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;

public class OversizedMutationTest extends TestBaseImpl
{
@Test
public void testSingleOversizedMutation() throws Throwable
{
try (Cluster cluster = init(builder().withNodes(1).withConfig(c -> c.set("max_mutation_size", "48KiB"))
.start()))
{
cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (key int PRIMARY KEY, val blob)"));
String payload = StringUtils.repeat('1', 1024 * 49);
String query = "INSERT INTO %s.t (key, val) VALUES (1, textAsBlob('" + payload + "'))";
Assertions.assertThatThrownBy(() -> cluster.coordinator(1).execute(withKeyspace(query), ALL))
.hasMessageContaining("Rejected an oversized mutation (")
.hasMessageContaining("/49152) for keyspace: distributed_test_keyspace. Top keys are: t.1");
}
}

@Test
public void testOversizedBatch() throws Throwable
{
try (Cluster cluster = init(builder().withNodes(1).withConfig(c -> c.set("max_mutation_size", "48KiB"))
.start()))
{
cluster.schemaChange(withKeyspace("CREATE KEYSPACE ks1 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};"));
cluster.schemaChange(withKeyspace("CREATE TABLE ks1.t (key int PRIMARY KEY, val blob)"));
String payload = StringUtils.repeat('1', 1024 * 48);
String query = "BEGIN BATCH\n" +
"INSERT INTO ks1.t (key, val) VALUES (1, textAsBlob('" + payload + "'))\n" +
"INSERT INTO ks1.t (key, val) VALUES (2, textAsBlob('222'))\n" +
"APPLY BATCH";
Assertions.assertThatThrownBy(() -> cluster.coordinator(1).execute(withKeyspace(query), ALL))
.hasMessageContaining("Rejected an oversized mutation (")
.hasMessageContaining("/49152) for keyspace: ks1. Top keys are: t.1");
}
}
}
@@ -452,7 +452,7 @@ private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String
.add("val", ByteBuffer.allocate(allocSize)).build();

int max = DatabaseDescriptor.getMaxMutationSize();
max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead
max -= ENTRY_OVERHEAD_SIZE; // log entry overhead

// Note that the size of the value if vint encoded. So we first compute the ovehead of the mutation without the value and it's size
int mutationOverhead = rm.serializedSize(MessagingService.current_version) - (VIntCoding.computeVIntSize(allocSize) + allocSize);
@@ -536,7 +536,7 @@ public void testExceedRecordLimitWithMultiplePartitions() throws Exception
String message = exception.getMessage();

long mutationSize = mutation.serializedSize(MessagingService.current_version) + ENTRY_OVERHEAD_SIZE;
final String expectedMessagePrefix = String.format("Encountered an oversized mutation (%d/%d) for keyspace: %s.",
final String expectedMessagePrefix = String.format("Rejected an oversized mutation (%d/%d) for keyspace: %s.",
mutationSize,
DatabaseDescriptor.getMaxMutationSize(),
KEYSPACE1);

0 comments on commit 9f3bc65

Please sign in to comment.