In [None]:
# https://pypi.org/project/azure-cosmos/
# https://azuresdkdocs.blob.core.windows.net/$web/python/azure-cosmos/4.0.0/azure.cosmos.html
pip install azure-cosmos

# Using the following test data: 
# https://raw.githubusercontent.com/AzureCosmosDB/labs/master/dotnet/setup/NutritionData.json

In [None]:
#
# 1. Loop and delete
# https://azuresdkdocs.blob.core.windows.net/$web/python/azure-cosmos/4.0.0/azure.cosmos.html#azure.cosmos.ContainerProxy.query_items
#
from azure.cosmos import CosmosClient, exceptions
import json

url = 'https://<account>.documents.azure.com:443/'
key = '<key>'
client = CosmosClient(url, credential=key)
database_name = 'FoodTestDb'
container_name = 'FoodCollection'

database = client.get_database_client(database_name)
container = database.get_container_client(container_name)

# Prepare the query for fetching IDs for a specific partition key
query_foodGroup = 'Pork Products'

items = container.query_items(
    query='SELECT f.id, f.foodGroup FROM FoodCollection f WHERE f.foodGroup = @foodGroup',
    parameters=[
        dict(name='@foodGroup', value=query_foodGroup )
    ],
    # enable_cross_partition_query – Allows sending of more than one request to execute the query in the Azure Cosmos DB service. 
    # More than one request is necessary if the query is not scoped to single partition key value.
    enable_cross_partition_query=False
)
document_ids = [(item['id'], item['foodGroup']) for item in items]

# loop over the fetched ids and remove them
for document_id, partition_key in document_ids:
    try:
        response = container.delete_item(
            item=document_id,
            partition_key=partition_key
        )
        #print(f"Deleted item with ID: {document_id} and Partition Key: {partition_key}.")
        print(f"Deleted item with ID: {document_id} and Partition Key: {partition_key}.")
    except exceptions.CosmosHttpResponseError as e:
        print(f"Failed to delete item {document_id}. Status code: {e.status_code}. Error: {e.message}")
    except Exception as e:
        print(f"An error occurred while deleting item {document_id}: {e}")

In [None]:
#
# 2. Loop and delete using ThreadPoolExecutor to parallelize deletions 
#
from azure.cosmos import CosmosClient, exceptions
import json

url = 'https://<account>.documents.azure.com:443/'
key = '<key>'
client = CosmosClient(url, credential=key)
database_name = 'FoodTestDb'
container_name = 'FoodCollection'

database = client.get_database_client(database_name)
container = database.get_container_client(container_name)

# Prepare the query for fetching IDs for a specific partition key
query_foodGroup = 'Snacks'

items = container.query_items(
    query='SELECT f.id, f.foodGroup FROM FoodCollection f WHERE f.foodGroup = @foodGroup',
    parameters=[
        dict(name='@foodGroup', value=query_foodGroup )
    ],
    enable_cross_partition_query=False
)
document_ids = [(item['id'], item['foodGroup']) for item in items]

# Function to delete a document
def delete_document(doc_id, partition_key):
    try:
        container.delete_item(item=doc_id, partition_key=partition_key)
        return {"id": doc_id, "status": "Deleted"}
    except exceptions.CosmosHttpResponseError as e:
        return {"id": doc_id, "status": f"Failed to delete: {str(e)}"}

# Perform batch deletion
from concurrent.futures import ThreadPoolExecutor, as_completed

# Use ThreadPoolExecutor to parallelize deletions
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(delete_document, doc_id, partition_key_value) for doc_id, partition_key_value in document_ids]
    results = [future.result() for future in as_completed(futures)]

# Check results
print(results)

In [None]:
#
# 3. Loop and tag TTL to 1 second 
#
from azure.cosmos import CosmosClient, exceptions
import json

url = 'https://<account>.documents.azure.com:443/'
key = '<key>'
client = CosmosClient(url, credential=key)
database_name = 'FoodTestDb'
container_name = 'FoodCollection'

database = client.get_database_client(database_name)
container = database.get_container_client(container_name)

# Prepare the query for fetching IDs for a specific partition key
query_foodGroup = 'Baby Foods'

items = container.query_items(
    query='SELECT f.id, f.foodGroup FROM FoodCollection f WHERE f.foodGroup = @foodGroup',
    parameters=[
        dict(name='@foodGroup', value=query_foodGroup )
    ],
    enable_cross_partition_query=False
)
document_ids = [(item['id'], item['foodGroup']) for item in items]

new_ttl = 1  # Set new TTL to 1 second

for doc_id, part_key in document_ids:
    try:
        # Fetch the document
        document = container.read_item(item=doc_id, partition_key=part_key)

        # Modify the TTL in the document
        document["ttl"] = new_ttl

        # Replace the updated document back into the container
        updated_document = container.replace_item(item=doc_id, body=document)
        print(f"Successfully updated TTL for document ID {doc_id} to {new_ttl} seconds.")

    except Exception as e:
        print(f"Error updating document ID {doc_id}: {str(e)}")

**Using the Analytical Store**

Below will require to have Analytical Store configured for the Cosmos DB collection.
* What is it?: https://learn.microsoft.com/en-us/azure/cosmos-db/analytical-store-introduction
* Get started: https://learn.microsoft.com/en-us/azure/cosmos-db/configure-synapse-link

As a result of enabling this feature in test testup, in Synapse a linked service is created with the following name "CosmosDBLSFoodTestDb". 
First step is creating an Spark table to directly being able to query the data (alternative could be loading all the data into a dataframe).

In [None]:
%%sql
create table foodCollection using cosmos.olap options (
    spark.synapse.linkedService 'CosmosDBLSFoodTestDb',
    spark.cosmos.container 'FoodCollection'
)

In [None]:
# The goal here is to keep the latest 100 records for every foodGroup.
# First identify the groups that have more then 100 records. Then use windowing functions to obtain the latest ids.
result_df = spark.sql("""
    WITH RankedFoodGroups AS (
        SELECT
            fd.id,
            fd.foodGroup,
            ROW_NUMBER() OVER (PARTITION BY fd.foodGroup ORDER BY fd.id DESC) AS rank
        FROM
            foodCollection fd
        JOIN (
            SELECT foodGroup
            FROM foodCollection
            GROUP BY foodGroup
            HAVING COUNT(*) > 100
        ) AS filtered_groups ON fd.foodGroup = filtered_groups.foodGroup
    )
    SELECT id, foodGroup
    FROM RankedFoodGroups
    WHERE rank > 100
    """)

# Collect the IDs and foodGroups to a list
ids_foodgroups_to_delete = result_df.collect()

In [None]:

from azure.cosmos import CosmosClient, exceptions
import json

url = 'https://<account>.documents.azure.com:443/'
key = '<key>'
client = CosmosClient(url, credential=key)
database_name = 'FoodTestDb'
container_name = 'FoodCollection'

database = client.get_database_client(database_name)
container = database.get_container_client(container_name)

# Remove the ID's from the operational Cosmos DB data store
for record in ids_foodgroups_to_delete:
    try:
        response = container.delete_item(
            item=record.id,
            partition_key=record.foodGroup
        )
        print(f"Deleted item with ID: {record.id} and Partition Key: {record.foodGroup}.")
    except exceptions.CosmosHttpResponseError as e:
        print(f"Failed to delete item {record.id}. Status code: {e.status_code}. Error: {e.message}")
    except Exception as e:
        print(f"An error occurred while deleting item {record.id}: {e}")
