title | titleSufix | description | author | ms.author | ms.reviewer | ms.service | ms.subservice | ms.topic | ms.date | ms.custom |
---|---|---|---|---|---|---|---|---|---|---|
Read Cassandra API table data using Spark |
Azure Cosmos DB |
This article describes how to read data from Cassandra API tables in Azure Cosmos DB. |
TheovanKraay |
thvankra |
sngun |
cosmos-db |
cosmosdb-cassandra |
how-to |
06/02/2020 |
seodec18 |
This article describes how to read data stored in Azure Cosmos DB Cassandra API from Spark.
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra
//Connection-related
spark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com")
spark.conf.set("spark.cassandra.connection.port","10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled","true")
spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")
spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")
spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
//Throughput-related...adjust as needed
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
readBooksDF.explain
readBooksDF.show
val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()
val readBooksDF = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_author", "book_pub_year")
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
You can push down predicates to the database to allow for better optimized Spark queries. A predicate is a condition on a query that returns true or false, typically located in the WHERE clause. A predicate push down filters the data in the database query, reducing the number of entries retrieved from the database and improving query performance. By default the Spark Dataset API will automatically push down valid WHERE clauses to the database.
val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
The PushedFilters section of the physical plan includes the GreaterThan push down filter.
:::image type="content" source="./media/cassandra-spark-read-ops/pushdown-predicates.png" alt-text="partitions":::
val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)
val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)
spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load.createOrReplaceTempView("books_vw")
select * from books_vw where book_pub_year > 1891
The following are additional articles on working with Azure Cosmos DB Cassandra API from Spark: