Permalink
Browse files

Merge pull request #489 from opuneet/issue382

adding createKeyspaceIfNotExists()
  • Loading branch information...
2 parents 1cc21b2 + 6083901 commit b9ab12712607734f8ff0d4ac692c04da01c571c7 @opuneet opuneet committed Feb 25, 2014
@@ -268,6 +268,14 @@ SerializerPackage getSerializerPackage(String cfName, boolean ignoreErrors) thro
OperationResult<SchemaChangeResult> createKeyspace(Map<String, Object> options) throws ConnectionException ;
/**
+ * Create the keyspace in cassandra if it does not exist. This call will only create the keyspace and not
+ * any column families. Once the keyspace has been created then call createColumnFamily
+ * for each CF you want to create.
+ * @param options - For list of options see http://www.datastax.com/docs/1.0/configuration/storage_configuration
+ */
+ OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(Map<String, Object> options) throws ConnectionException ;
+
+ /**
* Create the keyspace in cassandra. This call will create the keyspace and any column families.
* @param properties
* @return
@@ -276,6 +284,14 @@ SerializerPackage getSerializerPackage(String cfName, boolean ignoreErrors) thro
OperationResult<SchemaChangeResult> createKeyspace(Properties properties) throws ConnectionException;
/**
+ * Create a keyspace if it does not exist.
+ * @param properties
+ * @return
+ * @throws ConnectionException
+ */
+ OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(Properties properties) throws ConnectionException;
+
+ /**
* Bulk create for a keyspace and a bunch of column famlies
* @param options
* @param cfs
@@ -284,6 +300,14 @@ SerializerPackage getSerializerPackage(String cfName, boolean ignoreErrors) thro
OperationResult<SchemaChangeResult> createKeyspace(Map<String, Object> options, Map<ColumnFamily, Map<String, Object>> cfs) throws ConnectionException ;
/**
+ * Bulk create for a keyspace if it does not exist and a bunch of column famlies
+ * @param options
+ * @param cfs
+ * @throws ConnectionException
+ */
+ OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(Map<String, Object> options, Map<ColumnFamily, Map<String, Object>> cfs) throws ConnectionException ;
+
+ /**
* Update the keyspace in cassandra.
* @param options - For list of options see http://www.datastax.com/docs/1.0/configuration/storage_configuration
*/
@@ -276,4 +276,27 @@ public Properties getColumnFamilyProperties(String columnFamily)
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(
+ Map<String, Object> options) throws ConnectionException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(
+ Properties properties) throws ConnectionException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(
+ Map<String, Object> options,
+ Map<ColumnFamily, Map<String, Object>> cfs)
+ throws ConnectionException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
@@ -20,6 +20,9 @@
private static final String WHY_UNCONFIGURED_COLUMNFAMILY = "unconfigured columnfamily";
+ private static final String KEYSPACE = "Keyspace";
+ private static final String DOES_NOT_EXIST = "does not exist";
+
public BadRequestException(String message) {
super(message);
}
@@ -35,4 +38,9 @@ public BadRequestException(String message, Throwable cause) {
public boolean isUnconfiguredColumnFamilyError() {
return getMessage().contains(WHY_UNCONFIGURED_COLUMNFAMILY);
}
+
+ public boolean isKeyspaceDoestNotExist() {
+ String message = getMessage();
+ return message.contains(KEYSPACE) && message.contains(DOES_NOT_EXIST);
+ }
}
@@ -25,31 +25,32 @@
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.Cassandra.Client;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.CounterColumn;
import org.apache.cassandra.thrift.KsDef;
-import org.apache.cassandra.thrift.Cassandra.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.netflix.astyanax.AstyanaxConfiguration;
+import com.netflix.astyanax.CassandraOperationType;
import com.netflix.astyanax.ColumnMutation;
import com.netflix.astyanax.Execution;
-import com.netflix.astyanax.CassandraOperationType;
+import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.KeyspaceTracerFactory;
import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.SerializerPackage;
import com.netflix.astyanax.WriteAheadEntry;
import com.netflix.astyanax.WriteAheadLog;
-import com.netflix.astyanax.SerializerPackage;
-import com.netflix.astyanax.connectionpool.ConnectionPool;
import com.netflix.astyanax.connectionpool.ConnectionContext;
+import com.netflix.astyanax.connectionpool.ConnectionPool;
import com.netflix.astyanax.connectionpool.Host;
import com.netflix.astyanax.connectionpool.Operation;
import com.netflix.astyanax.connectionpool.OperationResult;
@@ -60,21 +61,22 @@
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.connectionpool.exceptions.OperationException;
import com.netflix.astyanax.connectionpool.exceptions.SchemaDisagreementException;
+import com.netflix.astyanax.connectionpool.impl.OperationResultImpl;
import com.netflix.astyanax.connectionpool.impl.TokenRangeImpl;
import com.netflix.astyanax.cql.CqlStatement;
import com.netflix.astyanax.ddl.ColumnFamilyDefinition;
import com.netflix.astyanax.ddl.KeyspaceDefinition;
import com.netflix.astyanax.ddl.SchemaChangeResult;
import com.netflix.astyanax.ddl.impl.SchemaChangeResponseImpl;
-import com.netflix.astyanax.model.*;
-import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.partitioner.Partitioner;
import com.netflix.astyanax.query.ColumnFamilyQuery;
import com.netflix.astyanax.retry.RetryPolicy;
import com.netflix.astyanax.retry.RunOnce;
import com.netflix.astyanax.serializers.SerializerPackageImpl;
import com.netflix.astyanax.serializers.UnknownComparatorException;
-import com.netflix.astyanax.thrift.ddl.*;
+import com.netflix.astyanax.thrift.ddl.ThriftColumnFamilyDefinitionImpl;
+import com.netflix.astyanax.thrift.ddl.ThriftKeyspaceDefinitionImpl;
/**
*
@@ -253,14 +255,18 @@ public Object call() throws Exception {
@Override
public KeyspaceDefinition describeKeyspace() throws ConnectionException {
+ return internalDescribeKeyspace().getResult();
+ }
+
+ public OperationResult<KeyspaceDefinition> internalDescribeKeyspace() throws ConnectionException {
return executeOperation(
new AbstractKeyspaceOperationImpl<KeyspaceDefinition>(
tracerFactory.newTracer(CassandraOperationType.DESCRIBE_KEYSPACE), getKeyspaceName()) {
@Override
public KeyspaceDefinition internalExecute(Cassandra.Client client, ConnectionContext context) throws Exception {
return new ThriftKeyspaceDefinitionImpl(client.describe_keyspace(getKeyspaceName()));
}
- }, getConfig().getRetryPolicy().duplicate()).getResult();
+ }, getConfig().getRetryPolicy().duplicate());
}
@Override
@@ -553,6 +559,17 @@ public String internalExecute(Client client, ConnectionContext context) throws E
}
@Override
+ public OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(final Map<String, Object> options) throws ConnectionException {
+
+ return createKeyspaceIfNotExists(new Callable<OperationResult<SchemaChangeResult>>() {
+ @Override
+ public OperationResult<SchemaChangeResult> call() throws Exception {
+ return createKeyspace(options);
+ }
+ });
+ }
+
+ @Override
public OperationResult<SchemaChangeResult> createKeyspace(
final Map<String, Object> options,
final Map<ColumnFamily, Map<String, Object>> cfs) throws ConnectionException {
@@ -568,6 +585,19 @@ public String internalExecute(Client client, ConnectionContext context) throws E
}
@Override
+ public OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(
+ final Map<String, Object> options,
+ final Map<ColumnFamily, Map<String, Object>> cfs) throws ConnectionException {
+
+ return createKeyspaceIfNotExists(new Callable<OperationResult<SchemaChangeResult>>() {
+ @Override
+ public OperationResult<SchemaChangeResult> call() throws Exception {
+ return createKeyspace(options, cfs);
+ }
+ });
+ }
+
+ @Override
public OperationResult<SchemaChangeResult> createKeyspace(final Properties props) throws ConnectionException {
if (props.containsKey("name") && !props.get("name").equals(getKeyspaceName())) {
throw new BadRequestException(
@@ -587,6 +617,55 @@ public String internalExecute(Client client, ConnectionContext context) throws E
@Override
+ public OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(final Properties props) throws ConnectionException {
+
+ return createKeyspaceIfNotExists(new Callable<OperationResult<SchemaChangeResult>>() {
+ @Override
+ public OperationResult<SchemaChangeResult> call() throws Exception {
+ return createKeyspace(props);
+ }
+ });
+ }
+
+ private OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(Callable<OperationResult<SchemaChangeResult>> createKeyspace) throws ConnectionException {
+
+ boolean shouldCreateKeyspace = false;
+
+ try {
+
+ OperationResult<KeyspaceDefinition> opResult = this.internalDescribeKeyspace();
+
+ if (opResult != null && opResult.getResult() != null) {
+ return new OperationResultImpl<SchemaChangeResult>(opResult.getHost(),
+ new SchemaChangeResponseImpl().setSchemaId("no-op"),
+ opResult.getLatency());
+
+ } else {
+ shouldCreateKeyspace = true;
+ }
+ } catch (BadRequestException e) {
+ if (e.isKeyspaceDoestNotExist()) {
+ shouldCreateKeyspace = true;
+ } else {
+ throw e;
+ }
+ }
+
+ if (shouldCreateKeyspace) {
+ try {
+ return createKeyspace.call();
+ } catch (ConnectionException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+
+
+ @Override
public <K, C> OperationResult<SchemaChangeResult> createColumnFamily(final ColumnFamily<K, C> columnFamily, final Map<String, Object> options) throws ConnectionException {
final CfDef cfDef = toThriftColumnFamilyDefinition(options, columnFamily).getThriftColumnFamilyDefinition();
return internalCreateColumnFamily(cfDef);
@@ -1909,6 +1909,65 @@ public void testKeyspaceDoesntExist() {
keyspaceContext.shutdown();
}
}
+
+
+ @Test
+ public void testCreateKeyspaceThatAlreadyExists() {
+
+ String keyspaceName = TEST_KEYSPACE_NAME + "_ksAlreadyExists";
+
+ AstyanaxContext<Keyspace> keyspaceContext = new AstyanaxContext.Builder()
+ .forCluster(TEST_CLUSTER_NAME)
+ .forKeyspace(keyspaceName)
+ .withAstyanaxConfiguration(
+ new AstyanaxConfigurationImpl()
+ .setDiscoveryType(NodeDiscoveryType.NONE))
+ .withConnectionPoolConfiguration(
+ new ConnectionPoolConfigurationImpl(keyspaceName)
+ .setMaxConnsPerHost(1).setSeeds(SEEDS))
+ .buildKeyspace(ThriftFamilyFactory.getInstance());
+
+ Keyspace ks = null;
+ try {
+ keyspaceContext.start();
+ ks = keyspaceContext.getClient();
+
+ Properties props = new Properties();
+ props.setProperty("name", keyspaceName);
+ props.setProperty("strategy_class", "SimpleStrategy");
+ props.setProperty("strategy_options.replication_factor", "1");
+
+ try {
+ ks.createKeyspaceIfNotExists(props);
+
+ KeyspaceDefinition ksDef = ks.describeKeyspace();
+ Assert.assertNotNull(ksDef);
+
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+
+
+ // NOW create is again.
+ try {
+ ks.createKeyspaceIfNotExists(props);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ } finally {
+
+ try {
+ if (ks != null) {
+ ks.dropKeyspace();
+ }
+ } catch (Exception e) {
+ LOG.info(e.getMessage());
+ }
+
+ keyspaceContext.shutdown();
+ }
+ }
+
@Test
public void testGetSingleColumnNotExists() {

0 comments on commit b9ab127

Please sign in to comment.