Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,29 @@ native_transport_allow_older_protocols: true
# native_transport_rate_limiting_enabled: false
# native_transport_max_requests_per_second: 1000000

# When enabled, nodes will signal connected clients before shutting down,
# allowing in-flight requests to complete without client-visible timeouts.
# This applies to intentional shutdowns (nodetool drain, rolling restarts,
# controlled JVM shutdown). Clients must subscribe to the GRACEFUL_DISCONNECT
# event via REGISTER to benefit from this behavior.
#
# Requires driver support for the GRACEFUL_DISCONNECT event type.
# See: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406619103

# Enable or disable graceful disconnect. When false, shutdown behavior is
# unchanged from previous versions.
# graceful_disconnect_enabled: false

# Time given to clients to stop sending new requests after the
# GRACEFUL_DISCONNECT event is emitted. Must not exceed graceful_disconnect_max_drain_ms.
# graceful_disconnect_grace_period_ms: 5000

# Hard timeout for draining connections. Once this limit is reached,
# remaining connections are force-closed regardless of in-flight requests.
# Must be greater than 0 when graceful_disconnect_enabled is true.
# graceful_disconnect_max_drain_ms: 30000


# The address or interface to bind the native transport server to.
#
# Set rpc_address OR rpc_interface, not both.
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ public static Set<String> splitCommaDelimited(String src)

public volatile DurationSpec.LongMillisecondsBound accord_preaccept_timeout = new DurationSpec.LongMillisecondsBound("1s");

public boolean graceful_disconnect_enabled = false;

public volatile DurationSpec.LongMillisecondsBound graceful_disconnect_grace_period_ms = new DurationSpec.LongMillisecondsBound("5s");

public volatile DurationSpec.LongMillisecondsBound graceful_disconnect_max_drain_ms = new DurationSpec.LongMillisecondsBound("30s");

@Replaces(oldName = "truncate_request_timeout_in_ms", converter = Converters.MILLIS_DURATION_LONG, deprecated = true)
public volatile DurationSpec.LongMillisecondsBound truncate_request_timeout = new DurationSpec.LongMillisecondsBound("60000ms");

Expand Down
36 changes: 36 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,17 @@ else if (DiskAccessMode.direct == conf.compaction_read_disk_access_mode)
throw new ConfigurationException("phi_convict_threshold must be between 5 and 16, but was " + conf.phi_convict_threshold, false);
}

if (conf.graceful_disconnect_enabled && conf.graceful_disconnect_max_drain_ms.toMilliseconds() <= 0)
{
throw new ConfigurationException("graceful_disconnect_max_drain_ms must be greater than 0 when graceful_disconnect_enabled is set to true.", false);
}

if (conf.graceful_disconnect_enabled &&
conf.graceful_disconnect_grace_period_ms.toMilliseconds() > conf.graceful_disconnect_max_drain_ms.toMilliseconds())
{
throw new ConfigurationException("graceful_disconnect_grace_period_ms cannot exceed graceful_disconnect_max_drain_ms.", false);
}

/* Thread per pool */
if (conf.concurrent_reads < 2)
{
Expand Down Expand Up @@ -2571,6 +2582,31 @@ public static void setRpcTimeout(long timeOutInMillis)
conf.request_timeout = new DurationSpec.LongMillisecondsBound(timeOutInMillis);
}

public static long getGracefulDisconnectGracePeriodMs()
{
return conf.graceful_disconnect_grace_period_ms.toMilliseconds();
}

public static void setGracefulDisconnectGracePeriodMs(long gracefulDisconnectGracePeriodMs)
{
conf.graceful_disconnect_grace_period_ms = new DurationSpec.LongMillisecondsBound(gracefulDisconnectGracePeriodMs);
}

public static long getGracefulDisconnectMaxDrainMs()
{
return conf.graceful_disconnect_max_drain_ms.toMilliseconds();
}

public static void setGracefulDisconnectMaxDrainMs(long gracefulDisconnectMaxDrainMs)
{
conf.graceful_disconnect_max_drain_ms = new DurationSpec.LongMillisecondsBound(gracefulDisconnectMaxDrainMs);
}

public static boolean getGracefulDisconnectEnabled()
{
return conf.graceful_disconnect_enabled;
}

public static long getReadRpcTimeout(TimeUnit unit)
{
return conf.read_request_timeout.to(unit);
Expand Down
42 changes: 42 additions & 0 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,48 @@ public long getRpcTimeout()
return DatabaseDescriptor.getRpcTimeout(MILLISECONDS);
}

@Override
public void setGracefulDisconnectGracePeriodMs(long value)
{
if (value > DatabaseDescriptor.getGracefulDisconnectMaxDrainMs())
throw new IllegalArgumentException("graceful_disconnect_grace_period_ms cannot exceed graceful_disconnect_max_drain_ms.");

DatabaseDescriptor.setGracefulDisconnectGracePeriodMs(value);
logger.info("set graceful disconnect grace period to {} ms", value);
}

@Override
public long getGracefulDisconnectGracePeriodMs()
{
return DatabaseDescriptor.getGracefulDisconnectGracePeriodMs();
}

