Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 107 additions & 107 deletions .circleci/config.yml

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
<<<<<<< HEAD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge conflict

5.1
* Add support for the BETWEEN operator in WHERE clauses (CASSANDRA-19604)
* Replace Stream iteration with for-loop for SimpleRestriction::bindAndGetClusteringElements (CASSANDRA-19679)
Expand Down Expand Up @@ -46,6 +47,7 @@
* Add the ability to disable bulk loading of SSTables (CASSANDRA-18781)
* Clean up obsolete functions and simplify cql_version handling in cqlsh (CASSANDRA-18787)
Merged from 5.0:
* Refresh stale paxos commit (CASSANDRA-19617)
* Replace Stream iteration with for-loop for StorageProxy::updateCoordinatorWriteLatencyTableMetric (CASSANDRA-19676)
* Enforce metric naming contract if scope is used in a metric name (CASSANDRA-19619)
* Avoid reading of the same IndexInfo from disk many times for a large partition (CASSANDRA-19557)
Expand Down
15 changes: 9 additions & 6 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -3315,13 +3315,16 @@ public static ColumnFamilyStore getIfExists(TableId id)
if (metadata == null)
return null;

Keyspace keyspace = Keyspace.open(metadata.keyspace);
if (keyspace == null)
return null;
return getIfExists(metadata);
}

return keyspace.hasColumnFamilyStore(id)
? keyspace.getColumnFamilyStore(id)
: null;
/**
* Returns a ColumnFamilyStore by metadata if it exists, null otherwise
* Differently from others, this method does not throw exception if the table does not exist.
*/
public static ColumnFamilyStore getIfExists(TableMetadata table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add @Nullable?

{
return Keyspace.openAndGetStoreIfExists(table);
}

/**
Expand Down
13 changes: 13 additions & 0 deletions src/java/org/apache/cassandra/db/Keyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ public static ColumnFamilyStore openAndGetStore(TableMetadata table)
return open(table.keyspace).getColumnFamilyStore(table.id);
}

public static ColumnFamilyStore openAndGetStoreIfExists(TableMetadata table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add @Nullable?

{
Keyspace keyspace = open(table.keyspace);
if (keyspace == null)
return null;
return keyspace.getIfExists(table.id);
}

/**
* Removes every SSTable in the directory from the appropriate Tracker's view.
* @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
Expand Down Expand Up @@ -202,6 +210,11 @@ public ColumnFamilyStore getColumnFamilyStore(TableId id)
return cfs;
}

public ColumnFamilyStore getIfExists(TableId id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add @Nullable?

{
return columnFamilyStores.get(id);
}

public boolean hasColumnFamilyStore(TableId id)
{
return columnFamilyStores.containsKey(id);
Expand Down
31 changes: 29 additions & 2 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
import static java.lang.String.format;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.apache.cassandra.config.Config.PaxosStatePurging.legacy;
import static org.apache.cassandra.config.DatabaseDescriptor.paxosStatePurging;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
Expand Down Expand Up @@ -1362,6 +1363,8 @@ public static Set<String> allKnownDatacenters()

/**
* Load the current paxos state for the table and key
*
* NOTE: nowInSec is typically provided as zero, and should not be assumed to be definitive, as the cache may apply different nowInSec filters
*/
public static PaxosState.Snapshot loadPaxosState(DecoratedKey partitionKey, TableMetadata metadata, long nowInSec)
{
Expand All @@ -1373,14 +1376,38 @@ public static PaxosState.Snapshot loadPaxosState(DecoratedKey partitionKey, Tabl
return new PaxosState.Snapshot(Ballot.none(), Ballot.none(), null, noneCommitted);
}

long purgeBefore = 0;
long overrideTtlSeconds = 0;
switch (paxosStatePurging())
{
default: throw new AssertionError();
case legacy:
case gc_grace:
overrideTtlSeconds = metadata.params.gcGraceSeconds;
if (nowInSec > 0)
purgeBefore = TimeUnit.SECONDS.toMicros(nowInSec - overrideTtlSeconds);
break;

case repaired:
ColumnFamilyStore cfs = Keyspace.openAndGetStoreIfExists(metadata);
if (cfs != null)
{
long paxosPurgeGraceMicros = DatabaseDescriptor.getPaxosPurgeGrace(MICROSECONDS);
purgeBefore = cfs.getPaxosRepairLowBound(partitionKey).uuidTimestamp() - paxosPurgeGraceMicros;
}
}


Row row = results.get(0);

Ballot promisedWrite = PaxosRows.getWritePromise(row);
if (promisedWrite.uuidTimestamp() < purgeBefore) promisedWrite = Ballot.none();
Ballot promised = latest(promisedWrite, PaxosRows.getPromise(row));
if (promised.uuidTimestamp() < purgeBefore) promised = Ballot.none();

// either we have both a recently accepted ballot and update or we have neither
Accepted accepted = PaxosRows.getAccepted(row);
Committed committed = PaxosRows.getCommitted(metadata, partitionKey, row);
Accepted accepted = PaxosRows.getAccepted(row, purgeBefore, overrideTtlSeconds);
Committed committed = PaxosRows.getCommitted(metadata, partitionKey, row, purgeBefore, overrideTtlSeconds);
// fix a race with TTL/deletion resolution, where TTL expires after equal deletion is inserted; TTL wins the resolution, and is read using an old ballot's nowInSec
if (accepted != null && !accepted.isAfter(committed))
accepted = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition

private class PaxosPurger extends Transformation<UnfilteredRowIterator>
{

private final long nowInSec;
private final long paxosPurgeGraceMicros = DatabaseDescriptor.getPaxosPurgeGrace(MICROSECONDS);
private final Map<TableId, PaxosRepairHistory.Searcher> tableIdToHistory = new HashMap<>();
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/service/paxos/Commit.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ public static Committed latestCommitted(Committed a, Committed b)
return c > 0 ? a : b;
return a instanceof CommittedWithTTL ? ((CommittedWithTTL)a).lastDeleted(b) : a;
}

public boolean isNone()
{
return ballot.equals(Ballot.none()) && update.isEmpty();
}
}

public static class CommittedWithTTL extends Committed
Expand Down
22 changes: 19 additions & 3 deletions src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.slf4j.Logger;
Expand Down Expand Up @@ -286,8 +287,8 @@ private ElectorateMismatch(Participants participants, Ballot ballot)
private final List<Message<ReadResponse>> readResponses;
private boolean haveReadResponseWithLatest;
private boolean haveQuorumOfPermissions; // permissions => SUCCESS or READ_SUCCESS
private List<InetAddressAndPort> withLatest; // promised and have latest commit
private List<InetAddressAndPort> needLatest; // promised without having witnessed latest commit, nor yet been refreshed by us
private @Nonnull List<InetAddressAndPort> withLatest; // promised and have latest commit
private @Nullable List<InetAddressAndPort> needLatest; // promised without having witnessed latest commit, nor yet been refreshed by us
private int failures; // failed either on initial request or on refresh
private boolean hasProposalStability = true; // no successful modifying proposal could have raced with us and not been seen
private boolean hasOnlyPromises = true;
Expand Down Expand Up @@ -506,11 +507,26 @@ private void permitted(Permitted permitted, InetAddressAndPort from)
}

