Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
_Undefined,
_Empty,
_PartitionKeyKind,
_PartitionKeyType,
PartitionKeyType,
_SequentialPartitionKeyType,
_return_undefined_or_empty_partition_key,
)
Expand Down Expand Up @@ -1054,7 +1054,7 @@ def DeletePermission(
def read_items(
self,
collection_link: str,
items: Sequence[Tuple[str, _PartitionKeyType]],
items: Sequence[Tuple[str, PartitionKeyType]],
options: Optional[Mapping[str, Any]] = None,
*,
executor: Optional[ThreadPoolExecutor] = None,
Expand Down Expand Up @@ -1119,7 +1119,7 @@ def QueryItems(
database_or_container_link: str,
query: Optional[Union[str, Dict[str, Any]]],
options: Optional[Mapping[str, Any]] = None,
partition_key: Optional[_PartitionKeyType] = None,
partition_key: Optional[PartitionKeyType] = None,
response_hook: Optional[Callable[[Mapping[str, Any], Dict[str, Any]], None]] = None,
**kwargs: Any
) -> ItemPaged[Dict[str, Any]]:
Expand Down Expand Up @@ -3330,7 +3330,8 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]:

return __GetBodiesFromQueryResult(result), last_response_headers

def _GetQueryPlanThroughGateway(self, query: str, resource_link: str, excluded_locations: Optional[str] = None,
def _GetQueryPlanThroughGateway(self, query: str, resource_link: str,
excluded_locations: Optional[Sequence[str]] = None,
**kwargs: Any) -> List[Dict[str, Any]]:
supported_query_features = (documents._QueryFeature.Aggregate + "," +
documents._QueryFeature.CompositeAggregate + "," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,15 @@ def get_unhealthy_locations(
request: RequestObject,
pk_range_wrapper: PartitionKeyRangeWrapper
) -> List[str]:
excluded_locations = []
unhealthy_locations = []
if pk_range_wrapper in self.pk_range_wrapper_to_health_info:
for location, partition_health_info in self.pk_range_wrapper_to_health_info[pk_range_wrapper].items():
if (partition_health_info.unavailability_info and
not (request.healthy_tentative_location and request.healthy_tentative_location == location)):
health_status = partition_health_info.unavailability_info[HEALTH_STATUS]
if health_status in (UNHEALTHY_TENTATIVE, UNHEALTHY) :
excluded_locations.append(location)
return excluded_locations
unhealthy_locations.append(location)
return unhealthy_locations

def add_failure(
self,
Expand Down
12 changes: 6 additions & 6 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from azure.cosmos.partition_key import _Undefined, _Empty, NonePartitionKeyValue
if TYPE_CHECKING:
from azure.cosmos._cosmos_client_connection import _PartitionKeyType
from azure.cosmos._cosmos_client_connection import PartitionKeyType


class _QueryBuilder:
Expand All @@ -49,7 +49,7 @@ def _get_field_expression(path: str) -> str:

@staticmethod
def is_id_partition_key_query(
items: Sequence[Tuple[str, "_PartitionKeyType"]],
items: Sequence[Tuple[str, "PartitionKeyType"]],
partition_key_definition: Dict[str, Any]
) -> bool:
"""Check if we can use the optimized ID IN query.
Expand All @@ -71,7 +71,7 @@ def is_id_partition_key_query(

@staticmethod
def is_single_logical_partition_query(
items: Sequence[Tuple[str, "_PartitionKeyType"]]
items: Sequence[Tuple[str, "PartitionKeyType"]]
) -> bool:
"""Check if all items in a chunk belong to the same logical partition.

Expand All @@ -88,7 +88,7 @@ def is_single_logical_partition_query(

@staticmethod
def build_pk_and_id_in_query(
items: Sequence[Tuple[str, "_PartitionKeyType"]],
items: Sequence[Tuple[str, "PartitionKeyType"]],
partition_key_definition: Dict[str, Any]
) -> Dict[str, Any]:
"""Build a query for items in a single logical partition using an IN clause for IDs.
Expand All @@ -114,7 +114,7 @@ def build_pk_and_id_in_query(
return {"query": query_text, "parameters": parameters}

@staticmethod
def build_id_in_query(items: Sequence[Tuple[str, "_PartitionKeyType"]]) -> Dict[str, Any]:
def build_id_in_query(items: Sequence[Tuple[str, "PartitionKeyType"]]) -> Dict[str, Any]:
"""Build optimized query using ID IN clause when ID equals partition key.

:param Sequence[tuple[str, any]] items: The list of items to build the query for.
Expand All @@ -131,7 +131,7 @@ def build_id_in_query(items: Sequence[Tuple[str, "_PartitionKeyType"]]) -> Dict[

@staticmethod
def build_parameterized_query_for_items(
items_by_partition: Dict[str, Sequence[Tuple[str, "_PartitionKeyType"]]],
items_by_partition: Dict[str, Sequence[Tuple[str, "PartitionKeyType"]]],
partition_key_definition: Dict[str, Any]
) -> Dict[str, Any]:
"""Builds a parameterized SQL query for reading multiple items.
Expand Down
22 changes: 11 additions & 11 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_read_items_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from azure.cosmos.partition_key import _get_partition_key_from_partition_key_definition
from azure.cosmos import CosmosList
if TYPE_CHECKING:
from azure.cosmos._cosmos_client_connection import _PartitionKeyType , CosmosClientConnection
from azure.cosmos._cosmos_client_connection import PartitionKeyType , CosmosClientConnection



Expand All @@ -43,7 +43,7 @@ def __init__(
self,
client: 'CosmosClientConnection',
collection_link: str,
items: Sequence[Tuple[str, "_PartitionKeyType"]],
items: Sequence[Tuple[str, "PartitionKeyType"]],
options: Optional[Mapping[str, Any]],
partition_key_definition: Dict[str, Any],
*,
Expand Down Expand Up @@ -87,7 +87,7 @@ def read_items(self) -> CosmosList:
def _execute_with_executor(
self,
executor: ThreadPoolExecutor,
query_chunks: List[Dict[str, List[Tuple[int, str, "_PartitionKeyType"]]]]
query_chunks: List[Dict[str, List[Tuple[int, str, "PartitionKeyType"]]]]
) -> CosmosList:
"""Execute the queries using the provided executor with improved error handling.

Expand Down Expand Up @@ -141,7 +141,7 @@ def _execute_with_executor(

return cosmos_list

def _partition_items_by_range(self) -> Dict[str, List[Tuple[int, str, "_PartitionKeyType"]]]:
def _partition_items_by_range(self) -> Dict[str, List[Tuple[int, str, "PartitionKeyType"]]]:
# pylint: disable=protected-access
"""Groups items by their partition key range ID efficiently while preserving original order.

Expand All @@ -150,10 +150,10 @@ def _partition_items_by_range(self) -> Dict[str, List[Tuple[int, str, "_Partitio
"""
collection_rid = _base.GetResourceIdOrFullNameFromLink(self.collection_link)
partition_key = _get_partition_key_from_partition_key_definition(self.partition_key_definition)
items_by_partition: Dict[str, List[Tuple[int, str, "_PartitionKeyType"]]] = {}
items_by_partition: Dict[str, List[Tuple[int, str, "PartitionKeyType"]]] = {}

# Group items by logical partition key first to avoid redundant range lookups
items_by_pk_value: Dict[Any, List[Tuple[int, str, "_PartitionKeyType"]]] = {}
items_by_pk_value: Dict[Any, List[Tuple[int, str, "PartitionKeyType"]]] = {}
for idx, (item_id, pk_value) in enumerate(self.items):
# Convert list to tuple to use as a dictionary key, as lists are unhashable
key = tuple(pk_value) if isinstance(pk_value, list) else pk_value
Expand All @@ -180,8 +180,8 @@ def _partition_items_by_range(self) -> Dict[str, List[Tuple[int, str, "_Partitio

def _create_query_chunks(
self,
items_by_partition: Dict[str, List[Tuple[int, str, "_PartitionKeyType"]]]
) -> List[Dict[str, List[Tuple[int, str, "_PartitionKeyType"]]]]:
items_by_partition: Dict[str, List[Tuple[int, str, "PartitionKeyType"]]]
) -> List[Dict[str, List[Tuple[int, str, "PartitionKeyType"]]]]:
"""Create query chunks for concurrency control while preserving original indices.

:param items_by_partition: A dictionary mapping partition key range IDs to lists of items with indices.
Expand All @@ -198,7 +198,7 @@ def _create_query_chunks(
return query_chunks

def _execute_query_chunk_worker(
self, partition_id: str, chunk_partition_items: Sequence[Tuple[int, str, "_PartitionKeyType"]]
self, partition_id: str, chunk_partition_items: Sequence[Tuple[int, str, "PartitionKeyType"]]
) -> Tuple[List[Tuple[int, Dict[str, Any]]], float]:
"""Synchronous worker to build and execute a query for a chunk of items.

Expand Down Expand Up @@ -231,7 +231,7 @@ def _execute_query_chunk_worker(
def _execute_query(
self,
partition_id: str,
items_for_query: Sequence[Tuple[str, "_PartitionKeyType"]],
items_for_query: Sequence[Tuple[str, "PartitionKeyType"]],
id_to_idx: Dict[str, int],
request_kwargs: Dict[str, Any]
) -> Tuple[List[Tuple[int, Any]], CaseInsensitiveDict]:
Expand Down Expand Up @@ -281,7 +281,7 @@ def local_response_hook(hook_headers, _):
def _execute_point_read(
self,
item_id: str,
pk_value: "_PartitionKeyType",
pk_value: "PartitionKeyType",
request_kwargs: Dict[str, Any]
) -> Tuple[Optional[Any], CaseInsensitiveDict]:
"""
Expand Down
4 changes: 2 additions & 2 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylin
not result[0]['Offers'] and request.method == 'POST':
# Grab the link used for getting throughput properties to add to message.
link = json.loads(request.body)["parameters"][0]["value"]
response = exceptions.InternalException(status_code=StatusCodes.NOT_FOUND,
headers={HttpHeaders.SubStatus:
response = exceptions._InternalCosmosException(status_code=StatusCodes.NOT_FOUND,
headers={HttpHeaders.SubStatus:
SubStatusCodes.THROUGHPUT_OFFER_NOT_FOUND})
e_offer = exceptions.CosmosResourceNotFoundError(
status_code=StatusCodes.NOT_FOUND,
Expand Down
Loading
Loading