Skip to content

Commit

Permalink
Properly set lastModifiedEpoch on multistep operations
Browse files Browse the repository at this point in the history
Patch by Marcus Eriksson; reviewed by Sam Tunnicliffe for
CASSANDRA-19538
  • Loading branch information
krummas authored and beobal committed Apr 18, 2024
1 parent d919274 commit 8097170
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 7 deletions.
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/tcm/MultiStepOperation.java
Expand Up @@ -155,7 +155,7 @@ protected MultiStepOperation(int currentStep, Epoch latestModification)
public static Transformation.Result applyMultipleTransformations(ClusterMetadata metadata, Transformation.Kind next, List<Transformation> transformations)
{
ImmutableSet.Builder<MetadataKey> modifiedKeys = ImmutableSet.builder();
Epoch lastModifiedEpoch = metadata.epoch;
Epoch lastModifiedEpoch = metadata.epoch.nextEpoch();
boolean foundStart = false;
for (Transformation nextTransformation : transformations)
{
Expand All @@ -169,7 +169,7 @@ public static Transformation.Result applyMultipleTransformations(ClusterMetadata
modifiedKeys.addAll(result.success().affectedMetadata);
}
}
return new Transformation.Success(metadata.forceEpoch(lastModifiedEpoch.nextEpoch()), LockedRanges.AffectedRanges.EMPTY, modifiedKeys.build());
return new Transformation.Success(metadata, LockedRanges.AffectedRanges.EMPTY, modifiedKeys.build());
}

/**
Expand Down
Expand Up @@ -155,6 +155,12 @@ public DataPlacement build()
}
}

public DataPlacement withCappedLastModified(Epoch lastModified)
{
return new DataPlacement(reads.withCappedLastModified(lastModified),
writes.withCappedLastModified(lastModified));
}

@Override
public String toString()
{
Expand Down
10 changes: 9 additions & 1 deletion src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java
Expand Up @@ -25,6 +25,7 @@
import java.util.Objects;
import java.util.function.BiConsumer;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

Expand Down Expand Up @@ -122,7 +123,7 @@ public DataPlacements combineReplicaGroups(DataPlacements end)
@Override
public DataPlacements withLastModified(Epoch epoch)
{
return new DataPlacements(epoch, asMap());
return new DataPlacements(epoch, capLastModified(epoch, map));
}

@Override
Expand Down Expand Up @@ -255,6 +256,13 @@ public long serializedSize(DataPlacements t, Version version)
}
}

public static ImmutableMap<ReplicationParams, DataPlacement> capLastModified(Epoch lastModified, Map<ReplicationParams, DataPlacement> placements)
{
ImmutableMap.Builder<ReplicationParams, DataPlacement> builder = ImmutableMap.builder();
placements.forEach((params, placement) -> builder.put(params, placement.withCappedLastModified(lastModified)));
return builder.build();
}

public void dumpDiff(DataPlacements other)
{
if (!map.equals(other.map))
Expand Down
14 changes: 14 additions & 0 deletions src/java/org/apache/cassandra/tcm/ownership/PlacementForRange.java
Expand Up @@ -152,6 +152,20 @@ private RangesByEndpoint diff(RangesByEndpoint left, RangesByEndpoint right)
return builder.build();
}

public PlacementForRange withCappedLastModified(Epoch lastModified)
{
SortedMap<Range<Token>, VersionedEndpoints.ForRange> copy = new TreeMap<>();
for (Map.Entry<Range<Token>, VersionedEndpoints.ForRange> entry : replicaGroups.entrySet())
{
Range<Token> range = entry.getKey();
VersionedEndpoints.ForRange forRange = entry.getValue();
if (forRange.lastModified().isAfter(lastModified))
forRange = forRange.withLastModified(lastModified);
copy.put(range, forRange);
}
return new PlacementForRange(copy);
}

@Override
public String toString()
{
Expand Down
Expand Up @@ -63,7 +63,7 @@ private ForRange(Epoch lastModified, EndpointsForRange endpointsForRange)

public ForRange withLastModified(Epoch epoch)
{
return new ForRange(lastModified, endpointsForRange);
return new ForRange(epoch, endpointsForRange);
}

public Epoch lastModified()
Expand Down Expand Up @@ -151,21 +151,19 @@ private ForToken(Epoch lastModified, EndpointsForToken endpointsForRange)

public ForToken withLastModified(Epoch epoch)
{
return new ForToken(lastModified, endpointsForToken);
return new ForToken(epoch, endpointsForToken);
}

public ForToken map(Function<EndpointsForToken, EndpointsForToken> fn)
{
return new ForToken(lastModified, fn.apply(endpointsForToken));
}


public ForToken without(Set<InetAddressAndPort> remove)
{
return map(e -> e.without(remove));
}


public Epoch lastModified()
{
return lastModified;
Expand Down

0 comments on commit 8097170

Please sign in to comment.