title | description | author | ms.author | ms.reviewer | ms.service | ms.subservice | ms.topic | ms.date |
---|---|---|---|---|---|---|---|---|
DDL operations in Azure Cosmos DB Cassandra API from Spark |
This article details keyspace and table DDL operations against Azure Cosmos DB Cassandra API from Spark. |
TheovanKraay |
thvankra |
sngun |
cosmos-db |
cosmosdb-cassandra |
how-to |
10/07/2020 |
This article details keyspace and table DDL operations against Azure Cosmos DB Cassandra API from Spark.
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra
//Connection-related
spark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com")
spark.conf.set("spark.cassandra.connection.port","10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled","true")
spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")
spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")
spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
//Throughput-related...adjust as needed
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))
Run the following command in cqlsh and you should see the keyspace you created earlier.
DESCRIBE keyspaces;
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))
DESCRIBE keyspaces;
Considerations:
- Throughput can be assigned at the table level by using the create table statement.
- One partition key can store 20 GB of data.
- One record can store a maximum of 2 MB of data.
- One partition key range can store multiple partition keys.
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks1.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
Run the following command in cqlsh and you should see the table named “books:
USE books_ks;
DESCRIBE books;
Provisioned throughput and default TTL values are not shown in the output of the previous command, you can get these values from the portal.
You can alter the following values by using the alter table command:
- provisioned throughput
- time-to-live value
Column changes are currently not supported.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
Run the following command in cqlsh and you should see that the “books” table is no longer available:
USE books_ks;
DESCRIBE tables;
After creating the keyspace and the table, proceed to the following articles for CRUD operations and more: