Skip to content
Permalink
Browse files
Call the start method of CassandraAdaptorDelegate to start periodic h…
…ealth checl

patch by Saranya Krishnakumar; reviewed by Yifan Cai, Dinesh Joshi for CASSANDRASC-32
  • Loading branch information
sarankk authored and yifan-c committed Nov 3, 2021
1 parent 4d88af6 commit 01d8b9c1893a37ae9c905f1844ca911c128ef6f4
Showing 3 changed files with 54 additions and 17 deletions.
@@ -101,11 +101,12 @@ public synchronized Session getLocalCql()
.withNettyOptions(nettyOptions)
.build();
localSession = cluster.connect();
logger.info("Successfully connected to Casssandra instance!");
}
}
catch (Exception e)
{
logger.debug("Failed to reach Cassandra", e);
logger.error("Failed to reach Cassandra", e);
if (cluster != null)
{
try
@@ -114,7 +115,7 @@ public synchronized Session getLocalCql()
}
catch (Exception ex)
{
logger.debug("Failed to close cluster in cleanup", ex);
logger.error("Failed to close cluster in cleanup", ex);
}
}
}
@@ -21,6 +21,7 @@
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Preconditions;
@@ -32,6 +33,7 @@
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.sun.istack.internal.NotNull;


/**
@@ -57,6 +59,7 @@ public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateLi

private static final Logger logger = LoggerFactory.getLogger(CassandraAdapterDelegate.class);
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> healthCheckRoutine;
private boolean registered = false;

public CassandraAdapterDelegate(CassandraVersionProvider provider, CQLSession cqlSession)
@@ -74,19 +77,31 @@ public CassandraAdapterDelegate(CassandraVersionProvider provider, CQLSession cq
public synchronized void start()
{
logger.info("Starting health check");
executor.scheduleWithFixedDelay(this::healthCheck, 0, refreshRate, TimeUnit.MILLISECONDS);
maybeRegisterHostListener();
// only schedule the health check once.
if (healthCheckRoutine == null)
{
healthCheckRoutine = executor.scheduleWithFixedDelay(this::healthCheck,
0,
refreshRate,
TimeUnit.MILLISECONDS);
}
}

private synchronized void maybeRegisterHostListener()
private synchronized void maybeRegisterHostListener(@NotNull Session session)
{
if (!registered)
{
checkSession();
if (session != null)
{
session.getCluster().register(this);
}
session.getCluster().register(this);
registered = true;
}
}

private synchronized void maybeUnregisterHostListener(@NotNull Session session)
{
if (registered)
{
session.getCluster().unregister(this);
registered = false;
}
}

@@ -97,15 +112,16 @@ public synchronized void stop()
}

/**
* Need to be called before routing the request to the adapter
* Make an attempt to obtain the session object.
*
* It needs to be called before routing the request to the adapter
* We might end up swapping the adapter out because of a server upgrade
*/
public synchronized void checkSession()
{
if (session == null)
{
session = cqlSession.getLocalCql();
start();
}
}

@@ -117,7 +133,17 @@ public synchronized void checkSession()
*/
public synchronized void healthCheck()
{
Preconditions.checkNotNull(session);
checkSession();

if (session == null)
{
logger.info("No local CQL session is available. Cassandra is down presumably.");
isUp = false;
return;
}

maybeRegisterHostListener(session);

try
{
String version = session.execute("select release_version from system.local")
@@ -128,15 +154,20 @@ public synchronized void healthCheck()
SimpleCassandraVersion newVersion = SimpleCassandraVersion.create(version);
if (!newVersion.equals(currentVersion))
{
currentVersion = SimpleCassandraVersion.create(version);
currentVersion = newVersion;
adapter = versionProvider.getCassandra(version).create(cqlSession);
logger.info("Cassandra version change detected. New adapter loaded: {}", adapter);
logger.info("Cassandra version change detected. New adapter loaded: {}", adapter);
}
logger.info("Cassandra version {}");
logger.info("Cassandra version {}", version);
}
catch (NoHostAvailableException e)
{
logger.error("Unexpected error connecting to Cassandra instance.", e);
// The cassandra node is down.
// Unregister the host listener and nullify the session in order to get a new object.
isUp = false;
maybeUnregisterHostListener(session);
session = null;
}
}

@@ -27,6 +27,7 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.vertx.core.http.HttpServer;
import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.utils.SslUtils;

/**
@@ -36,12 +37,14 @@
public class CassandraSidecarDaemon
{
private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class);
private final CassandraAdapterDelegate delegate;
private final HttpServer server;
private final Configuration config;

@Inject
public CassandraSidecarDaemon(HttpServer server, Configuration config)
public CassandraSidecarDaemon(CassandraAdapterDelegate delegate, HttpServer server, Configuration config)
{
this.delegate = delegate;
this.server = server;
this.config = config;
}
@@ -50,13 +53,15 @@ public void start()
{
banner(System.out);
validate();
delegate.start();
logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort());
server.listen(config.getPort(), config.getHost());
}

public void stop()
{
logger.info("Stopping Cassandra Sidecar");
delegate.stop();
server.close();
}

0 comments on commit 01d8b9c

Please sign in to comment.