Skip to content
Permalink
Browse files
Add Stream SSTable API to Sidecar to stream SSTable components throug…
…h zero copy streaming

Patch by Saranya Krishnakumar; reviewed by Dinesh Joshi, Yifan Cai for CASSANDRASC-28
  • Loading branch information
sarankk authored and yifan-c committed Sep 29, 2021
1 parent 29485e0 commit 4e4b8eac699606ad09e2f9debc8ef71285d61af3
Showing 21 changed files with 1,653 additions and 14 deletions.
@@ -5,7 +5,6 @@ build/
src/gen-java/
src/resources/org/apache/cassandra/config/
logs/
data/
conf/hotspot_compiler
doc/cql3/CQL.html

@@ -19,6 +19,11 @@ Apache Cassandra running on the host & port specified in `conf/sidecar.yaml`.

$ ./gradlew run

Configuring Cassandra Instance
------------------------------

While setting up cassandra instance, make sure the data directories of cassandra are in the path stored in sidecar.yaml file, else modify data directories path to point to the correct directories for stream APIs to work.

Testing
---------

@@ -152,7 +152,7 @@ dependencies {
compile 'io.vertx:vertx-web-client:3.8.5'

compile 'io.swagger.core.v3:swagger-jaxrs2:2.1.0'
compile 'org.jboss.resteasy:resteasy-vertx:3.1.0.Final'
compile 'org.jboss.resteasy:resteasy-vertx:3.1.2.Final'
compile group: 'org.jboss.spec.javax.servlet', name: 'jboss-servlet-api_4.0_spec', version: '2.0.0.Final'

// Trying to be exactly compatible with Cassandra's deps
@@ -170,6 +170,7 @@ dependencies {

testCompile "org.junit.jupiter:junit-jupiter-api:${project.junitVersion}"
testCompile "org.junit.jupiter:junit-jupiter-params:${project.junitVersion}"
testCompile "org.assertj:assertj-core:3.14.0"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${project.junitVersion}"

testCompile group: 'org.cassandraunit', name: 'cassandra-unit-shaded', version: '3.11.2.0'
@@ -5,10 +5,15 @@
cassandra:
- host: localhost
- port: 9042
- data_dirs: /cassandra/d1/data, /cassandra/d2/data

sidecar:
- host: 0.0.0.0
- port: 9043
- throttle:
- stream_requests_per_sec: 5000
- delay_sec: 5
- timeout_sec: 10
#
# Enable SSL configuration (Disabled by default)
#
@@ -0,0 +1,36 @@
package com.google.common.util.concurrent;

/**
* Wrapper class over guava Rate Limiter, uses SmoothBursty Ratelimiter. This class mainly exists to expose
* package protected method queryEarliestAvailable of guava RateLimiter
*/
public class SidecarRateLimiter
{
private final RateLimiter rateLimiter;

private SidecarRateLimiter(final double permitsPerSecond)
{
this.rateLimiter = RateLimiter.create(permitsPerSecond);
}

public static SidecarRateLimiter create(final double permitsPerSecond)
{
return new SidecarRateLimiter(permitsPerSecond);
}

/**
* Returns earliest time permits will become available
*/
public long queryEarliestAvailable(final long nowMicros)
{
return this.rateLimiter.queryEarliestAvailable(nowMicros);
}

/**
* Tries to reserve 1 permit, if not available immediately returns false
*/
public boolean tryAcquire()
{
return this.rateLimiter.tryAcquire();
}
}
@@ -18,6 +18,8 @@

package org.apache.cassandra.sidecar;

import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;

/**
@@ -31,6 +33,9 @@
/* Cassandra Port */
private final Integer cassandraPort;

/* Cassandra Data Dirs */
private Collection<String> cassandraDataDirs;

/* Sidecar's HTTP REST API port */
private final Integer port;

@@ -55,15 +60,22 @@

private final boolean isSslEnabled;

public Configuration(String cassandraHost, Integer cassandraPort, String host, Integer port,
Integer healthCheckFrequencyMillis, boolean isSslEnabled,
@Nullable String keyStorePath,
@Nullable String keyStorePassword,
@Nullable String trustStorePath,
@Nullable String trustStorePassword)
private final long rateLimitStreamRequestsPerSecond;

private final long throttleTimeoutInSeconds;

private final long throttleDelayInSeconds;

public Configuration(String cassandraHost, Integer cassandraPort, List<String> cassandraDataDirs, String host,
Integer port, Integer healthCheckFrequencyMillis, boolean isSslEnabled,
@Nullable String keyStorePath, @Nullable String keyStorePassword,
@Nullable String trustStorePath, @Nullable String trustStorePassword,
long rateLimitStreamRequestsPerSecond, long throttleTimeoutInSeconds,
long throttleDelayInSeconds)
{
this.cassandraHost = cassandraHost;
this.cassandraPort = cassandraPort;
this.cassandraDataDirs = cassandraDataDirs;
this.host = host;
this.port = port;
this.healthCheckFrequencyMillis = healthCheckFrequencyMillis;
@@ -73,6 +85,9 @@ public Configuration(String cassandraHost, Integer cassandraPort, String host, I
this.trustStorePath = trustStorePath;
this.trustStorePassword = trustStorePassword;
this.isSslEnabled = isSslEnabled;
this.rateLimitStreamRequestsPerSecond = rateLimitStreamRequestsPerSecond;
this.throttleTimeoutInSeconds = throttleTimeoutInSeconds;
this.throttleDelayInSeconds = throttleDelayInSeconds;
}

/**
@@ -95,6 +110,16 @@ public Integer getCassandraPort()
return cassandraPort;
}

/**
* Get Cassandra data dirs
*
* @return
*/
public Collection<String> getCassandraDataDirs()
{
return cassandraDataDirs;
}

/**
* Sidecar's listen address
*
@@ -179,13 +204,34 @@ public String getTruststorePassword()
return trustStorePassword;
}

/**
* Get number of stream requests accepted per second
*
* @return
*/
public long getRateLimitStreamRequestsPerSecond()
{
return rateLimitStreamRequestsPerSecond;
}

public long getThrottleTimeoutInSeconds()
{
return throttleTimeoutInSeconds;
}

public long getThrottleDelayInSeconds()
{
return throttleDelayInSeconds;
}

/**
* Configuration Builder
*/
public static class Builder
{
private String cassandraHost;
private Integer cassandraPort;
private List<String> cassandraDataDirs;
private String host;
private Integer port;
private Integer healthCheckFrequencyMillis;
@@ -194,6 +240,9 @@ public String getTruststorePassword()
private String trustStorePath;
private String trustStorePassword;
private boolean isSslEnabled;
private long rateLimitStreamRequestsPerSecond;
private long throttleTimeoutInSeconds;
private long throttleDelayInSeconds;

public Builder setCassandraHost(String host)
{
@@ -207,6 +256,12 @@ public Builder setCassandraPort(Integer port)
return this;
}

public Builder setCassandraDataDirs(List<String> dataDirs)
{
this.cassandraDataDirs = dataDirs;
return this;
}

public Builder setHost(String host)
{
this.host = host;
@@ -255,10 +310,30 @@ public Builder setSslEnabled(boolean enabled)
return this;
}

public Builder setRateLimitStreamRequestsPerSecond(long rateLimitStreamRequestsPerSecond)
{
this.rateLimitStreamRequestsPerSecond = rateLimitStreamRequestsPerSecond;
return this;
}

public Builder setThrottleTimeoutInSeconds(long throttleTimeoutInSeconds)
{
this.throttleTimeoutInSeconds = throttleTimeoutInSeconds;
return this;
}

public Builder setThrottleDelayInSeconds(long throttleDelayInSeconds)
{
this.throttleDelayInSeconds = throttleDelayInSeconds;
return this;
}

public Configuration build()
{
return new Configuration(cassandraHost, cassandraPort, host, port, healthCheckFrequencyMillis, isSslEnabled,
keyStorePath, keyStorePassword, trustStorePath, trustStorePassword);
return new Configuration(cassandraHost, cassandraPort, cassandraDataDirs, host, port,
healthCheckFrequencyMillis, isSslEnabled, keyStorePath, keyStorePassword,
trustStorePath, trustStorePassword, rateLimitStreamRequestsPerSecond,
throttleTimeoutInSeconds, throttleDelayInSeconds);
}
}
}
@@ -23,6 +23,7 @@
import java.net.MalformedURLException;
import java.net.URL;

import com.google.common.util.concurrent.SidecarRateLimiter;
import org.apache.commons.configuration2.YAMLConfiguration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.slf4j.Logger;
@@ -45,7 +46,10 @@
import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
import org.apache.cassandra.sidecar.routes.HealthService;
import org.apache.cassandra.sidecar.routes.StreamSSTableComponent;
import org.apache.cassandra.sidecar.routes.SwaggerOpenApiResource;
import org.apache.cassandra.sidecar.utils.CachedFilePathBuilder;
import org.apache.cassandra.sidecar.utils.FilePathBuilder;
import org.jboss.resteasy.plugins.server.vertx.VertxRegistry;
import org.jboss.resteasy.plugins.server.vertx.VertxRequestHandler;
import org.jboss.resteasy.plugins.server.vertx.VertxResteasyDeployment;
@@ -96,14 +100,17 @@ public HttpServer vertxServer(Vertx vertx, Configuration conf, Router router, Ve

@Provides
@Singleton
private VertxRequestHandler configureServices(Vertx vertx, HealthService healthService)
private VertxRequestHandler configureServices(Vertx vertx,
HealthService healthService,
StreamSSTableComponent ssTableComponent)
{
VertxResteasyDeployment deployment = new VertxResteasyDeployment();
deployment.start();
VertxRegistry r = deployment.getRegistry();

r.addPerInstanceResource(SwaggerOpenApiResource.class);
r.addSingletonResource(healthService);
r.addSingletonResource(ssTableComponent);

return new VertxRequestHandler(vertx, deployment);
}
@@ -122,7 +129,6 @@ public Router vertxRouter(Vertx vertx)
// Docs index.html page
StaticHandler docs = StaticHandler.create("docs");
router.route().path("/docs/*").handler(docs);

return router;
}

@@ -143,6 +149,7 @@ public Configuration configuration() throws ConfigurationException, IOException
return new Configuration.Builder()
.setCassandraHost(yamlConf.get(String.class, "cassandra.host"))
.setCassandraPort(yamlConf.get(Integer.class, "cassandra.port"))
.setCassandraDataDirs(yamlConf.getList(String.class, "cassandra.data_dirs"))
.setHost(yamlConf.get(String.class, "sidecar.host"))
.setPort(yamlConf.get(Integer.class, "sidecar.port"))
.setHealthCheckFrequency(yamlConf.get(Integer.class, "healthcheck.poll_freq_millis"))
@@ -151,6 +158,9 @@ public Configuration configuration() throws ConfigurationException, IOException
.setTrustStorePath(yamlConf.get(String.class, "sidecar.ssl.truststore.path", null))
.setTrustStorePassword(yamlConf.get(String.class, "sidecar.ssl.truststore.password", null))
.setSslEnabled(yamlConf.get(Boolean.class, "sidecar.ssl.enabled", false))
.setRateLimitStreamRequestsPerSecond(yamlConf.getLong("sidecar.throttle.stream_requests_per_sec"))
.setThrottleTimeoutInSeconds(yamlConf.getLong("sidecar.throttle.timeout_sec"))
.setThrottleDelayInSeconds(yamlConf.getLong("sidecar.throttle.delay_sec"))
.build();
}
catch (MalformedURLException e)
@@ -184,4 +194,17 @@ public CassandraAdapterDelegate cassandraAdapterDelegate(CassandraVersionProvide
{
return new CassandraAdapterDelegate(provider, session, config.getHealthCheckFrequencyMillis());
}

@Provides
public SidecarRateLimiter rateLimiter(Configuration config)
{
return SidecarRateLimiter.create(config.getRateLimitStreamRequestsPerSecond());
}

@Provides
@Singleton
public FilePathBuilder filePathBuilder(Configuration config)
{
return new CachedFilePathBuilder(config.getCassandraDataDirs());
}
}
@@ -0,0 +1,12 @@
package org.apache.cassandra.sidecar.exceptions;

/**
* Custom exception
*/
public class RangeException extends RuntimeException
{
public RangeException(String msg)
{
super(msg);
}
}

0 comments on commit 4e4b8ea

Please sign in to comment.