Skip to content

Commit

Permalink
feat(Datasets): delete records by query (#1721)
Browse files Browse the repository at this point in the history
Closes #1714
  • Loading branch information
frascuchon committed Sep 21, 2022
1 parent 7272d60 commit be4e92c
Show file tree
Hide file tree
Showing 13 changed files with 450 additions and 12 deletions.
2 changes: 2 additions & 0 deletions src/rubrix/__init__.py
Expand Up @@ -32,6 +32,7 @@
from rubrix.client.api import (
copy,
delete,
delete_records,
get_workspace,
init,
load,
Expand Down Expand Up @@ -68,6 +69,7 @@
"get_workspace",
"init",
"load",
"delete_records",
"log",
"log_async",
"set_workspace",
Expand Down
58 changes: 57 additions & 1 deletion src/rubrix/client/api.py
Expand Up @@ -20,7 +20,7 @@
from asyncio import Future
from functools import wraps
from inspect import signature
from typing import Any, Callable, Dict, Iterable, List, Optional, Union
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

from tqdm.auto import tqdm

Expand Down Expand Up @@ -401,6 +401,57 @@ async def log_async(
# Creating a composite BulkResponse with the total processed and failed
return BulkResponse(dataset=name, processed=processed, failed=failed)

def delete_records(
self,
name: str,
query: Optional[str] = None,
ids: Optional[List[Union[str, int]]] = None,
discard_only: bool = False,
discard_when_forbidden: bool = True,
) -> Tuple[int, int]:
"""Delete records from a Rubrix dataset.
Parameters:
-----------
name:
The dataset name.
query:
An ElasticSearch query with the
`query string syntax <https://rubrix.readthedocs.io/en/stable/guides/queries.html>`_
ids:
If provided, deletes dataset records with given ids.
discard_only:
If `True`, matched records won't be deleted. Instead, they will be marked
as `Discarded`
discard_when_forbidden:
Only super-user or dataset creator can delete records from a dataset.
So, running "hard" deletion for other users will raise an `ForbiddenApiError` error.
If this parameter is `True`, the client API will automatically try to mark as ``Discarded``
records instead. Default, `True`
Returns:
--------
The total of matched records and real number of processed errors. These numbers could not
be the same if some data conflicts are found during operations (some matched records change during
deletion).
Examples:
**Delete by id**:
>>> import rubrix as rb
>>> rb.delete_records(name="example-dataset", ids=[1,3,5])
**Discard records by query**:
>>> import rubrix as rb
>>> rb.delete_records(name="example-dataset", query="metadata.code=33", discard_only=True)
"""
return self.datasets.delete_records(
name=name,
query=query,
ids=ids,
mark_as_discarded=discard_only,
discard_when_forbidden=discard_when_forbidden,
)

def load(
self,
name: str,
Expand Down Expand Up @@ -649,5 +700,10 @@ def load(*args, **kwargs):
return active_api().load(*args, **kwargs)


@api_wrapper(Api.delete_records)
def delete_records(*args, **kwargs):
return active_api().delete_records(*args, **kwargs)


class InputValueError(RubrixClientError):
pass
61 changes: 58 additions & 3 deletions src/rubrix/client/apis/datasets.py
@@ -1,11 +1,16 @@
import warnings
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Optional, Set, Union
from typing import Any, Dict, List, Optional, Set, Tuple, Union

from pydantic import BaseModel, Field

from rubrix.client.apis import AbstractApi, api_compatibility
from rubrix.client.sdk.commons.errors import AlreadyExistsApiError, NotFoundApiError
from rubrix.client.sdk.commons.errors import (
AlreadyExistsApiError,
ForbiddenApiError,
NotFoundApiError,
)
from rubrix.client.sdk.datasets.api import get_dataset
from rubrix.client.sdk.datasets.models import TaskType

Expand Down Expand Up @@ -126,8 +131,58 @@ def configure(self, name: str, settings: Settings):
ds = self.find_by_name(name)
self.__save_settings__(dataset=ds, settings=settings)

def __save_settings__(self, dataset: _DatasetApiModel, settings: Settings):
def delete_records(
self,
name: str,
query: Optional[str] = None,
ids: Optional[List[Union[str, int]]] = None,
mark_as_discarded: bool = False,
discard_when_forbidden: bool = True,
) -> Tuple[int, int]:
"""
Tries to delete records in a dataset for a given query/ids list.
Args:
name: The dataset name
query: The query matching records
ids: A list of records ids. If provided, the query param will be ignored
mark_as_discarded: If `True`, the matched records will be marked as `Discarded` instead
of delete them
discard_when_forbidden: Only super-user or dataset creator can delete records from a dataset.
So, running "hard" deletion for other users will raise an `ForbiddenApiError` error.
If this parameter is `True`, the client API will automatically try to mark as ``Discarded``
records instead.
Returns:
The total of matched records and real number of processed errors. These numbers could not
be the same if some data conflicts are found during operations (some matched records change during
deletion).
"""
with api_compatibility(self, min_version="0.18"):
try:
response = self.__client__.delete(
path=f"{self._API_PREFIX}/{name}/data?mark_as_discarded={mark_as_discarded}",
json={"ids": ids} if ids else {"query_text": query},
)
return response["matched"], response["processed"]
except ForbiddenApiError as faer:
if discard_when_forbidden:
warnings.warn(
message=f"{faer}. Records will be discarded instead",
category=UserWarning,
)
return self.delete_records(
name=name,
query=query,
ids=ids,
mark_as_discarded=True,
discard_when_forbidden=False, # Next time will raise the error
)
else:
raise faer

def __save_settings__(self, dataset: _DatasetApiModel, settings: Settings):
if __TASK_TO_SETTINGS__.get(dataset.task) != type(settings):
raise ValueError(
f"The provided settings type {type(settings)} cannot be applied to dataset."
Expand Down
11 changes: 11 additions & 0 deletions src/rubrix/client/sdk/client.py
Expand Up @@ -120,6 +120,17 @@ def put(self, path: str, *args, **kwargs):
)
return build_raw_response(response).parsed

@with_httpx_error_handler
def delete(self, path: str, *args, **kwargs):
path = self._normalize_path(path)
response = self.__httpx__.delete(
url=path,
headers=self.get_headers(),
*args,
**kwargs,
)
return build_raw_response(response).parsed

@with_httpx_error_handler
def stream(self, path: str, *args, **kwargs):
return self.__httpx__.stream(
Expand Down
61 changes: 61 additions & 0 deletions src/rubrix/server/apis/v0/handlers/records_deletion.py
@@ -0,0 +1,61 @@
from typing import Optional, Union

from fastapi import APIRouter, Depends, Query, Security
from pydantic import BaseModel

from rubrix.client.sdk.token_classification.models import TokenClassificationQuery
from rubrix.server.apis.v0.models.commons.params import CommonTaskHandlerDependencies
from rubrix.server.apis.v0.models.text2text import Text2TextQuery
from rubrix.server.apis.v0.models.text_classification import TextClassificationQuery
from rubrix.server.security import auth
from rubrix.server.security.model import User
from rubrix.server.services.datasets import DatasetsService
from rubrix.server.services.storage.service import RecordsStorageService


def configure_router(router: APIRouter):
QueryType = Union[TextClassificationQuery, TokenClassificationQuery, Text2TextQuery]

class DeleteRecordsResponse(BaseModel):
matched: int
processed: int

@router.delete(
"/{name}/data",
operation_id="delete_dataset_records",
response_model=DeleteRecordsResponse,
)
async def delete_dataset_records(
name: str,
query: Optional[QueryType] = None,
mark_as_discarded: bool = Query(
default=False,
title="If True, matched records won't be deleted."
" Instead of that, the record status will be changed to `Discarded`",
),
request_deps: CommonTaskHandlerDependencies = Depends(),
service: DatasetsService = Depends(DatasetsService.get_instance),
storage: RecordsStorageService = Depends(RecordsStorageService.get_instance),
current_user: User = Security(auth.get_user, scopes=[]),
):
found = service.find_by_name(
user=current_user,
name=name,
workspace=request_deps.workspace,
)

result = await storage.delete_records(
user=current_user,
dataset=found,
query=query,
mark_as_discarded=mark_as_discarded,
)

return DeleteRecordsResponse(
matched=result.processed,
processed=result.deleted or result.discarded,
)


router = APIRouter(tags=["datasets"], prefix="/datasets")
configure_router(router)
61 changes: 58 additions & 3 deletions src/rubrix/server/daos/backend/elasticsearch.py
Expand Up @@ -13,11 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
import warnings
from typing import Any, Dict, Iterable, List, Optional, Tuple

from opensearchpy import NotFoundError, OpenSearch, OpenSearchException, RequestError
from opensearchpy.exceptions import OpenSearchWarning
from opensearchpy.helpers import bulk as es_bulk
from opensearchpy.helpers import scan as es_scan

from rubrix.logging import LoggingMixin
from rubrix.server.commons.models import TaskType
Expand Down Expand Up @@ -77,6 +78,8 @@ def __init__(self, index: str):
self._index = index

def __enter__(self):
# This line disable all open search client warnings
warnings.filterwarnings("ignore", category=OpenSearchWarning)
pass

def __exit__(self, exception_type, exception_value, traceback):
Expand Down Expand Up @@ -722,6 +725,58 @@ def get_mappings(self, id: str) -> Dict[str, Any]:
response = response.get(index)
return response

async def update_records_content(
self,
id: str,
content: Dict[str, Any],
query: Optional[BaseDatasetsQuery],
) -> Tuple[int, int]:
index = dataset_records_index(id)
with backend_error_handler(index=index):
es_query = self.query_builder.map_2_es_query(
schema=self.get_mappings(id),
query=query,
)
response = self.client.update_by_query(
index=index,
body={
"query": es_query["query"],
"script": {
"lang": "painless",
"inline": ";".join(
[f"ctx._source.{k}='{v}'" for k, v in content.items()]
),
},
},
slices="auto",
wait_for_completion=True,
conflicts="proceed",
)
total, updated = response["total"], response["updated"]
return total, updated

async def delete_records_by_query(
self,
id: str,
query: Optional[BaseDatasetsQuery],
) -> Tuple[int, int]:

index = dataset_records_index(id)
with backend_error_handler(index=index):
es_query = self.query_builder.map_2_es_query(
schema=self.get_mappings(id),
query=query,
)
response = self.client.delete_by_query(
index=index,
body={"query": es_query["query"]},
slices="auto",
wait_for_completion=True,
# conflicts="proceed", # If document version conflict -> continue
)
total, deleted = response["total"], response["deleted"]
return total, deleted

def search_records(
self,
id: str,
Expand All @@ -732,8 +787,8 @@ def search_records(
exclude_fields: List[str] = None,
enable_highlight: bool = True,
) -> Tuple[int, List[Dict[str, Any]]]:
index = dataset_records_index(id)

index = dataset_records_index(id)
with backend_error_handler(index=index):
if not sort.sort_by and sort.shuffle is False:
sort.sort_by = [SortableField(id="id")] # Default sort by id
Expand Down
23 changes: 22 additions & 1 deletion src/rubrix/server/daos/records.py
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

import datetime
from typing import Any, Dict, Iterable, List, Optional, Type
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type

from fastapi import Depends

Expand Down Expand Up @@ -200,3 +200,24 @@ def get_dataset_schema(self, dataset: DatasetDB) -> Dict[str, Any]:
"""Return inner elasticsearch index configuration"""
schema = self._es.get_mappings(id=dataset.id)
return schema

async def delete_records_by_query(
self,
dataset: DatasetDB,
query: Optional[BackendRecordsQuery] = None,
) -> Tuple[int, int]:
total, deleted = await self._es.delete_records_by_query(
id=dataset.id, query=query
)
return total, deleted

async def update_records_by_query(
self,
dataset: DatasetDB,
query: Optional[BackendRecordsQuery] = None,
**content,
) -> Tuple[int, int]:
total, updated = await self._es.update_records_content(
id=dataset.id, content=content, query=query
)
return total, updated

0 comments on commit be4e92c

Please sign in to comment.