Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
912 changes: 556 additions & 356 deletions .circleci/config.yml

Large diffs are not rendered by default.

35 changes: 18 additions & 17 deletions src/java/org/apache/cassandra/net/ParamType.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.Int32Serializer;
import org.apache.cassandra.utils.Int64Serializer;
import org.apache.cassandra.utils.RangesSerializer;
import org.apache.cassandra.utils.TimeUUID;

import static java.lang.Math.max;
Expand All @@ -42,30 +43,30 @@
*/
public enum ParamType
{
FORWARD_TO (0, "FWD_TO", ForwardingInfo.serializer),
RESPOND_TO (1, "FWD_FRM", fwdFrmSerializer),
FORWARD_TO (0, "FWD_TO", ForwardingInfo.serializer),
RESPOND_TO (1, "FWD_FRM", fwdFrmSerializer),

@Deprecated
FAILURE_RESPONSE (2, "FAIL", LegacyFlag.serializer),
FAILURE_RESPONSE (2, "FAIL", LegacyFlag.serializer),
@Deprecated
FAILURE_REASON (3, "FAIL_REASON", RequestFailureReason.serializer),
FAILURE_REASON (3, "FAIL_REASON", RequestFailureReason.serializer),
@Deprecated
FAILURE_CALLBACK (4, "CAL_BAC", LegacyFlag.serializer),
FAILURE_CALLBACK (4, "CAL_BAC", LegacyFlag.serializer),

TRACE_SESSION (5, "TraceSession", TimeUUID.Serializer.instance),
TRACE_TYPE (6, "TraceType", Tracing.traceTypeSerializer),
TRACE_SESSION (5, "TraceSession", TimeUUID.Serializer.instance),
TRACE_TYPE (6, "TraceType", Tracing.traceTypeSerializer),

@Deprecated
TRACK_REPAIRED_DATA (7, "TrackRepaired", LegacyFlag.serializer),

TOMBSTONE_FAIL(8, "TSF", Int32Serializer.serializer),
TOMBSTONE_WARNING(9, "TSW", Int32Serializer.serializer),
LOCAL_READ_SIZE_FAIL(10, "LRSF", Int64Serializer.serializer),
LOCAL_READ_SIZE_WARN(11, "LRSW", Int64Serializer.serializer),
ROW_INDEX_READ_SIZE_FAIL(12, "RIRSF", Int64Serializer.serializer),
ROW_INDEX_READ_SIZE_WARN(13, "RIRSW", Int64Serializer.serializer),

CUSTOM_MAP (14, "CUSTOM", CustomParamsSerializer.serializer);
TRACK_REPAIRED_DATA (7, "TrackRepaired", LegacyFlag.serializer),

TOMBSTONE_FAIL (8, "TSF", Int32Serializer.serializer),
TOMBSTONE_WARNING (9, "TSW", Int32Serializer.serializer),
LOCAL_READ_SIZE_FAIL (10, "LRSF", Int64Serializer.serializer),
LOCAL_READ_SIZE_WARN (11, "LRSW", Int64Serializer.serializer),
ROW_INDEX_READ_SIZE_FAIL (12, "RIRSF", Int64Serializer.serializer),
ROW_INDEX_READ_SIZE_WARN (13, "RIRSW", Int64Serializer.serializer),
CUSTOM_MAP (14, "CUSTOM", CustomParamsSerializer.serializer),
SNAPSHOT_RANGES (15, "SNAPSHOT_RANGES", RangesSerializer.serializer);

final int id;
@Deprecated final String legacyAlias; // pre-4.0 we used to serialize entire param name string
Expand Down
17 changes: 13 additions & 4 deletions src/java/org/apache/cassandra/repair/PreviewRepairTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.RepairMetrics;
import org.apache.cassandra.repair.consistent.SyncStatSummary;
Expand Down Expand Up @@ -104,14 +106,18 @@ private void maybeSnapshotReplicas(TimeUUID parentSession, String keyspace, List
{
Set<String> mismatchingTables = new HashSet<>();
Set<InetAddressAndPort> nodes = new HashSet<>();
Set<Range<Token>> ranges = new HashSet<>();
for (RepairSessionResult sessionResult : results)
{
for (RepairResult repairResult : emptyIfNull(sessionResult.repairJobResults))
{
for (SyncStat stat : emptyIfNull(repairResult.stats))
{
if (stat.numberOfDifferences > 0)
if (!stat.differences.isEmpty())
{
mismatchingTables.add(repairResult.desc.columnFamily);
ranges.addAll(stat.differences);
}
// snapshot all replicas, even if they don't have any differences
nodes.add(stat.nodes.coordinator);
nodes.add(stat.nodes.peer);
Expand All @@ -125,10 +131,13 @@ private void maybeSnapshotReplicas(TimeUUID parentSession, String keyspace, List
// we can just check snapshot existence locally since the repair coordinator is always a replica (unlike in the read case)
if (!Keyspace.open(keyspace).getColumnFamilyStore(table).snapshotExists(snapshotName))
{
logger.info("{} Snapshotting {}.{} for preview repair mismatch with tag {} on instances {}",
List<Range<Token>> normalizedRanges = Range.normalize(ranges);
logger.info("{} Snapshotting {}.{} for preview repair mismatch for ranges {} with tag {} on instances {}",
options.getPreviewKind().logPrefix(parentSession),
keyspace, table, snapshotName, nodes);
DiagnosticSnapshotService.repairedDataMismatch(Keyspace.open(keyspace).getColumnFamilyStore(table).metadata(), nodes);
keyspace, table, normalizedRanges, snapshotName, nodes);
DiagnosticSnapshotService.repairedDataMismatch(Keyspace.open(keyspace).getColumnFamilyStore(table).metadata(),
nodes,
normalizedRanges);
}
else
{
Expand Down
15 changes: 9 additions & 6 deletions src/java/org/apache/cassandra/repair/SyncStat.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
*/
package org.apache.cassandra.repair;

import java.util.Collection;
import java.util.List;

import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.streaming.SessionSummary;

/**
Expand All @@ -27,23 +30,23 @@
public class SyncStat
{
public final SyncNodePair nodes;
public final long numberOfDifferences; // TODO: revert to Range<Token>
public final Collection<Range<Token>> differences;
public final List<SessionSummary> summaries;

public SyncStat(SyncNodePair nodes, long numberOfDifferences)
public SyncStat(SyncNodePair nodes, Collection<Range<Token>> differences)
{
this(nodes, numberOfDifferences, null);
this(nodes, differences, null);
}

public SyncStat(SyncNodePair nodes, long numberOfDifferences, List<SessionSummary> summaries)
public SyncStat(SyncNodePair nodes, Collection<Range<Token>> differences, List<SessionSummary> summaries)
{
this.nodes = nodes;
this.numberOfDifferences = numberOfDifferences;
this.summaries = summaries;
this.differences = differences;
}

public SyncStat withSummaries(List<SessionSummary> summaries)
{
return new SyncStat(nodes, numberOfDifferences, summaries);
return new SyncStat(nodes, differences, summaries);
}
}
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/repair/SyncTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected SyncTask(RepairJobDesc desc, InetAddressAndPort primaryEndpoint, InetA
this.rangesToSync = rangesToSync;
this.nodePair = new SyncNodePair(primaryEndpoint, peer);
this.previewKind = previewKind;
this.stat = new SyncStat(nodePair, rangesToSync.size());
this.stat = new SyncStat(nodePair, rangesToSync);
}

protected abstract void startSync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import com.google.common.collect.Lists;

import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.repair.RepairResult;
import org.apache.cassandra.repair.RepairSessionResult;
import org.apache.cassandra.repair.SyncStat;
Expand All @@ -50,7 +55,7 @@ private static class Session

int files = 0;
long bytes = 0;
long ranges = 0;
Set<Range<Token>> ranges = new HashSet<>();

Session(InetSocketAddress src, InetSocketAddress dst)
{
Expand All @@ -64,15 +69,15 @@ void consumeSummary(StreamSummary summary)
bytes += summary.totalSize;
}

void consumeSummaries(Collection<StreamSummary> summaries, long numRanges)
void consumeSummaries(Collection<StreamSummary> summaries, Collection<Range<Token>> ranges)
{
summaries.forEach(this::consumeSummary);
ranges += numRanges;
this.ranges.addAll(ranges);
}

public String toString()
{
return String.format("%s -> %s: %s ranges, %s sstables, %s bytes", src, dst, ranges, files, FBUtilities.prettyPrintMemory(bytes));
return String.format("%s -> %s: %s ranges, %s sstables, %s bytes", src, dst, ranges.size(), files, FBUtilities.prettyPrintMemory(bytes));
}
}

Expand All @@ -84,7 +89,7 @@ private static class Table

int files = -1;
long bytes = -1;
int ranges = -1;
Collection<Range<Token>> ranges = new HashSet<>();
boolean totalsCalculated = false;

final Map<Pair<InetSocketAddress, InetSocketAddress>, Session> sessions = new HashMap<>();
Expand All @@ -109,8 +114,8 @@ void consumeStat(SyncStat stat)
{
for (SessionSummary summary: stat.summaries)
{
getOrCreate(summary.coordinator, summary.peer).consumeSummaries(summary.sendingSummaries, stat.numberOfDifferences);
getOrCreate(summary.peer, summary.coordinator).consumeSummaries(summary.receivingSummaries, stat.numberOfDifferences);
getOrCreate(summary.coordinator, summary.peer).consumeSummaries(summary.sendingSummaries, stat.differences);
getOrCreate(summary.peer, summary.coordinator).consumeSummaries(summary.receivingSummaries, stat.differences);
}
}

Expand All @@ -123,12 +128,12 @@ void calculateTotals()
{
files = 0;
bytes = 0;
ranges = 0;
ranges = new HashSet<>();
for (Session session: sessions.values())
{
files += session.files;
bytes += session.bytes;
ranges += session.ranges;
ranges.addAll(session.ranges);
}
totalsCalculated = true;
}
Expand All @@ -147,21 +152,36 @@ public String toString()
}
StringBuilder output = new StringBuilder();

output.append(String.format("%s.%s - %s ranges, %s sstables, %s bytes\n", keyspace, table, ranges, files, FBUtilities.prettyPrintMemory(bytes)));
output.append(String.format("%s.%s - %s ranges, %s sstables, %s bytes\n", keyspace, table, ranges.size(), files, FBUtilities.prettyPrintMemory(bytes)));
if (ranges.size() > 0)
{
output.append(" Mismatching ranges: ");
int i = 0;
Iterator<Range<Token>> rangeIterator = ranges.iterator();
while (rangeIterator.hasNext() && i < 30)
{
Range<Token> r = rangeIterator.next();
output.append('(').append(r.left).append(',').append(r.right).append("],");
i++;
}
if (i == 30)
output.append("...");
output.append(System.lineSeparator());
}
for (Session session: sessions.values())
{
output.append(" ").append(session.toString()).append('\n');
output.append(" ").append(session.toString()).append(System.lineSeparator());
}
return output.toString();
}
}

private Map<Pair<String, String>, Table> summaries = new HashMap<>();
private final Map<Pair<String, String>, Table> summaries = new HashMap<>();
private final boolean isEstimate;

private int files = -1;
private long bytes = -1;
private int ranges = -1;
private Set<Range<Token>> ranges = new HashSet<>();
private boolean totalsCalculated = false;

public SyncStatSummary(boolean isEstimate)
Expand Down Expand Up @@ -190,14 +210,14 @@ public void consumeSessionResults(Optional<List<RepairSessionResult>> results)
public boolean isEmpty()
{
calculateTotals();
return files == 0 && bytes == 0 && ranges == 0;
return files == 0 && bytes == 0 && ranges.isEmpty();
}

private void calculateTotals()
{
files = 0;
bytes = 0;
ranges = 0;
ranges = new HashSet<>();
summaries.values().forEach(Table::calculateTotals);
for (Table table: summaries.values())
{
Expand All @@ -208,7 +228,7 @@ private void calculateTotals()
table.calculateTotals();
files += table.files;
bytes += table.bytes;
ranges += table.ranges;
ranges.addAll(table.ranges);
}
totalsCalculated = true;
}
Expand All @@ -228,11 +248,11 @@ public String toString()

if (isEstimate)
{
output.append(String.format("Total estimated streaming: %s ranges, %s sstables, %s bytes\n", ranges, files, FBUtilities.prettyPrintMemory(bytes)));
output.append(String.format("Total estimated streaming: %s ranges, %s sstables, %s bytes\n", ranges.size(), files, FBUtilities.prettyPrintMemory(bytes)));
}
else
{
output.append(String.format("Total streaming: %s ranges, %s sstables, %s bytes\n", ranges, files, FBUtilities.prettyPrintMemory(bytes)));
output.append(String.format("Total streaming: %s ranges, %s sstables, %s bytes\n", ranges.size(), files, FBUtilities.prettyPrintMemory(bytes)));
}

for (Pair<String, String> tableName: tables)
Expand Down
14 changes: 12 additions & 2 deletions src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,23 @@
*/
package org.apache.cassandra.service;

import java.util.Collections;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.SnapshotCommand;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SnapshotCommand;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.DiagnosticSnapshotService;

import static org.apache.cassandra.net.ParamType.SNAPSHOT_RANGES;

public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand>
{
public static final SnapshotVerbHandler instance = new SnapshotVerbHandler();
Expand All @@ -41,7 +48,10 @@ public void doVerb(Message<SnapshotCommand> message)
}
else if (DiagnosticSnapshotService.isDiagnosticSnapshotRequest(command))
{
DiagnosticSnapshotService.snapshot(command, message.from());
List<Range<Token>> ranges = Collections.emptyList();
if (message.header.params().containsKey(SNAPSHOT_RANGES))
ranges = (List<Range<Token>>) message.header.params().get(SNAPSHOT_RANGES);
DiagnosticSnapshotService.snapshot(command, ranges, message.from());
}
else
{
Expand Down
Loading