Skip to content

Commit

Permalink
Fix cluster schema creation (#219)
Browse files Browse the repository at this point in the history
* Fix wrong logger class

* HWKALERTS-166 Coordinate schema creation on clustering

* Protect CassCluster for standalone tests
  • Loading branch information
lucasponce authored and jshaughn committed Sep 30, 2016
1 parent a8be941 commit d27f962
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 27 deletions.
Expand Up @@ -30,6 +30,7 @@

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.ejb.AccessTimeout;
import javax.ejb.Singleton;
import javax.ejb.Startup;
Expand All @@ -38,6 +39,8 @@
import org.cassalog.core.Cassalog;
import org.cassalog.core.CassalogBuilder;
import org.hawkular.alerts.engine.util.TokenReplacingReader;
import org.infinispan.Cache;
import org.infinispan.manager.EmbeddedCacheManager;
import org.jboss.logging.Logger;

import com.datastax.driver.core.Cluster;
Expand All @@ -58,12 +61,28 @@
@Startup
@Singleton
public class CassCluster {
private static final Logger log = Logger.getLogger(CassDefinitionsServiceImpl.class);
private static final Logger log = Logger.getLogger(CassCluster.class);

/*
PORT used by the Cassandra cluster
*/
private static final String ALERTS_CASSANDRA_PORT = "hawkular-alerts.cassandra-cql-port";
private static final String ALERTS_CASSANDRA_PORT_ENV = "CASSANDRA_CQL_PORT";

/*
List of nodes defined on the Cassandra cluster
*/
private static final String ALERTS_CASSANDRA_NODES = "hawkular-alerts.cassandra-nodes";
private static final String ALERTS_CASSANDRA_NODES_ENV = "CASSANDRA_NODES";

/*
Hawkular Alerts keyspace name used on Cassandra cluster
*/
private static final String ALERTS_CASSANDRA_KEYSPACE = "hawkular-alerts.cassandra-keyspace";

/*
Number of attempts when Hawkular Alerts cannot connect with Cassandra cluster to retry
*/
private static final String ALERTS_CASSANDRA_RETRY_ATTEMPTS = "hawkular-alerts.cassandra-retry-attempts";

/*
Expand Down Expand Up @@ -104,6 +123,22 @@ public class CassCluster {

private boolean initialized = false;

private boolean distributed = false;

/**
* Access to the manager of the caches used for the partition services.
*/
@Resource(lookup = "java:jboss/infinispan/container/hawkular-alerts")
private EmbeddedCacheManager cacheManager;

private static final String SCHEMA = "schema";

/**
* This cache will be used to coordinate schema creation across a cluster of nodes.
*/
@Resource(lookup = "java:jboss/infinispan/cache/hawkular-alerts/schema")
private Cache schemaCache;

private void readProperties() {
attempts = Integer.parseInt(AlertProperties.getProperty(ALERTS_CASSANDRA_RETRY_ATTEMPTS, "5"));
timeout = Integer.parseInt(AlertProperties.getProperty(ALERTS_CASSANDRA_RETRY_TIMEOUT, "2000"));
Expand All @@ -122,6 +157,10 @@ private void readProperties() {
public void initCassCluster() {
readProperties();

if (cacheManager != null) {
distributed = cacheManager.getTransport() != null;
}

int currentAttempts = attempts;

SocketOptions socketOptions = null;
Expand Down Expand Up @@ -174,7 +213,11 @@ public void initCassCluster() {
}
if (session != null) {
try {
initScheme();
if (distributed) {
initSchemeDistributed();
} else {
initScheme();
}
} catch (IOException e) {
log.error("Error on initialization of Alerts scheme", e);
}
Expand All @@ -188,45 +231,54 @@ public void initCassCluster() {
}
}

private void initSchemeDistributed() throws IOException {
schemaCache.getAdvancedCache().lock(SCHEMA);
initScheme();
}

private void initScheme() throws IOException {

if (log.isDebugEnabled()) {
log.debugf("Checking Schema existence for keyspace: %s", keyspace);
}
log.infof("Checking Schema existence for keyspace: %s", keyspace);

KeyspaceMetadata keyspaceMetadata = cluster.getMetadata().getKeyspace(keyspace);
if (keyspaceMetadata != null) {
// If overwrite flag is true it should not check if all tables are created
if (!overwrite) {
int currentAttempts = attempts;
while(!checkSchema() && !Thread.currentThread().isInterrupted() && currentAttempts >= 0) {
log.warn("[" + currentAttempts + "] Keyspace detected but schema not fully created. " +
"Retrying in [" + timeout + "]ms...");
currentAttempts--;
try {
Thread.sleep(timeout);
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
waitForSchemaCheck();
if (!checkSchema()) {
log.errorf("Keyspace detected, but failed on check phase.", keyspace);
log.errorf("Keyspace %s detected, but failed on check phase.", keyspace);
initialized = false;
return;
} else {
log.infof("Schema already exist. Skipping schema creation.");
initialized = true;
}
log.debug("Schema already exist. Skipping schema creation.");
}
} else {
log.infof("Creating Schema for keyspace %s", keyspace);
createSchema(session, keyspace, overwrite);
waitForSchemaCheck();
if (!checkSchema()) {
log.errorf("Schema %s not created correctly", keyspace);
initialized = false;
} else {
initialized = true;
return;
log.infof("Done creating Schema for keyspace: %s", keyspace);
}
}
}

log.infof("Creating Schema for keyspace %s", keyspace);

createSchema(session, keyspace, overwrite);

initialized = true;

log.infof("Done creating Schema for keyspace: %s", keyspace);
private void waitForSchemaCheck() {
int currentAttempts = attempts;
while(!checkSchema() && !Thread.currentThread().isInterrupted() && currentAttempts >= 0) {
log.warnf("[%s] Keyspace detected but schema not fully created. " +
"Retrying in [%s] ms...", currentAttempts, timeout);
currentAttempts--;
try {
Thread.sleep(timeout);
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

private boolean checkSchema() {
Expand Down
3 changes: 3 additions & 0 deletions hawkular-alerts-rest-tests/src/test/resources/domain.xsl
Expand Up @@ -49,6 +49,9 @@
<replicated-cache name="publish" mode="ASYNC">
<transaction mode="BATCH"/>
</replicated-cache>
<replicated-cache name="schema" mode="SYNC">
<transaction mode="NON_XA"/>
</replicated-cache>
</cache-container>
</xsl:template>

Expand Down
Expand Up @@ -67,6 +67,9 @@
<replicated-cache name="publish" mode="ASYNC">
<transaction mode="BATCH"/>
</replicated-cache>
<replicated-cache name="schema" mode="SYNC">
<transaction mode="NON_XA"/>
</replicated-cache>
</cache-container>
</xsl:template>

Expand Down
Expand Up @@ -42,6 +42,7 @@
<local-cache name="triggers"/>
<local-cache name="data"/>
<local-cache name="publish"/>
<local-cache name="schema"/>
</cache-container>
</xsl:template>

Expand Down
Expand Up @@ -40,6 +40,7 @@
<local-cache name="triggers"/>
<local-cache name="data"/>
<local-cache name="publish"/>
<local-cache name="schema"/>
</cache-container>
</xsl:template>

Expand Down

0 comments on commit d27f962

Please sign in to comment.