# What's in this exercise
Basics of how to work with Azure Cosmos DB - Cassandra API from Databricks <B>in batch</B>.<BR>
Section 07: Aggregation operations<BR>
  
**NOTE:**<br>
1) Server-side(Cassandra) filtering of non-partition key columns is not yet supported.<BR>
2) Server-side(Cassandra) aggregation operations are not yet supported yet<BR>
The samples below perform the same on the Spark-side<br>

### Prerequisites
The Datastax connector for Cassandra requires the Azure Comsos DB Cassandra API connection details to be initialized as part of the spark context.  When you launch a Jupyter notebook, the spark session and context are already initialized and it is not advisable to stop and reinitialize the Spark context unless with every configuration set as part of the HDInsight default Jupyter notebook start-up.  One workaround is to add the Cassandra instance details to Ambari, Spark2 service configuration directly.  This is a one-time activity that requires a Spark2 service restart.<BR>

1.  Go to Ambari, Spark2 service and click on configs
2.  Then go to custom spark2-defaults and add a new property with the following, and restart Spark2 service:
spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br>
spark.cassandra.connection.port=10350<br>
spark.cassandra.connection.ssl.enabled=true<br>
spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br>
spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>

---------
## 1.0. Cassandra API connection

### 1.0.1. Configure dependencies

In [None]:
%%configure -f
{ "conf": {"spark.jars.packages": "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0" }}

### 1.0.2. Cassandra API configuration

In [None]:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType}
import org.apache.spark.sql.cassandra._

//datastax Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.driver.core.{ConsistencyLevel, DataType}
import com.datastax.spark.connector.writer.WriteConf

//Azure Cosmos DB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra

// Specify connection factory for Cassandra
spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

// Parallelism and throughput configs
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
spark.conf.set("spark.cassandra.output.ignoreNulls","true")

## 2.0. Data generator

In [None]:
//Delete data from prior runs
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("delete from books_ks.books where book_id in ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');"))

//Generate a few rows
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")

//Persist
booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save()

---
## 3.0. Count

### 3.0.1. RDD API

In [None]:
sc.cassandraTable("books_ks", "books").count

In [None]:
//count on server side - NOT SUPPORTED YET
//sc.cassandraTable("books_ks", "books").cassandraCount

### 3.0.2. Dataframe API

Count does not work currently for the dataframe API.<BR>
While we are pending release of count support, the sample below shows how we can execute counts currently using dataframe caching as a workaround-<br>
  
**Options for storage level**<br>
https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#which-storage-level-to-choose<br>
(1) MEMORY_ONLY:	
Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.<br>
(2) MEMORY_AND_DISK:	<br>
Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.<br>
(3) MEMORY_ONLY_SER: Java/Scala<br>
Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.<br>
(4) MEMORY_AND_DISK_SER:  Java/Scala<br>
Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.<br>
(5) DISK_ONLY:	<br>
Store the RDD partitions only on disk.<br>
(6) MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.	<br>
Same as the levels above, but replicate each partition on two cluster nodes.<br>
(7) OFF_HEAP (experimental):<br>
Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.<br>

In [None]:
//Workaround
import org.apache.spark.storage.StorageLevel

//Read from source
val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()

//Explain plan
readBooksDF.explain

In [None]:
//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)

//Subsequent execution against this DF hits the cache 
readBooksDF.count

In [None]:
//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")

In [None]:
%%sql
--select * from books_vw
--select count(*) from books_vw where book_pub_year > 1900
--select count(book_id) from books_vw
select book_author, count(*) as count from books_vw group by book_author


## 4.0. Average

### 4.0.1. RDD API

In [None]:
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

### 4.0.2. Dataframe API

In [None]:
spark.read.cassandraFormat("books", "books_ks", "").load().select("book_price").agg(avg("book_price")).show

### 4.0.3. SQL 

In [None]:
%%sql
select min(book_price) from books_vw

## 5.0. Min

### 5.0.1. RDD API

In [None]:
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

### 5.0.2. Dataframe API

In [None]:
spark.read.cassandraFormat("books", "books_ks", "").load().select("book_id","book_price").agg(min("book_price")).show

### 5.0.3. SQL

In [None]:
%%sql
select min(book_price) from books_vw

## 6.0. Max

### 6.0.1. RDD API

In [None]:
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

### 6.0.2. Dataframe API

In [None]:
spark.read.cassandraFormat("books", "books_ks", "").load().select("book_price").agg(max("book_price")).show

### 6.0.3. SQL

In [None]:
%%sql
select max(book_price) from books_vw

## 7.0. Sum

### 7.0.1. RDD API

In [None]:
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

### 7.0.2. Dataframe API

In [None]:
spark.read.cassandraFormat("books", "books_ks", "").load().select("book_price").agg(sum("book_price")).show

### 7.0.3. SQL

In [None]:
%%sql
select sum(book_price) from books_vw

## 8.0. Top or comparable

### 8.0.1. RDD API

In [None]:
val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

### 8.0.2. Dataframe API

In [None]:
import org.apache.spark.sql.functions._

val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.select("book_name","book_price").orderBy(desc("book_price")).limit(3)

//Explain plan
readBooksDF.explain

In [None]:
//Top 3
readBooksDF.show

### 8.0.3. SQL

In [None]:
%%sql
select book_name,book_price from books_vw order by book_price desc limit 3