diff --git a/src/sentry/uptime/endpoints/validators.py b/src/sentry/uptime/endpoints/validators.py index 7fc797a2271ec3..11ca30e6547461 100644 --- a/src/sentry/uptime/endpoints/validators.py +++ b/src/sentry/uptime/endpoints/validators.py @@ -466,6 +466,30 @@ def create_source(self, validated_data: dict[str, Any]) -> UptimeSubscription: class UptimeDomainCheckFailureValidator(BaseDetectorTypeValidator): data_sources = serializers.ListField(child=UptimeMonitorDataSourceValidator(), required=False) + def validate_config(self, config: dict[str, Any]) -> dict[str, Any]: + """ + Validate that only superusers can change mode to non-MANUAL values. + """ + if "mode" not in config: + return config + + mode = config["mode"] + + # For updates, only validate if mode is being changed + if self.instance: + current_mode = self.instance.config.get("mode") + # If mode hasn't changed, no validation needed + if current_mode == mode: + return config + + # Only superusers can set/change mode to anything other than MANUAL + if mode != UptimeMonitorMode.MANUAL: + request = self.context["request"] + if not is_active_superuser(request): + raise serializers.ValidationError("Only superusers can modify `mode`") + + return config + def validate_enabled(self, value: bool) -> bool: """ Validate that enabling a detector is allowed based on seat availability. diff --git a/tests/sentry/uptime/endpoints/test_detector.py b/tests/sentry/uptime/endpoints/test_detector.py index 8bf4354dcfb49c..8cd0bba39cdecb 100644 --- a/tests/sentry/uptime/endpoints/test_detector.py +++ b/tests/sentry/uptime/endpoints/test_detector.py @@ -78,8 +78,53 @@ def setUp(self) -> None: "request": self.make_request(), } + def test_update_non_superuser_cannot_change_mode_via_endpoint(self) -> None: + """Integration test: non-superuser cannot change mode via API endpoint.""" + # Create a detector with MANUAL mode specifically for this test + manual_detector = self.create_uptime_detector( + project=self.project, + env=self.environment, + uptime_subscription=self.uptime_subscription, + name="Manual Test Detector", + mode=UptimeMonitorMode.MANUAL, + ) + + assert manual_detector.workflow_condition_group is not None + invalid_data = { + "id": manual_detector.id, + "projectId": self.project.id, + "name": "Manual Test Detector", + "type": UptimeDomainCheckFailure.slug, + "dateCreated": manual_detector.date_added, + "dateUpdated": timezone.now(), + "conditionGroup": { + "id": manual_detector.workflow_condition_group.id, + "organizationId": self.organization.id, + }, + "config": { + "environment": self.environment.name, + "mode": UptimeMonitorMode.AUTO_DETECTED_ACTIVE.value, + "recovery_threshold": 1, + "downtime_threshold": 1, + }, + } + + response = self.get_error_response( + self.organization.slug, + manual_detector.id, + **invalid_data, + status_code=status.HTTP_400_BAD_REQUEST, + method="PUT", + ) + + assert response.data["config"] == ["Only superusers can modify `mode`"] + + # Verify that mode was NOT changed + manual_detector.refresh_from_db() + assert manual_detector.config["mode"] == UptimeMonitorMode.MANUAL.value + def test_update(self) -> None: - assert self.detector.workflow_condition_group + assert self.detector.workflow_condition_group is not None valid_data = { "id": self.detector.id, "projectId": self.project.id, @@ -113,8 +158,7 @@ def test_update(self) -> None: assert updated_sub.timeout_ms == 15000 def test_update_invalid(self) -> None: - assert self.detector.workflow_condition_group - + assert self.detector.workflow_condition_group is not None valid_data = { "id": self.detector.id, "projectId": self.project.id, @@ -252,3 +296,24 @@ def test_create_detector_missing_config_property(self): assert "config" in response.data assert "downtime_threshold" in str(response.data["config"]) + + def test_create_detector_non_superuser_cannot_set_auto_detected_mode(self): + """Integration test: non-superuser cannot create with AUTO_DETECTED mode via API.""" + invalid_data = _get_valid_data( + self.project.id, + self.environment.name, + config={ + "environment": self.environment.name, + "mode": UptimeMonitorMode.AUTO_DETECTED_ACTIVE.value, + "recovery_threshold": 1, + "downtime_threshold": 1, + }, + ) + + response = self.get_error_response( + self.organization.slug, + **invalid_data, + status_code=status.HTTP_400_BAD_REQUEST, + ) + + assert response.data["config"] == ["Only superusers can modify `mode`"] diff --git a/tests/sentry/uptime/endpoints/test_validators.py b/tests/sentry/uptime/endpoints/test_validators.py index 2127aed378f1c3..adb4526372ea64 100644 --- a/tests/sentry/uptime/endpoints/test_validators.py +++ b/tests/sentry/uptime/endpoints/test_validators.py @@ -294,3 +294,83 @@ def test_update_no_enable_change_no_seat_call(self, mock_assign_seat: mock.Magic # Verify no seat operations were called mock_assign_seat.assert_not_called() + + def test_non_superuser_cannot_create_with_auto_detected_mode(self) -> None: + """Test that non-superuser cannot create detector with AUTO_DETECTED mode.""" + data = self.get_valid_data( + config={ + "mode": UptimeMonitorMode.AUTO_DETECTED_ACTIVE.value, + "environment": None, + "recovery_threshold": DEFAULT_RECOVERY_THRESHOLD, + "downtime_threshold": DEFAULT_DOWNTIME_THRESHOLD, + } + ) + + validator = UptimeDomainCheckFailureValidator(data=data, context=self.context) + assert not validator.is_valid() + assert validator.errors["config"] == ["Only superusers can modify `mode`"] + + def test_non_superuser_cannot_change_mode(self) -> None: + """Test that non-superuser cannot change mode via update.""" + # Create a detector with MANUAL mode + detector = self.create_uptime_detector(mode=UptimeMonitorMode.MANUAL) + + data = { + "config": { + "mode": UptimeMonitorMode.AUTO_DETECTED_ACTIVE.value, + "environment": None, + "recovery_threshold": DEFAULT_RECOVERY_THRESHOLD, + "downtime_threshold": DEFAULT_DOWNTIME_THRESHOLD, + } + } + + validator = UptimeDomainCheckFailureValidator( + instance=detector, data=data, context=self.context, partial=True + ) + assert not validator.is_valid() + assert validator.errors["config"] == ["Only superusers can modify `mode`"] + + # Verify mode was not changed + detector.refresh_from_db() + assert detector.config["mode"] == UptimeMonitorMode.MANUAL.value + + def test_non_superuser_can_update_with_same_mode(self) -> None: + """Test that non-superuser can pass config if mode doesn't change.""" + # Create a detector with AUTO_DETECTED_ACTIVE mode (e.g., from autodetection) + detector = self.create_uptime_detector(mode=UptimeMonitorMode.AUTO_DETECTED_ACTIVE) + + data = { + "config": { + "mode": UptimeMonitorMode.AUTO_DETECTED_ACTIVE.value, # Same mode + "environment": None, + "recovery_threshold": DEFAULT_RECOVERY_THRESHOLD, + "downtime_threshold": DEFAULT_DOWNTIME_THRESHOLD, + } + } + + validator = UptimeDomainCheckFailureValidator( + instance=detector, data=data, context=self.context, partial=True + ) + # Should be valid since mode hasn't changed + assert validator.is_valid(), validator.errors + + def test_superuser_can_create_with_auto_detected_mode(self) -> None: + """Test that superuser can create detector with AUTO_DETECTED mode.""" + superuser = self.create_user(is_superuser=True) + self.context["request"] = self.make_request(user=superuser, is_superuser=True) + + data = self.get_valid_data( + config={ + "mode": UptimeMonitorMode.AUTO_DETECTED_ACTIVE.value, + "environment": None, + "recovery_threshold": DEFAULT_RECOVERY_THRESHOLD, + "downtime_threshold": DEFAULT_DOWNTIME_THRESHOLD, + } + ) + + validator = UptimeDomainCheckFailureValidator(data=data, context=self.context) + assert validator.is_valid(), validator.errors + detector = validator.save() + + detector.refresh_from_db() + assert detector.config["mode"] == UptimeMonitorMode.AUTO_DETECTED_ACTIVE.value