Skip to content

Commit

Permalink
Handle unexpected columns due to schema races
Browse files Browse the repository at this point in the history
Patch by Blake Eggleston; Reviewed by Sam Tunnicliffe for CASSANDRA-15899
  • Loading branch information
bdeggleston committed Oct 5, 2020
1 parent 9ee74c9 commit 31b9078
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
3.0.23:
* Handle unexpected columns due to schema races (CASSANDRA-15899)
* Avoid failing compactions with very large partitions (CASSANDRA-15164)
* Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935)
* Add flag to ignore unreplicated keyspaces during repair (CASSANDRA-15160)
Expand Down
23 changes: 23 additions & 0 deletions src/java/org/apache/cassandra/config/ColumnDefinition.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,29 @@ public int compare(CellPath path1, CellPath path2)
};
}

private static class Placeholder extends ColumnDefinition
{
Placeholder(CFMetaData table, ByteBuffer name, AbstractType<?> type, int position, Kind kind)
{
super(table, name, type, position, kind);
}

public boolean isPlaceholder()
{
return true;
}
}

public static ColumnDefinition placeholder(CFMetaData table, ByteBuffer name, boolean isStatic)
{
return new Placeholder(table, name, EmptyType.instance, NO_POSITION, isStatic ? Kind.STATIC : Kind.REGULAR);
}

public boolean isPlaceholder()
{
return false;
}

public ColumnDefinition copy()
{
return new ColumnDefinition(ksName, cfName, name, type, position, kind);
Expand Down
19 changes: 17 additions & 2 deletions src/java/org/apache/cassandra/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ public long serializedSize(Columns columns)
return size;
}

public Columns deserialize(DataInputPlus in, CFMetaData metadata) throws IOException
public Columns deserialize(DataInputPlus in, CFMetaData metadata, boolean isStatic) throws IOException
{
int length = (int)in.readUnsignedVInt();
BTree.Builder<ColumnDefinition> builder = BTree.builder(Comparator.naturalOrder());
Expand All @@ -441,14 +441,29 @@ public Columns deserialize(DataInputPlus in, CFMetaData metadata) throws IOExcep
// fail deserialization because of that. So we grab a "fake" ColumnDefinition that ensure proper
// deserialization. The column will be ignore later on anyway.
column = metadata.getDroppedColumnDefinition(name);

// If there's no dropped column, it may be for a column we haven't received a schema update for yet
// so we create a placeholder column. If this is a read, the placeholder column will let the response
// serializer know we're not serializing all requested columns when it writes the row flags, but it
// will cause mutations that try to write values for this column to fail.
if (column == null)
throw new RuntimeException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization");
column = ColumnDefinition.placeholder(metadata, name, isStatic);
}
builder.add(column);
}
return new Columns(builder.build());
}

public Columns deserializeStatics(DataInputPlus in, CFMetaData metadata) throws IOException
{
return deserialize(in, metadata, true);
}

public Columns deserializeRegulars(DataInputPlus in, CFMetaData metadata) throws IOException
{
return deserialize(in, metadata, false);
}

