Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DML for idiomatic + necessary changes around #238

Merged
merged 10 commits into from
Mar 6, 2024
82 changes: 72 additions & 10 deletions astrapy/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,9 @@ def find_one_and_replace(
self,
replacement: Dict[str, Any],
*,
sort: Optional[Dict[str, Any]] = {},
filter: Optional[Dict[str, Any]] = None,
projection: Optional[Dict[str, Any]] = None,
sort: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None,
) -> API_RESPONSE:
"""
Expand All @@ -517,6 +518,7 @@ def find_one_and_replace(
json_query = make_payload(
top_level="findOneAndReplace",
filter=filter,
projection=projection,
replacement=replacement,
options=options,
sort=sort,
Expand Down Expand Up @@ -547,7 +549,7 @@ def vector_find_one_and_replace(
dict or None: either the matched document or None if nothing found
"""
# Pre-process the included arguments
sort, _ = self._recast_as_sort_projection(
sort, projection = self._recast_as_sort_projection(
convert_vector_to_floats(vector),
fields=fields,
)
Expand All @@ -556,6 +558,7 @@ def vector_find_one_and_replace(
raw_find_result = self.find_one_and_replace(
replacement=replacement,
filter=filter,
projection=projection,
sort=sort,
)

Expand All @@ -567,6 +570,7 @@ def find_one_and_update(
sort: Optional[Dict[str, Any]] = {},
filter: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None,
projection: Optional[Dict[str, Any]] = None,
) -> API_RESPONSE:
"""
Find a single document and update it.
Expand All @@ -584,6 +588,7 @@ def find_one_and_update(
update=update,
options=options,
sort=sort,
projection=projection,
)

response = self._request(
Expand Down Expand Up @@ -614,7 +619,7 @@ def vector_find_one_and_update(
update operation, or None if nothing found
"""
# Pre-process the included arguments
sort, _ = self._recast_as_sort_projection(
sort, projection = self._recast_as_sort_projection(
convert_vector_to_floats(vector),
fields=fields,
)
Expand All @@ -624,6 +629,7 @@ def vector_find_one_and_update(
update=update,
filter=filter,
sort=sort,
projection=projection,
)

return cast(Union[API_DOC, None], raw_find_result["data"]["document"])
Expand Down Expand Up @@ -873,7 +879,10 @@ def update_one(
return response

def update_many(
self, filter: Dict[str, Any], update: Dict[str, Any]
self,
filter: Dict[str, Any],
update: Dict[str, Any],
options: Optional[Dict[str, Any]] = None,
) -> API_RESPONSE:
"""
Updates multiple documents in the collection.
Expand All @@ -883,7 +892,12 @@ def update_many(
Returns:
dict: The response from the database after the update operation.
"""
json_query = make_payload(top_level="updateMany", filter=filter, update=update)
json_query = make_payload(
top_level="updateMany",
filter=filter,
update=update,
options=options,
)

response = self._request(
method=http_methods.POST,
Expand Down Expand Up @@ -973,6 +987,23 @@ def delete_many(self, filter: Dict[str, Any]) -> API_RESPONSE:

return response

def chunked_delete_many(self, filter: Dict[str, Any]) -> List[API_RESPONSE]:
"""
Delete many documents from the collection based on a filter condition,
chaining several API calls until exhaustion of the documents to delete.
Args:
filter (dict): Criteria to identify the documents to delete.
Returns:
List[dict]: The responses from the database from all the calls
"""
responses = []
must_proceed = True
while must_proceed:
dm_response = self.delete_many(filter=filter)
responses.append(dm_response)
must_proceed = dm_response.get("status", {}).get("moreData", False)
return responses

def clear(self) -> API_RESPONSE:
"""
Clear the collection, deleting all documents
Expand Down Expand Up @@ -1534,8 +1565,9 @@ async def find_one_and_replace(
self,
replacement: Dict[str, Any],
*,
sort: Optional[Dict[str, Any]] = {},
filter: Optional[Dict[str, Any]] = None,
projection: Optional[Dict[str, Any]] = None,
sort: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None,
) -> API_RESPONSE:
"""
Expand All @@ -1551,6 +1583,7 @@ async def find_one_and_replace(
json_query = make_payload(
top_level="findOneAndReplace",
filter=filter,
projection=projection,
replacement=replacement,
options=options,
sort=sort,
Expand Down Expand Up @@ -1581,7 +1614,7 @@ async def vector_find_one_and_replace(
dict or None: either the matched document or None if nothing found
"""
# Pre-process the included arguments
sort, _ = self._recast_as_sort_projection(
sort, projection = self._recast_as_sort_projection(
vector,
fields=fields,
)
Expand All @@ -1590,6 +1623,7 @@ async def vector_find_one_and_replace(
raw_find_result = await self.find_one_and_replace(
replacement=replacement,
filter=filter,
projection=projection,
sort=sort,
)

Expand All @@ -1601,6 +1635,7 @@ async def find_one_and_update(
sort: Optional[Dict[str, Any]] = {},
filter: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None,
projection: Optional[Dict[str, Any]] = None,
) -> API_RESPONSE:
"""
Find a single document and update it.
Expand All @@ -1618,6 +1653,7 @@ async def find_one_and_update(
update=update,
options=options,
sort=sort,
projection=projection,
)

response = await self._request(
Expand Down Expand Up @@ -1648,7 +1684,7 @@ async def vector_find_one_and_update(
update operation, or None if nothing found
"""
# Pre-process the included arguments
sort, _ = self._recast_as_sort_projection(
sort, projection = self._recast_as_sort_projection(
vector,
fields=fields,
)
Expand All @@ -1658,6 +1694,7 @@ async def vector_find_one_and_update(
update=update,
filter=filter,
sort=sort,
projection=projection,
)

return cast(Union[API_DOC, None], raw_find_result["data"]["document"])
Expand Down Expand Up @@ -1881,7 +1918,10 @@ async def update_one(
return response

async def update_many(
self, filter: Dict[str, Any], update: Dict[str, Any]
self,
filter: Dict[str, Any],
update: Dict[str, Any],
options: Optional[Dict[str, Any]] = None,
) -> API_RESPONSE:
"""
Updates multiple documents in the collection.
Expand All @@ -1891,7 +1931,12 @@ async def update_many(
Returns:
dict: The response from the database after the update operation.
"""
json_query = make_payload(top_level="updateMany", filter=filter, update=update)
json_query = make_payload(
top_level="updateMany",
filter=filter,
update=update,
options=options,
)

response = await self._request(
method=http_methods.POST,
Expand Down Expand Up @@ -1972,6 +2017,23 @@ async def delete_many(self, filter: Dict[str, Any]) -> API_RESPONSE:

return response

async def chunked_delete_many(self, filter: Dict[str, Any]) -> List[API_RESPONSE]:
"""
Delete many documents from the collection based on a filter condition,
chaining several API calls until exhaustion of the documents to delete.
Args:
filter (dict): Criteria to identify the documents to delete.
Returns:
List[dict]: The responses from the database from all the calls
"""
responses = []
must_proceed = True
while must_proceed:
dm_response = await self.delete_many(filter=filter)
responses.append(dm_response)
must_proceed = dm_response.get("status", {}).get("moreData", False)
return responses

async def clear(self) -> API_RESPONSE:
"""
Clear the collection, deleting all documents
Expand Down
Loading
Loading