Skip to content

Commit

Permalink
Gossip thread slows down when using batch commit log
Browse files Browse the repository at this point in the history
patch by jasobrown; reviwed by spodkowinski fot CASSANDRA-12966
  • Loading branch information
jasobrown committed Aug 21, 2017
1 parent dc32ed8 commit ec85b4a
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
3.0.15
* Gossip thread slows down when using batch commit log (CASSANDRA-12966)
* Randomize batchlog endpoint selection with only 1 or 2 racks (CASSANDRA-12884)
* Fix digest calculation for counter cells (CASSANDRA-13750)
* Fix ColumnDefinition.cellValueType() for non-frozen collection and change SSTabledump to use type.toJSONString() (CASSANDRA-13573)
Expand Down
22 changes: 14 additions & 8 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import java.util.concurrent.Future;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
Expand All @@ -36,6 +38,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.util.concurrent.Futures;

import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
Expand Down Expand Up @@ -687,29 +693,29 @@ private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer by
/**
* Record tokens being used by another node
*/
public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens)
public static Future<?> updateTokens(final InetAddress ep, final Collection<Token> tokens, ExecutorService executorService)
{
if (ep.equals(FBUtilities.getBroadcastAddress()))
return;
return Futures.immediateFuture(null);

String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)";
executeInternal(String.format(req, PEERS), ep, tokensAsSet(tokens));
return executorService.submit((Runnable) () -> executeInternal(String.format(req, PEERS), ep, tokensAsSet(tokens)));
}

public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
public static void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
{
String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)";
executeInternal(String.format(req, PEERS), ep, preferred_ip);
forceBlockingFlush(PEERS);
}

public static synchronized void updatePeerInfo(InetAddress ep, String columnName, Object value)
public static Future<?> updatePeerInfo(final InetAddress ep, final String columnName, final Object value, ExecutorService executorService)
{
if (ep.equals(FBUtilities.getBroadcastAddress()))
return;
return Futures.immediateFuture(null);

String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)";
executeInternal(String.format(req, PEERS, columnName), ep, value);
return executorService.submit((Runnable) () -> executeInternal(String.format(req, PEERS, columnName), ep, value));
}

public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value)
Expand Down Expand Up @@ -748,7 +754,7 @@ private static Collection<Token> deserializeTokens(Collection<String> tokensStri
/**
* Remove stored tokens being used by another node
*/
public static synchronized void removeEndpoint(InetAddress ep)
public static void removeEndpoint(InetAddress ep)
{
String req = "DELETE FROM system.%s WHERE peer = ?";
executeInternal(String.format(req, PEERS), ep);
Expand Down
28 changes: 15 additions & 13 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1711,35 +1711,36 @@ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValu

if (getTokenMetadata().isMember(endpoint))
{
final ExecutorService executor = StageManager.getStage(Stage.MUTATION);
switch (state)
{
case RELEASE_VERSION:
SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value);
SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value, executor);
break;
case DC:
updateTopology(endpoint);
SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value);
SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value, executor);
break;
case RACK:
updateTopology(endpoint);
SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value);
SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value, executor);
break;
case RPC_ADDRESS:
try
{
SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value));
SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value), executor);
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
break;
case SCHEMA:
SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value));
SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value), executor);
MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
break;
case HOST_ID:
SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value));
SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value), executor);
break;
case RPC_READY:
notifyRpcChange(endpoint, epState.isRpcReady());
Expand Down Expand Up @@ -1785,34 +1786,35 @@ public void updateTopology()
private void updatePeerInfo(InetAddress endpoint)
{
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
final ExecutorService executor = StageManager.getStage(Stage.MUTATION);
for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states())
{
switch (entry.getKey())
{
case RELEASE_VERSION:
SystemKeyspace.updatePeerInfo(endpoint, "release_version", entry.getValue().value);
SystemKeyspace.updatePeerInfo(endpoint, "release_version", entry.getValue().value, executor);
break;
case DC:
SystemKeyspace.updatePeerInfo(endpoint, "data_center", entry.getValue().value);
SystemKeyspace.updatePeerInfo(endpoint, "data_center", entry.getValue().value, executor);
break;
case RACK:
SystemKeyspace.updatePeerInfo(endpoint, "rack", entry.getValue().value);
SystemKeyspace.updatePeerInfo(endpoint, "rack", entry.getValue().value, executor);
break;
case RPC_ADDRESS:
try
{
SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(entry.getValue().value));
SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(entry.getValue().value), executor);
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
break;
case SCHEMA:
SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(entry.getValue().value));
SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(entry.getValue().value), executor);
break;
case HOST_ID:
SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(entry.getValue().value));
SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(entry.getValue().value), executor);
break;
}
}
Expand Down Expand Up @@ -2118,7 +2120,7 @@ else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
}
if (!tokensToUpdateInSystemKeyspace.isEmpty())
SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace);
SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace, StageManager.getStage(Stage.MUTATION));

if (isMoving || operationMode == Mode.MOVING)
{
Expand Down
6 changes: 5 additions & 1 deletion test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.Future;

import org.apache.commons.io.FileUtils;
import org.junit.BeforeClass;
import org.junit.Test;

import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
Expand Down Expand Up @@ -84,7 +87,8 @@ public void testNonLocalToken() throws UnknownHostException
{
BytesToken token = new BytesToken(ByteBufferUtil.bytes("token3"));
InetAddress address = InetAddress.getByName("127.0.0.2");
SystemKeyspace.updateTokens(address, Collections.<Token>singletonList(token));
Future<?> future = SystemKeyspace.updateTokens(address, Collections.singletonList(token), StageManager.getStage(Stage.MUTATION));
FBUtilities.waitOnFuture(future);
assert SystemKeyspace.loadTokens().get(address).contains(token);
SystemKeyspace.removeEndpoint(address);
assert !SystemKeyspace.loadTokens().containsValue(token);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public static void setup()
{
// slow unit tests can cause problems with FailureDetector's GC pause handling
System.setProperty("cassandra.max_local_pause_in_ms", "20000");

DatabaseDescriptor.setDaemonInitialized();
DatabaseDescriptor.createAllDirectories();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
Expand All @@ -33,6 +35,8 @@
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.Util.PartitionerSwitcher;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.IPartitioner;
Expand All @@ -46,6 +50,7 @@
import org.apache.cassandra.locator.SimpleSnitch;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.utils.FBUtilities;

import static org.junit.Assert.*;

Expand Down Expand Up @@ -675,8 +680,9 @@ public void testStateChangeOnRemovedNode() throws UnknownHostException
Util.createInitialRing(ss, partitioner, endpointTokens, new ArrayList<Token>(), hosts, new ArrayList<UUID>(), 2);

InetAddress toRemove = hosts.get(1);
SystemKeyspace.updatePeerInfo(toRemove, "data_center", "dc42");
SystemKeyspace.updatePeerInfo(toRemove, "rack", "rack42");
final ExecutorService executor = StageManager.getStage(Stage.MUTATION);
FBUtilities.waitOnFuture(SystemKeyspace.updatePeerInfo(toRemove, "data_center", "dc42", executor));
FBUtilities.waitOnFuture(SystemKeyspace.updatePeerInfo(toRemove, "rack", "rack42", executor));
assertEquals("rack42", SystemKeyspace.loadDcRackInfo().get(toRemove).get("rack"));

// mark the node as removed
Expand Down

0 comments on commit ec85b4a

Please sign in to comment.