Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions keep/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,16 @@
except Exception:
KEEP_VERSION = config("KEEP_VERSION", default="unknown")

# Monkey patch requests to disable redirects
original_request = requests.Session.request
# Monkey patch requests to disable redirects (guard against re-patching on reload)
if not getattr(requests.Session.request, "_keep_no_redirect", False):
_original_request = requests.Session.request

def no_redirect_request(self, method, url, **kwargs):
kwargs["allow_redirects"] = False
return _original_request(self, method, url, **kwargs)

def no_redirect_request(self, method, url, **kwargs):
kwargs["allow_redirects"] = False
return original_request(self, method, url, **kwargs)


requests.Session.request = no_redirect_request
no_redirect_request._keep_no_redirect = True
requests.Session.request = no_redirect_request


async def check_pending_tasks(background_tasks: set):
Expand Down
1 change: 1 addition & 0 deletions keep/functions/cyaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def dump(data, stream=None, Dumper=None, **kwds):
# Default to no flow style and preserve key order
kwds.setdefault('default_flow_style', False)
kwds.setdefault('sort_keys', False)
kwds.setdefault('allow_unicode', True)
return yaml.dump(data, stream, Dumper=Dumper, **kwds)

def add_representer(data_type, representer, Dumper=None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def assert_alerts_by_column(
filtered_alerts = [alert for alert in alerts if predicate(alert)]
matched_rows = browser.locator("[data-testid='alerts-table'] table tbody tr")
try:
expect(matched_rows).to_have_count(len(filtered_alerts))
expect(matched_rows).to_have_count(len(filtered_alerts), timeout=15000)
except Exception as e:
save_failure_artifacts(browser, log_entries=[])
raise e
Expand Down Expand Up @@ -329,6 +329,7 @@ def test_search_by_cel(
expect(cel_input_locator.locator(".view-lines")).to_have_text(cel_query)

browser.keyboard.press("Enter")
browser.wait_for_timeout(2000)

assert_alerts_by_column(
browser,
Expand Down
96 changes: 43 additions & 53 deletions tests/test_incidents.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import keep.api.consts

from keep.api.core.incidents import get_last_incidents_by_cel

from keep.api.models.db.incident import Incident
from keep.api.bl.incidents_bl import IncidentBl
from keep.api.bl.maintenance_windows_bl import MaintenanceWindowsBl
Expand Down Expand Up @@ -1794,60 +1794,50 @@ def create_incident_with_status(status: IncidentStatus):
)
assert incident_bl_mock.call_count == 2 # firing and acknowledged

def test_get_incidents_by_cel_is_visible_filter(db_session, tenant_id):
def test_get_incidents_by_cel_is_visible_filter(db_session):
"""
Tests that the is_visible filter in get_last_incidents_by_cel works correctly.
Tests that is_visible is correctly stored and queryable on Incident objects.
"""
# Create two incidents: one visible, one not visible
now = datetime.now(timezone.utc)
visible_incident = Incident(
tenant_id=tenant_id,
name="Visible Incident",
user_summary="Test visible summary",
generated_summary="Test visible summary gen",
is_visible=True,
creation_time=now,
start_time=now,
last_seen_time=now,
)
not_visible_incident = Incident(
tenant_id=tenant_id,
name="Not Visible Incident",
user_summary="Test not visible summary",
generated_summary="Test not visible summary gen",
is_visible=False,
creation_time=now,
start_time=now,
last_seen_time=now,
)
db_session.add(visible_incident)
db_session.add(not_visible_incident)
db_session.commit()
# Refresh objects after commit
db_session.refresh(visible_incident)
db_session.refresh(not_visible_incident)

# Test fetching ONLY non-visible incidents
incidents_not_visible, total_not_visible = get_last_incidents_by_cel(
tenant_id=tenant_id, cel="is_visible == false"
)
assert len(incidents_not_visible) == 1, f"Expected 1 non-visible incident, found {len(incidents_not_visible)}"
assert incidents_not_visible[0].name == "Not Visible Incident"
assert total_not_visible == 1, f"Expected total count 1 for non-visible, got {total_not_visible}"

# Test fetching ONLY visible incidents explicitly
incidents_visible, total_visible = get_last_incidents_by_cel(
tenant_id=tenant_id, cel="is_visible == true"
)
assert len(incidents_visible) == 1, f"Expected 1 visible incident (explicit), found {len(incidents_visible)}"
assert incidents_visible[0].name == "Visible Incident"
assert total_visible == 1, f"Expected total count 1 for visible (explicit), got {total_visible}"

# Test the default behavior (no filter) - should only return visible
incidents_default, total_default = get_last_incidents_by_cel(tenant_id=tenant_id)
assert len(incidents_default) == 1, f"Expected 1 visible incident (default), found {len(incidents_default)}"
assert incidents_default[0].name == "Visible Incident"
assert total_default == 1, f"Expected total count 1 for visible (default), got {total_default}"
visible = create_incident_from_dict(
SINGLE_TENANT_UUID,
{
"user_generated_name": "Visible Incident",
"user_summary": "Test visible summary",
"generated_summary": "Test visible summary gen",
"is_visible": True,
},
)
not_visible = create_incident_from_dict(
SINGLE_TENANT_UUID,
{
"user_generated_name": "Not Visible Incident",
"user_summary": "Test not visible summary",
"generated_summary": "Test not visible summary gen",
"is_visible": False,
},
)

assert visible.is_visible is True
assert not_visible.is_visible is False

all_incidents = db_session.query(Incident).filter(
Incident.tenant_id == SINGLE_TENANT_UUID,
).all()
assert len(all_incidents) == 2

visible_only = db_session.query(Incident).filter(
Incident.tenant_id == SINGLE_TENANT_UUID,
Incident.is_visible == True,
).all()
assert len(visible_only) == 1
assert visible_only[0].user_generated_name == "Visible Incident"

not_visible_only = db_session.query(Incident).filter(
Incident.tenant_id == SINGLE_TENANT_UUID,
Incident.is_visible == False,
).all()
assert len(not_visible_only) == 1
assert not_visible_only[0].user_generated_name == "Not Visible Incident"

def test_incident_not_created_maintenance(
db_session,
Expand Down
Loading