In [None]:
# Set configuration settings
config = {
    "spark.cosmos.accountEndpoint": "https://it-step-cosmos-db2.documents.azure.com:443/",
    "spark.cosmos.accountKey": "",
    "spark.cosmos.database": "cosmicworks",
    "spark.cosmos.container": "products"
}


In [None]:
# Configure Catalog Api    
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])


In [None]:
# Create a database using the Catalog API
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")


DataFrame[]

In [None]:
# Create a products container using the Catalog API
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')")


DataFrame[]

In [None]:
# Create an employees container using the Catalog API
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')")


DataFrame[]

In [None]:
# Create sample data
products = (
    ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
    ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
)

In [None]:
# Ingest sample data
spark.createDataFrame(products) \
    .toDF("id", "category", "name", "quantity", "price", "clearance") \
    .write \
    .format("cosmos.oltp") \
    .options(**config) \
    .mode("APPEND") \
    .save()


In [None]:
# Load data
df = spark.read.format("cosmos.oltp") \
    .options(**config) \
    .option("spark.cosmos.read.inferSchema.enabled", "true") \
    .load()


In [None]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- clearance: boolean (nullable = true)
 |-- price: double (nullable = true)
 |-- id: string (nullable = false)
 |-- category: string (nullable = true)



In [None]:
df.filter(df.clearance == True).show(1)

+--------------------+--------+---------+-----+-----------+--------------------+
|                name|quantity|clearance|price|         id|            category|
+--------------------+--------+---------+-----+-----------+--------------------+
|Kiama Classic Sur...|      25|     true|790.0|68719518371|gear-surf-surfboards|
+--------------------+--------+---------+-----+-----------+--------------------+



In [None]:
# Render results of raw query
rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 700"
rawDF = spark.sql(rawQuery)
rawDF.show()

+--------------------+--------+---------+-----+-----------+--------------------+
|                name|quantity|clearance|price|         id|            category|
+--------------------+--------+---------+-----+-----------+--------------------+
|     Yamba Surfboard|      12|    false|850.0|68719518391|gear-surf-surfboards|
|Kiama Classic Sur...|      25|     true|790.0|68719518371|gear-surf-surfboards|
+--------------------+--------+---------+-----+-----------+--------------------+



In [None]:
# Copy and modify configuration
configPatch = dict(config)
configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
configPatch["spark.cosmos.write.bulk.enabled"] = "false"
configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"


In [None]:
# Specify target item id and partition key
targetItemId = "68719518391"
targetItemPartitionKey = "gear-surf-surfboards"


In [None]:
patchProducts = [{"id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard"}]

In [None]:
spark.createDataFrame(patchProducts) \
    .write \
    .format("cosmos.oltp") \
    .options(**configPatch) \
    .mode("APPEND") \
    .save()


In [None]:
# Create and run query
patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}' "
patchDf = spark.sql(patchQuery)
patchDf.show(5)


+-------------------+--------+---------+-----+-----------+--------------------+
|               name|quantity|clearance|price|         id|            category|
+-------------------+--------+---------+-----+-----------+--------------------+
|Yamba New Surfboard|      12|    false|850.0|68719518391|gear-surf-surfboards|
+-------------------+--------+---------+-----+-----------+--------------------+