/**
* If both ends have a pre-shared superset of the columns we are serializing, we can send them much
* more efficiently. Both ends must provide the identically same set of columns.
Expand Down
17 changes: 15 additions & 2 deletions src/java/org/apache/cassandra/db/SerializationHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.SearchIterator;

public class SerializationHeader
{
Expand Down Expand Up @@ -160,6 +161,18 @@ public boolean hasStatic()
return !columns.statics.isEmpty();
}

public boolean hasAllColumns(Row row, boolean isStatic)
{
SearchIterator<ColumnDefinition, ColumnData> rowIter = row.searchIterator();
Iterable<ColumnDefinition> columns = isStatic ? columns().statics : columns().regulars;
for (ColumnDefinition column : columns)
{
if (rowIter.next(column) == null)
return false;
}
return true;
}

public boolean isForSSTable()
{
return isForSSTable;
Expand Down Expand Up @@ -442,8 +455,8 @@ public SerializationHeader deserializeForMessaging(DataInputPlus in, CFMetaData
Columns statics, regulars;
if (selection == null)
{
statics = hasStatic ? Columns.serializer.deserialize(in, metadata) : Columns.NONE;
regulars = Columns.serializer.deserialize(in, metadata);
statics = hasStatic ? Columns.serializer.deserializeStatics(in, metadata) : Columns.NONE;
regulars = Columns.serializer.deserializeRegulars(in, metadata);
}
else
{
Expand Down
12 changes: 9 additions & 3 deletions src/java/org/apache/cassandra/db/UnknownColumnException.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.cassandra.config.CFMetaData;
Expand All @@ -27,16 +28,21 @@
* Exception thrown when we read a column internally that is unknown. Note that
* this is an internal exception and is not meant to be user facing.
*/
public class UnknownColumnException extends Exception
public class UnknownColumnException extends IOException
{
public final ByteBuffer columnName;

public UnknownColumnException(CFMetaData metadata, ByteBuffer columnName)
public UnknownColumnException(String ksName, String cfName, ByteBuffer columnName)
{
super(String.format("Unknown column %s in table %s.%s", stringify(columnName), metadata.ksName, metadata.cfName));
super(String.format("Unknown column %s in table %s.%s", stringify(columnName), ksName, cfName));
this.columnName = columnName;
}

public UnknownColumnException(CFMetaData metadata, ByteBuffer columnName)
{
this(metadata.ksName, metadata.cfName, columnName);
}

private static String stringify(ByteBuffer name)
{
try
Expand Down
8 changes: 4 additions & 4 deletions src/java/org/apache/cassandra/db/filter/ColumnFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ public ColumnFilter deserialize(DataInputPlus in, int version, CFMetaData metada
{
if (version >= MessagingService.VERSION_3014)
{
Columns statics = Columns.serializer.deserialize(in, metadata);
Columns regulars = Columns.serializer.deserialize(in, metadata);
Columns statics = Columns.serializer.deserializeStatics(in, metadata);
Columns regulars = Columns.serializer.deserializeRegulars(in, metadata);
fetched = new PartitionColumns(statics, regulars);
}
else
Expand All @@ -443,8 +443,8 @@ public ColumnFilter deserialize(DataInputPlus in, int version, CFMetaData metada

if (hasSelection)
{
Columns statics = Columns.serializer.deserialize(in, metadata);
Columns regulars = Columns.serializer.deserialize(in, metadata);
Columns statics = Columns.serializer.deserializeStatics(in, metadata);
Columns regulars = Columns.serializer.deserializeRegulars(in, metadata);
selection = new PartitionColumns(statics, regulars);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db.partitions;

import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -753,6 +754,12 @@ private static PartitionUpdate deserialize30(DataInputPlus in, int version, Seri
deletionBuilder.add((RangeTombstoneMarker)unfiltered);
}
}
catch (IOError e)
{
if (e.getCause() != null && e.getCause() instanceof UnknownColumnException)
throw (UnknownColumnException) e.getCause();
throw e;
}

MutableDeletionInfo deletionInfo = deletionBuilder.build();
return new PartitionUpdate(metadata,
Expand Down
19 changes: 14 additions & 5 deletions src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import java.io.IOException;

import com.google.common.collect.Collections2;

import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
Expand Down Expand Up @@ -133,7 +133,7 @@ private void serialize(Row row, SerializationHeader header, DataOutputPlus out,
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
Row.Deletion deletion = row.deletion();
boolean hasComplexDeletion = row.hasComplexDeletion();
boolean hasAllColumns = (row.columnCount() == headerColumns.size());
boolean hasAllColumns = header.hasAllColumns(row, isStatic);
boolean hasExtendedFlags = hasExtendedFlags(row);

if (isStatic)
Expand Down Expand Up @@ -192,7 +192,12 @@ private void serialize(Row row, SerializationHeader header, DataOutputPlus out,
// with. So we use the ColumnDefinition from the "header" which is "current". Also see #11810 for what
// happens if we don't do that.
ColumnDefinition column = si.next(data.column());
assert column != null;

// we may have columns that the remote node isn't aware of due to inflight schema changes
// in cases where it tries to fetch all columns, it will set the `all columns` flag, but only
// expect a subset of columns (from this node's perspective). See CASSANDRA-15899
if (column == null)
continue;

if (data.column.isSimple())
Cell.serializer.serialize((Cell) data, column, out, pkLiveness, header);
Expand Down Expand Up @@ -274,7 +279,7 @@ private long serializedRowBodySize(Row row, SerializationHeader header, long pre
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
Row.Deletion deletion = row.deletion();
boolean hasComplexDeletion = row.hasComplexDeletion();
boolean hasAllColumns = (row.columnCount() == headerColumns.size());
boolean hasAllColumns = header.hasAllColumns(row, isStatic);

if (!pkLiveness.isEmpty())
size += header.timestampSerializedSize(pkLiveness.timestamp());
Expand All @@ -293,7 +298,8 @@ private long serializedRowBodySize(Row row, SerializationHeader header, long pre
for (ColumnData data : row)
{
ColumnDefinition column = si.next(data.column());
assert column != null;
if (column == null)
continue;

if (data.column.isSimple())
size += Cell.serializer.serializedSize((Cell) data, column, pkLiveness, header);
Expand Down Expand Up @@ -484,6 +490,9 @@ public Row deserializeRowBody(DataInputPlus in,
Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in);
for (ColumnDefinition column : columns)
{
// if the column is a placeholder, then it's not part of our schema, and we can't deserialize it
if (column.isPlaceholder())
throw new UnknownColumnException(column.ksName, column.cfName, column.name.bytes);
if (column.isSimple())
readSimpleColumn(column, in, header, helper, builder, rowLiveness);
else
Expand Down
117 changes: 117 additions & 0 deletions test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.test;

import java.util.function.Consumer;

import org.junit.Test;

import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInstanceConfig;

import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;

public class SchemaTest extends TestBaseImpl
{
private static final Consumer<IInstanceConfig> CONFIG_CONSUMER = config -> {
config.set("partitioner", ByteOrderedPartitioner.class.getSimpleName());
config.set("initial_token", Integer.toString(config.num() * 1000));
};

@Test
public void dropColumnMixedMode() throws Throwable
{
try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
{
cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int, v3 int)");
Object [][] someExpected = new Object[5][];
Object [][] allExpected1 = new Object[5][];
Object [][] allExpected2 = new Object[5][];
for (int i = 0; i < 5; i++)
{
int v1 = i * 10, v2 = i * 100, v3 = i * 1000;
cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2, v3) VALUES (?,?,?, ?)" , ConsistencyLevel.ALL, i, v1, v2, v3);
someExpected[i] = new Object[] {i, v1};
allExpected1[i] = new Object[] {i, v1, v3};
allExpected2[i] = new Object[] {i, v1, v2, v3};
}
cluster.forEach((instance) -> instance.flush(KEYSPACE));
cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl DROP v2");
assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
}
}

@Test
public void addColumnMixedMode() throws Throwable
{
try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
{
cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int)");
Object [][] someExpected = new Object[5][];
Object [][] allExpected1 = new Object[5][];
Object [][] allExpected2 = new Object[5][];
for (int i = 0; i < 5; i++)
{
int v1 = i * 10, v2 = i * 100;
cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2) VALUES (?,?,?)" , ConsistencyLevel.ALL, i, v1, v2);
someExpected[i] = new Object[] {i, v1};
allExpected1[i] = new Object[] {i, v1, v2, null};
allExpected2[i] = new Object[] {i, v1, v2};
}
cluster.forEach((instance) -> instance.flush(KEYSPACE));
cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl ADD v3 int");
assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
}
}

@Test
public void addDropColumnMixedMode() throws Throwable
{
try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
{
cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int)");
Object [][] someExpected = new Object[5][];
Object [][] allExpected1 = new Object[5][];
Object [][] allExpected2 = new Object[5][];
for (int i = 0; i < 5; i++)
{
int v1 = i * 10, v2 = i * 100;
cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2) VALUES (?,?,?)" , ConsistencyLevel.ALL, i, v1, v2);
someExpected[i] = new Object[] {i, v1};
allExpected1[i] = new Object[] {i, v1, v2, null};
allExpected2[i] = new Object[] {i, v1};
}
cluster.forEach((instance) -> instance.flush(KEYSPACE));
cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl ADD v3 int");
cluster.get(2).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl DROP v2");
assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
}
}
}

0 comments on commit 31b9078

Please sign in to comment.