Skip to content

Latest commit

 

History

History
142 lines (109 loc) · 4.75 KB

cassandra-spark-create-ops.md

File metadata and controls

142 lines (109 loc) · 4.75 KB
title description author ms.author ms.reviewer ms.service ms.subservice ms.topic ms.date
Create or insert data into Azure Cosmos DB Cassandra API from Spark
This article details how to insert sample data into Azure Cosmos DB Cassandra API tables
kanshiG
govindk
sngun
cosmos-db
cosmosdb-cassandra
how-to
09/24/2018

Create/Insert data into Azure Cosmos DB Cassandra API from Spark

This article describes how to insert sample data into a table in Azure Cosmos DB Cassandra API from Spark.

Cassandra API configuration

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

//Connection-related
spark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com")
spark.conf.set("spark.cassandra.connection.port","10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled","true")
spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")
spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")
spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
//Throughput-related...adjust as needed
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")

Dataframe API

Create a Dataframe with sample data

// Generate a dataframe containing five records
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
).toDF("book_id", "book_author", "book_name", "book_pub_year")

//Review schema
booksDF.printSchema

//Print
booksDF.show

Note

"Create if not exists" functionality, at a row level, is not yet supported.

Persist to Azure Cosmos DB Cassandra API

When saving data, you can also set time-to-live and consistency policy settings as shown in the following example:

//Persist
booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Note

Column-level TTL is not supported yet.

Validate in cqlsh

use books_ks;
select * from books;

Resilient Distributed Database (RDD) API

Create a RDD with sample data

//Delete records created in the previous section 
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("delete from books_ks.books where book_id in ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999');"))

//Create RDD
val booksRDD = sc.parallelize(Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901)
))

//Review
booksRDD.take(2).foreach(println)

Note

Create if not exists functionality is not yet supported.

Persist to Azure Cosmos DB Cassandra API

When saving data to Cassandra API, you can also set time-to-live and consistency policy settings as shown in the following example:

import com.datastax.spark.connector.writer._

//Persist
booksRDD.saveToCassandra("books_ks", "books", SomeColumns("book_id", "book_author", "book_name", "book_pub_year"),writeConf = WriteConf(ttl = TTLOption.constant(900000),consistencyLevel = ConsistencyLevel.ALL))

Validate in cqlsh

use books_ks;
select * from books;

Next steps

After inserting data into the Azure Cosmos DB Cassandra API table, proceed to the following articles to perform other operations on the data stored in Cosmos DB Cassandra API: