# Example: Spark over Cosmos DB

A notebook is composed by a sequence of cells. Cells can have text in markdown or code - code cells will be in the default programming language.
Text cells in markdown must start with ```%md```.

This is a first example showing how to execute Spark programs that access Cosmos DB database. The program will compute the number of auctions for each user (**note**: this kind of computation would be better performed using CosmosDB query interface -- we are using it only for demonstration).

We will have serveral versions:
* first version uses the dataframes interface and displays the result in the notebook;
* second version uses the dataframes SQL interface and displays the result in the notebook;
* last version adds writing the results back into CosmosDB.

For using the connection to CosmosDB, you need to install the CosmosDB Spark library in the cluster. The library is available here:
[https://search.maven.org/remotecontent?filepath=com/azure/cosmos/spark/azure-cosmos-spark_3-2_2-12/4.14.1/azure-cosmos-spark_3-2_2-12-4.14.1.jar](https://search.maven.org/remotecontent?filepath=com/azure/cosmos/spark/azure-cosmos-spark_3-2_2-12/4.14.1/azure-cosmos-spark_3-2_2-12-4.14.1.jar).

## Program using DataFrames interface

Environment has a number of variable defined by default:
* **spark** : SparkSession [https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.html#pyspark.sql.SparkSession](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.html#pyspark.sql.SparkSession)
* **sc** : SparkContext [https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.SparkContext.html](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.SparkContext.html)

The following program reads data from auctions container and computes how many auctions are owned by each user.

In [0]:
from pyspark.sql.functions import *

try:
  # Configuration for accessing CosmosDB
  readConfig = {
    "spark.cosmos.accountEndpoint": "https://scc23nmp.documents.azure.com:443/",
    "spark.cosmos.accountKey": "3Stlau3NbZIGs97x5KOW8J9byciE0GxuiN3B3P8ykzivdccUzRhrp6xMUQ4SOHmQBGZWcJqK7WsPDalVZwXQcQ==",
    "spark.cosmos.database": "scc23dbnmp",
    "spark.cosmos.container": "auctions",
      # Getting only the id and owner from the auctions
    "spark.cosmos.read.customQuery": "SELECT a.id, a.owner FROM auctions a"
  }

  # Connect via azure-cosmosdb-spark to create Spark DataFrame
  # Infer schema automaticaly
  auctions = spark.read.format("cosmos.oltp").options(**readConfig) \
                    .option("spark.cosmos.read.inferSchema.enabled", "true") \
                    .load()

  display(auctions)
    # Count the number of auctions for each owner
  result = auctions.groupBy("owner") \
              .count()
  display(result)
except Exception as e:
  print(e)

id,owner
21332,nmp


owner,count
nmp,1


## Program using SQL interface

This program uses SQL interface for executing the same operations as before.

In [0]:
from pyspark.sql.functions import *

try:
  # Configuration for accessing CosmosDB
  readConfig = {
    "spark.cosmos.accountEndpoint": "https://scc23nmp.documents.azure.com:443/",
    "spark.cosmos.accountKey": "3Stlau3NbZIGs97x5KOW8J9byciE0GxuiN3B3P8ykzivdccUzRhrp6xMUQ4SOHmQBGZWcJqK7WsPDalVZwXQcQ==",
    "spark.cosmos.database": "scc23dbnmp",
    "spark.cosmos.container": "auctions",
      # Getting only the id and owner from the auctions
    "spark.cosmos.read.customQuery": "SELECT a.id, a.owner FROM auctions a"
  }

  # Connect via azure-cosmosdb-spark to create Spark DataFrame
  # Infer schema automaticaly
  auctions = spark.read.format("cosmos.oltp").options(**readConfig) \
                    .option("spark.cosmos.read.inferSchema.enabled", "true") \
                    .load()
  # Let's register the dataframe as a view
  auctions.createOrReplaceTempView("auctions")

    # Count the number of auctions for each owner
  result = spark.sql("""SELECT owner, count(*) AS count FROM auctions
                          GROUP BY owner""")
  display(result)
except Exception as e:
  print(e)

owner,count
nmp,1


## Execute computation and dump result

Write result in collection **auctionsFreq**

In [0]:
from pyspark.sql.functions import *

try:
  # Configuration for accessing CosmosDB
  readConfig = {
    "spark.cosmos.accountEndpoint": "https://scc23nmp.documents.azure.com:443/",
    "spark.cosmos.accountKey": "3Stlau3NbZIGs97x5KOW8J9byciE0GxuiN3B3P8ykzivdccUzRhrp6xMUQ4SOHmQBGZWcJqK7WsPDalVZwXQcQ==",
    "spark.cosmos.database": "scc23dbnmp",
    "spark.cosmos.container": "auctions",
      # Getting only the id and owner from the auctions
    "spark.cosmos.read.customQuery": "SELECT a.id, a.owner FROM auctions a"
  }

  # Connect via azure-cosmosdb-spark to create Spark DataFrame
  # Infer schema automaticaly
  auctions = spark.read.format("cosmos.oltp").options(**readConfig) \
                    .option("spark.cosmos.read.inferSchema.enabled", "true") \
                    .load()
  # Let's register the dataframe as a view
  auctions.createOrReplaceTempView("auctions")

    # Count the number of auctions for each owner
  result = spark.sql("""SELECT owner, count(*) AS count FROM auctions
                          GROUP BY owner""")

  # Write configuration
  writeConfig = {
    "spark.cosmos.accountEndpoint": "https://scc23nmp.documents.azure.com:443/",
    "spark.cosmos.accountKey": "3Stlau3NbZIGs97x5KOW8J9byciE0GxuiN3B3P8ykzivdccUzRhrp6xMUQ4SOHmQBGZWcJqK7WsPDalVZwXQcQ==",
    "spark.cosmos.database": "scc23dbnmp",
    "spark.cosmos.container": "auctionsFreq",
  }

  # Write to Cosmos DB from the result DataFrame
  result.write.format("cosmos.oltp").options(**writeConfig) \
              .mode("append") \
              .save()
except Exception as e:
  print(e)