From bd49317cf8d406824e8be0b3a7c676a0a6bb95f9 Mon Sep 17 00:00:00 2001 From: Alwyn Davis Date: Fri, 30 Sep 2016 00:44:22 +0000 Subject: [PATCH 01/34] Test commit --- INSTACLUSTR-README.md | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 INSTACLUSTR-README.md diff --git a/INSTACLUSTR-README.md b/INSTACLUSTR-README.md new file mode 100644 index 000000000000..8e3695edabd7 --- /dev/null +++ b/INSTACLUSTR-README.md @@ -0,0 +1,2 @@ +# Test +This is a *test*. From 2f45b53ee590fbefd3d15382765466fe716675d0 Mon Sep 17 00:00:00 2001 From: Sylvain Lebresne Date: Thu, 23 Jun 2016 10:58:14 +0200 Subject: [PATCH 02/34] Fixed conflicts --- CHANGES.txt | 1 + .../statements/ModificationStatement.java | 10 +++++ .../cassandra/net/OutboundTcpConnection.java | 3 +- .../cassandra/utils/ByteBufferUtil.java | 6 ++- .../org/apache/cassandra/cql3/CQLTester.java | 2 + .../validation/operations/InsertTest.java | 37 ++++++++++++++----- .../validation/operations/SelectTest.java | 21 ++++++++++- 7 files changed, 66 insertions(+), 14 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 3605752b7247..27b2da4c97c8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -37,6 +37,7 @@ Merged from 2.1: * Do not consider local node a valid source during replace (CASSANDRA-11848) * Add message dropped tasks to nodetool netstats (CASSANDRA-11855) * Avoid holding SSTableReaders for duration of incremental repair (CASSANDRA-11739) + * Prevent select statements with clustering key > 64k (CASSANDRA-11882) 3.6 diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index af89ba83f041..a3f766c0b65c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -673,7 +673,17 @@ final void addUpdates(UpdatesCollector collector, else { for (Clustering clustering : clusterings) + { + for (ByteBuffer c : clustering.getRawValues()) + { + if (c != null && c.remaining() > FBUtilities.MAX_UNSIGNED_SHORT) + throw new InvalidRequestException(String.format("Key length of %d is longer than maximum of %d", + clustering.dataSize(), + FBUtilities.MAX_UNSIGNED_SHORT)); + } + addUpdateForKey(upd, clustering, params); + } } } } diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index c8599bb8be54..c2d10fd31cd2 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -301,8 +301,9 @@ private void writeConnected(QueuedMessage qm, boolean flush) if (flush) out.flush(); } - catch (Exception e) + catch (Throwable e) { + JVMStabilityInspector.inspectThrowable(e); disconnect(); if (e instanceof IOException || e.getCause() instanceof IOException) { diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java index 87e8b9ddc953..cb4fc1dbd979 100644 --- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java +++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java @@ -305,7 +305,8 @@ public static void writeWithLength(byte[] bytes, DataOutput out) throws IOExcept public static void writeWithShortLength(ByteBuffer buffer, DataOutputPlus out) throws IOException { int length = buffer.remaining(); - assert 0 <= length && length <= FBUtilities.MAX_UNSIGNED_SHORT : length; + assert 0 <= length && length <= FBUtilities.MAX_UNSIGNED_SHORT + : String.format("Attempted serializing to buffer exceeded maximum of %s bytes: %s", FBUtilities.MAX_UNSIGNED_SHORT, length); out.writeShort(length); out.write(buffer); } @@ -313,7 +314,8 @@ public static void writeWithShortLength(ByteBuffer buffer, DataOutputPlus out) t public static void writeWithShortLength(byte[] buffer, DataOutput out) throws IOException { int length = buffer.length; - assert 0 <= length && length <= FBUtilities.MAX_UNSIGNED_SHORT : length; + assert 0 <= length && length <= FBUtilities.MAX_UNSIGNED_SHORT + : String.format("Attempted serializing to buffer exceeded maximum of %s bytes: %s", FBUtilities.MAX_UNSIGNED_SHORT, length); out.writeShort(length); out.write(buffer); } diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 1e7d05f290e7..49c7a7a51de4 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -64,6 +64,7 @@ import org.apache.cassandra.transport.Server; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; import static junit.framework.Assert.assertNotNull; @@ -80,6 +81,7 @@ public abstract class CQLTester protected static final boolean REUSE_PREPARED = Boolean.valueOf(System.getProperty("cassandra.test.reuse_prepared", "true")); protected static final long ROW_CACHE_SIZE_IN_MB = Integer.valueOf(System.getProperty("cassandra.test.row_cache_size_in_mb", "0")); private static final AtomicInteger seqNumber = new AtomicInteger(); + protected static final ByteBuffer TOO_BIG = ByteBuffer.allocate(FBUtilities.MAX_UNSIGNED_SHORT + 1024); private static org.apache.cassandra.transport.Server server; protected static final int nativePort; diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java index aa738bb60b6c..671dbb8072ed 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java @@ -24,6 +24,7 @@ import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.cql3.UntypedResultSet.Row; +import org.apache.cassandra.exceptions.InvalidRequestException; public class InsertTest extends CQLTester { @@ -162,10 +163,10 @@ public void testInsertWithTwoClusteringColumns() throws Throwable private void testInsertWithTwoClusteringColumns(boolean forceFlush) throws Throwable { createTable("CREATE TABLE %s (partitionKey int," + - "clustering_1 int," + - "clustering_2 int," + - "value int," + - " PRIMARY KEY (partitionKey, clustering_1, clustering_2))"); + "clustering_1 int," + + "clustering_2 int," + + "value int," + + " PRIMARY KEY (partitionKey, clustering_1, clustering_2))"); execute("INSERT INTO %s (partitionKey, clustering_1, clustering_2) VALUES (0, 0, 0)"); execute("INSERT INTO %s (partitionKey, clustering_1, clustering_2, value) VALUES (0, 0, 1, 1)"); @@ -260,11 +261,11 @@ public void testInsertWithAStaticColumn() throws Throwable private void testInsertWithAStaticColumn(boolean forceFlush) throws Throwable { createTable("CREATE TABLE %s (partitionKey int," + - "clustering_1 int," + - "clustering_2 int," + - "value int," + - "staticValue text static," + - " PRIMARY KEY (partitionKey, clustering_1, clustering_2))"); + "clustering_1 int," + + "clustering_2 int," + + "value int," + + "staticValue text static," + + " PRIMARY KEY (partitionKey, clustering_1, clustering_2))"); execute("INSERT INTO %s (partitionKey, clustering_1, clustering_2, staticValue) VALUES (0, 0, 0, 'A')"); execute("INSERT INTO %s (partitionKey, staticValue) VALUES (1, 'B')"); @@ -314,4 +315,22 @@ public void testInsertWithDefaultTtl() throws Throwable row = resultSet.one(); Assert.assertTrue(row.getInt("ttl(b)") >= (9 * secondsPerMinute)); } + + @Test + public void testPKInsertWithValueOver64K() throws Throwable + { + createTable("CREATE TABLE %s (a text, b text, PRIMARY KEY (a, b))"); + + assertInvalidThrow(InvalidRequestException.class, + "INSERT INTO %s (a, b) VALUES (?, 'foo')", new String(TOO_BIG.array())); + } + + @Test + public void testCKInsertWithValueOver64K() throws Throwable + { + createTable("CREATE TABLE %s (a text, b text, PRIMARY KEY (a, b))"); + + assertInvalidThrow(InvalidRequestException.class, + "INSERT INTO %s (a, b) VALUES ('foo', ?)", new String(TOO_BIG.array())); + } } diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java index f91ec5a0c0d9..dc757f3d3b97 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java @@ -26,6 +26,7 @@ import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.cql3.restrictions.StatementRestrictions; +import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.utils.ByteBufferUtil; import static org.junit.Assert.assertEquals; @@ -36,7 +37,6 @@ */ public class SelectTest extends CQLTester { - private static final ByteBuffer TOO_BIG = ByteBuffer.allocate(1024 * 65); @Test public void testSingleClustering() throws Throwable @@ -1272,7 +1272,7 @@ public void testSelectDistinctWithWhereClause() throws Throwable { "SELECT DISTINCT k FROM %s WHERE k IN (1, 2, 3) AND a = 10"); assertInvalidMessage(distinctQueryErrorMsg, - "SELECT DISTINCT k FROM %s WHERE b = 5"); + "SELECT DISTINCT k FROM %s WHERE b = 5"); assertRows(execute("SELECT DISTINCT k FROM %s WHERE k = 1"), row(1)); @@ -2264,6 +2264,23 @@ public void testIndexQueryWithValueOver64K() throws Throwable assertEmpty(execute("SELECT * FROM %s WHERE c = ? ALLOW FILTERING", TOO_BIG)); } + @Test + public void testPKQueryWithValueOver64K() throws Throwable + { + createTable("CREATE TABLE %s (a text, b text, PRIMARY KEY (a, b))"); + + assertInvalidThrow(InvalidRequestException.class, + "SELECT * FROM %s WHERE a = ?", new String(TOO_BIG.array())); + } + + @Test + public void testCKQueryWithValueOver64K() throws Throwable + { + createTable("CREATE TABLE %s (a text, b text, PRIMARY KEY (a, b))"); + + execute("SELECT * FROM %s WHERE a = 'foo' AND b = ?", new String(TOO_BIG.array())); + } + @Test public void testFilteringOnCompactTablesWithoutIndicesAndWithMaps() throws Throwable { From c64c6a80a5a56f517625197fb30154f2ad99c808 Mon Sep 17 00:00:00 2001 From: Yuki Morishita Date: Wed, 3 Aug 2016 09:34:27 -0500 Subject: [PATCH 03/34] Release sstables of failed stream sessions only when outgoing transfers are finished Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11345 --- CHANGES.txt | 1 + .../cassandra/streaming/StreamSession.java | 4 +- .../streaming/StreamTransferTask.java | 4 +- .../streaming/messages/FileMessageHeader.java | 21 +++-- .../messages/OutgoingFileMessage.java | 38 ++++++++- .../streaming/StreamTransferTaskTest.java | 85 +++++++++++++++++-- 6 files changed, 137 insertions(+), 16 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 27b2da4c97c8..daf761729e50 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -29,6 +29,7 @@ Merged from 2.2: * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) * Prohibit Reversed Counter type as part of the PK (CASSANDRA-9395) * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626) + * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345) Merged from 2.1: * Run CommitLog tests with different compression settings (CASSANDRA-9039) * cqlsh: apply current keyspace to source command (CASSANDRA-11152) diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 8dc306dee3cf..beea577e3560 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -25,6 +25,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.*; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; @@ -133,7 +134,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber // stream requests to send to the peer protected final Set requests = Sets.newConcurrentHashSet(); // streaming tasks are created and managed per ColumnFamily ID - private final ConcurrentHashMap transfers = new ConcurrentHashMap<>(); + @VisibleForTesting + protected final ConcurrentHashMap transfers = new ConcurrentHashMap<>(); // data receivers, filled after receiving prepare message private final Map receivers = new ConcurrentHashMap<>(); private final StreamingMetrics metrics; diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index e8d0cae5581f..4f313c36fbf0 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -22,6 +22,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import org.apache.cassandra.concurrent.NamedThreadFactory; @@ -40,7 +41,8 @@ public class StreamTransferTask extends StreamTask private final AtomicInteger sequenceNumber = new AtomicInteger(0); private boolean aborted = false; - private final Map files = new HashMap<>(); + @VisibleForTesting + protected final Map files = new HashMap<>(); private final Map timeoutTasks = new HashMap<>(); private long totalSize; diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java index 2b5047d76630..32d814bd1586 100644 --- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java +++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java @@ -25,7 +25,6 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.util.DataInputPlus; @@ -63,6 +62,9 @@ public class FileMessageHeader public final int sstableLevel; public final SerializationHeader.Component header; + /* cached size value */ + private transient final long size; + public FileMessageHeader(UUID cfId, int sequenceNumber, Version version, @@ -85,6 +87,7 @@ public FileMessageHeader(UUID cfId, this.repairedAt = repairedAt; this.sstableLevel = sstableLevel; this.header = header; + this.size = calculateSize(); } public FileMessageHeader(UUID cfId, @@ -109,6 +112,7 @@ public FileMessageHeader(UUID cfId, this.repairedAt = repairedAt; this.sstableLevel = sstableLevel; this.header = header; + this.size = calculateSize(); } public boolean isCompressed() @@ -121,23 +125,28 @@ public boolean isCompressed() */ public long size() { - long size = 0; + return size; + } + + private long calculateSize() + { + long transferSize = 0; if (compressionInfo != null) { // calculate total length of transferring chunks for (CompressionMetadata.Chunk chunk : compressionInfo.chunks) - size += chunk.length + 4; // 4 bytes for CRC + transferSize += chunk.length + 4; // 4 bytes for CRC } else if (compressionMetadata != null) { - size = compressionMetadata.getTotalSizeForSections(sections); + transferSize = compressionMetadata.getTotalSizeForSections(sections); } else { for (Pair section : sections) - size += section.right - section.left; + transferSize += section.right - section.left; } - return size; + return transferSize; } @Override diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java index f10b42e1d8be..ae006c864661 100644 --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java @@ -21,6 +21,8 @@ import java.nio.channels.ReadableByteChannel; import java.util.List; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataOutputStreamPlus; @@ -45,8 +47,16 @@ public OutgoingFileMessage deserialize(ReadableByteChannel in, int version, Stre public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException { - message.serialize(out, version, session); - session.fileSent(message.header); + message.startTransfer(); + try + { + message.serialize(out, version, session); + session.fileSent(message.header); + } + finally + { + message.finishTransfer(); + } } }; @@ -54,6 +64,7 @@ public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int private final Ref ref; private final String filename; private boolean completed = false; + private boolean transferring = false; public OutgoingFileMessage(Ref ref, int sequenceNumber, long estimatedKeys, List> sections, long repairedAt, boolean keepSSTableLevel) { @@ -91,12 +102,33 @@ public synchronized void serialize(DataOutputStreamPlus out, int version, Stream writer.write(out); } + @VisibleForTesting + public synchronized void finishTransfer() + { + transferring = false; + //session was aborted mid-transfer, now it's safe to release + if (completed) + { + ref.release(); + } + } + + @VisibleForTesting + public synchronized void startTransfer() + { + transferring = true; + } + public synchronized void complete() { if (!completed) { completed = true; - ref.release(); + //release only if not transferring + if (!transferring) + { + ref.release(); + } } } diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index 95725524d3fd..2e3eac115bbf 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -19,12 +19,17 @@ import java.net.InetAddress; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; import java.util.List; +import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.junit.BeforeClass; +import org.junit.After; import org.junit.Test; import junit.framework.Assert; @@ -36,7 +41,9 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.streaming.messages.OutgoingFileMessage; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.Ref; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -55,20 +62,24 @@ public static void defineSchema() throws ConfigurationException SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD)); } + @After + public void tearDown() + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); + cfs.clearUnsafe(); + } + @Test public void testScheduleTimeout() throws Exception { - String ks = KEYSPACE1; - String cf = "Standard1"; - InetAddress peer = FBUtilities.getBroadcastAddress(); StreamSession session = new StreamSession(peer, peer, null, 0, true, false); - ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); // create two sstables for (int i = 0; i < 2; i++) { - SchemaLoader.insertData(ks, cf, i, 1); + SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1); cfs.forceBlockingFlush(); } @@ -103,4 +114,68 @@ public void testScheduleTimeout() throws Exception // when all streaming are done, time out task should not be scheduled. assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS)); } + + @Test + public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception + { + InetAddress peer = FBUtilities.getBroadcastAddress(); + StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null, false); + StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.emptyList(), streamCoordinator); + StreamSession session = new StreamSession(peer, peer, null, 0, true, false); + session.init(future); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); + + // create two sstables + for (int i = 0; i < 2; i++) + { + SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1); + cfs.forceBlockingFlush(); + } + + // create streaming task that streams those two sstables + StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId); + List> refs = new ArrayList<>(cfs.getSSTables().size()); + for (SSTableReader sstable : cfs.getSSTables()) + { + List> ranges = new ArrayList<>(); + ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); + Ref ref = sstable.selfRef(); + refs.add(ref); + task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0); + } + assertEquals(2, task.getTotalNumberOfFiles()); + + //add task to stream session, so it is aborted when stream session fails + session.transfers.put(UUID.randomUUID(), task); + + //make a copy of outgoing file messages, since task is cleared when it's aborted + Collection files = new LinkedList<>(task.files.values()); + + //simulate start transfer + for (OutgoingFileMessage file : files) + { + file.startTransfer(); + } + + //fail stream session mid-transfer + session.onError(new Exception("Fake exception")); + + //make sure reference was not released + for (Ref ref : refs) + { + assertEquals(1, ref.globalCount()); + } + + //simulate finish transfer + for (OutgoingFileMessage file : files) + { + file.finishTransfer(); + } + + //now reference should be released + for (Ref ref : refs) + { + assertEquals(0, ref.globalCount()); + } + } } From 7a7c219024128063fee1bac382b68b659f93ea66 Mon Sep 17 00:00:00 2001 From: Paulo Motta Date: Fri, 12 Aug 2016 22:06:27 -0300 Subject: [PATCH 04/34] Throw RuntimeException if starting transfer of already completed OutgoingFileMessage --- .../cassandra/streaming/messages/OutgoingFileMessage.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java index ae006c864661..b2621f3dc5d4 100644 --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java @@ -116,6 +116,9 @@ public synchronized void finishTransfer() @VisibleForTesting public synchronized void startTransfer() { + if (completed) + throw new RuntimeException(String.format("Transfer of file %s already completed or aborted (perhaps session failed?).", + filename)); transferring = true; } From 05e9f07723ef53eb4b31ea3543699d6260797e3f Mon Sep 17 00:00:00 2001 From: Marcus Eriksson Date: Mon, 13 Jun 2016 15:29:08 +0200 Subject: [PATCH 05/34] Avoid missing sstables when getting the canonical sstables Patch by marcuse; reviewed by Stefania Alborghetti for CASSANDRA-11996 --- CHANGES.txt | 1 + .../cassandra/db/ColumnFamilyStore.java | 6 +-- .../cassandra/db/SizeEstimatesRecorder.java | 8 +++- .../cassandra/db/lifecycle/Tracker.java | 2 +- .../apache/cassandra/db/lifecycle/View.java | 41 ++++++++++++------ .../apache/cassandra/db/view/ViewBuilder.java | 4 +- .../index/SecondaryIndexManager.java | 2 +- .../index/internal/CassandraIndex.java | 2 +- .../io/sstable/IndexSummaryManager.java | 2 +- .../cassandra/streaming/StreamSession.java | 2 +- .../cassandra/db/lifecycle/ViewTest.java | 6 +-- .../index/internal/CustomCassandraIndex.java | 2 +- .../io/sstable/SSTableRewriterTest.java | 43 ++++++++++++++++++- 13 files changed, 91 insertions(+), 30 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index daf761729e50..e425a0faf172 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -18,6 +18,7 @@ Merged from 3.0: * Allow compaction strategies to disable early open (CASSANDRA-11754) * Refactor Materialized View code (CASSANDRA-11475) * Update Java Driver (CASSANDRA-11615) + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996) Merged from 2.2: * Persist local metadata earlier in startup sequence (CASSANDRA-11742) * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index f004ae056d14..41b7ce62866a 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1592,7 +1592,7 @@ public Set getLiveSSTables() public Iterable getSSTables(SSTableSet sstableSet) { - return data.getView().sstables(sstableSet); + return data.getView().select(sstableSet); } public Iterable getUncompactingSSTables() @@ -2049,7 +2049,7 @@ public static Iterable all() public Iterable keySamples(Range range) { - try (RefViewFragment view = selectAndReference(View.select(SSTableSet.CANONICAL))) + try (RefViewFragment view = selectAndReference(View.selectFunction(SSTableSet.CANONICAL))) { Iterable[] samples = new Iterable[view.sstables.size()]; int i = 0; @@ -2063,7 +2063,7 @@ public Iterable keySamples(Range range) public long estimatedKeysForRange(Range range) { - try (RefViewFragment view = selectAndReference(View.select(SSTableSet.CANONICAL))) + try (RefViewFragment view = selectAndReference(View.selectFunction(SSTableSet.CANONICAL))) { long count = 0; for (SSTableReader sstable : view.sstables) diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java index 2a74ea9d1e1f..0b31b872f232 100644 --- a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java +++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.dht.Range; @@ -103,8 +104,11 @@ private void recordSizeEstimates(ColumnFamilyStore table, Collection sstables = table.getTracker().getView().select(SSTableSet.CANONICAL); + SSTableIntervalTree tree = SSTableIntervalTree.build(sstables); + Range r = Range.makeRowRange(range); + Iterable canonicalSSTables = View.sstablesInBounds(r.left, r.right, tree); + refs = Refs.tryRef(canonicalSSTables); } // calculate the estimates. diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index be1436c3ab48..3e861f80e182 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -405,7 +405,7 @@ public Set getCompacting() public Iterable getUncompacting() { - return view.get().sstables(SSTableSet.NONCOMPACTING); + return view.get().select(SSTableSet.NONCOMPACTING); } public Iterable getUncompacting(Iterable candidates) diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java index cde6363ca7a8..d813710227f7 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/View.java +++ b/src/java/org/apache/cassandra/db/lifecycle/View.java @@ -118,14 +118,9 @@ public Set liveSSTables() return sstables; } - public Iterable sstables(SSTableSet sstableSet) - { - return select(sstableSet, sstables); - } - public Iterable sstables(SSTableSet sstableSet, Predicate filter) { - return select(sstableSet, filter(sstables, filter)); + return filter(select(sstableSet), filter); } // any sstable known by this tracker in any form; we have a special method here since it's only used for testing/debug @@ -136,7 +131,7 @@ public Iterable allKnownSSTables() return Iterables.concat(sstables, filterOut(compacting, sstables)); } - private Iterable select(SSTableSet sstableSet, Iterable sstables) + public Iterable select(SSTableSet sstableSet) { switch (sstableSet) { @@ -145,9 +140,18 @@ private Iterable select(SSTableSet sstableSet, Iterable !compacting.contains(s)); case CANONICAL: - return transform(filter(sstables, - (s) -> s.openReason != SSTableReader.OpenReason.EARLY), - (s) -> s.openReason != SSTableReader.OpenReason.MOVED_START ? s : compactingMap.get(s)); + Set canonicalSSTables = new HashSet<>(); + for (SSTableReader sstable : compacting) + if (sstable.openReason != SSTableReader.OpenReason.EARLY) + canonicalSSTables.add(sstable); + // reason for checking if compacting contains the sstable is that if compacting has an EARLY version + // of a NORMAL sstable, we still have the canonical version of that sstable in sstables. + // note that the EARLY version is equal, but not == since it is a different instance of the same sstable. + for (SSTableReader sstable : sstables) + if (!compacting.contains(sstable) && sstable.openReason != SSTableReader.OpenReason.EARLY) + canonicalSSTables.add(sstable); + + return canonicalSSTables; default: throw new IllegalStateException(); } @@ -190,12 +194,23 @@ public Iterable sstablesInBounds(SSTableSet sstableSet, Partition return Collections.emptyList(); PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() : right; - return select(sstableSet, intervalTree.search(Interval.create(left, stopInTree))); + return intervalTree.search(Interval.create(left, stopInTree)); + } + + public static List sstablesInBounds(PartitionPosition left, PartitionPosition right, SSTableIntervalTree intervalTree) + { + assert !AbstractBounds.strictlyWrapsAround(left, right); + + if (intervalTree.isEmpty()) + return Collections.emptyList(); + + PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() : right; + return intervalTree.search(Interval.create(left, stopInTree)); } - public static Function> select(SSTableSet sstableSet) + public static Function> selectFunction(SSTableSet sstableSet) { - return (view) -> view.sstables(sstableSet); + return (view) -> view.select(sstableSet); } public static Function> select(SSTableSet sstableSet, Predicate filter) diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java index 65e26e216598..70ff05328321 100644 --- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java @@ -115,7 +115,7 @@ public void run() if (buildStatus == null) { baseCfs.forceBlockingFlush(); - function = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL); + function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL); int generation = Integer.MIN_VALUE; try (Refs temp = baseCfs.selectAndReference(function).refs) @@ -136,7 +136,7 @@ public void run() @Nullable public Iterable apply(org.apache.cassandra.db.lifecycle.View view) { - Iterable readers = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL).apply(view); + Iterable readers = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL).apply(view); if (readers != null) return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left); return null; diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index b5ebd582ac7d..d6d3a951c37c 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -270,7 +270,7 @@ public void buildIndexBlocking(Index index) { if (index.shouldBuildBlocking()) { - try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL)); + try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL)); Refs sstables = viewFragment.refs) { buildIndexesBlocking(sstables, Collections.singleton(index)); diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java index 26184c337aee..1834bb927e2e 100644 --- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java +++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java @@ -671,7 +671,7 @@ private void buildBlocking() { baseCfs.forceBlockingFlush(); - try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL)); + try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL)); Refs sstables = viewFragment.refs) { if (sstables.isEmpty()) diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index aed35c952808..ddda430b8fb3 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@ -208,7 +208,7 @@ private Pair, Map> getCompacting do { View view = cfStore.getTracker().getView(); - allSSTables = ImmutableSet.copyOf(view.sstables(SSTableSet.CANONICAL)); + allSSTables = ImmutableSet.copyOf(view.select(SSTableSet.CANONICAL)); nonCompacting = ImmutableSet.copyOf(view.getUncompacting(allSSTables)); } while (null == (txn = cfStore.getTracker().tryModify(nonCompacting, OperationType.UNKNOWN))); diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index beea577e3560..bdd1fb0666bb 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -343,7 +343,7 @@ private List getSSTableSectionsForRanges(Collection nonCompacting = ImmutableSet.copyOf(cur.sstables(SSTableSet.NONCOMPACTING)); + Set nonCompacting = ImmutableSet.copyOf(cur.select(SSTableSet.NONCOMPACTING)); Assert.assertTrue(nonCompacting.containsAll(readers.subList(2, 5))); Assert.assertTrue(nonCompacting.containsAll(readers.subList(0, 1))); Assert.assertEquals(4, nonCompacting.size()); diff --git a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java index 51457c9359a3..c3923f5536c9 100644 --- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java +++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java @@ -603,7 +603,7 @@ private void buildBlocking() { baseCfs.forceBlockingFlush(); - try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL)); + try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL)); Refs sstables = viewFragment.refs) { if (sstables.isEmpty()) diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 7dbc45e6f9db..90cb1cb938a3 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -22,8 +22,10 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; @@ -765,7 +767,7 @@ public void testCanonicalView() throws Exception if (!checked && writer.currentWriter().getFilePointer() > 15000000) { checked = true; - ColumnFamilyStore.ViewFragment viewFragment = cfs.select(View.select(SSTableSet.CANONICAL)); + ColumnFamilyStore.ViewFragment viewFragment = cfs.select(View.selectFunction(SSTableSet.CANONICAL)); // canonical view should have only one SSTable which is not opened early. assertEquals(1, viewFragment.sstables.size()); SSTableReader sstable = viewFragment.sstables.get(0); @@ -819,6 +821,45 @@ public void testTwoWriters() throws IOException validateCFS(cfs); } + @Test + public void testCanonicalSSTables() throws ExecutionException, InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + truncate(cfs); + + cfs.addSSTable(writeFile(cfs, 100)); + Collection allSSTables = cfs.getSSTables(); + assertEquals(1, allSSTables.size()); + final AtomicBoolean done = new AtomicBoolean(false); + final AtomicBoolean failed = new AtomicBoolean(false); + Runnable r = () -> { + while (!done.get()) + { + Iterable sstables = cfs.getSSTables(SSTableSet.CANONICAL); + if (Iterables.size(sstables) != 1) + { + failed.set(true); + return; + } + } + }; + Thread t = new Thread(r); + try + { + t.start(); + cfs.forceMajorCompaction(); + } + finally + { + done.set(true); + t.join(20); + } + assertFalse(failed.get()); + + + } + private void validateKeys(Keyspace ks) { for (int i = 0; i < 100; i++) From f258bab2954be32c7637c1ba936a11f1d500d52e Mon Sep 17 00:00:00 2001 From: Sylvain Lebresne Date: Mon, 25 Jul 2016 16:35:33 +0200 Subject: [PATCH 06/34] AssertionError with MVs on updating a row that isn't indexed due to a null value patch by Sylvain Lebresne; reviewed by Carl Yeksigian for CASSANDRA-12247 --- CHANGES.txt | 1 + .../db/view/ViewUpdateGenerator.java | 4 +- .../org/apache/cassandra/cql3/ViewTest.java | 60 +++++++++++++++++++ 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index e425a0faf172..c4e19286ff5c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -19,6 +19,7 @@ Merged from 3.0: * Refactor Materialized View code (CASSANDRA-11475) * Update Java Driver (CASSANDRA-11615) * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996) + * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247) Merged from 2.2: * Persist local metadata earlier in startup sequence (CASSANDRA-11742) * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664) diff --git a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java index 4c6dbb7abc3d..a8af37bc3b72 100644 --- a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java +++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java @@ -191,7 +191,7 @@ private UpdateAction updateAction(Row existingBaseRow, Row mergedBaseRow) // If the update didn't modified this column, the cells will be the same object so it's worth checking if (before == after) - return before == null ? UpdateAction.NONE : UpdateAction.UPDATE_EXISTING; + return isLive(before) ? UpdateAction.UPDATE_EXISTING : UpdateAction.NONE; if (!isLive(before)) return isLive(after) ? UpdateAction.NEW_ENTRY : UpdateAction.NONE; @@ -452,7 +452,7 @@ private LivenessInfo computeLivenessInfoForEntry(Row baseRow) ColumnDefinition baseColumn = view.baseNonPKColumnsInViewPK.get(0); Cell cell = baseRow.getCell(baseColumn); - assert isLive(cell) : "We shouldn't have got there is the base row had no associated entry"; + assert isLive(cell) : "We shouldn't have got there if the base row had no associated entry"; long timestamp = Math.max(baseLiveness.timestamp(), cell.timestamp()); return LivenessInfo.withExpirationTime(timestamp, cell.ttl(), cell.localDeletionTime()); diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java index 4a1dc0761010..c9ef401e234f 100644 --- a/test/unit/org/apache/cassandra/cql3/ViewTest.java +++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java @@ -1077,4 +1077,64 @@ public void testCollectionInView() throws Throwable mvRows = executeNet(protocolVersion, "SELECT a, b FROM mvmap WHERE b = ?", 0); assertRowsNet(protocolVersion, mvRows, row(0, 0)); } + + @Test + public void testMultipleNonPrimaryKeysInView() throws Throwable + { + createTable("CREATE TABLE %s (" + + "a int," + + "b int," + + "c int," + + "d int," + + "e int," + + "PRIMARY KEY ((a, b), c))"); + + try + { + createView("mv_de", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL AND d IS NOT NULL AND e IS NOT NULL PRIMARY KEY ((d, a), b, e, c)"); + Assert.fail("Should have rejected a query including multiple non-primary key base columns"); + } + catch (Exception e) + { + } + + try + { + createView("mv_de", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL AND d IS NOT NULL AND e IS NOT NULL PRIMARY KEY ((a, b), c, d, e)"); + Assert.fail("Should have rejected a query including multiple non-primary key base columns"); + } + catch (Exception e) + { + } + + } + + @Test + public void testNullInClusteringColumns() throws Throwable + { + createTable("CREATE TABLE %s (id1 int, id2 int, v1 text, v2 text, PRIMARY KEY (id1, id2))"); + + executeNet(protocolVersion, "USE " + keyspace()); + + createView("mv", + "CREATE MATERIALIZED VIEW %s AS" + + " SELECT id1, v1, id2, v2" + + " FROM %%s" + + " WHERE id1 IS NOT NULL AND v1 IS NOT NULL AND id2 IS NOT NULL" + + " PRIMARY KEY (id1, v1, id2)" + + " WITH CLUSTERING ORDER BY (v1 DESC, id2 ASC)"); + + execute("INSERT INTO %s (id1, id2, v1, v2) VALUES (?, ?, ?, ?)", 0, 1, "foo", "bar"); + + assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * FROM %s"), row(0, 1, "foo", "bar")); + assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * FROM mv"), row(0, "foo", 1, "bar")); + + executeNet(protocolVersion, "UPDATE %s SET v1=? WHERE id1=? AND id2=?", null, 0, 1); + assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * FROM %s"), row(0, 1, null, "bar")); + assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * FROM mv")); + + executeNet(protocolVersion, "UPDATE %s SET v2=? WHERE id1=? AND id2=?", "rab", 0, 1); + assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * FROM %s"), row(0, 1, null, "rab")); + assertRowsNet(protocolVersion, executeNet(protocolVersion, "SELECT * FROM mv")); + } } From abdb8224a04a56a12a4b8ea6984d68f99234b2c8 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Mon, 9 May 2016 11:06:43 +0200 Subject: [PATCH 07/34] Allow updating UDT nested in non-frozen map after ALTERing the UDT Patch by Alex Petrov; reviewed by jknighton for CASSANDRA-11604 --- .../validation/entities/UserTypesTest.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java index 2bffb092069b..9e9d0e258e66 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java @@ -268,6 +268,34 @@ public void testAlteringUserTypeNestedWithinMap() throws Throwable } } + @Test + public void testAlteringUserTypeNestedWithinNonFrozenMap() throws Throwable + { + String ut1 = createType("CREATE TYPE %s (a int)"); + String columnType = KEYSPACE + "." + ut1; + + createTable("CREATE TABLE %s (x int PRIMARY KEY, y map>)"); + + execute("INSERT INTO %s (x, y) VALUES(1, {'firstValue': {a: 1}})"); + assertRows(execute("SELECT * FROM %s"), + row(1, map("firstValue", userType(1)))); + + flush(); + + execute("ALTER TYPE " + columnType + " ADD b int"); + execute("UPDATE %s SET y['secondValue'] = {a: 2, b: 2} WHERE x = 1"); + + assertRows(execute("SELECT * FROM %s"), + row(1, map("firstValue", userType(1), + "secondValue", userType(2, 2)))); + + flush(); + + assertRows(execute("SELECT * FROM %s"), + row(1, map("firstValue", userType(1), + "secondValue", userType(2, 2)))); + } + @Test public void testAlteringUserTypeNestedWithinSet() throws Throwable { From 909dfa82a5576a4ff2274511009b569ce9e50cc9 Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Fri, 10 Jun 2016 10:45:57 -0500 Subject: [PATCH 08/34] StorageService shutdown hook should use a volatile variable patch by Ed Capriolo; reviewed by Stefania Alborghetti for CASSANDRA-11984 --- CHANGES.txt | 3 ++- src/java/org/apache/cassandra/service/StorageService.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index c4e19286ff5c..1f72c3560844 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -31,7 +31,8 @@ Merged from 2.2: * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) * Prohibit Reversed Counter type as part of the PK (CASSANDRA-9395) * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626) - * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345) + * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345) + * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984) Merged from 2.1: * Run CommitLog tests with different compression settings (CASSANDRA-9039) * cqlsh: apply current keyspace to source command (CASSANDRA-11152) diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 2ac7ad633aae..bc15ece61e40 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -214,7 +214,7 @@ private static int getRingDelay() public volatile VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(tokenMetadata.partitioner); private Thread drainOnShutdown = null; - private boolean inShutdownHook = false; + private volatile boolean inShutdownHook = false; public static final StorageService instance = new StorageService(); From 63c6e9b8efaf91f6782f674cf33a8db13dc40f57 Mon Sep 17 00:00:00 2001 From: Sam Tunnicliffe Date: Fri, 24 Jun 2016 11:47:25 +0100 Subject: [PATCH 09/34] Ensure new CFS is initialized before adding to schema Patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for CASSANDRA-12083 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/config/Schema.java | 8 +++----- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 1f72c3560844..4dacc82cda6d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -20,6 +20,7 @@ Merged from 3.0: * Update Java Driver (CASSANDRA-11615) * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996) * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247) + * Fix potential race in schema during new table creation (CASSANDRA-12083) Merged from 2.2: * Persist local metadata earlier in startup sequence (CASSANDRA-11742) * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664) diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java index 03d8e8b96e42..ee1f1373ecf1 100644 --- a/src/java/org/apache/cassandra/config/Schema.java +++ b/src/java/org/apache/cassandra/config/Schema.java @@ -644,15 +644,13 @@ public void addTable(CFMetaData cfm) assert getCFMetaData(cfm.ksName, cfm.cfName) == null; // Make sure the keyspace is initialized - Keyspace.open(cfm.ksName); + // and init the new CF before switching the KSM to the new one + // to avoid races as in CASSANDRA-10761 + Keyspace.open(cfm.ksName).initCf(cfm, true); // Update the keyspaces map with the updated metadata update(cfm.ksName, ks -> ks.withSwapped(ks.tables.with(cfm))); // Update the table ID <-> table name map (cfIdMap) load(cfm); - - // init the new CF before switching the KSM to the new one - // to avoid races as in CASSANDRA-10761 - Keyspace.open(cfm.ksName).initCf(cfm, true); MigrationManager.instance.notifyCreateColumnFamily(cfm); } From 97d9b149c1189b82f68216be8eeac5f67f92b711 Mon Sep 17 00:00:00 2001 From: Alwyn Date: Thu, 6 Oct 2016 15:24:00 +1100 Subject: [PATCH 10/34] Fix for incorrect test case in CASSANDRA-11604 --- .../cql3/validation/entities/UserTypesTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java index 9e9d0e258e66..7eae03939dfe 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UserTypesTest.java @@ -278,7 +278,7 @@ public void testAlteringUserTypeNestedWithinNonFrozenMap() throws Throwable execute("INSERT INTO %s (x, y) VALUES(1, {'firstValue': {a: 1}})"); assertRows(execute("SELECT * FROM %s"), - row(1, map("firstValue", userType(1)))); + row(1, map("firstValue", userType("a", 1)))); flush(); @@ -286,14 +286,14 @@ public void testAlteringUserTypeNestedWithinNonFrozenMap() throws Throwable execute("UPDATE %s SET y['secondValue'] = {a: 2, b: 2} WHERE x = 1"); assertRows(execute("SELECT * FROM %s"), - row(1, map("firstValue", userType(1), - "secondValue", userType(2, 2)))); + row(1, map("firstValue", userType("a", 1), + "secondValue", userType("a", 2, "b", 2)))); flush(); assertRows(execute("SELECT * FROM %s"), - row(1, map("firstValue", userType(1), - "secondValue", userType(2, 2)))); + row(1, map("firstValue", userType("a", 1), + "secondValue", userType("a", 2, "b", 2)))); } @Test From 34a71bc0ea1fd8f3378a7fa9a286010c09f44956 Mon Sep 17 00:00:00 2001 From: Alwyn Davis Date: Sat, 15 Oct 2016 23:17:01 +0000 Subject: [PATCH 11/34] Bumped version number --- build.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.xml b/build.xml index df9de85aeea2..f5040cd46eb9 100644 --- a/build.xml +++ b/build.xml @@ -25,7 +25,7 @@ - + From 9e922d358a6ce8175b6f303b69b3c84a229514db Mon Sep 17 00:00:00 2001 From: benbromhead Date: Tue, 18 Oct 2016 23:57:52 -0700 Subject: [PATCH 12/34] Update README.asc Changed README to be our FAQ and text. Includes link to actual readme for apache cassandra --- README.asc | 119 +++++++++++++++++++---------------------------------- 1 file changed, 42 insertions(+), 77 deletions(-) diff --git a/README.asc b/README.asc index 8ae90e898f51..d74c6ca2e4c7 100644 --- a/README.asc +++ b/README.asc @@ -1,92 +1,57 @@ -Executive summary ------------------ += Cassandra 3.7 LTS -Cassandra is a partitioned row store. Rows are organized into tables with a required primary key. +This is repository contains Apache Cassandra 3.7 with a number of bugfixes from later versions of Apache Cassandra we have backported to 3.7. If you want to know more and how to build and install see the official https://github.com/apache/cassandra/blob/cassandra-3.7/README.asc[Apache Cassandra README] -http://wiki.apache.org/cassandra/Partitioners[Partitioning] means that Cassandra can distribute your data across multiple machines in an application-transparent matter. Cassandra will automatically repartition as machines are added and removed from the cluster. += FAQ +=== Is this a fork? +No, This is just Cassandra with a different release cadence for those who want 3.x features but are slightly more risk adverse than the current schedule allows. -http://wiki.apache.org/cassandra/DataModel[Row store] means that like relational databases, Cassandra organizes data by rows and columns. The Cassandra Query Language (CQL) is a close relative of SQL. +=== Why backport? +At Instaclustr we support and run a number of different versions of Apache Cassandra on behalf of our customers. Over the course of managing Cassandra for our customers we often encounter bugs. Some there are existing patches, others we patch ourselves. Generally, if we can, we try to wait for the next official Apache Cassandra release, however in the need to ensure our customers remain stable and running we will sometimes backport bugs and write our own hotfixes (which are also submitted back to the community). -For more information, see http://cassandra.apache.org/[the Apache Cassandra web site]. +=== Why not just use the official release? +With the 3.x tick-tock branch we have encountered more instability than with the previous release cadence. We feel that releasing new features every other release makes it very hard for operators to stabilize their production environment without bringing in brand new features that are not battle tested. With the dual release of Cassandra 3.8 and 3.9 simultaneously the bug fix branch included new and real-world untested features, specifically CDC. We have decided to stick with Cassandra 3.7 and instead backport critical issues and maintain it ourselves rather than trying to stick with the current Apache Cassandra release cadence. -Requirements ------------- -. Java >= 1.8 (OpenJDK and Oracle JVMS have been tested) -. Python 2.7 (for cqlsh) +=== Why release it? +A number of our customers and people in the community have asked if we would make this available, which we are more than happy to do so. This repository represents what we at Instaclustr run in production for Cassandra 3.7 and this is our way of helping the community get a similar level of stability as what you would get from our managed service. -Getting started ---------------- +=== How long will you support this? +We will be aiming to maintain our 3.7 release for the next 6 months or so. The lifetime will primarily be driven by the needs of our paying customers. There has been some initial discussions on the Cassandra mailing list about abandoning the current release cadence, should this happen it is likely we will start to follow the official Apache Cassandra releases more closely, however we will probably still maintain this repository with just a much shorter list of backported bugfixes. As we backport patches to the base 3.7 release, we will update the list below. -This short guide will walk you through getting a basic one node cluster up -and running, and demonstrate some simple reads and writes. +=== Where are the debs at? +We will just be maintaining a repository and not distributing or maintaining build artefacts and is this is provided as is without warranty. -First, we'll unpack our archive: +=== Will you help me? +We will keep an eye on the issues page of this github repository and try to help there, however if you want commercial support for our release mailto:info@instaclustr.com[contact us] for details. - $ tar -zxvf apache-cassandra-$VERSION.tar.gz - $ cd apache-cassandra-$VERSION +=== Current patches backported +* https://issues.apache.org/jira/browse/CASSANDRA-11882[11882: Clustering Key with ByteBuffer size > 64k throws Assertion Error] +* https://issues.apache.org/jira/browse/CASSANDRA-11345[11345: Assertion Errors "Memory was freed" during streaming] +* https://issues.apache.org/jira/browse/CASSANDRA-11996[11996: SSTableSet.CANONICAL can miss sstables] +* https://issues.apache.org/jira/browse/CASSANDRA-12247[12247: AssertionError with MVs on updating a row that isn't indexed due to a null value] +* https://issues.apache.org/jira/browse/CASSANDRA-11604[11604: select on table fails after changing user defined type in map] +* https://issues.apache.org/jira/browse/CASSANDRA-11984[11984: StorageService shutdown hook should use a volatile variable] +* https://issues.apache.org/jira/browse/CASSANDRA-12083[12083: Race condition during system.roles column family creation] -After that we start the server. Running the startup script with the -f argument will cause -Cassandra to remain in the foreground and log to standard out; it can be stopped with ctrl-C. +We are also working to backport additional patches that we see as required, however this is an ongoing process and there will always be fixes that "need" to be backported that we haven't done yet. We will tag commits with a minor version number for releases that we are ourselves running in production clusters. E.g. 3.7.1. Build artefacts will not be release builds and as such you will see this in the version numbers reported in various places by Cassandra. - $ bin/cassandra -f +=== How are you labeling these versions? +Currently we are putting using a minor version appended to the major version (e.g. for 3.7 this release is 3.7.1). This could conflict with official versioning if they release a patch for 3.7, so we are open to suggestions if that happens. -**** -Note for Windows users: to install Cassandra as a service, download -http://commons.apache.org/daemon/procrun.html[Procrun], set the -PRUNSRV environment variable to the full path of prunsrv (e.g., -C:\procrun\prunsrv.exe), and run "bin\cassandra.bat install". -Similarly, "uninstall" will remove the service. -**** +=== Is this tested? +Yes it sure is! We rerun all associated unit tests, dtests and our own internal tests. Tests for each backported patch are also run. We also run this version ourselves internally for our own monitoring and metrics system. If you want super duper stability, then look at the 2.x branches. -Now let's try to read and write some data using the Cassandra Query Language: +=== I want to contribute! +Awesome! We will be maintaining this repository under the following general rules (though we may make well explained exceptions): - $ bin/cqlsh +* All code and backported fixes in this repository can be found in some form in various branches in the ASF project repository. +* We will not accept pull requests for patches that do not have a corresponding Jira ticket in the ASF Cassandra jira repository. +* The only exception to this will be if there is a bug with the backport itself. +* We will only backport bugfixes, not features. +* Bugfixes will generally be for major issues (or issues that annoy us). +* We prioritise fixing issues our customers are experiencing, then fixing issues in the official Cassandra project and only then backporting issues listed here. +* If you think you've found a bug in Cassandra, reproduce in an official release then raise a Jira. +* If you think you've found a bug specific to our implementation, raise an issue in this github. -The command line client is interactive so if everything worked you should -be sitting in front of a prompt: - ----- -Connected to Test Cluster at localhost:9160. -[cqlsh 2.2.0 | Cassandra 1.2.0 | CQL spec 3.0.0 | Thrift protocol 19.35.0] -Use HELP for help. -cqlsh> ----- - -As the banner says, you can use 'help;' or '?' to see what CQL has to -offer, and 'quit;' or 'exit;' when you've had enough fun. But lets try -something slightly more interesting: - ----- -cqlsh> CREATE SCHEMA schema1 - WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; -cqlsh> USE schema1; -cqlsh:Schema1> CREATE TABLE users ( - user_id varchar PRIMARY KEY, - first varchar, - last varchar, - age int - ); -cqlsh:Schema1> INSERT INTO users (user_id, first, last, age) - VALUES ('jsmith', 'John', 'Smith', 42); -cqlsh:Schema1> SELECT * FROM users; - user_id | age | first | last ----------+-----+-------+------- - jsmith | 42 | john | smith - cqlsh:Schema1> ----- - -If your session looks similar to what's above, congrats, your single node -cluster is operational! - -For more on what commands are supported by CQL, see -https://github.com/apache/cassandra/blob/trunk/doc/cql3/CQL.textile[the CQL reference]. A -reasonable way to think of it is as, "SQL minus joins and subqueries, plus collections." - -Wondering where to go from here? - - * Getting started: http://wiki.apache.org/cassandra/GettingStarted - * Join us in #cassandra on irc.freenode.net and ask questions - * Subscribe to the Users mailing list by sending a mail to - user-subscribe@cassandra.apache.org - * Planet Cassandra aggregates Cassandra articles and news: - http://planetcassandra.org/ +=== This is terrible why don't you include XYZ? +Feel free to raise an issue and suggest something that is missing, we will be very friendly! There are definitely some things that need to be included here e.g. Time Window Compaction Strategy (TWCS), which we may include soon. From e36f435a5901a185c196ffb5a69bdebca5540444 Mon Sep 17 00:00:00 2001 From: benbromhead Date: Wed, 19 Oct 2016 00:21:43 -0700 Subject: [PATCH 13/34] Update README.asc Reworded a few things, fixed some typos --- README.asc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.asc b/README.asc index d74c6ca2e4c7..64157e9b515f 100644 --- a/README.asc +++ b/README.asc @@ -6,12 +6,12 @@ This is repository contains Apache Cassandra 3.7 with a number of bugfixes from === Is this a fork? No, This is just Cassandra with a different release cadence for those who want 3.x features but are slightly more risk adverse than the current schedule allows. -=== Why backport? -At Instaclustr we support and run a number of different versions of Apache Cassandra on behalf of our customers. Over the course of managing Cassandra for our customers we often encounter bugs. Some there are existing patches, others we patch ourselves. Generally, if we can, we try to wait for the next official Apache Cassandra release, however in the need to ensure our customers remain stable and running we will sometimes backport bugs and write our own hotfixes (which are also submitted back to the community). - === Why not just use the official release? With the 3.x tick-tock branch we have encountered more instability than with the previous release cadence. We feel that releasing new features every other release makes it very hard for operators to stabilize their production environment without bringing in brand new features that are not battle tested. With the dual release of Cassandra 3.8 and 3.9 simultaneously the bug fix branch included new and real-world untested features, specifically CDC. We have decided to stick with Cassandra 3.7 and instead backport critical issues and maintain it ourselves rather than trying to stick with the current Apache Cassandra release cadence. +=== Why backport? +At Instaclustr we support and run a number of different versions of Apache Cassandra on behalf of our customers. Over the course of managing Cassandra for our customers we often encounter bugs. There are existing patches for some of them, others we patch ourselves. Generally, if we can, we try to wait for the next official Apache Cassandra release, however in the need to ensure our customers remain stable and running we will sometimes backport bugs and write our own hotfixes (which are also submitted back to the community). + === Why release it? A number of our customers and people in the community have asked if we would make this available, which we are more than happy to do so. This repository represents what we at Instaclustr run in production for Cassandra 3.7 and this is our way of helping the community get a similar level of stability as what you would get from our managed service. @@ -36,7 +36,7 @@ We will keep an eye on the issues page of this github repository and try to help We are also working to backport additional patches that we see as required, however this is an ongoing process and there will always be fixes that "need" to be backported that we haven't done yet. We will tag commits with a minor version number for releases that we are ourselves running in production clusters. E.g. 3.7.1. Build artefacts will not be release builds and as such you will see this in the version numbers reported in various places by Cassandra. === How are you labeling these versions? -Currently we are putting using a minor version appended to the major version (e.g. for 3.7 this release is 3.7.1). This could conflict with official versioning if they release a patch for 3.7, so we are open to suggestions if that happens. +Currently we are putting using a minor version appended to the major version (e.g. for 3.7 this release is 3.7.1). If in the future this conflicts with official versioning we will change our versioning to remove any confusion. === Is this tested? Yes it sure is! We rerun all associated unit tests, dtests and our own internal tests. Tests for each backported patch are also run. We also run this version ourselves internally for our own monitoring and metrics system. If you want super duper stability, then look at the 2.x branches. From ca1d94eec089694998d71869246d19516a1ef487 Mon Sep 17 00:00:00 2001 From: benbromhead Date: Wed, 19 Oct 2016 00:28:01 -0700 Subject: [PATCH 14/34] Update README.asc added link to Instaclustr.com --- README.asc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.asc b/README.asc index 64157e9b515f..302b742f9b5c 100644 --- a/README.asc +++ b/README.asc @@ -10,7 +10,7 @@ No, This is just Cassandra with a different release cadence for those who want 3 With the 3.x tick-tock branch we have encountered more instability than with the previous release cadence. We feel that releasing new features every other release makes it very hard for operators to stabilize their production environment without bringing in brand new features that are not battle tested. With the dual release of Cassandra 3.8 and 3.9 simultaneously the bug fix branch included new and real-world untested features, specifically CDC. We have decided to stick with Cassandra 3.7 and instead backport critical issues and maintain it ourselves rather than trying to stick with the current Apache Cassandra release cadence. === Why backport? -At Instaclustr we support and run a number of different versions of Apache Cassandra on behalf of our customers. Over the course of managing Cassandra for our customers we often encounter bugs. There are existing patches for some of them, others we patch ourselves. Generally, if we can, we try to wait for the next official Apache Cassandra release, however in the need to ensure our customers remain stable and running we will sometimes backport bugs and write our own hotfixes (which are also submitted back to the community). +At https://www.instaclustr.com[Instaclustr] we support and run a number of different versions of Apache Cassandra on behalf of our customers. Over the course of managing Cassandra for our customers we often encounter bugs. There are existing patches for some of them, others we patch ourselves. Generally, if we can, we try to wait for the next official Apache Cassandra release, however in the need to ensure our customers remain stable and running we will sometimes backport bugs and write our own hotfixes (which are also submitted back to the community). === Why release it? A number of our customers and people in the community have asked if we would make this available, which we are more than happy to do so. This repository represents what we at Instaclustr run in production for Cassandra 3.7 and this is our way of helping the community get a similar level of stability as what you would get from our managed service. From 2d1c3f5ae2000899b74af91aaf883bf5690585c9 Mon Sep 17 00:00:00 2001 From: benbromhead Date: Wed, 19 Oct 2016 10:38:16 -0700 Subject: [PATCH 15/34] Update README.asc --- README.asc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.asc b/README.asc index 302b742f9b5c..fb32fa2820ca 100644 --- a/README.asc +++ b/README.asc @@ -39,7 +39,7 @@ We are also working to backport additional patches that we see as required, howe Currently we are putting using a minor version appended to the major version (e.g. for 3.7 this release is 3.7.1). If in the future this conflicts with official versioning we will change our versioning to remove any confusion. === Is this tested? -Yes it sure is! We rerun all associated unit tests, dtests and our own internal tests. Tests for each backported patch are also run. We also run this version ourselves internally for our own monitoring and metrics system. If you want super duper stability, then look at the 2.x branches. +Yes it sure is! We rerun all associated unit tests, dtests and our own internal tests. Tests for each backported patch are also run. We also run this version ourselves internally for our own monitoring and metrics system. Having said that, there will always still be bugs and there will also be bugs with patches we have yet to backport. If you want super duper stability, then look at the 2.x branches. === I want to contribute! Awesome! We will be maintaining this repository under the following general rules (though we may make well explained exceptions): From 488d07eaae3489aaf5468aeffde7859957292f3f Mon Sep 17 00:00:00 2001 From: benbromhead Date: Wed, 19 Oct 2016 11:28:45 -0700 Subject: [PATCH 16/34] Update README.asc a word --- README.asc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.asc b/README.asc index fb32fa2820ca..a8d0e200b975 100644 --- a/README.asc +++ b/README.asc @@ -4,7 +4,7 @@ This is repository contains Apache Cassandra 3.7 with a number of bugfixes from = FAQ === Is this a fork? -No, This is just Cassandra with a different release cadence for those who want 3.x features but are slightly more risk adverse than the current schedule allows. +No, This is just Cassandra with a different release cadence for those who want 3.x features but are slightly more risk averse than the current schedule allows. === Why not just use the official release? With the 3.x tick-tock branch we have encountered more instability than with the previous release cadence. We feel that releasing new features every other release makes it very hard for operators to stabilize their production environment without bringing in brand new features that are not battle tested. With the dual release of Cassandra 3.8 and 3.9 simultaneously the bug fix branch included new and real-world untested features, specifically CDC. We have decided to stick with Cassandra 3.7 and instead backport critical issues and maintain it ourselves rather than trying to stick with the current Apache Cassandra release cadence. From deb53f468fe27aeb65f9729937233ea8e954f123 Mon Sep 17 00:00:00 2001 From: Yuki Morishita Date: Thu, 29 Sep 2016 15:05:12 -0500 Subject: [PATCH 17/34] Merge branch 'cassandra-3.0' into cassandra-3.7-instaclustr Fix merkle tree depth calculation Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12580 --- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 6 +- .../apache/cassandra/repair/Validator.java | 9 +- .../apache/cassandra/utils/MerkleTree.java | 10 ++ .../apache/cassandra/utils/MerkleTrees.java | 10 ++ .../db/compaction/CompactionsTest.java | 2 +- .../cassandra/repair/ValidatorTest.java | 167 ++++++++++++------ 7 files changed, 145 insertions(+), 60 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 4dacc82cda6d..0c967aff86e4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -22,6 +22,7 @@ Merged from 3.0: * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247) * Fix potential race in schema during new table creation (CASSANDRA-12083) Merged from 2.2: + * Fix merkle tree depth calculation (CASSANDRA-12580) * Persist local metadata earlier in startup sequence (CASSANDRA-11742) * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664) * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index dca48aa1d2a1..b4a4e320b10e 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1223,7 +1223,6 @@ private void doValidationCompaction(ColumnFamilyStore cfs, Validator validator) // Create Merkle trees suitable to hold estimated partitions for the given ranges. // We blindly assume that a partition is evenly distributed on all sstables for now. - // determine tree depth from number of partitions, but cap at 20 to prevent large tree. MerkleTrees tree = createMerkleTrees(sstables, validator.desc.ranges, cfs); long start = System.nanoTime(); try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges); @@ -1284,8 +1283,11 @@ private static MerkleTrees createMerkleTrees(Iterable sstables, C { long numPartitions = rangePartitionCounts.get(range); double rangeOwningRatio = allPartitions > 0 ? (double)numPartitions / allPartitions : 0; + // determine max tree depth proportional to range size to avoid blowing up memory with multiple tress, + // capping at 20 to prevent large tree (CASSANDRA-11390) int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0; - int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), maxDepth) : 0; + // determine tree depth from number of partitions, capping at max tree depth (CASSANDRA-5263) + int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0; tree.addMerkleTree((int) Math.pow(2, depth), range); } if (logger.isDebugEnabled()) diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java index e51dc0ee55ec..d05092b4fe4a 100644 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@ -58,6 +58,7 @@ public class Validator implements Runnable public final RepairJobDesc desc; public final InetAddress initiator; public final int gcBefore; + private final boolean evenTreeDistribution; // null when all rows with the min token have been consumed private long validated; @@ -70,6 +71,11 @@ public class Validator implements Runnable private DecoratedKey lastKey; public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore) + { + this(desc, initiator, gcBefore, false); + } + + public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution) { this.desc = desc; this.initiator = initiator; @@ -77,13 +83,14 @@ public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore) validated = 0; range = null; ranges = null; + this.evenTreeDistribution = evenTreeDistribution; } public void prepare(ColumnFamilyStore cfs, MerkleTrees tree) { this.trees = tree; - if (!tree.partitioner().preservesOrder()) + if (!tree.partitioner().preservesOrder() || evenTreeDistribution) { // You can't beat an even tree distribution for md5 tree.init(); diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java index bc39b91554bb..ba36345f89c3 100644 --- a/src/java/org/apache/cassandra/utils/MerkleTree.java +++ b/src/java/org/apache/cassandra/utils/MerkleTree.java @@ -535,6 +535,16 @@ public EstimatedHistogram histogramOfRowCountPerLeaf() return histbuild.buildWithStdevRangesAroundMean(); } + public long rowCount() + { + long count = 0; + for (TreeRange range : new TreeRangeIterator(this)) + { + count += range.hashable.rowsInRange; + } + return count; + } + @Override public String toString() { diff --git a/src/java/org/apache/cassandra/utils/MerkleTrees.java b/src/java/org/apache/cassandra/utils/MerkleTrees.java index b950b3b79e70..4ae55ab85bfe 100644 --- a/src/java/org/apache/cassandra/utils/MerkleTrees.java +++ b/src/java/org/apache/cassandra/utils/MerkleTrees.java @@ -319,6 +319,16 @@ public Iterator, MerkleTree>> iterator() return merkleTrees.entrySet().iterator(); } + public long rowCount() + { + long totalCount = 0; + for (MerkleTree tree : merkleTrees.values()) + { + totalCount += tree.rowCount(); + } + return totalCount; + } + public class TreeRangeIterator extends AbstractIterator implements Iterable, PeekingIterator diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index 198b01b94f0d..0ce81d33e946 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -115,7 +115,7 @@ public ColumnFamilyStore testSingleSSTableCompaction(String strategyClassName) t return store; } - private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl) + public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl) { long timestamp = System.currentTimeMillis(); CFMetaData cfm = Keyspace.open(ks).getColumnFamilyStore(cf).metadata; diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java index 14f5707710c8..9c32cef7ab81 100644 --- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java +++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java @@ -15,13 +15,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.cassandra.repair; import java.net.InetAddress; import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.CompactionsTest; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.junit.After; import org.junit.BeforeClass; import org.junit.Test; @@ -42,9 +52,12 @@ import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.ValidationComplete; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.utils.MerkleTrees; -import org.apache.cassandra.utils.concurrent.SimpleCondition; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -54,6 +67,8 @@ public class ValidatorTest { + private static final long TEST_TIMEOUT = 60; //seconds + private static final String keyspace = "ValidatorTest"; private static final String columnFamily = "Standard1"; private static IPartitioner partitioner; @@ -80,34 +95,7 @@ public void testValidatorComplete() throws Throwable Range range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range)); - final SimpleCondition lock = new SimpleCondition(); - MessagingService.instance().addMessageSink(new IMessageSink() - { - public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) - { - try - { - if (message.verb == MessagingService.Verb.REPAIR_MESSAGE) - { - RepairMessage m = (RepairMessage) message.payload; - assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); - assertEquals(desc, m.desc); - assertTrue(((ValidationComplete) m).success()); - assertNotNull(((ValidationComplete) m).trees); - } - } - finally - { - lock.signalAll(); - } - return false; - } - - public boolean allowIncomingMessage(MessageIn message, int id) - { - return false; - } - }); + final CompletableFuture outgoingMessageSink = registerOutgoingMessageSink(); InetAddress remote = InetAddress.getByName("127.0.0.2"); @@ -130,8 +118,13 @@ public boolean allowIncomingMessage(MessageIn message, int id) Token min = tree.partitioner().getMinimumToken(); assertNotNull(tree.hash(new Range<>(min, min))); - if (!lock.isSignaled()) - lock.await(); + MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS); + assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb); + RepairMessage m = (RepairMessage) message.payload; + assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); + assertEquals(desc, m.desc); + assertTrue(((ValidationComplete) m).success()); + assertNotNull(((ValidationComplete) m).trees); } @@ -141,26 +134,95 @@ public void testValidatorFailed() throws Throwable Range range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range)); - final SimpleCondition lock = new SimpleCondition(); + final CompletableFuture outgoingMessageSink = registerOutgoingMessageSink(); + + InetAddress remote = InetAddress.getByName("127.0.0.2"); + + Validator validator = new Validator(desc, remote, 0); + validator.fail(); + + MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS); + assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb); + RepairMessage m = (RepairMessage) message.payload; + assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); + assertEquals(desc, m.desc); + assertFalse(((ValidationComplete) m).success()); + assertNull(((ValidationComplete) m).trees); + } + + @Test + public void simpleValidationTest128() throws Exception + { + simpleValidationTest(128); + } + + @Test + public void simpleValidationTest1500() throws Exception + { + simpleValidationTest(1500); + } + + /** + * Test for CASSANDRA-5263 + * 1. Create N rows + * 2. Run validation compaction + * 3. Expect merkle tree with size 2^(log2(n)) + */ + public void simpleValidationTest(int n) throws Exception + { + Keyspace ks = Keyspace.open(keyspace); + ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily); + cfs.clearUnsafe(); + + // disable compaction while flushing + cfs.disableAutoCompaction(); + + CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s + + cfs.forceBlockingFlush(); + assertEquals(1, cfs.getLiveSSTables().size()); + + // wait enough to force single compaction + TimeUnit.SECONDS.sleep(5); + + SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); + UUID repairSessionId = UUIDGen.getTimeUUID(); + final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(), + cfs.getColumnFamilyName(), Collections.singletonList(new Range<>(sstable.first.getToken(), + sstable.last.getToken()))); + + ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(), + Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE, + false); + + final CompletableFuture outgoingMessageSink = registerOutgoingMessageSink(); + Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true); + CompactionManager.instance.submitValidation(cfs, validator); + + MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS); + assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb); + RepairMessage m = (RepairMessage) message.payload; + assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); + assertEquals(desc, m.desc); + assertTrue(((ValidationComplete) m).success()); + MerkleTrees trees = ((ValidationComplete) m).trees; + + Iterator, MerkleTree>> iterator = trees.iterator(); + while (iterator.hasNext()) + { + assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), iterator.next().getValue().size(), 0.0); + } + assertEquals(trees.rowCount(), n); + } + + private CompletableFuture registerOutgoingMessageSink() + { + final CompletableFuture future = new CompletableFuture<>(); MessagingService.instance().addMessageSink(new IMessageSink() { public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) { - try - { - if (message.verb == MessagingService.Verb.REPAIR_MESSAGE) - { - RepairMessage m = (RepairMessage) message.payload; - assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); - assertEquals(desc, m.desc); - assertFalse(((ValidationComplete) m).success()); - assertNull(((ValidationComplete) m).trees); - } - } - finally - { - lock.signalAll(); - } + future.complete(message); return false; } @@ -169,13 +231,6 @@ public boolean allowIncomingMessage(MessageIn message, int id) return false; } }); - - InetAddress remote = InetAddress.getByName("127.0.0.2"); - - Validator validator = new Validator(desc, remote, 0); - validator.fail(); - - if (!lock.isSignaled()) - lock.await(); + return future; } } From e09f1abd7261d3372081b72713fa7e57ccc9d3cd Mon Sep 17 00:00:00 2001 From: Yuki Morishita Date: Thu, 20 Oct 2016 09:47:36 -0500 Subject: [PATCH 18/34] Fix unreleased resource sockets patch by Arunkumar M; reviewed by yukim for CASSANDRA-12330 --- CHANGES.txt | 1 + .../cassandra/streaming/DefaultConnectionFactory.java | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGES.txt b/CHANGES.txt index 0c967aff86e4..ad1bc29b7f81 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -2,6 +2,7 @@ * Support multiple folders for user defined compaction tasks (CASSANDRA-11765) * Fix race in CompactionStrategyManager's pause/resume (CASSANDRA-11922) Merged from 3.0: + * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330) * Fix legacy serialization of Thrift-generated non-compound range tombstones when communicating with 2.x nodes (CASSANDRA-11930) * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849) diff --git a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java index 5c27ff346595..7e9dfd342c35 100644 --- a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java +++ b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java @@ -47,15 +47,20 @@ public Socket createConnection(InetAddress peer) throws IOException int attempts = 0; while (true) { + Socket socket = null; try { - Socket socket = OutboundTcpConnectionPool.newSocket(peer); + socket = OutboundTcpConnectionPool.newSocket(peer); socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout()); socket.setKeepAlive(true); return socket; } catch (IOException e) { + if (socket != null) + { + socket.close(); + } if (++attempts >= MAX_CONNECT_ATTEMPTS) throw e; From 8405b187d5bbd7951fd7de409b9370bfe3f668cd Mon Sep 17 00:00:00 2001 From: Yuki Morishita Date: Thu, 20 Oct 2016 09:45:28 -0500 Subject: [PATCH 19/34] Fix unreleased resource sockets patch by Arunkumar M; reviewed by yukim for CASSANDRA-12329 --- .../apache/cassandra/security/SSLFactory.java | 53 +++++++++++++++---- 1 file changed, 42 insertions(+), 11 deletions(-) diff --git a/src/java/org/apache/cassandra/security/SSLFactory.java b/src/java/org/apache/cassandra/security/SSLFactory.java index 2e59b06f64f8..7216e2c7b3f9 100644 --- a/src/java/org/apache/cassandra/security/SSLFactory.java +++ b/src/java/org/apache/cassandra/security/SSLFactory.java @@ -60,11 +60,18 @@ public static SSLServerSocket getServerSocket(EncryptionOptions options, InetAdd { SSLContext ctx = createSSLContext(options, true); SSLServerSocket serverSocket = (SSLServerSocket)ctx.getServerSocketFactory().createServerSocket(); - serverSocket.setReuseAddress(true); - prepareSocket(serverSocket, options); - serverSocket.bind(new InetSocketAddress(address, port), 500); - - return serverSocket; + try + { + serverSocket.setReuseAddress(true); + prepareSocket(serverSocket, options); + serverSocket.bind(new InetSocketAddress(address, port), 500); + return serverSocket; + } + catch (IllegalArgumentException | SecurityException | IOException e) + { + serverSocket.close(); + throw e; + } } /** Create a socket and connect */ @@ -72,8 +79,16 @@ public static SSLSocket getSocket(EncryptionOptions options, InetAddress address { SSLContext ctx = createSSLContext(options, true); SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(address, port, localAddress, localPort); - prepareSocket(socket, options); - return socket; + try + { + prepareSocket(socket, options); + return socket; + } + catch (IllegalArgumentException e) + { + socket.close(); + throw e; + } } /** Create a socket and connect, using any local address */ @@ -81,8 +96,16 @@ public static SSLSocket getSocket(EncryptionOptions options, InetAddress address { SSLContext ctx = createSSLContext(options, true); SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(address, port); - prepareSocket(socket, options); - return socket; + try + { + prepareSocket(socket, options); + return socket; + } + catch (IllegalArgumentException e) + { + socket.close(); + throw e; + } } /** Just create a socket */ @@ -90,8 +113,16 @@ public static SSLSocket getSocket(EncryptionOptions options) throws IOException { SSLContext ctx = createSSLContext(options, true); SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(); - prepareSocket(socket, options); - return socket; + try + { + prepareSocket(socket, options); + return socket; + } + catch (IllegalArgumentException e) + { + socket.close(); + throw e; + } } /** Sets relevant socket options specified in encryption settings */ From 53ebca6850b58f9f8ff8f3e03b50d2c604fdfc3a Mon Sep 17 00:00:00 2001 From: Jeff Jirsa Date: Tue, 18 Oct 2016 18:11:17 -0700 Subject: [PATCH 20/34] Correct log message for statistics of offheap memtable flush Patch by Kurt Greaves; Reviewed by Jeff Jirsa for CASSANDRA-12776 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/ColumnFamilyStore.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES.txt b/CHANGES.txt index ad1bc29b7f81..0dab61667bba 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -2,6 +2,7 @@ * Support multiple folders for user defined compaction tasks (CASSANDRA-11765) * Fix race in CompactionStrategyManager's pause/resume (CASSANDRA-11922) Merged from 3.0: + * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776) * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330) * Fix legacy serialization of Thrift-generated non-compound range tombstones when communicating with 2.x nodes (CASSANDRA-11930) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 41b7ce62866a..42464dc66a94 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1295,7 +1295,7 @@ public void run() float flushingOnHeap = Memtable.MEMORY_POOL.onHeap.reclaimingRatio(); float flushingOffHeap = Memtable.MEMORY_POOL.offHeap.reclaimingRatio(); float thisOnHeap = largest.getAllocator().onHeap().ownershipRatio(); - float thisOffHeap = largest.getAllocator().onHeap().ownershipRatio(); + float thisOffHeap = largest.getAllocator().offHeap().ownershipRatio(); logger.debug("Flushing largest {} to free up room. Used total: {}, live: {}, flushing: {}, this: {}", largest.cfs, ratio(usedOnHeap, usedOffHeap), ratio(liveOnHeap, liveOffHeap), ratio(flushingOnHeap, flushingOffHeap), ratio(thisOnHeap, thisOffHeap)); From ab0b88621071ee820699af742eeef12b3dd9e75c Mon Sep 17 00:00:00 2001 From: Marcus Eriksson Date: Fri, 21 Oct 2016 09:03:31 +0200 Subject: [PATCH 21/34] Don't skip sstables based on maxLocalDeletionTime Patch by Cameron Zemek; reviewed by marcuse for CASSANDRA-12765 --- CHANGES.txt | 1 + .../db/SinglePartitionReadCommand.java | 3 +- .../io/sstable/format/SSTableReader.java | 2 +- .../db/SinglePartitionReadCommandCQLTest.java | 41 +++++++++++++++++++ 4 files changed, 45 insertions(+), 2 deletions(-) create mode 100644 test/unit/org/apache/cassandra/db/SinglePartitionReadCommandCQLTest.java diff --git a/CHANGES.txt b/CHANGES.txt index 0dab61667bba..c9fb14b5a130 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -38,6 +38,7 @@ Merged from 2.2: * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345) * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984) Merged from 2.1: + * Don't skip sstables based on maxLocalDeletionTime (CASSANDRA-12765) * Run CommitLog tests with different compression settings (CASSANDRA-9039) * cqlsh: apply current keyspace to source command (CASSANDRA-11152) * Clear out parent repair session if repair coordinator dies (CASSANDRA-11824) diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 4d508853dfb8..4caaec2ea5c2 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -572,6 +572,7 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs if (skippedSSTablesWithTombstones == null) skippedSSTablesWithTombstones = new ArrayList<>(); skippedSSTablesWithTombstones.add(sstable); + } continue; } @@ -762,7 +763,7 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam sstable.incrementReadCount(); try (UnfilteredRowIterator iter = StorageHook.instance.makeRowIterator(cfs, sstable, partitionKey(), Slices.ALL, columnFilter(), filter.isReversed(), isForThrift())) { - if (iter.partitionLevelDeletion().isLive()) + if (!iter.partitionLevelDeletion().isLive()) { sstablesIterated++; result = add(UnfilteredRowIterators.noRowsIterator(iter.metadata(), iter.partitionKey(), Rows.EMPTY_STATIC_ROW, iter.partitionLevelDeletion(), filter.isReversed()), result, filter, sstable.isRepaired()); diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index db69f2f37f8e..8bd8925b1d16 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -1859,7 +1859,7 @@ public int getMaxLocalDeletionTime() return sstableMetadata.maxLocalDeletionTime; } - /** sstable contains no tombstones if maxLocalDeletionTime == Integer.MAX_VALUE */ + /** sstable contains no tombstones if minLocalDeletionTime == Integer.MAX_VALUE */ public boolean hasTombstones() { // sstable contains no tombstone if minLocalDeletionTime is still set to the default value Integer.MAX_VALUE diff --git a/test/unit/org/apache/cassandra/db/SinglePartitionReadCommandCQLTest.java b/test/unit/org/apache/cassandra/db/SinglePartitionReadCommandCQLTest.java new file mode 100644 index 000000000000..1c891ec2b2c6 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/SinglePartitionReadCommandCQLTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import org.junit.Test; + +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.UntypedResultSet; +import static org.junit.Assert.assertTrue; + +public class SinglePartitionReadCommandCQLTest extends CQLTester +{ + @Test + public void partitionLevelDeletionTest() throws Throwable + { + createTable("CREATE TABLE %s (bucket_id TEXT,name TEXT,data TEXT,PRIMARY KEY (bucket_id, name))"); + execute("insert into %s (bucket_id, name, data) values ('8772618c9009cf8f5a5e0c18', 'test', 'hello')"); + getCurrentColumnFamilyStore().forceBlockingFlush(); + execute("insert into %s (bucket_id, name, data) values ('8772618c9009cf8f5a5e0c19', 'test2', 'hello');"); + execute("delete from %s where bucket_id = '8772618c9009cf8f5a5e0c18'"); + getCurrentColumnFamilyStore().forceBlockingFlush(); + UntypedResultSet res = execute("select * from %s where bucket_id = '8772618c9009cf8f5a5e0c18' and name = 'test'"); + assertTrue(res.isEmpty()); + } +} From 6e694e76a02fdd30ac73a46d07c55689ba5f93f5 Mon Sep 17 00:00:00 2001 From: Jeff Jirsa Date: Fri, 21 Oct 2016 19:04:04 -0700 Subject: [PATCH 22/34] Split consistent range movement flag correction Patch by Sankalp Kohli; Reviewed by Jeff Jirsa for CASSANDRA-12786 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/service/StorageService.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES.txt b/CHANGES.txt index c9fb14b5a130..e81ea61ba1ac 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -37,6 +37,7 @@ Merged from 2.2: * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626) * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345) * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984) + * Split consistent range movement flag correction (CASSANDRA-12786) Merged from 2.1: * Don't skip sstables based on maxLocalDeletionTime (CASSANDRA-12765) * Run CommitLog tests with different compression settings (CASSANDRA-9039) diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index bc15ece61e40..8e979f380058 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -945,7 +945,7 @@ private void joinTokenRing(int delay) throws ConfigurationException logger.debug("... got ring + schema info"); - if (useStrictConsistency && + if (useStrictConsistency && !allowSimultaneousMoves() && ( tokenMetadata.getBootstrapTokens().valueSet().size() > 0 || tokenMetadata.getLeavingEndpoints().size() > 0 || From 86e6e4ba1ed414e4ace38c3e3c51a151e74c45bd Mon Sep 17 00:00:00 2001 From: Sylvain Lebresne Date: Wed, 20 Jul 2016 14:29:16 +0200 Subject: [PATCH 23/34] NullPointerExpception when reading/compacting table patch by Sylvain Lebresne; reviewed by Carl Yeksigian for CASSANDRA-11988 --- CHANGES.txt | 2 ++ .../db/partitions/AbstractBTreePartition.java | 2 +- .../db/partitions/PurgeFunction.java | 15 ++++++++----- .../cassandra/db/rows/BaseRowIterator.java | 2 +- .../org/apache/cassandra/db/rows/Row.java | 13 ++++++++--- .../cassandra/db/transform/BaseRows.java | 2 +- .../apache/cassandra/db/transform/Filter.java | 12 ++++++---- .../entities/StaticColumnsTest.java | 22 +++++++++++++++++++ 8 files changed, 55 insertions(+), 15 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index e81ea61ba1ac..2f8127c3c349 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,8 @@ 3.7 * Support multiple folders for user defined compaction tasks (CASSANDRA-11765) * Fix race in CompactionStrategyManager's pause/resume (CASSANDRA-11922) +Merged from 3.x: + * NullPointerExpception when reading/compacting table (CASSANDRA-11988) Merged from 3.0: * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776) * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330) diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java index 1c05f3c682ff..04004021de96 100644 --- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java @@ -60,7 +60,7 @@ protected static final class Holder this.columns = columns; this.tree = tree; this.deletionInfo = deletionInfo; - this.staticRow = staticRow; + this.staticRow = staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow; this.stats = stats; } } diff --git a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java index 492bab1c7d57..d3255d303f27 100644 --- a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java +++ b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java @@ -55,7 +55,8 @@ protected void updateProgress() { } - public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + @Override + protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) { onNewPartition(partition.partitionKey()); @@ -71,24 +72,28 @@ public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) return purged; } - public DeletionTime applyToDeletion(DeletionTime deletionTime) + @Override + protected DeletionTime applyToDeletion(DeletionTime deletionTime) { return purger.shouldPurge(deletionTime) ? DeletionTime.LIVE : deletionTime; } - public Row applyToStatic(Row row) + @Override + protected Row applyToStatic(Row row) { updateProgress(); return row.purge(purger, nowInSec); } - public Row applyToRow(Row row) + @Override + protected Row applyToRow(Row row) { updateProgress(); return row.purge(purger, nowInSec); } - public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + @Override + protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) { updateProgress(); boolean reversed = isReverseOrder; diff --git a/src/java/org/apache/cassandra/db/rows/BaseRowIterator.java b/src/java/org/apache/cassandra/db/rows/BaseRowIterator.java index fb9e9083eefb..ce372975dc25 100644 --- a/src/java/org/apache/cassandra/db/rows/BaseRowIterator.java +++ b/src/java/org/apache/cassandra/db/rows/BaseRowIterator.java @@ -53,7 +53,7 @@ public interface BaseRowIterator extends CloseableIterator /** * The static part corresponding to this partition (this can be an empty - * row). + * row but cannot be {@code null}). */ public Row staticRow(); diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java index 4fc3e224f93e..53b0eb33048e 100644 --- a/src/java/org/apache/cassandra/db/rows/Row.java +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@ -200,7 +200,8 @@ public interface Row extends Unfiltered, Collection * * @param purger the {@code DeletionPurger} to use to decide what can be purged. * @param nowInSec the current time to decide what is deleted and what isn't (in the case of expired cells). - * @return this row but without any deletion info purged by {@code purger}. + * @return this row but without any deletion info purged by {@code purger}. If the purged row is empty, returns + * {@code null}. */ public Row purge(DeletionPurger purger, int nowInSec); @@ -220,8 +221,14 @@ public interface Row extends Unfiltered, Collection public Row markCounterLocalToBeCleared(); /** - * returns a copy of this row where all live timestamp have been replaced by {@code newTimestamp} and every deletion timestamp - * by {@code newTimestamp - 1}. See {@link Commit} for why we need this. + * Returns a copy of this row where all live timestamp have been replaced by {@code newTimestamp} and every deletion + * timestamp by {@code newTimestamp - 1}. + * + * @param newTimestamp the timestamp to use for all live data in the returned row. + * @param a copy of this row with timestamp updated using {@code newTimestamp}. This can return {@code null} in the + * rare where the row only as a shadowable row deletion and the new timestamp supersedes it. + * + * @see Commit for why we need this. */ public Row updateAllTimestamp(long newTimestamp); diff --git a/src/java/org/apache/cassandra/db/transform/BaseRows.java b/src/java/org/apache/cassandra/db/transform/BaseRows.java index 825db44a089f..10d707b801a9 100644 --- a/src/java/org/apache/cassandra/db/transform/BaseRows.java +++ b/src/java/org/apache/cassandra/db/transform/BaseRows.java @@ -52,7 +52,7 @@ public DecoratedKey partitionKey() public Row staticRow() { - return staticRow; + return staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow; } diff --git a/src/java/org/apache/cassandra/db/transform/Filter.java b/src/java/org/apache/cassandra/db/transform/Filter.java index 3bf831f03c12..14e57c274313 100644 --- a/src/java/org/apache/cassandra/db/transform/Filter.java +++ b/src/java/org/apache/cassandra/db/transform/Filter.java @@ -13,7 +13,8 @@ public Filter(boolean filterEmpty, int nowInSec) this.nowInSec = nowInSec; } - public RowIterator applyToPartition(BaseRowIterator iterator) + @Override + protected RowIterator applyToPartition(BaseRowIterator iterator) { RowIterator filtered = iterator instanceof UnfilteredRows ? new FilteredRows(this, (UnfilteredRows) iterator) @@ -25,7 +26,8 @@ public RowIterator applyToPartition(BaseRowIterator iterator) return filtered; } - public Row applyToStatic(Row row) + @Override + protected Row applyToStatic(Row row) { if (row.isEmpty()) return Rows.EMPTY_STATIC_ROW; @@ -34,12 +36,14 @@ public Row applyToStatic(Row row) return row == null ? Rows.EMPTY_STATIC_ROW : row; } - public Row applyToRow(Row row) + @Override + protected Row applyToRow(Row row) { return row.purge(DeletionPurger.PURGE_ALL, nowInSec); } - public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + @Override + protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) { return null; } diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java index db7487e3a313..ecffbf0ddbb9 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java @@ -278,4 +278,26 @@ public void testAlterClusteringAndStatic() throws Throwable // We shouldn 't allow static when there is not clustering columns assertInvalid("ALTER TABLE %s ADD bar2 text static"); } + + /** + * Ensure that deleting and compacting a static row that should be purged doesn't throw. + * This is a test for #11988. + */ + @Test + public void testStaticColumnPurging() throws Throwable + { + createTable("CREATE TABLE %s (pkey text, ckey text, value text, static_value text static, PRIMARY KEY(pkey, ckey)) WITH gc_grace_seconds = 0"); + + execute("INSERT INTO %s (pkey, ckey, static_value, value) VALUES (?, ?, ?, ?)", "k1", "c1", "s1", "v1"); + + flush(); + + execute("DELETE static_value FROM %s WHERE pkey = ?", "k1"); + + flush(); + + compact(); + + assertRows(execute("SELECT * FROM %s"), row("k1", "c1", null, "v1")); + } } From 0576db93629790247fcb8cf281e413635b656b1f Mon Sep 17 00:00:00 2001 From: Sylvain Lebresne Date: Fri, 29 Jul 2016 12:36:40 +0200 Subject: [PATCH 24/34] NullPointerException during compaction on table with static columns patch by Sylvain Lebresne; reviewed by Carl Yeksigian for CASSANDRA-12336 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/transform/BaseRows.java | 3 ++- .../cassandra/cql3/validation/entities/StaticColumnsTest.java | 2 ++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGES.txt b/CHANGES.txt index 2f8127c3c349..4ad7042b3502 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -2,6 +2,7 @@ * Support multiple folders for user defined compaction tasks (CASSANDRA-11765) * Fix race in CompactionStrategyManager's pause/resume (CASSANDRA-11922) Merged from 3.x: + * NullPointerException during compaction on table with static columns (CASSANDRA-12336) * NullPointerExpception when reading/compacting table (CASSANDRA-11988) Merged from 3.0: * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776) diff --git a/src/java/org/apache/cassandra/db/transform/BaseRows.java b/src/java/org/apache/cassandra/db/transform/BaseRows.java index 10d707b801a9..648879df71eb 100644 --- a/src/java/org/apache/cassandra/db/transform/BaseRows.java +++ b/src/java/org/apache/cassandra/db/transform/BaseRows.java @@ -85,7 +85,8 @@ void add(Transformation transformation) super.add(transformation); // transform any existing data - staticRow = transformation.applyToStatic(staticRow); + if (staticRow != null) + staticRow = transformation.applyToStatic(staticRow); next = applyOne(next, transformation); partitionKey = transformation.applyToPartitionKey(partitionKey); } diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java index ecffbf0ddbb9..74fed6904568 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java @@ -296,6 +296,8 @@ public void testStaticColumnPurging() throws Throwable flush(); + Thread.sleep(1000); + compact(); assertRows(execute("SELECT * FROM %s"), row("k1", "c1", null, "v1")); From 030cc592e8cf1ddc4079fab4570407cb24b058f6 Mon Sep 17 00:00:00 2001 From: Stefania Alborghetti Date: Fri, 23 Sep 2016 13:52:02 +0800 Subject: [PATCH 25/34] Avoid sstable corrupt exception due to dropped static column Patch by Stefania Alborghetti; reviewed by Carl Yeksigian for CASSANDRA-12582 --- CHANGES.txt | 1 + .../org/apache/cassandra/config/CFMetaData.java | 14 ++++++++++++-- .../apache/cassandra/db/SerializationHeader.java | 3 ++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 4ad7042b3502..6037f95111d4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -2,6 +2,7 @@ * Support multiple folders for user defined compaction tasks (CASSANDRA-11765) * Fix race in CompactionStrategyManager's pause/resume (CASSANDRA-11922) Merged from 3.x: + * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582) * NullPointerException during compaction on table with static columns (CASSANDRA-12336) * NullPointerExpception when reading/compacting table (CASSANDRA-11988) Merged from 3.0: diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 689e61a32978..00b16b5f0927 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -674,11 +674,19 @@ public Map getDroppedColumns() return droppedColumns; } + public ColumnDefinition getDroppedColumnDefinition(ByteBuffer name) + { + return getDroppedColumnDefinition(name, false); + } + /** * Returns a "fake" ColumnDefinition corresponding to the dropped column {@code name} * of {@code null} if there is no such dropped column. + * + * @param name - the column name + * @param isStatic - whether the column was a static column, if known */ - public ColumnDefinition getDroppedColumnDefinition(ByteBuffer name) + public ColumnDefinition getDroppedColumnDefinition(ByteBuffer name, boolean isStatic) { DroppedColumn dropped = droppedColumns.get(name); if (dropped == null) @@ -688,7 +696,9 @@ public ColumnDefinition getDroppedColumnDefinition(ByteBuffer name) // it means that it's a dropped column from before 3.0, and in that case using // BytesType is fine for what we'll be using it for, even if that's a hack. AbstractType type = dropped.type == null ? BytesType.instance : dropped.type; - return ColumnDefinition.regularDef(this, name, type); + return isStatic + ? ColumnDefinition.staticDef(this, name, type) + : ColumnDefinition.regularDef(this, name, type); } @Override diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java index af2d43415f03..64accf5d6103 100644 --- a/src/java/org/apache/cassandra/db/SerializationHeader.java +++ b/src/java/org/apache/cassandra/db/SerializationHeader.java @@ -311,7 +311,8 @@ public SerializationHeader toHeader(CFMetaData metadata) // If we don't find the definition, it could be we have data for a dropped column, and we shouldn't // fail deserialization because of that. So we grab a "fake" ColumnDefinition that ensure proper // deserialization. The column will be ignore later on anyway. - column = metadata.getDroppedColumnDefinition(name); + boolean isStatic = staticColumns.containsKey(name); + column = metadata.getDroppedColumnDefinition(name, isStatic); if (column == null) throw new RuntimeException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization"); } From 2cd54679d0cb1f2e2ef7b8de7a16c13fd374461e Mon Sep 17 00:00:00 2001 From: Marcus Eriksson Date: Thu, 23 Jun 2016 09:46:00 +0200 Subject: [PATCH 26/34] Don't try to get sstables for non-repairing column families Patch by marcuse; reviewed by Paulo Motta for CASSANDRA-12077 --- CHANGES.txt | 1 + .../apache/cassandra/service/ActiveRepairService.java | 9 ++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGES.txt b/CHANGES.txt index 6037f95111d4..f2da0dc9dcdc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -2,6 +2,7 @@ * Support multiple folders for user defined compaction tasks (CASSANDRA-11765) * Fix race in CompactionStrategyManager's pause/resume (CASSANDRA-11922) Merged from 3.x: + * Don't try to get sstables for non-repairing column families (CASSANDRA-12077) * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582) * NullPointerException during compaction on table with static columns (CASSANDRA-12336) * NullPointerExpception when reading/compacting table (CASSANDRA-11988) diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 9f249e4a7481..3521e649d736 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -476,7 +476,11 @@ public ParentRepairSession(InetAddress coordinator, List colu public synchronized Refs getActiveRepairedSSTableRefs(UUID cfId) { ImmutableMap.Builder> references = ImmutableMap.builder(); - for (SSTableReader sstable : getActiveSSTables(cfId)) + Iterable sstables = getActiveSSTables(cfId); + if (sstables == null) + throw new RuntimeException("Not possible to get sstables for anticompaction for " + cfId); + + for (SSTableReader sstable : sstables) { Ref ref = sstable.tryRef(); if (ref == null) @@ -489,6 +493,9 @@ public synchronized Refs getActiveRepairedSSTableRefs(UUID cfId) private Set getActiveSSTables(UUID cfId) { + if (!columnFamilyStores.containsKey(cfId)) + return null; + Set repairedSSTables = sstableMap.get(cfId); Set activeSSTables = new HashSet<>(); Set activeSSTableNames = new HashSet<>(); From f866272e2e9ba2f39bd480c986e4c0711c689044 Mon Sep 17 00:00:00 2001 From: Kurt Date: Mon, 21 Nov 2016 01:49:11 +0000 Subject: [PATCH 27/34] Bump version to 3.7.2 --- build.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.xml b/build.xml index f5040cd46eb9..bb8561970ab4 100644 --- a/build.xml +++ b/build.xml @@ -25,7 +25,7 @@ - + From b5587f41e711b07a9392309c179c4d5746db1405 Mon Sep 17 00:00:00 2001 From: Kurt Date: Wed, 23 Nov 2016 00:35:54 +0000 Subject: [PATCH 28/34] Updated list of backports --- README.asc | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.asc b/README.asc index a8d0e200b975..5de01dee8e28 100644 --- a/README.asc +++ b/README.asc @@ -32,6 +32,14 @@ We will keep an eye on the issues page of this github repository and try to help * https://issues.apache.org/jira/browse/CASSANDRA-11604[11604: select on table fails after changing user defined type in map] * https://issues.apache.org/jira/browse/CASSANDRA-11984[11984: StorageService shutdown hook should use a volatile variable] * https://issues.apache.org/jira/browse/CASSANDRA-12083[12083: Race condition during system.roles column family creation] +* https://issues.apache.org/jira/browse/CASSANDRA-11988[11988: NullPointerExpception when reading/compacting table] +* https://issues.apache.org/jira/browse/CASSANDRA-12336[12336: NullPointerException during compaction on table with static columns] +* https://issues.apache.org/jira/browse/CASSANDRA-12582[12582: Avoid sstable corrupt exception due to dropped static column] +* https://issues.apache.org/jira/browse/CASSANDRA-12786[12786: Split consistent range movement flag correction] +* https://issues.apache.org/jira/browse/CASSANDRA-12765[12765: Don't skip sstables based on maxLocalDeletionTime] +* https://issues.apache.org/jira/browse/CASSANDRA-12776[12776: Correct log message for statistics of offheap memtable flush] +* https://issues.apache.org/jira/browse/CASSANDRA-12329[12329: Fix unreleased resource sockets] +* https://issues.apache.org/jira/browse/CASSANDRA-12330[12330: Fix unreleased resource sockets] We are also working to backport additional patches that we see as required, however this is an ongoing process and there will always be fixes that "need" to be backported that we haven't done yet. We will tag commits with a minor version number for releases that we are ourselves running in production clusters. E.g. 3.7.1. Build artefacts will not be release builds and as such you will see this in the version numbers reported in various places by Cassandra. From a37f622adf866d48ceb1018b44c64ed6de112d2e Mon Sep 17 00:00:00 2001 From: Mahdi Mohammadi Date: Sun, 5 Jun 2016 05:21:32 +0800 Subject: [PATCH 29/34] Cache local ranges when calculating repair neighbors patch by Mahdi Mohammadi; reviewed by Paulo Motta for CASSANDRA-11933 --- CHANGES.txt | 1 + .../cassandra/repair/RepairRunnable.java | 7 ++++++- .../cassandra/service/ActiveRepairService.java | 7 +++++-- .../service/ActiveRepairServiceTest.java | 18 ++++++++++-------- 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index f2da0dc9dcdc..1b92a81aeb07 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -2,6 +2,7 @@ * Support multiple folders for user defined compaction tasks (CASSANDRA-11765) * Fix race in CompactionStrategyManager's pause/resume (CASSANDRA-11922) Merged from 3.x: + * Cache local ranges when calculating repair neighbors (CASSANDRA-11933) * Don't try to get sstables for non-repairing column families (CASSANDRA-12077) * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582) * NullPointerException during compaction on table with static columns (CASSANDRA-12336) diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 26c9f5a77675..3cb850429f7b 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -148,11 +148,16 @@ protected void runMayThrow() throws Exception final Set allNeighbors = new HashSet<>(); List, ? extends Collection>>> commonRanges = new ArrayList<>(); + + //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent + //calculation multiple times + Collection> keyspaceLocalRanges = storageService.getLocalRanges(keyspace); + try { for (Range range : options.getRanges()) { - Set neighbors = ActiveRepairService.getNeighbors(keyspace, range, + Set neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range, options.getDataCenters(), options.getHosts()); diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 3521e649d736..a3a1511590b6 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -177,17 +177,20 @@ public synchronized void terminateSessions() * Return all of the neighbors with whom we share the provided range. * * @param keyspaceName keyspace to repair + * @param keyspaceLocalRanges local-range for given keyspaceName * @param toRepair token to repair * @param dataCenters the data centers to involve in the repair * * @return neighbors with whom we share the provided range */ - public static Set getNeighbors(String keyspaceName, Range toRepair, Collection dataCenters, Collection hosts) + public static Set getNeighbors(String keyspaceName, Collection> keyspaceLocalRanges, + Range toRepair, Collection dataCenters, + Collection hosts) { StorageService ss = StorageService.instance; Map, List> replicaSets = ss.getRangeToAddressMap(keyspaceName); Range rangeSuperSet = null; - for (Range range : ss.getLocalRanges(keyspaceName)) + for (Range range : keyspaceLocalRanges) { if (range.contains(toRepair)) { diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index db751cf5b05b..da067fdd6318 100644 --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@ -97,7 +97,7 @@ public void testGetNeighborsPlusOne() throws Throwable Set neighbors = new HashSet<>(); for (Range range : ranges) { - neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, null, null)); + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null)); } assertEquals(expected, neighbors); } @@ -120,7 +120,7 @@ public void testGetNeighborsTimesTwo() throws Throwable Set neighbors = new HashSet<>(); for (Range range : ranges) { - neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, null, null)); + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null)); } assertEquals(expected, neighbors); } @@ -142,7 +142,7 @@ public void testGetNeighborsPlusOneInLocalDC() throws Throwable Set neighbors = new HashSet<>(); for (Range range : ranges) { - neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null)); + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null)); } assertEquals(expected, neighbors); } @@ -170,7 +170,7 @@ public void testGetNeighborsTimesTwoInLocalDC() throws Throwable Set neighbors = new HashSet<>(); for (Range range : ranges) { - neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null)); + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null)); } assertEquals(expected, neighbors); } @@ -191,10 +191,11 @@ public void testGetNeighborsTimesTwoInSpecifiedHosts() throws Throwable expected.remove(FBUtilities.getBroadcastAddress()); Collection hosts = Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName()); + Collection> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); - assertEquals(expected.get(0), ActiveRepairService.getNeighbors(KEYSPACE5, - StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(), - null, hosts).iterator().next()); + assertEquals(expected.get(0), ActiveRepairService.getNeighbors(KEYSPACE5, ranges, + ranges.iterator().next(), + null, hosts).iterator().next()); } @Test(expected = IllegalArgumentException.class) @@ -203,7 +204,8 @@ public void testGetNeighborsSpecifiedHostsWithNoLocalHost() throws Throwable addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); //Dont give local endpoint Collection hosts = Arrays.asList("127.0.0.3"); - ActiveRepairService.getNeighbors(KEYSPACE5, StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(), null, hosts); + Collection> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + ActiveRepairService.getNeighbors(KEYSPACE5, ranges, ranges.iterator().next(), null, hosts); } Set addTokens(int max) throws Throwable From 1c8ee0fed79234a8c8d1140f484b7e34bec7bd6f Mon Sep 17 00:00:00 2001 From: Alex Lourie Date: Mon, 20 Mar 2017 12:36:17 +1030 Subject: [PATCH 30/34] Exception when computing read-repair for range tombstones (#5) patch by Sylvain Lebresne; reviewed by Branimir Lambov for CASSANDRA-12263 updated by Alex Lourie: * changed 'Slice.Bound' calls to 'ClusteringBound' * update to DataResolverTest.java to use 'ClusteringBound.Kind' instead of ''RangeTombstone.Bound.Kind'; --- CHANGES.txt | 1 + .../db/rows/UnfilteredRowIterators.java | 45 +++++- .../cassandra/service/DataResolver.java | 102 ++++++++++++-- .../cassandra/service/DataResolverTest.java | 128 +++++++++++++++++- 4 files changed, 260 insertions(+), 16 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 1b92a81aeb07..0914edd708d9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -2,6 +2,7 @@ * Support multiple folders for user defined compaction tasks (CASSANDRA-11765) * Fix race in CompactionStrategyManager's pause/resume (CASSANDRA-11922) Merged from 3.x: + * Exception when computing read-repair for range tombstones (CASSANDRA-12263) * Cache local ranges when calculating repair neighbors (CASSANDRA-11933) * Don't try to get sstables for non-repairing column families (CASSANDRA-12077) * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582) diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index f6bf21844deb..6fcf2cf66260 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -46,11 +46,52 @@ public abstract class UnfilteredRowIterators private UnfilteredRowIterators() {} + /** + * Interface for a listener interested in the result of merging multiple versions of a given row. + *

