**Secrets**

The secrets below  like the Cosmos account key are retrieved from a secret scope. If you don't have defined a secret scope for a Cosmos Account you want to use when going through this sample you can find the instructions on how to create one here:
- Here you can [Create a new secret scope](./#secrets/createScope) for the current Databricks workspace
  - See how you can create an [Azure Key Vault backed secret scope](https://docs.microsoft.com/azure/databricks/security/secrets/secret-scopes#--create-an-azure-key-vault-backed-secret-scope) 
  - See how you can create a [Databricks backed secret scope](https://docs.microsoft.com/azure/databricks/security/secrets/secret-scopes#create-a-databricks-backed-secret-scope)
- And here you can find information on how to [add secrets to your Spark configuration](https://docs.microsoft.com/azure/databricks/security/secrets/secrets#read-a-secret)
If you don't want to use secrets at all you can of course also just assign the values in clear-text below - but for obvious reasons we recommend the usage of secrets.

In [None]:
cosmosEndpoint = spark.conf.get("spark.cosmos.accountEndpoint")
cosmosMasterKey = spark.conf.get("spark.cosmos.accountKey")

**Preparation - creating the Cosmos DB container to ingest the data into**

Configure the Catalog API to be used

In [None]:
import uuid
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.views.repositoryPath", "/viewDefinitions" + str(uuid.uuid4()))

And execute the command to create the new container with a throughput of up-to 100,000 RU (Autoscale - so 10,000 - 100,000 RU based on scale) and only system properties (like /id) being indexed. We will also create a second container that will be used to store metadata for the global throughput control

In [None]:
%sql
CREATE DATABASE IF NOT EXISTS cosmosCatalog.SampleDatabase;

CREATE TABLE IF NOT EXISTS cosmosCatalog.SampleDatabase.GreenTaxiRecords
USING cosmos.oltp
TBLPROPERTIES(partitionKeyPath = '/id', autoScaleMaxThroughput = '100000', indexingPolicy = 'OnlySystemProperties');

CREATE TABLE IF NOT EXISTS cosmosCatalog.SampleDatabase.GreenTaxiRecordsCFSink
USING cosmos.oltp
TBLPROPERTIES(partitionKeyPath = '/id', autoScaleMaxThroughput = '100000', indexingPolicy = 'OnlySystemProperties');

/* NOTE: It is important to enable TTL (can be off/-1 by default) on the throughput control container */
CREATE TABLE IF NOT EXISTS cosmosCatalog.SampleDatabase.ThroughputControl
USING cosmos.oltp
OPTIONS(spark.cosmos.database = 'SampleDatabase')
TBLPROPERTIES(partitionKeyPath = '/groupId', autoScaleMaxThroughput = '4000', indexingPolicy = 'AllProperties', defaultTtlInSeconds = '-1');

**Preparation - create a runId (which identifies the logical streaming query and progress/bookmarks/offset are stored for this query) - so when running a query with the same runId it would continue on the offsets persisted**

In [None]:
import uuid

runId = str(uuid.uuid4())
print("Run ID: ", runId)

**Sample - running the streaming query**

Waiting until at least one record is available in the source table

In [None]:
import time

sourceRecordCount = 0
emptyCount = 0
while (sourceRecordCount == 0):
  records_DF = spark.sql('SELECT * FROM cosmosCatalog.SampleDatabase.GreenTaxiRecords LIMIT 1')
  sourceRecordCount = records_DF.count() 
  if (sourceRecordCount == 0):
    emptyCount += 1
    time.sleep(emptyCount * 5)

Running the query for a couple of times - the way it is done (processAllAvailable) is just for debugging purposes - repeating this a couple of times is necessary because ingestion might happen while running this query

In [None]:
import uuid
import datetime
import time
from pyspark.sql.types import LongType

changeFeedCfg = {
  "spark.cosmos.accountEndpoint": cosmosEndpoint,
  "spark.cosmos.accountKey": cosmosMasterKey,
  "spark.cosmos.database": "SampleDatabase",
  "spark.cosmos.container": "GreenTaxiRecords",
  "spark.cosmos.read.partitioning.strategy": "Default",
  "spark.cosmos.read.inferSchema.enabled" : "true",
  "spark.cosmos.read.inferSchema.forceNullableProperties" : "true",
  "spark.cosmos.changeFeed.startFrom" : "Beginning",
  "spark.cosmos.changeFeed.mode" : "Incremental"
  #"spark.cosmos.changeFeed.maxItemCountPerTriggerHint" : "50000"
}

writeCfg = {
  "spark.cosmos.accountEndpoint": cosmosEndpoint,
  "spark.cosmos.accountKey": cosmosMasterKey,
  "spark.cosmos.database": "SampleDatabase",
  "spark.cosmos.container": "GreenTaxiRecordsCFSink",
  "spark.cosmos.write.strategy": "ItemOverwrite",
  "spark.cosmos.write.bulk.enabled": "true",
  "checkpointLocation": "/tmp/" + runId + "/"
}

idleCount = 0
lastProgressJson = "n/a"
while idleCount <= 5:
  print("IdleCount: ", idleCount)

  changeFeedDF = spark \
    .readStream  \
    .format("cosmos.oltp.changeFeed") \
    .options(**changeFeedCfg) \
    .load()
 
  nowUdf= udf(lambda : int(time.time() * 1000),LongType())
  df_withTimestamps = changeFeedDF \
    .withColumnRenamed("_ts","original_ts") \
    .withColumnRenamed("insertedAt","original_insertedAt") \
    .withColumn("copiedAt", nowUdf())

  print("Starting to copy records: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"))

  microBatchQuery = df_withTimestamps \
    .writeStream \
    .format("cosmos.oltp") \
    .queryName(runId) \
    .options(**writeCfg) \
    .outputMode("append") \
    .start()

  microBatchQuery.processAllAvailable()

  if (microBatchQuery.lastProgress["numInputRows"] == 0):
    print("No progress - sleeping for ", str(5 * idleCount), " seconds")
    idleCount += 1
    if (idleCount < 5):
      time.sleep(5 * idleCount)

  microBatchQuery.stop()  

  print("Finished copying records: ", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"))
  
print("No more activity expected - terminating loop")

**Validation - ensuring that the record count in the target container is identical with the one in the source**

In [None]:
%sql
CREATE TABLE cosmosCatalog.SampleDatabase.GreenTaxiRecordsCFSinkView 
  (id STRING)
USING cosmos.oltp
TBLPROPERTIES(isCosmosView = 'True')
OPTIONS (
  spark.cosmos.database = 'SampleDatabase',
  spark.cosmos.container = 'GreenTaxiRecordsCFSink',
  spark.cosmos.read.inferSchema.enabled = 'False',
  spark.cosmos.read.partitioning.strategy = 'Default');

SELECT COUNT(*) FROM cosmosCatalog.SampleDatabase.GreenTaxiRecordsCFSinkView

***NOTE*** the query below is pretty inefficient because Spark 3.1 doesn't allow DataSources to push-down aggregates yet (so count is happening in Spark after retrieving all records). Aggregate push-down is planned for Spark 3.2 - we will also consider allowing to execute custom Cosmos queries which would provide a workaround in these cases for Spark 3.1 soon.

In [None]:
df_source = spark.sql('SELECT * FROM cosmosCatalog.SampleDatabase.GreenTaxiRecords')
df_target = spark.sql('SELECT * FROM cosmosCatalog.SampleDatabase.GreenTaxiRecordsCFSinkView')
assert df_source.count() == df_target.count()

**Cleaning up the view **

In [None]:
%sql
DROP TABLE IF EXISTS cosmosCatalog.SampleDatabase.GreenTaxiRecordsCFSinkView;
SHOW TABLES FROM cosmosCatalog.SampleDatabase