Permalink
Browse files

adding createKeyspaceIfNotExists()

  • Loading branch information...
1 parent 1cc21b2 commit 6ff5cecba0c796313e7d33f9a5f71c68e4aa1ee2 @opuneet opuneet committed Feb 25, 2014
@@ -268,6 +268,14 @@ SerializerPackage getSerializerPackage(String cfName, boolean ignoreErrors) thro
OperationResult<SchemaChangeResult> createKeyspace(Map<String, Object> options) throws ConnectionException ;
/**
+ * Create the keyspace in cassandra if it does not exist. This call will only create the keyspace and not
+ * any column families. Once the keyspace has been created then call createColumnFamily
+ * for each CF you want to create.
+ * @param options - For list of options see http://www.datastax.com/docs/1.0/configuration/storage_configuration
+ */
+ OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(Map<String, Object> options) throws ConnectionException ;
+
+ /**
* Create the keyspace in cassandra. This call will create the keyspace and any column families.
* @param properties
* @return
@@ -276,6 +284,14 @@ SerializerPackage getSerializerPackage(String cfName, boolean ignoreErrors) thro
OperationResult<SchemaChangeResult> createKeyspace(Properties properties) throws ConnectionException;
/**
+ * Create a keyspace if it does not exist.
+ * @param properties
+ * @return
+ * @throws ConnectionException
+ */
+ OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(Properties properties) throws ConnectionException;
+
+ /**
* Bulk create for a keyspace and a bunch of column famlies
* @param options
* @param cfs
@@ -284,6 +300,14 @@ SerializerPackage getSerializerPackage(String cfName, boolean ignoreErrors) thro
OperationResult<SchemaChangeResult> createKeyspace(Map<String, Object> options, Map<ColumnFamily, Map<String, Object>> cfs) throws ConnectionException ;
/**
+ * Bulk create for a keyspace if it does not exist and a bunch of column famlies
+ * @param options
+ * @param cfs
+ * @throws ConnectionException
+ */
+ OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(Map<String, Object> options, Map<ColumnFamily, Map<String, Object>> cfs) throws ConnectionException ;
+
+ /**
* Update the keyspace in cassandra.
* @param options - For list of options see http://www.datastax.com/docs/1.0/configuration/storage_configuration
*/
@@ -276,4 +276,27 @@ public Properties getColumnFamilyProperties(String columnFamily)
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(
+ Map<String, Object> options) throws ConnectionException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(
+ Properties properties) throws ConnectionException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(
+ Map<String, Object> options,
+ Map<ColumnFamily, Map<String, Object>> cfs)
+ throws ConnectionException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
@@ -20,6 +20,9 @@
private static final String WHY_UNCONFIGURED_COLUMNFAMILY = "unconfigured columnfamily";
+ private static final String KEYSPACE = "Keyspace";
+ private static final String DOES_NOT_EXIST = "does not exist";
+
public BadRequestException(String message) {
super(message);
}
@@ -35,4 +38,9 @@ public BadRequestException(String message, Throwable cause) {
public boolean isUnconfiguredColumnFamilyError() {
return getMessage().contains(WHY_UNCONFIGURED_COLUMNFAMILY);
}
+
+ public boolean isKeyspaceDoestNotExist() {
+ String message = getMessage();
+ return message.contains(KEYSPACE) && message.contains(DOES_NOT_EXIST);
+ }
}
@@ -25,31 +25,32 @@
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.Cassandra.Client;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.CounterColumn;
import org.apache.cassandra.thrift.KsDef;
-import org.apache.cassandra.thrift.Cassandra.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.netflix.astyanax.AstyanaxConfiguration;
+import com.netflix.astyanax.CassandraOperationType;
import com.netflix.astyanax.ColumnMutation;
import com.netflix.astyanax.Execution;
-import com.netflix.astyanax.CassandraOperationType;
+import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.KeyspaceTracerFactory;
import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.SerializerPackage;
import com.netflix.astyanax.WriteAheadEntry;
import com.netflix.astyanax.WriteAheadLog;
-import com.netflix.astyanax.SerializerPackage;
-import com.netflix.astyanax.connectionpool.ConnectionPool;
import com.netflix.astyanax.connectionpool.ConnectionContext;
+import com.netflix.astyanax.connectionpool.ConnectionPool;
import com.netflix.astyanax.connectionpool.Host;
import com.netflix.astyanax.connectionpool.Operation;
import com.netflix.astyanax.connectionpool.OperationResult;
@@ -60,21 +61,22 @@
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.connectionpool.exceptions.OperationException;
import com.netflix.astyanax.connectionpool.exceptions.SchemaDisagreementException;
+import com.netflix.astyanax.connectionpool.impl.OperationResultImpl;
import com.netflix.astyanax.connectionpool.impl.TokenRangeImpl;
import com.netflix.astyanax.cql.CqlStatement;
import com.netflix.astyanax.ddl.ColumnFamilyDefinition;
import com.netflix.astyanax.ddl.KeyspaceDefinition;
import com.netflix.astyanax.ddl.SchemaChangeResult;
import com.netflix.astyanax.ddl.impl.SchemaChangeResponseImpl;
-import com.netflix.astyanax.model.*;
-import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.partitioner.Partitioner;
import com.netflix.astyanax.query.ColumnFamilyQuery;
import com.netflix.astyanax.retry.RetryPolicy;
import com.netflix.astyanax.retry.RunOnce;
import com.netflix.astyanax.serializers.SerializerPackageImpl;
import com.netflix.astyanax.serializers.UnknownComparatorException;
-import com.netflix.astyanax.thrift.ddl.*;
+import com.netflix.astyanax.thrift.ddl.ThriftColumnFamilyDefinitionImpl;
+import com.netflix.astyanax.thrift.ddl.ThriftKeyspaceDefinitionImpl;
/**
*
@@ -253,14 +255,18 @@ public Object call() throws Exception {
@Override
public KeyspaceDefinition describeKeyspace() throws ConnectionException {
+ return internalDescribeKeyspace().getResult();
+ }
+
+ public OperationResult<KeyspaceDefinition> internalDescribeKeyspace() throws ConnectionException {
return executeOperation(
new AbstractKeyspaceOperationImpl<KeyspaceDefinition>(
tracerFactory.newTracer(CassandraOperationType.DESCRIBE_KEYSPACE), getKeyspaceName()) {
@Override
public KeyspaceDefinition internalExecute(Cassandra.Client client, ConnectionContext context) throws Exception {
return new ThriftKeyspaceDefinitionImpl(client.describe_keyspace(getKeyspaceName()));
}
- }, getConfig().getRetryPolicy().duplicate()).getResult();
+ }, getConfig().getRetryPolicy().duplicate());
}
@Override
@@ -553,6 +559,17 @@ public String internalExecute(Client client, ConnectionContext context) throws E
}
@Override
+ public OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(final Map<String, Object> options) throws ConnectionException {
+
+ return createKeyspaceIfNotExists(new Callable<OperationResult<SchemaChangeResult>>() {
+ @Override
+ public OperationResult<SchemaChangeResult> call() throws Exception {
+ return createKeyspace(options);
+ }
+ });
+ }
+
+ @Override
public OperationResult<SchemaChangeResult> createKeyspace(
final Map<String, Object> options,
final Map<ColumnFamily, Map<String, Object>> cfs) throws ConnectionException {
@@ -568,6 +585,19 @@ public String internalExecute(Client client, ConnectionContext context) throws E
}
@Override
+ public OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(
+ final Map<String, Object> options,
+ final Map<ColumnFamily, Map<String, Object>> cfs) throws ConnectionException {
+
+ return createKeyspaceIfNotExists(new Callable<OperationResult<SchemaChangeResult>>() {
+ @Override
+ public OperationResult<SchemaChangeResult> call() throws Exception {
+ return createKeyspace(options, cfs);
+ }
+ });
+ }
+
+ @Override
public OperationResult<SchemaChangeResult> createKeyspace(final Properties props) throws ConnectionException {
if (props.containsKey("name") && !props.get("name").equals(getKeyspaceName())) {
throw new BadRequestException(
@@ -587,6 +617,55 @@ public String internalExecute(Client client, ConnectionContext context) throws E
@Override
+ public OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(final Properties props) throws ConnectionException {
+
+ return createKeyspaceIfNotExists(new Callable<OperationResult<SchemaChangeResult>>() {
+ @Override
+ public OperationResult<SchemaChangeResult> call() throws Exception {
+ return createKeyspace(props);
+ }
+ });
+ }
+
+ private OperationResult<SchemaChangeResult> createKeyspaceIfNotExists(Callable<OperationResult<SchemaChangeResult>> createKeyspace) throws ConnectionException {
+
+ boolean shouldCreateKeyspace = false;
+
+ try {
+
+ OperationResult<KeyspaceDefinition> opResult = this.internalDescribeKeyspace();
+
+ if (opResult != null && opResult.getResult() != null) {
+ return new OperationResultImpl<SchemaChangeResult>(opResult.getHost(),
+ new SchemaChangeResponseImpl().setSchemaId("no-op"),
+ opResult.getLatency());
+
+ } else {
+ shouldCreateKeyspace = true;
+ }
+ } catch (BadRequestException e) {
+ if (e.isKeyspaceDoestNotExist()) {
+ shouldCreateKeyspace = true;
+ } else {
+ throw e;
+ }
+ }
+
+ if (shouldCreateKeyspace) {
+ try {
+ return createKeyspace.call();
+ } catch (ConnectionException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+
+
+ @Override
public <K, C> OperationResult<SchemaChangeResult> createColumnFamily(final ColumnFamily<K, C> columnFamily, final Map<String, Object> options) throws ConnectionException {
final CfDef cfDef = toThriftColumnFamilyDefinition(options, columnFamily).getThriftColumnFamilyDefinition();
return internalCreateColumnFamily(cfDef);

0 comments on commit 6ff5cec

Please sign in to comment.