@Override
public void setGracefulDisconnectMaxDrainMs(long value)
{
if (value <= 0)
throw new IllegalArgumentException("graceful_disconnect_max_drain_ms must be greater than 0 when graceful_disconnect_enabled is set to true.");

if (value < DatabaseDescriptor.getGracefulDisconnectGracePeriodMs())
throw new IllegalArgumentException("graceful_disconnect_max_drain_ms cannot be less than graceful_disconnect_grace_period_ms.");

DatabaseDescriptor.setGracefulDisconnectMaxDrainMs(value);
logger.info("set graceful disconnect max drain to {} ms", value);
}


@Override
public long getGracefulDisconnectMaxDrainMs()
{
return DatabaseDescriptor.getGracefulDisconnectMaxDrainMs();
}

@Override
public boolean getGracefulDisconnectEnabled()
{
return DatabaseDescriptor.getGracefulDisconnectEnabled();
}

public void setReadRpcTimeout(long value)
{
DatabaseDescriptor.setReadRpcTimeout(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,14 @@ default int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion,
public void setRpcTimeout(long value);
public long getRpcTimeout();

public void setGracefulDisconnectGracePeriodMs(long value);
public long getGracefulDisconnectGracePeriodMs();

public void setGracefulDisconnectMaxDrainMs(long value);
public long getGracefulDisconnectMaxDrainMs();

public boolean getGracefulDisconnectEnabled();

public void setReadRpcTimeout(long value);
public long getReadRpcTimeout();

Expand Down
28 changes: 28 additions & 0 deletions test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,34 @@ public void testRowIndexSizeWarnEnabledAbortDisabled()
DatabaseDescriptor.applyReadThresholdsValidations(conf);
}

@Test
public void testGracefulDisconnectEnabled()
{
Config config = new Config();
boolean originalValue = config.graceful_disconnect_enabled;
Assert.assertFalse("Default value of graceful_disconnect_enabled must be false", originalValue);
}

@Test
public void testGracefulDisconnectGracePeriodMs()
{
long originalValue = DatabaseDescriptor.getGracefulDisconnectGracePeriodMs();
Assert.assertEquals("Default value of graceful_disconnect_grace_period_ms must be 5000", 5000, originalValue);
DatabaseDescriptor.setGracefulDisconnectGracePeriodMs(3000);
Assert.assertEquals("graceful_disconnect_grace_period_ms should be updated to 3000", 3000, DatabaseDescriptor.getGracefulDisconnectGracePeriodMs());
DatabaseDescriptor.setGracefulDisconnectGracePeriodMs(originalValue);
}

@Test
public void testGracefulDisconnectMaxDrainMs()
{
long originalValue = DatabaseDescriptor.getGracefulDisconnectMaxDrainMs();
Assert.assertEquals("Default value of graceful_disconnect_max_drain_ms must be 30000", 30000, originalValue);
DatabaseDescriptor.setGracefulDisconnectMaxDrainMs(45000);
Assert.assertEquals("graceful_disconnect_max_drain_ms should be updated to 45000", 45000, DatabaseDescriptor.getGracefulDisconnectMaxDrainMs());
DatabaseDescriptor.setGracefulDisconnectMaxDrainMs(originalValue);
}

@Test
public void testRowIndexSizeAbortEnabledWarnDisabled()
{
Expand Down
57 changes: 57 additions & 0 deletions test/unit/org/apache/cassandra/service/StorageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,63 @@ public void testColumnIndexSizeInKiB()
}
}

@Test
public void testGracefulDisconnectMaxDrainMs()
{
StorageService storageService = StorageService.instance;
long originalMaxDrainMs = storageService.getGracefulDisconnectMaxDrainMs();
long originalGracePeriodMs = storageService.getGracefulDisconnectGracePeriodMs();
try
{
storageService.setGracefulDisconnectGracePeriodMs(1000);

storageService.setGracefulDisconnectMaxDrainMs(10000);
assertEquals(10000, storageService.getGracefulDisconnectMaxDrainMs());
try
{
storageService.setGracefulDisconnectMaxDrainMs(0);
fail("Should have received an IllegalArgumentException for max_drain_ms of 0");
}
catch (IllegalArgumentException ignored)
{
}
assertEquals(10000, storageService.getGracefulDisconnectMaxDrainMs());
}
finally
{
storageService.setGracefulDisconnectMaxDrainMs(originalMaxDrainMs);
storageService.setGracefulDisconnectGracePeriodMs(originalGracePeriodMs);
}
}

@Test
public void testGracefulDisconnectGracePeriodMs()
{
StorageService storageService = StorageService.instance;
long originalMaxDrainMs = storageService.getGracefulDisconnectMaxDrainMs();
long originalGracePeriodMs = storageService.getGracefulDisconnectGracePeriodMs();
try
{
storageService.setGracefulDisconnectMaxDrainMs(20000);

storageService.setGracefulDisconnectGracePeriodMs(5000);
assertEquals(5000, storageService.getGracefulDisconnectGracePeriodMs());

try
{
storageService.setGracefulDisconnectGracePeriodMs(30000);
fail("Should have received an IllegalArgumentException when grace_period_ms exceeds max_drain_ms");
}
catch (IllegalArgumentException ignored) {}
assertEquals(5000, storageService.getGracefulDisconnectGracePeriodMs());
}
finally
{
storageService.setGracefulDisconnectMaxDrainMs(originalMaxDrainMs);
storageService.setGracefulDisconnectGracePeriodMs(originalGracePeriodMs);
}
}

@Test
public void testColumnIndexCacheSizeInKiB()
{
Expand Down