Permalink
Browse files

Astyanax testing and implementation tweaks

* Added "Internal" flavors of the existing "ExternalAstyanax..." test
  classes.  These start a Cassandra instance in the JVM, like the
  "InternalCassandra..." test classes.  These pass on my machine, but
  only from maven.  Eclipse mysteriously stops printing any console
  output midway through test execution and later produces a bizarre
  Astyanax connection failure.  The InternalCassandra* tests do pass.
  It's just the InternalAstyanax* tests failing.  So, there may be
  some gross oversight in the way I'm setting the Astyanax environment
  up that Eclipse exposes.  I'm not sure.

* Removed the AstyanaxStorageManager#clearKeyspaces() hack that I
  wrote in earlier commits.  There's no longer any static concurrent
  hash map for sharing Astyanax contexts among storage managers, with
  the upshot that this method is unnecessary.
  • Loading branch information...
1 parent 521e116 commit 1fb7205d25cadc109c8dced23ae67f059750d7bf @dalaro dalaro committed with mbroecheler Jun 24, 2012
@@ -2,9 +2,10 @@
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Cluster;
@@ -46,13 +47,10 @@
public static final String CLUSTER_DEFAULT = "Test Cluster";
public static final String CLUSTER_KEY = "cluster";
- private static final ConcurrentHashMap<String, AstyanaxContext<Keyspace>> keyspaces =
- new ConcurrentHashMap<String, AstyanaxContext<Keyspace>>();
-
- private static final ConcurrentHashMap<String, AstyanaxContext<Cluster>> clusters =
- new ConcurrentHashMap<String, AstyanaxContext<Cluster>>();
-
- private final AstyanaxContext<Keyspace> ks;
+ private final AstyanaxContext<Keyspace> ksctx;
+ private final AstyanaxContext<Cluster> clctx;
+ private final AstyanaxContext.Builder ctxbuilder;
+
private final String ksName;
private final String clusterName;
@@ -64,14 +62,14 @@
private final String llmPrefix;
+ private static final Logger log = LoggerFactory.getLogger(AstyanaxStorageManager.class);
+
public AstyanaxStorageManager(Configuration config) {
this.clusterName = config.getString(CLUSTER_KEY, CLUSTER_DEFAULT);
this.ksName = config.getString(KEYSPACE_KEY, KEYSPACE_DEFAULT);
- this.ks = getOrCreateKeyspace();
-
this.rid = ConfigHelper.getRid(config);
this.llmPrefix =
@@ -93,8 +91,17 @@ public AstyanaxStorageManager(Configuration config) {
config.getLong(
GraphDatabaseConfiguration.LOCK_EXPIRE_MS,
GraphDatabaseConfiguration.LOCK_EXPIRE_MS_DEFAULT);
+
+ this.ctxbuilder = getContextBuilder(config);
+ this.clctx = getOrCreateCluster();
+
+ ensureKeyspaceExists(clctx.getEntity());
+
+ this.ksctx = getOrCreateKeyspace();
+
idmanager = new OrderedKeyColumnValueIDManager(
+
openDatabase("titan_ids", null), rid, config);
}
@@ -106,6 +113,9 @@ public AstyanaxStorageManager(Configuration config) {
@Override
public OrderedKeyColumnValueStore openDatabase(String name)
throws GraphStorageException {
+
+ getOrCreateKeyspace();
+
AstyanaxOrderedKeyColumnValueStore lockStore =
openDatabase(name + "_locks", null);
LocalLockMediator llm = LocalLockMediators.INSTANCE.get(llmPrefix + ":" + ksName + ":" + name);
@@ -116,18 +126,14 @@ public OrderedKeyColumnValueStore openDatabase(String name)
@Override
public TransactionHandle beginTransaction() {
return new AstyanaxTransaction();
+
}
@Override
public void close() {
- // TODO Auto-generated method stub
-
- }
-
-
- // TODO remove
- public static void clearKeyspaces() {
- keyspaces.clear();
+ // Shutdown the Astyanax contexts
+ ksctx.shutdown();
+ clctx.shutdown();
}
private SimpleLockConfig.Builder makeLockConfigBuilder(AstyanaxOrderedKeyColumnValueStore lockStore, LocalLockMediator llm) {
@@ -144,28 +150,25 @@ private AstyanaxOrderedKeyColumnValueStore openDatabase(String name, SimpleLockC
ensureColumnFamilyExists(name);
- return new AstyanaxOrderedKeyColumnValueStore(ks.getEntity(), name, lcb);
+ return new AstyanaxOrderedKeyColumnValueStore(ksctx.getEntity(), name, lcb);
}
private void ensureColumnFamilyExists(String name) {
-
- Cluster cl = clusters.get(clusterName).getEntity();
-
+ Cluster cl = clctx.getEntity();
try {
KeyspaceDefinition ksDef = cl.describeKeyspace(ksName);
-
boolean found = false;
-
if (null != ksDef) {
for (ColumnFamilyDefinition cfDef : ksDef.getColumnFamilyList()) {
found |= cfDef.getName().equals(name);
}
}
-
if (!found) {
- ColumnFamilyDefinition cfDef = cl.makeColumnFamilyDefinition()
- .setName(name).setKeyspace(ksName).setComparatorType("org.apache.cassandra.db.marshal.BytesType");
-
+ ColumnFamilyDefinition cfDef =
+ cl.makeColumnFamilyDefinition()
+ .setName(name)
+ .setKeyspace(ksName)
+ .setComparatorType("org.apache.cassandra.db.marshal.BytesType");
cl.addColumnFamily(cfDef);
}
} catch (ConnectionException e) {
@@ -174,11 +177,32 @@ private void ensureColumnFamilyExists(String name) {
}
private AstyanaxContext<Keyspace> getOrCreateKeyspace() {
- AstyanaxContext<Keyspace> ks = keyspaces.get(ksName);
+ AstyanaxContext<Keyspace> ksctx =
+ ctxbuilder.buildKeyspace(ThriftFamilyFactory.getInstance());
+ ksctx.start();
- if (null != ks)
- return ks;
-
+ return ksctx;
+ }
+
+// private AstyanaxContext<Cluster> getOrCreateCluster(String clusterName, AstyanaxContext.Builder builder) {
+// AstyanaxContext<Cluster> ctx =
+// builder.buildCluster(ThriftFamilyFactory.getInstance());
+// ctx.start();
+// if (null != clusters.putIfAbsent(clusterName, ctx)) {
+// ctx.shutdown();
+// }
+// return clusters.get(clusterName);
+// }
+
+ private AstyanaxContext<Cluster> getOrCreateCluster() {
+ AstyanaxContext<Cluster> clusterCtx =
+ ctxbuilder.buildCluster(ThriftFamilyFactory.getInstance());
+ clusterCtx.start();
+
+ return clusterCtx;
+ }
+
+ private AstyanaxContext.Builder getContextBuilder(Configuration config) {
// TODO actual configuration
AstyanaxContext.Builder builder =
new AstyanaxContext.Builder()
@@ -193,39 +217,38 @@ private void ensureColumnFamilyExists(String name) {
.setSeeds("127.0.0.1:9160"))
.withConnectionPoolMonitor(new CountingConnectionPoolMonitor());
- ks = builder.buildKeyspace(ThriftFamilyFactory.getInstance());
-
- ks.start();
+ return builder;
+ }
+
+ private void ensureKeyspaceExists(Cluster cl) {
+ KeyspaceDefinition ksDef;
- if (null != keyspaces.putIfAbsent(ksName, ks)) {
- ks.shutdown();
- } else {
- AstyanaxContext<Cluster> clusterCtx = builder.buildCluster(ThriftFamilyFactory.getInstance());
-
- clusterCtx.start();
-
- clusters.putIfAbsent(clusterName, clusterCtx);
-
- clusterCtx = clusters.get(clusterName);
-
- Cluster cl = clusterCtx.getEntity();
-
- Map<String, String> stratops = new HashMap<String, String>();
- stratops.put("replication_factor", "1"); //TODO
-
- KeyspaceDefinition ksDef = cl.makeKeyspaceDefinition()
- .setName(ksName)
- .setStrategyClass("org.apache.cassandra.locator.SimpleStrategy")
- .setStrategyOptions(stratops);
+ try {
+ ksDef = cl.describeKeyspace(ksName);
- try {
- cl.addKeyspace(ksDef);
- } catch (ConnectionException e) {
- throw new GraphStorageException(e);
+ if (null != ksDef && ksDef.getName().equals(ksName)) {
+ log.debug("Found keyspace {}", ksName);
+ return;
}
+ } catch (ConnectionException e) {
+ log.debug("Failed to describe keyspace {}", ksName);
}
- return keyspaces.get(ksName);
+ log.debug("Creating keyspace {}...", ksName);
+ try {
+ Map<String, String> stratops = new HashMap<String, String>();
+ stratops.put("replication_factor", "1"); //TODO config
+ ksDef = cl.makeKeyspaceDefinition()
+ .setName(ksName)
+ .setStrategyClass("org.apache.cassandra.locator.SimpleStrategy")
+ .setStrategyOptions(stratops);
+ cl.addKeyspace(ksDef);
+
+ log.debug("Created keyspace {}", ksName);
+ } catch (ConnectionException e) {
+ log.debug("Failed to create keyspace {}, ksName");
+ throw new GraphStorageException(e);
+ }
}
}
@@ -1,53 +1,19 @@
package com.thinkaurelius.titan.diskstorage.astyanax;
-import org.junit.BeforeClass;
-
-import com.netflix.astyanax.AstyanaxContext;
-import com.netflix.astyanax.Cluster;
-import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
-import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
-import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
-import com.netflix.astyanax.thrift.ThriftFamilyFactory;
import com.thinkaurelius.titan.StorageSetup;
import com.thinkaurelius.titan.diskstorage.KeyColumnValueStoreTest;
import com.thinkaurelius.titan.diskstorage.StorageManager;
+import com.thinkaurelius.titan.testutil.CassandraUtil;
public class ExternalAstyanaxKeyColumnValueTest extends KeyColumnValueStoreTest {
- private static Cluster cluster;
-
- @BeforeClass
- public static void connectToClusterForCleanup() {
- AstyanaxContext<Cluster> ctx = new AstyanaxContext.Builder()
- .forCluster(AstyanaxStorageManager.CLUSTER_DEFAULT)
- .withAstyanaxConfiguration(
- new AstyanaxConfigurationImpl()
- .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE))
- .withConnectionPoolConfiguration(
- new ConnectionPoolConfigurationImpl("MyConnectionPool")
- .setPort(9160).setMaxConnsPerHost(16)
- .setSeeds("localhost"))
- .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
- .buildCluster(ThriftFamilyFactory.getInstance());
-
- cluster = ctx.getEntity();
- ctx.start();
- }
-
@Override
public StorageManager openStorageManager() {
return new AstyanaxStorageManager(StorageSetup.getLocalStorageConfiguration());
}
@Override
public void cleanUp() {
- try {
- cluster.dropKeyspace(AstyanaxStorageManager.KEYSPACE_DEFAULT);
- AstyanaxStorageManager.clearKeyspaces();
- } catch (ConnectionException e) {
-// throw new RuntimeException(e);
- }
+ CassandraUtil.dropKeyspace(AstyanaxStorageManager.KEYSPACE_DEFAULT);
}
}
@@ -1,44 +1,16 @@
package com.thinkaurelius.titan.diskstorage.astyanax;
import org.apache.commons.configuration.Configuration;
-import org.junit.BeforeClass;
-import com.netflix.astyanax.AstyanaxContext;
-import com.netflix.astyanax.Cluster;
-import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
-import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
-import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
-import com.netflix.astyanax.thrift.ThriftFamilyFactory;
import com.thinkaurelius.titan.diskstorage.LockKeyColumnValueStoreTest;
import com.thinkaurelius.titan.diskstorage.StorageManager;
import com.thinkaurelius.titan.diskstorage.cassandra.CassandraLocalhostHelper;
import com.thinkaurelius.titan.diskstorage.cassandra.CassandraThriftStorageManager;
import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
+import com.thinkaurelius.titan.testutil.CassandraUtil;
public class ExternalAstyanaxLockKeyColumnValueStoreTest extends LockKeyColumnValueStoreTest {
- private static Cluster cluster;
-
- @BeforeClass
- public static void connectToClusterForCleanup() {
- AstyanaxContext<Cluster> ctx = new AstyanaxContext.Builder()
- .forCluster(AstyanaxStorageManager.CLUSTER_DEFAULT)
- .withAstyanaxConfiguration(
- new AstyanaxConfigurationImpl()
- .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE))
- .withConnectionPoolConfiguration(
- new ConnectionPoolConfigurationImpl("MyConnectionPool")
- .setPort(9160).setMaxConnsPerHost(16)
- .setSeeds("localhost"))
- .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
- .buildCluster(ThriftFamilyFactory.getInstance());
-
- cluster = ctx.getEntity();
- ctx.start();
- }
-
@Override
public StorageManager openStorageManager(short idx) {
Configuration sc = CassandraLocalhostHelper.getLocalStorageConfiguration();
@@ -50,11 +22,6 @@ public StorageManager openStorageManager(short idx) {
@Override
public void cleanUp() {
- try {
- cluster.dropKeyspace(AstyanaxStorageManager.KEYSPACE_DEFAULT);
- AstyanaxStorageManager.clearKeyspaces();
- } catch (ConnectionException e) {
-// throw new RuntimeException(e);
- }
+ CassandraUtil.dropKeyspace(AstyanaxStorageManager.KEYSPACE_DEFAULT);
}
}
Oops, something went wrong. Retry.

0 comments on commit 1fb7205

Please sign in to comment.