Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delete all items by partition key functionality #29186

Merged
merged 10 commits into from
Apr 10, 2023
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
### 4.4.0b1 (Unreleased)

#### Features Added
- Added **preview** partial document update (Patch API) functionality and container methods for patching items with operations. See [PR 29497](https://github.com/Azure/azure-sdk-for-python/pull/29497). For more information on Patch, please see [Azure Cosmos DB Partial Document Update](https://learn.microsoft.com/azure/cosmos-db/partial-document-update).
- Added **preview** delete all items by partition key functionality. See [PR 29186](https://github.com/Azure/azure-sdk-for-python/pull/29186).
- Added **preview** partial document update (Patch API) functionality and container methods for patching items with operations. See [PR 29497](https://github.com/Azure/azure-sdk-for-python/pull/29497). For more information on Patch, please see [Azure Cosmos DB Partial Document Update](https://learn.microsoft.com/azure/cosmos-db/partial-document-update).

#### Breaking Changes

Expand Down
11 changes: 11 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
headers[http_constants.HttpHeaders.XDate] = datetime.datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT")

if cosmos_client_connection.master_key or cosmos_client_connection.resource_tokens:
resource_type = _internal_resourcetype(resource_type)
authorization = auth._get_authorization_header(
cosmos_client_connection, verb, path, resource_id, IsNameBased(resource_id), resource_type, headers
)
Expand Down Expand Up @@ -737,3 +738,13 @@ def _replace_throughput(throughput: Union[int, ThroughputProperties], new_throug
new_throughput_properties["content"]["offerThroughput"] = throughput
else:
raise TypeError("offer_throughput must be int or an instance of ThroughputProperties")


def _internal_resourcetype(resource_type: str) -> str:
"""Partitionkey is used as the resource type for deleting all items by partition key in
other SDKs, but the colls resource type needs to be sent for the feature to work. In order to keep it consistent
with other SDKs, we switch it here.
"""
if resource_type.lower() == "partitionkey":
return "colls"
return resource_type
Original file line number Diff line number Diff line change
Expand Up @@ -1782,6 +1782,39 @@ def DeleteItem(self, document_link, options=None, **kwargs):
document_id = base.GetResourceIdOrFullNameFromLink(document_link)
return self.DeleteResource(path, "docs", document_id, None, options, **kwargs)

def DeleteAllItemsByPartitionKey(
self,
collection_link,
options=None,
**kwargs
) -> None:
"""Exposes an API to delete all items with a single partition key without the user having
to explicitly call delete on each record in the partition key.

:param str collection_link:
The link to the document collection.
:param dict options:
The request options for the request.

:return:
None
:rtype:
None
"""
if options is None:
options = {}

path = base.GetPathFromLink(collection_link)
#Specified url to perform background operation to delete all items by partition key
path = '{}{}/{}'.format(path, "operations", "partitionkeydelete")
collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
initial_headers = dict(self.default_headers)
headers = base.GetHeaders(self, initial_headers, "post", path, collection_id, "partitionkey", options)
request_params = _request_object.RequestObject("partitionkey", documents._OperationType.Delete)
result, self.last_response_headers = self.__Post(path=path, request_params=request_params,
req_headers=headers, body=None, **kwargs)
return result

def ReplaceTrigger(self, trigger_link, trigger, options=None, **kwargs):
"""Replaces a trigger and returns it.

Expand Down
23 changes: 23 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,3 +804,26 @@ async def delete_conflict(
)
if response_hook:
response_hook(self.client_connection.last_response_headers, result)

@distributed_trace
async def delete_all_items_by_partition_key(
self,
partition_key: Union[str, int, float, bool],
**kwargs: Any
) -> None:
"""Exposes an API to delete all items with a single partition key without the user having
to explicitly call delete on each record in the partition key.

:param partition_key: Partition key for the items to be deleted.
:type partition_key: Any
:rtype: None
"""
request_options = _build_options(kwargs)
response_hook = kwargs.pop('response_hook', None)
# regardless if partition key is valid we set it as invalid partition keys are set to a default empty value
request_options["partitionKey"] = self._set_partition_key(partition_key)

await self.client_connection.DeleteAllItemsByPartitionKey(collection_link=self.container_link,
options=request_options, **kwargs)
if response_hook:
response_hook(self.client_connection.last_response_headers, None)
Original file line number Diff line number Diff line change
Expand Up @@ -2597,3 +2597,37 @@ def __ValidateResource(resource):
raise ValueError("Id ends with a space.")
except AttributeError:
raise_with_traceback(TypeError, message="Id type must be a string.")

async def DeleteAllItemsByPartitionKey(
self,
collection_link,
options=None,
**kwargs
) -> None:
"""Exposes an API to delete all items with a single partition key without the user having
to explicitly call delete on each record in the partition key.

:param str collection_link:
The link to the document collection.
:param dict options:
The request options for the request.

:return:
None
:rtype:
None

"""
if options is None:
options = {}

path = base.GetPathFromLink(collection_link)
#Specified url to perform background operation to delete all items by partition key
path = '{}{}/{}'.format(path, "operations", "partitionkeydelete")
collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
initial_headers = dict(self.default_headers)
headers = base.GetHeaders(self, initial_headers, "post", path, collection_id, "partitionkey", options)
request_params = _request_object.RequestObject("partitionkey", documents._OperationType.Delete)
result, self.last_response_headers = await self.__Post(path=path, request_params=request_params,
req_headers=headers, body=None, **kwargs)
return result
23 changes: 23 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,3 +859,26 @@ def delete_conflict(self, conflict, partition_key, **kwargs):
)
if response_hook:
response_hook(self.client_connection.last_response_headers, result)

@distributed_trace
def delete_all_items_by_partition_key(
self,
partition_key: Union[str, int, float, bool],
**kwargs: Any
) -> None:
"""Exposes an API to delete all items with a single partition key without the user having
to explicitly call delete on each record in the partition key.

:param partition_key: Partition key for the items to be deleted.
:type partition_key: Any
:rtype: None
"""
request_options = build_options(kwargs)
response_hook = kwargs.pop('response_hook', None)
# regardless if partition key is valid we set it as invalid partition keys are set to a default empty value
request_options["partitionKey"] = self._set_partition_key(partition_key)

result = self.client_connection.DeleteAllItemsByPartitionKey(collection_link=self.container_link,
options=request_options, **kwargs)
if response_hook:
response_hook(self.client_connection.last_response_headers, result)
2 changes: 2 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ class ResourceType(object):
Offer = "offers"
Topology = "topology"
DatabaseAccount = "databaseaccount"
PartitionKey = "partitionkey"

@staticmethod
def IsCollectionChild(resourceType):
Expand All @@ -448,4 +449,5 @@ def IsCollectionChild(resourceType):
ResourceType.UserDefinedFunction,
ResourceType.Trigger,
ResourceType.StoredProcedure,
ResourceType.PartitionKey,
)
42 changes: 42 additions & 0 deletions sdk/cosmos/azure-cosmos/samples/document_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,47 @@ def delete_item(container, doc_id):
print('Deleted item\'s Id is {0}'.format(doc_id))


def delete_all_items_by_partition_key(db, partitionkey):
print('\n1.8 Deleting all Items by Partition Key\n')

# A container with a partition key that is different from id is needed
container = db.create_container_if_not_exists(id="Partition Key Delete Container",
partition_key=PartitionKey(path='/company'))
sales_order_company_A1 = get_sales_order("SalesOrderCompanyA1")
sales_order_company_A1["company"] = partitionkey
container.upsert_item(sales_order_company_A1)

print("\nUpserted Item is {} with Partition Key: {}".format(sales_order_company_A1["id"], partitionkey))

sales_order_company_A2 = get_sales_order("SalesOrderCompanyA2")
sales_order_company_A2["company"] = partitionkey
container.upsert_item(sales_order_company_A2)

print("\nUpserted Item is {} with Partition Key: {}".format(sales_order_company_A2["id"], partitionkey))

sales_order_company_B1 = get_sales_order("SalesOrderCompanyB1")
sales_order_company_B1["company"] = "companyB"
container.upsert_item(sales_order_company_B1)

print("\nUpserted Item is {} with Partition Key: {}".format(sales_order_company_B1["id"], "companyB"))

item_list = list(container.read_all_items(max_item_count=10))

print('Found {0} items'.format(item_list.__len__()))

for doc in item_list:
print('Item Id: {0}; Partition Key: {1}'.format(doc.get('id'), doc.get("company")))

print("\nDelete all items for Partition Key: {}\n".format(partitionkey))

container.delete_all_items_by_partition_key(partitionkey)
item_list = list(container.read_all_items())

print('Found {0} items'.format(item_list.__len__()))

for doc in item_list:
print('Item Id: {0}; Partition Key: {1}'.format(doc.get('id'), doc.get("company")))

def get_sales_order(item_id):
order1 = {'id' : item_id,
'account_number' : 'Account1',
Expand Down Expand Up @@ -195,6 +236,7 @@ def run_sample():
upsert_item(container, 'SalesOrder1')
patch_item(container, 'SalesOrder1')
delete_item(container, 'SalesOrder1')
delete_all_items_by_partition_key(db, "CompanyA")

# cleanup database after sample
try:
Expand Down
42 changes: 42 additions & 0 deletions sdk/cosmos/azure-cosmos/samples/document_management_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,47 @@ async def delete_item(container, doc_id):
print('Deleted item\'s Id is {0}'.format(doc_id))


async def delete_all_items_by_partition_key(db, partitionkey):
print('\n1.8 Deleting all Items by Partition Key\n')

# A container with a partition key that is different from id is needed
container = await db.create_container_if_not_exists(id="Partition Key Delete Container",
partition_key=PartitionKey(path='/company'))
sales_order_company_A1 = get_sales_order("SalesOrderCompanyA1")
sales_order_company_A1["company"] = partitionkey
await container.upsert_item(sales_order_company_A1)

print("\nUpserted Item is {} with Partition Key: {}".format(sales_order_company_A1["id"], partitionkey))

sales_order_company_A2 = get_sales_order("SalesOrderCompanyA2")
sales_order_company_A2["company"] = partitionkey
await container.upsert_item(sales_order_company_A2)

print("\nUpserted Item is {} with Partition Key: {}".format(sales_order_company_A2["id"], partitionkey))

sales_order_company_B1 = get_sales_order("SalesOrderCompanyB1")
sales_order_company_B1["company"] = "companyB"
await container.upsert_item(sales_order_company_B1)

print("\nUpserted Item is {} with Partition Key: {}".format(sales_order_company_B1["id"], "companyB"))

item_list = [item async for item in container.read_all_items()]

print('Found {0} items'.format(item_list.__len__()))

for doc in item_list:
print('Item Id: {0}; Partition Key: {1}'.format(doc.get('id'), doc.get("company")))

print("\nDelete all items for Partition Key: {}\n".format(partitionkey))

await container.delete_all_items_by_partition_key(partitionkey)
item_list = [item async for item in container.read_all_items()]

print('Found {0} items'.format(item_list.__len__()))

for doc in item_list:
print('Item Id: {0}; Partition Key: {1}'.format(doc.get('id'), doc.get("company")))

def get_sales_order(item_id):
order1 = {'id' : item_id,
'account_number' : 'Account1',
Expand Down Expand Up @@ -215,6 +256,7 @@ async def run_sample():
await upsert_item(container, 'SalesOrder1')
await patch_item(container, 'SalesOrder1')
await delete_item(container, 'SalesOrder1')
await delete_all_items_by_partition_key(db, "CompanyA")

# cleanup database after sample
try:
Expand Down
17 changes: 17 additions & 0 deletions sdk/cosmos/azure-cosmos/samples/examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,20 @@
except exceptions.CosmosHttpResponseError as failure:
print("Failed to create user. Status code:{}".format(failure.status_code))
# [END create_user]

# [START delete_all_items_by_partition_key]
container_name = "products"
container = database.get_container_client(container_name)
for i in range(1, 10):
container.upsert_item(
dict(id="item{}".format(i), productName="Gadget", productModel="Model {}".format(i))
)
items = container.read_all_items()
for item in items:
print(json.dumps(item, indent=True))
container.delete_all_items_by_partition_key("Gadget")
print("All items in partition {} deleted.".format("Gadget"))
items = container.read_all_items()
for item in items:
print(json.dumps(item, indent=True))
# [END delete_all_items_by_partition_key]
18 changes: 18 additions & 0 deletions sdk/cosmos/azure-cosmos/samples/examples_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,24 @@ async def examples_async():
print("Failed to create user. Status code:{}".format(failure.status_code))
# [END create_user]

# delete all items in a given partition key
# [START delete_all_items_by_partition_key]
container_name = "products"
container = database.get_container_client(container_name)
for i in range(1, 10):
await container.upsert_item(
dict(id="item{}".format(i), productName="Gadget", productModel="Model {}".format(i))
)
items = await container.read_all_items()
async for item in items:
print(json.dumps(item, indent=True))
await container.delete_all_items_by_partition_key("Gadget")
print("All items in partition {} deleted.".format("Gadget"))
items = await container.read_all_items()
async for item in items:
print(json.dumps(item, indent=True))
# [END delete_all_items_by_partition_key]

await client.delete_database(database_name)
print("Sample done running!")

Expand Down
45 changes: 45 additions & 0 deletions sdk/cosmos/azure-cosmos/test/test_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.


"""End to end test.
"""

Expand Down Expand Up @@ -2561,6 +2562,50 @@ def test_get_resource_with_dictionary_and_object(self):
read_permission = created_user.get_permission(created_permission.properties)
self.assertEqual(read_permission.id, created_permission.id)

def test_delete_all_items_by_partition_key(self):
# create database
created_db = self.databaseForTest

# create container
created_collection = created_db.create_container(
id='test_delete_all_items_by_partition_key ' + str(uuid.uuid4()),
partition_key=PartitionKey(path='/pk', kind='Hash')
)
# Create two partition keys
partition_key1 = "{}-{}".format("Partition Key 1", str(uuid.uuid4()))
partition_key2 = "{}-{}".format("Partition Key 2", str(uuid.uuid4()))

# add items for partition key 1
for i in range(1, 3):
created_collection.upsert_item(
dict(id="item{}".format(i), pk=partition_key1)
)

# add items for partition key 2

pk2_item = created_collection.upsert_item(dict(id="item{}".format(3), pk=partition_key2))

# delete all items for partition key 1
created_collection.delete_all_items_by_partition_key(partition_key1)

# check that only items from partition key 1 have been deleted
items = list(created_collection.read_all_items())

# items should only have 1 item and it should equal pk2_item
self.assertDictEqual(pk2_item, items[0])

# attempting to delete a non-existent partition key or passing none should not delete
# anything and leave things unchanged
created_collection.delete_all_items_by_partition_key(None)

# check that no changes were made by checking if the only item is still there
items = list(created_collection.read_all_items())

# items should only have 1 item and it should equal pk2_item
self.assertDictEqual(pk2_item, items[0])

created_db.delete_container(created_collection)

def test_patch_operations(self):
created_container = self.databaseForTest.create_container_if_not_exists(id="patch_container", partition_key=PartitionKey(path="/pk"))

Expand Down
Loading