if (permitted.lowBound > maxLowBound)
{
maxLowBound = permitted.lowBound;
if (!latestCommitted.isNone() && latestCommitted.ballot.uuidTimestamp() < maxLowBound)
{
latestCommitted = Committed.none(request.partitionKey, request.table);
haveReadResponseWithLatest = !readResponses.isEmpty();
if (needLatest != null)
{
withLatest.addAll(needLatest);
needLatest.clear();
}
}
}

if (!haveQuorumOfPermissions)
{
CompareResult compareLatest = permitted.latestCommitted.compareWith(latestCommitted);
Committed newLatestCommitted = permitted.latestCommitted;
if (newLatestCommitted.ballot.uuidTimestamp() < maxLowBound) newLatestCommitted = Committed.none(request.partitionKey, request.table);
CompareResult compareLatest = newLatestCommitted.compareWith(latestCommitted);

switch (compareLatest)
{
default: throw new IllegalStateException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -88,32 +89,38 @@ public static Ballot getWritePromise(Row row)
return getBallot(row, WRITE_PROMISE, Ballot.none());
}

public static Accepted getAccepted(Row row)
public static Accepted getAccepted(Row row, long purgeBefore, long overrideTtlSeconds)
{
Cell ballotCell = row.getCell(PROPOSAL);
if (ballotCell == null)
return null;

Ballot ballot = ballotCell.accessor().toBallot(ballotCell.value());
if (ballot.uuidTimestamp() < purgeBefore)
return null;

int version = getInt(row, PROPOSAL_VERSION, MessagingService.VERSION_40);
PartitionUpdate update = getUpdate(row, PROPOSAL_UPDATE, version);
return ballotCell.isExpiring()
? new AcceptedWithTTL(ballot, update, ballotCell.localDeletionTime())
: new Accepted(ballot, update);
if (overrideTtlSeconds > 0) return new AcceptedWithTTL(ballot, update, TimeUnit.MICROSECONDS.toSeconds(ballotCell.timestamp()) + overrideTtlSeconds);
else if (ballotCell.isExpiring()) return new AcceptedWithTTL(ballot, update, ballotCell.localDeletionTime());
else return new Accepted(ballot, update);
}

public static Committed getCommitted(TableMetadata metadata, DecoratedKey partitionKey, Row row)
public static Committed getCommitted(TableMetadata metadata, DecoratedKey partitionKey, Row row, long purgeBefore, long overrideTtlSeconds)
{
Cell ballotCell = row.getCell(COMMIT);
if (ballotCell == null)
return Committed.none(partitionKey, metadata);

Ballot ballot = ballotCell.accessor().toBallot(ballotCell.value());
if (ballot.uuidTimestamp() < purgeBefore)
return Committed.none(partitionKey, metadata);

int version = getInt(row, COMMIT_VERSION, MessagingService.VERSION_40);
PartitionUpdate update = getUpdate(row, COMMIT_UPDATE, version);
return ballotCell.isExpiring()
? new CommittedWithTTL(ballot, update, ballotCell.localDeletionTime())
: new Committed(ballot, update);
if (overrideTtlSeconds > 0) return new CommittedWithTTL(ballot, update, TimeUnit.MICROSECONDS.toSeconds(ballotCell.timestamp()) + overrideTtlSeconds);
else if (ballotCell.isExpiring()) return new CommittedWithTTL(ballot, update, ballotCell.localDeletionTime());
else return new Committed(ballot, update);
}

public static TableId getTableId(Row row)
Expand Down
Loading