Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@

detector_search_config = SearchConfig.create_from(
default_config,
text_operator_keys={"name", "type"},
allowed_keys={"name", "type", "assignee"},
text_operator_keys={"name", "type", "workflow"},
allowed_keys={"name", "type", "assignee", "workflow"},
Comment thread
malwilley marked this conversation as resolved.
allow_boolean=False,
free_text_key="query",
)
Expand Down Expand Up @@ -218,6 +218,24 @@ def filter_detectors(self, request: Request, organization: Any) -> QuerySet[Dete
queryset = queryset.exclude(assignee_q)
else:
queryset = queryset.filter(assignee_q)
case SearchFilter(
key=SearchKey("workflow"),
operator=("=" | "IN" | "!=" | "NOT IN"),
):
workflow_ids = (
filter.value.value
if isinstance(filter.value.value, list)
else [filter.value.value]
)
workflow_ids = to_valid_int_id_list("workflow", workflow_ids)
if filter.operator in ("!=", "NOT IN"):
queryset = queryset.exclude(
detectorworkflow__workflow_id__in=workflow_ids
)
else:
queryset = queryset.filter(
detectorworkflow__workflow_id__in=workflow_ids
).distinct()
case SearchFilter(key=SearchKey("query"), operator="="):
# 'query' is our free text key; all free text gets returned here
# as '=', and we search any relevant fields for it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,73 @@ def test_query_invalid_search_key(self) -> None:
assert "query" in response.data
assert "Invalid key for this search: tpe" in str(response.data["query"])

def test_query_by_workflow(self) -> None:
workflow = self.create_workflow(organization_id=self.organization.id)
workflow_2 = self.create_workflow(organization_id=self.organization.id)
detector_a = self.create_detector(
project=self.project, name="Detector A", type=MetricIssue.slug
)
detector_b = self.create_detector(
project=self.project, name="Detector B", type=MetricIssue.slug
)
self.create_detector(project=self.project, name="Detector C", type=MetricIssue.slug)
self.create_detector_workflow(detector=detector_a, workflow=workflow)
self.create_detector_workflow(detector=detector_b, workflow=workflow)
self.create_detector_workflow(detector=detector_b, workflow=workflow_2)

# Filter by single workflow
response = self.get_success_response(
self.organization.slug,
qs_params={"project": self.project.id, "query": f"workflow:{workflow.id}"},
)
assert {d["name"] for d in response.data} == {detector_a.name, detector_b.name}

# Filter by a different workflow
response = self.get_success_response(
self.organization.slug,
qs_params={"project": self.project.id, "query": f"workflow:{workflow_2.id}"},
)
assert {d["name"] for d in response.data} == {detector_b.name}

# Filter by multiple workflows (IN)
response = self.get_success_response(
self.organization.slug,
qs_params={
"project": self.project.id,
"query": f"workflow:[{workflow.id}, {workflow_2.id}]",
},
)
assert [d["name"] for d in response.data].count(detector_b.name) == 1
assert {d["name"] for d in response.data} == {detector_a.name, detector_b.name}

# Negation
response = self.get_success_response(
self.organization.slug,
qs_params={"project": self.project.id, "query": f"!workflow:{workflow.id}"},
)
returned_names = {d["name"] for d in response.data}
assert detector_a.name not in returned_names
assert detector_b.name not in returned_names

# Negation with list (!IN)
response = self.get_success_response(
self.organization.slug,
qs_params={
"project": self.project.id,
"query": f"!workflow:[{workflow.id}, {workflow_2.id}]",
},
)
returned_names = {d["name"] for d in response.data}
assert detector_a.name not in returned_names
assert detector_b.name not in returned_names

def test_query_by_workflow_invalid_value(self) -> None:
self.get_error_response(
self.organization.slug,
qs_params={"project": self.project.id, "query": "workflow:abc"},
status_code=400,
)

def test_query_by_assignee_user_email(self) -> None:
user = self.create_user(email="assignee@example.com")
self.create_member(organization=self.organization, user=user)
Expand Down
Loading