Skip to content

Commit

Permalink
Since this is a feature, it has been moved to the titan0.1 branch.
Browse files Browse the repository at this point in the history
This reverts commit 0d8b54a.
  • Loading branch information
mbroecheler committed Jun 25, 2012
1 parent 11c0b78 commit 8b4caae
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 272 deletions.
Expand Up @@ -2,10 +2,9 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Cluster;
Expand Down Expand Up @@ -47,10 +46,13 @@ public class AstyanaxStorageManager implements StorageManager {
public static final String CLUSTER_DEFAULT = "Test Cluster";
public static final String CLUSTER_KEY = "cluster";

private final AstyanaxContext<Keyspace> ksctx;
private final AstyanaxContext<Cluster> clctx;
private final AstyanaxContext.Builder ctxbuilder;

private static final ConcurrentHashMap<String, AstyanaxContext<Keyspace>> keyspaces =
new ConcurrentHashMap<String, AstyanaxContext<Keyspace>>();

private static final ConcurrentHashMap<String, AstyanaxContext<Cluster>> clusters =
new ConcurrentHashMap<String, AstyanaxContext<Cluster>>();

private final AstyanaxContext<Keyspace> ks;
private final String ksName;
private final String clusterName;

Expand All @@ -62,14 +64,14 @@ public class AstyanaxStorageManager implements StorageManager {

private final String llmPrefix;

private static final Logger log = LoggerFactory.getLogger(AstyanaxStorageManager.class);

public AstyanaxStorageManager(Configuration config) {

this.clusterName = config.getString(CLUSTER_KEY, CLUSTER_DEFAULT);

this.ksName = config.getString(KEYSPACE_KEY, KEYSPACE_DEFAULT);

this.ks = getOrCreateKeyspace();

this.rid = ConfigHelper.getRid(config);

this.llmPrefix =
Expand All @@ -91,17 +93,8 @@ public AstyanaxStorageManager(Configuration config) {
config.getLong(
GraphDatabaseConfiguration.LOCK_EXPIRE_MS,
GraphDatabaseConfiguration.LOCK_EXPIRE_MS_DEFAULT);

this.ctxbuilder = getContextBuilder(config);

this.clctx = getOrCreateCluster();

ensureKeyspaceExists(clctx.getEntity());

this.ksctx = getOrCreateKeyspace();

idmanager = new OrderedKeyColumnValueIDManager(

openDatabase("titan_ids", null), rid, config);
}

Expand All @@ -113,9 +106,6 @@ public long[] getIDBlock(int partition) {
@Override
public OrderedKeyColumnValueStore openDatabase(String name)
throws GraphStorageException {

getOrCreateKeyspace();

AstyanaxOrderedKeyColumnValueStore lockStore =
openDatabase(name + "_locks", null);
LocalLockMediator llm = LocalLockMediators.INSTANCE.get(llmPrefix + ":" + ksName + ":" + name);
Expand All @@ -126,14 +116,18 @@ public OrderedKeyColumnValueStore openDatabase(String name)
@Override
public TransactionHandle beginTransaction() {
return new AstyanaxTransaction();

}

@Override
public void close() {
// Shutdown the Astyanax contexts
ksctx.shutdown();
clctx.shutdown();
// TODO Auto-generated method stub

}


// TODO remove
public static void clearKeyspaces() {
keyspaces.clear();
}

private SimpleLockConfig.Builder makeLockConfigBuilder(AstyanaxOrderedKeyColumnValueStore lockStore, LocalLockMediator llm) {
Expand All @@ -150,25 +144,28 @@ private AstyanaxOrderedKeyColumnValueStore openDatabase(String name, SimpleLockC

ensureColumnFamilyExists(name);

return new AstyanaxOrderedKeyColumnValueStore(ksctx.getEntity(), name, lcb);
return new AstyanaxOrderedKeyColumnValueStore(ks.getEntity(), name, lcb);
}

private void ensureColumnFamilyExists(String name) {
Cluster cl = clctx.getEntity();

Cluster cl = clusters.get(clusterName).getEntity();

try {
KeyspaceDefinition ksDef = cl.describeKeyspace(ksName);

boolean found = false;

if (null != ksDef) {
for (ColumnFamilyDefinition cfDef : ksDef.getColumnFamilyList()) {
found |= cfDef.getName().equals(name);
}
}

if (!found) {
ColumnFamilyDefinition cfDef =
cl.makeColumnFamilyDefinition()
.setName(name)
.setKeyspace(ksName)
.setComparatorType("org.apache.cassandra.db.marshal.BytesType");
ColumnFamilyDefinition cfDef = cl.makeColumnFamilyDefinition()
.setName(name).setKeyspace(ksName).setComparatorType("org.apache.cassandra.db.marshal.BytesType");

cl.addColumnFamily(cfDef);
}
} catch (ConnectionException e) {
Expand All @@ -177,32 +174,11 @@ private void ensureColumnFamilyExists(String name) {
}

private AstyanaxContext<Keyspace> getOrCreateKeyspace() {
AstyanaxContext<Keyspace> ksctx =
ctxbuilder.buildKeyspace(ThriftFamilyFactory.getInstance());
ksctx.start();

return ksctx;
}

// private AstyanaxContext<Cluster> getOrCreateCluster(String clusterName, AstyanaxContext.Builder builder) {
// AstyanaxContext<Cluster> ctx =
// builder.buildCluster(ThriftFamilyFactory.getInstance());
// ctx.start();
// if (null != clusters.putIfAbsent(clusterName, ctx)) {
// ctx.shutdown();
// }
// return clusters.get(clusterName);
// }

private AstyanaxContext<Cluster> getOrCreateCluster() {
AstyanaxContext<Cluster> clusterCtx =
ctxbuilder.buildCluster(ThriftFamilyFactory.getInstance());
clusterCtx.start();
AstyanaxContext<Keyspace> ks = keyspaces.get(ksName);

return clusterCtx;
}

private AstyanaxContext.Builder getContextBuilder(Configuration config) {
if (null != ks)
return ks;

// TODO actual configuration
AstyanaxContext.Builder builder =
new AstyanaxContext.Builder()
Expand All @@ -217,38 +193,39 @@ private AstyanaxContext.Builder getContextBuilder(Configuration config) {
.setSeeds("127.0.0.1:9160"))
.withConnectionPoolMonitor(new CountingConnectionPoolMonitor());

return builder;
}

private void ensureKeyspaceExists(Cluster cl) {
KeyspaceDefinition ksDef;
ks = builder.buildKeyspace(ThriftFamilyFactory.getInstance());

ks.start();

try {
ksDef = cl.describeKeyspace(ksName);
if (null != keyspaces.putIfAbsent(ksName, ks)) {
ks.shutdown();
} else {
AstyanaxContext<Cluster> clusterCtx = builder.buildCluster(ThriftFamilyFactory.getInstance());

clusterCtx.start();

clusters.putIfAbsent(clusterName, clusterCtx);

clusterCtx = clusters.get(clusterName);

Cluster cl = clusterCtx.getEntity();

if (null != ksDef && ksDef.getName().equals(ksName)) {
log.debug("Found keyspace {}", ksName);
return;
}
} catch (ConnectionException e) {
log.debug("Failed to describe keyspace {}", ksName);
}

log.debug("Creating keyspace {}...", ksName);
try {
Map<String, String> stratops = new HashMap<String, String>();
stratops.put("replication_factor", "1"); //TODO config
ksDef = cl.makeKeyspaceDefinition()
.setName(ksName)
.setStrategyClass("org.apache.cassandra.locator.SimpleStrategy")
.setStrategyOptions(stratops);
cl.addKeyspace(ksDef);
stratops.put("replication_factor", "1"); //TODO

log.debug("Created keyspace {}", ksName);
} catch (ConnectionException e) {
log.debug("Failed to create keyspace {}, ksName");
throw new GraphStorageException(e);
KeyspaceDefinition ksDef = cl.makeKeyspaceDefinition()
.setName(ksName)
.setStrategyClass("org.apache.cassandra.locator.SimpleStrategy")
.setStrategyOptions(stratops);

try {
cl.addKeyspace(ksDef);
} catch (ConnectionException e) {
throw new GraphStorageException(e);
}
}

return keyspaces.get(ksName);
}
}

Expand Down
@@ -1,19 +1,53 @@
package com.thinkaurelius.titan.diskstorage.astyanax;

import org.junit.BeforeClass;

import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Cluster;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
import com.thinkaurelius.titan.StorageSetup;
import com.thinkaurelius.titan.diskstorage.KeyColumnValueStoreTest;
import com.thinkaurelius.titan.diskstorage.StorageManager;
import com.thinkaurelius.titan.testutil.CassandraUtil;

public class ExternalAstyanaxKeyColumnValueTest extends KeyColumnValueStoreTest {

private static Cluster cluster;

@BeforeClass
public static void connectToClusterForCleanup() {
AstyanaxContext<Cluster> ctx = new AstyanaxContext.Builder()
.forCluster(AstyanaxStorageManager.CLUSTER_DEFAULT)
.withAstyanaxConfiguration(
new AstyanaxConfigurationImpl()
.setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE))
.withConnectionPoolConfiguration(
new ConnectionPoolConfigurationImpl("MyConnectionPool")
.setPort(9160).setMaxConnsPerHost(16)
.setSeeds("localhost"))
.withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
.buildCluster(ThriftFamilyFactory.getInstance());

cluster = ctx.getEntity();
ctx.start();
}

@Override
public StorageManager openStorageManager() {
return new AstyanaxStorageManager(StorageSetup.getLocalStorageConfiguration());
}

@Override
public void cleanUp() {
CassandraUtil.dropKeyspace(AstyanaxStorageManager.KEYSPACE_DEFAULT);
try {
cluster.dropKeyspace(AstyanaxStorageManager.KEYSPACE_DEFAULT);
AstyanaxStorageManager.clearKeyspaces();
} catch (ConnectionException e) {
// throw new RuntimeException(e);
}
}
}
@@ -1,16 +1,44 @@
package com.thinkaurelius.titan.diskstorage.astyanax;

import org.apache.commons.configuration.Configuration;
import org.junit.BeforeClass;

import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Cluster;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
import com.thinkaurelius.titan.diskstorage.LockKeyColumnValueStoreTest;
import com.thinkaurelius.titan.diskstorage.StorageManager;
import com.thinkaurelius.titan.diskstorage.cassandra.CassandraLocalhostHelper;
import com.thinkaurelius.titan.diskstorage.cassandra.CassandraThriftStorageManager;
import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
import com.thinkaurelius.titan.testutil.CassandraUtil;

public class ExternalAstyanaxLockKeyColumnValueStoreTest extends LockKeyColumnValueStoreTest {

private static Cluster cluster;

@BeforeClass
public static void connectToClusterForCleanup() {
AstyanaxContext<Cluster> ctx = new AstyanaxContext.Builder()
.forCluster(AstyanaxStorageManager.CLUSTER_DEFAULT)
.withAstyanaxConfiguration(
new AstyanaxConfigurationImpl()
.setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE))
.withConnectionPoolConfiguration(
new ConnectionPoolConfigurationImpl("MyConnectionPool")
.setPort(9160).setMaxConnsPerHost(16)
.setSeeds("localhost"))
.withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
.buildCluster(ThriftFamilyFactory.getInstance());

cluster = ctx.getEntity();
ctx.start();
}

@Override
public StorageManager openStorageManager(short idx) {
Configuration sc = CassandraLocalhostHelper.getLocalStorageConfiguration();
Expand All @@ -22,6 +50,11 @@ public StorageManager openStorageManager(short idx) {

@Override
public void cleanUp() {
CassandraUtil.dropKeyspace(AstyanaxStorageManager.KEYSPACE_DEFAULT);
try {
cluster.dropKeyspace(AstyanaxStorageManager.KEYSPACE_DEFAULT);
AstyanaxStorageManager.clearKeyspaces();
} catch (ConnectionException e) {
// throw new RuntimeException(e);
}
}
}

0 comments on commit 8b4caae

Please sign in to comment.