+ * Implementors of this interface are given enough information that they can easily reconstruct the difference + * between the merged result and each individual input. This is used when reconciling results on replias for + * instance to figure out what to send as read-repair to each source. + */ public interface MergeListener { + /** + * Called once for the merged partition. + * + * @param mergedDeletion the partition level deletion for the merged partition. Implementors can test if the + * merged partition actually has a partition level deletion or not by calling {@code mergedDeletion.isLive()}. + * @param versions the partition level deletion for the sources of the merge. Elements of the array will never + * be null, but be "live". + **/ public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions); + /** + * Called once for every row participating in the merge. + *

+ * Note that this is called for every clustering where at least one of the source merged has a row. In + * particular, this may be called in cases where there is no row in the merged output (if a source has a row + * that is shadowed by another source range tombstone or partition level deletion). + * + * @param merged the result of the merge. This cannot be {@code null} but can be empty, in which case this is a + * placeholder for when at least one source has a row, but that row is shadowed in the merged output. + * @param versions for each source, the row in that source corresponding to {@code merged}. This can be + * {@code null} for some sources if the source has not such row. + */ public void onMergedRows(Row merged, Row[] versions); + + /** + * Called once for every range tombstone marker participating in the merge. + *

+ * Note that this is called for every "clustering position" where at least one of the source merged has a range + * tombstone marker. + * + * @param merged the marker in the merged output. This can be {@code null} if there is no such marker, which + * means that at least one source has a marker in {@code versions} but the merged out has nothing corresponding + * (this basically mean the merged output has a currently open deletion that shadows whatever marker the source + * had). + * @param versions the marker for each source merged. This can be {@code null} for some source if that source + * has not such marker. + */ public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions); public void close(); @@ -361,7 +402,7 @@ private static DeletionTime collectPartitionLevelDeletion(List= timestamp2)); + + // Message to peer2 contains peer1 ranges + assertRepairContainsDeletions(msg2, null, one_two, withExclusiveEndIf(three_four, timestamp2 >= timestamp1), five_six); + } + + // Forces the start to be exclusive if the condition holds + private static RangeTombstone withExclusiveStartIf(RangeTombstone rt, boolean condition) + { + if (!condition) + return rt; + + Slice slice = rt.deletedSlice(); + ClusteringBound newStart = ClusteringBound.create(Kind.EXCL_START_BOUND, slice.start().getRawValues()); + return condition + ? new RangeTombstone(Slice.make(newStart, slice.end()), rt.deletionTime()) + : rt; + } + + // Forces the end to be exclusive if the condition holds + private static RangeTombstone withExclusiveEndIf(RangeTombstone rt, boolean condition) + { + if (!condition) + return rt; + + Slice slice = rt.deletedSlice(); + ClusteringBound newEnd = ClusteringBound.create(Kind.EXCL_END_BOUND, slice.end().getRawValues()); + return condition + ? new RangeTombstone(Slice.make(slice.start(), newEnd), rt.deletionTime()) + : rt; + } + private static ByteBuffer bb(int b) { return ByteBufferUtil.bytes(b); @@ -667,7 +775,10 @@ private void assertRepairContainsDeletions(MessageOut message, int i = 0; while (ranges.hasNext()) { - assertEquals(ranges.next(), rangeTombstones[i++]); + RangeTombstone expected = rangeTombstones[i++]; + RangeTombstone actual = ranges.next(); + String msg = String.format("Expected %s, but got %s", expected.toString(cfm.comparator), actual.toString(cfm.comparator)); + assertEquals(msg, expected, actual); } } @@ -721,8 +832,17 @@ public MessageIn readResponseMessage(InetAddress from, UnfilteredP private RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime) { - return new RangeTombstone(Slice.make(cfm.comparator.make(start), cfm.comparator.make(end)), - new DeletionTime(markedForDeleteAt, localDeletionTime)); + return tombstone(start, true, end, true, markedForDeleteAt, localDeletionTime); + } + + private RangeTombstone tombstone(Object start, boolean inclusiveStart, Object end, boolean inclusiveEnd, long markedForDeleteAt, int localDeletionTime) + { + Kind startKind = inclusiveStart ? Kind.INCL_START_BOUND : Kind.EXCL_START_BOUND; + Kind endKind = inclusiveEnd ? Kind.INCL_END_BOUND : Kind.EXCL_END_BOUND; + + ClusteringBound startBound = ClusteringBound.create(startKind, cfm.comparator.make(start).getRawValues()); + ClusteringBound endBound = ClusteringBound.create(endKind, cfm.comparator.make(end).getRawValues()); + return new RangeTombstone(Slice.make(startBound, endBound), new DeletionTime(markedForDeleteAt, localDeletionTime)); } private UnfilteredPartitionIterator fullPartitionDelete(CFMetaData cfm, DecoratedKey dk, long timestamp, int nowInSec) From fe0f7dc42142fa7899851f17c0af3fdd71b5cdbd Mon Sep 17 00:00:00 2001 From: Alex Lourie Date: Mon, 20 Mar 2017 13:03:38 +1030 Subject: [PATCH 31/34] Bump version to 3.7.3 Signed-off-by: Alex Lourie --- build.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.xml b/build.xml index bb8561970ab4..baadd4e31392 100644 --- a/build.xml +++ b/build.xml @@ -25,7 +25,7 @@ - + From 4b5f9d2a324d0288e2bbbb171826efa1bf075537 Mon Sep 17 00:00:00 2001 From: Alex Lourie Date: Mon, 20 Mar 2017 13:08:06 +1030 Subject: [PATCH 32/34] Update README with added fixes Signed-off-by: Alex Lourie --- README.asc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.asc b/README.asc index 5de01dee8e28..a1b80de39f71 100644 --- a/README.asc +++ b/README.asc @@ -27,6 +27,7 @@ We will keep an eye on the issues page of this github repository and try to help === Current patches backported * https://issues.apache.org/jira/browse/CASSANDRA-11882[11882: Clustering Key with ByteBuffer size > 64k throws Assertion Error] * https://issues.apache.org/jira/browse/CASSANDRA-11345[11345: Assertion Errors "Memory was freed" during streaming] +* https://issues.apache.org/jira/browse/CASSANDRA-11933[11933: Cache local ranges when calculating repair neighbours] * https://issues.apache.org/jira/browse/CASSANDRA-11996[11996: SSTableSet.CANONICAL can miss sstables] * https://issues.apache.org/jira/browse/CASSANDRA-12247[12247: AssertionError with MVs on updating a row that isn't indexed due to a null value] * https://issues.apache.org/jira/browse/CASSANDRA-11604[11604: select on table fails after changing user defined type in map] @@ -40,6 +41,7 @@ We will keep an eye on the issues page of this github repository and try to help * https://issues.apache.org/jira/browse/CASSANDRA-12776[12776: Correct log message for statistics of offheap memtable flush] * https://issues.apache.org/jira/browse/CASSANDRA-12329[12329: Fix unreleased resource sockets] * https://issues.apache.org/jira/browse/CASSANDRA-12330[12330: Fix unreleased resource sockets] +* https://issues.apache.org/jira/browse/CASSANDRA-12263[12263: Exception when computing read-repair for range tombstones] We are also working to backport additional patches that we see as required, however this is an ongoing process and there will always be fixes that "need" to be backported that we haven't done yet. We will tag commits with a minor version number for releases that we are ourselves running in production clusters. E.g. 3.7.1. Build artefacts will not be release builds and as such you will see this in the version numbers reported in various places by Cassandra. From 7cd41c4ce6d259e7ce2e1befd1f7742a48d77f5f Mon Sep 17 00:00:00 2001 From: Ben Slater Date: Thu, 4 May 2017 16:53:38 +1000 Subject: [PATCH 33/34] Updated read.me with EOL for LTS release. --- README.asc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.asc b/README.asc index a1b80de39f71..f61d334666a2 100644 --- a/README.asc +++ b/README.asc @@ -2,6 +2,8 @@ This is repository contains Apache Cassandra 3.7 with a number of bugfixes from later versions of Apache Cassandra we have backported to 3.7. If you want to know more and how to build and install see the official https://github.com/apache/cassandra/blob/cassandra-3.7/README.asc[Apache Cassandra README] +*With the Apache Cassandra now focusing on stability in the 3.x release series (and releases since 3.8 being largely stability focused) we think the need for this LTS is greatly reduced and thus recommend using Apache Cassandra 3.10 or later rather than this release. We will likely make one or two more releases of this LTS and then discontinue maintenance.* + = FAQ === Is this a fork? No, This is just Cassandra with a different release cadence for those who want 3.x features but are slightly more risk averse than the current schedule allows. From e054345cb0c1057c4ac8206f648c3d18793d753a Mon Sep 17 00:00:00 2001 From: Cameron Zemek Date: Mon, 8 May 2017 10:11:50 +1000 Subject: [PATCH 34/34] Fail repair if insufficient responses received (CASSANDRA-13397) --- README.asc | 1 + .../service/ActiveRepairService.java | 21 ++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/README.asc b/README.asc index f61d334666a2..b6d82806fda8 100644 --- a/README.asc +++ b/README.asc @@ -44,6 +44,7 @@ We will keep an eye on the issues page of this github repository and try to help * https://issues.apache.org/jira/browse/CASSANDRA-12329[12329: Fix unreleased resource sockets] * https://issues.apache.org/jira/browse/CASSANDRA-12330[12330: Fix unreleased resource sockets] * https://issues.apache.org/jira/browse/CASSANDRA-12263[12263: Exception when computing read-repair for range tombstones] +* https://issues.apache.org/jira/browse/CASSANDRA-13397[13397: Return value of CountDownLatch.await() not being checked in Repair] We are also working to backport additional patches that we see as required, however this is an ongoing process and there will always be fixes that "need" to be backported that we haven't done yet. We will tag commits with a minor version number for releases that we are ourselves running in production clusters. E.g. 3.7.1. Build artefacts will not be release builds and as such you will see this in the version numbers reported in various places by Cassandra. diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index a3a1511590b6..f2ed17e1f440 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -297,30 +297,35 @@ public void onFailure(InetAddress from) } else { - status.set(false); - failedNodes.add(neighbour.getHostAddress()); - prepareLatch.countDown(); + // bailout early to avoid potentially waiting for a long time. + failRepair(parentRepairSession, "Endpoint not alive: " + neighbour); } } try { - prepareLatch.await(1, TimeUnit.HOURS); + // Failed repair is expensive so we wait for longer time. + if (!prepareLatch.await(1, TimeUnit.HOURS)) { + failRepair(parentRepairSession, "Did not get replies from all endpoints."); + } } catch (InterruptedException e) { - parentRepairSessions.remove(parentRepairSession); - throw new RuntimeException("Did not get replies from all endpoints. List of failed endpoint(s): " + failedNodes.toString(), e); + failRepair(parentRepairSession, "Interrupted while waiting for prepare repair response."); } if (!status.get()) { - parentRepairSessions.remove(parentRepairSession); - throw new RuntimeException("Did not get positive replies from all endpoints. List of failed endpoint(s): " + failedNodes.toString()); + failRepair(parentRepairSession, "Got negative replies from endpoints " + failedNodes); } return parentRepairSession; } + private void failRepair(UUID parentRepairSession, String errorMsg) { + removeParentRepairSession(parentRepairSession); + throw new RuntimeException(errorMsg); + } + public void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List columnFamilyStores, Collection> ranges, boolean isIncremental, long timestamp, boolean isGlobal) { if (!registeredForEndpointChanges)