Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/8080-eliminate-redundant-system-fetches.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type: Changed
description: Reduce redundant System fetches per row in /system/upsert from four to one, and add per-axis change-detection logging in the system audit path.
pr: 8080
labels: []
101 changes: 84 additions & 17 deletions src/fides/api/db/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
from fideslang.validation import FidesKey
from loguru import logger as log
from sqlalchemy import select
from sqlalchemy import update as sql_update
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session
from starlette.status import HTTP_400_BAD_REQUEST, HTTP_404_NOT_FOUND

from fides.api.db.crud import create_resource, get_resource, update_resource
from fides.api.db.crud import create_resource, get_resource
from fides.api.models.sql_models import ( # type: ignore[attr-defined]
DataCategory,
DataSubject,
Expand All @@ -25,6 +27,7 @@
System,
)
from fides.api.models.system_history import SystemHistory
from fides.api.util import errors
from fides.api.util.errors import NotFoundError


Expand Down Expand Up @@ -133,7 +136,7 @@ async def upsert_system(

for resource in resources:
try:
await get_resource(System, resource.fides_key, db)
existing_system = await get_resource(System, resource.fides_key, db)
except NotFoundError:
log.debug(
f"Upsert System with fides_key {resource.fides_key} not found, will create"
Expand All @@ -146,7 +149,14 @@ async def upsert_system(
)
inserted += 1
continue
await update_system(resource=resource, db=db, current_user_id=current_user_id)
# Pass the already-loaded System through so update_system doesn't
# re-fetch it.
await update_system(
resource=resource,
db=db,
current_user_id=current_user_id,
existing_system=existing_system,
)
updated += 1
return (inserted, updated)

Expand Down Expand Up @@ -186,12 +196,26 @@ async def upsert_privacy_declarations(


async def update_system(
resource: SystemSchema, db: AsyncSession, current_user_id: Optional[str] = None
) -> Tuple[Dict, bool]:
"""Helper function to share core system update logic for wrapping endpoint functions"""
system: System = await get_resource(
sql_model=System, fides_key=resource.fides_key, async_session=db
)
resource: SystemSchema,
db: AsyncSession,
current_user_id: Optional[str] = None,
existing_system: Optional[System] = None,
) -> Tuple[System, bool]:
"""Helper function to share core system update logic for wrapping endpoint functions.

If ``existing_system`` is supplied, it is used in place of an explicit
``get_resource`` call. Callers that already loaded the System (e.g.
``upsert_system`` after its existence check) should pass it through to
avoid a redundant fetch.
"""
system: System
if existing_system is None:
system = await get_resource(
sql_model=System, fides_key=resource.fides_key, async_session=db
)
else:
system = existing_system

existing_system_dict = copy.deepcopy(
SystemSchema.model_validate(system)
).model_dump(mode="json")
Expand All @@ -209,21 +233,40 @@ async def update_system(
resource, "privacy_declarations"
) # remove the attribute on the system since we've already updated declarations

# perform any updates on the system resource itself
updated_system = await update_resource(System, resource.model_dump(), db)
# Inline the UPDATE rather than calling ``crud.update_resource``, which
# otherwise issues two extra ``get_resource`` calls (one before the
# UPDATE and one to return the post-UPDATE row). We already hold the
# ORM object and ``db.refresh`` below picks up any DB-side coercions.
resource_dict = resource.model_dump()
async with db.begin():
log.debug(
"Updating resource",
sql_model="System",
fides_key=resource.fides_key,
)
try:
await db.execute(
sql_update(System.__table__)
.where(System.fides_key == resource.fides_key)
.values(resource_dict)
)
except SQLAlchemyError as exc:
# Mirrors the guard the prior `crud.update_resource` call had.
log.exception(f"Failed to update System with error: '{exc}'")
raise errors.QueryError()
Comment on lines +253 to +256
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these lines are uncovered by tests, can we add a test for the exception case ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude]: added in f447f65941test_update_system_raises_query_error_on_sqlalchemy_error patches async_session.execute to raise SQLAlchemyError only on the System UPDATE and asserts errors.QueryError propagates.


async with db.begin():
Comment thread
adamsachs marked this conversation as resolved.
await db.refresh(updated_system)
await db.refresh(system)

system_updated: bool = _audit_system_changes(
db,
system.id,
current_user_id,
existing_system_dict,
SystemSchema.model_validate(updated_system).model_dump(mode="json"),
SystemSchema.model_validate(system).model_dump(mode="json"),
)

return updated_system, system_updated
return system, system_updated


def _audit_system_changes(
Expand Down Expand Up @@ -259,8 +302,18 @@ def _audit_system_changes(

system_updated: bool = False

# Compute each axis's diff once so we can both gate the SystemHistory
# write and emit observability around how often each axis actually
# changes. These logs help quantify the steady-state no-op rate before
# we consider skipping the UPDATE for unchanged systems.
general_diff = DeepDiff(existing_system, updated_system, ignore_order=True)
privacy_diff = DeepDiff(privacy_existing, privacy_updated, ignore_order=True)
data_flow_diff = DeepDiff(
egress_ingress_existing, egress_ingress_updated, ignore_order=True
)

# Create a SystemHistory entry for general changes
if DeepDiff(existing_system, updated_system, ignore_order=True):
if general_diff:
system_updated = True

SystemHistory(
Expand All @@ -272,7 +325,7 @@ def _audit_system_changes(
).save(db=db)

# Create a SystemHistory entry for changes to privacy_declarations
if DeepDiff(privacy_existing, privacy_updated, ignore_order=True):
if privacy_diff:
system_updated = True

SystemHistory(
Expand All @@ -284,7 +337,7 @@ def _audit_system_changes(
).save(db=db)

# Create a SystemHistory entry for changes to egress and ingress
if DeepDiff(egress_ingress_existing, egress_ingress_updated, ignore_order=True):
if data_flow_diff:
system_updated = True

SystemHistory(
Expand All @@ -295,6 +348,20 @@ def _audit_system_changes(
created_at=now,
).save(db=db)

log.debug(
"System change detection",
system_id=system_id,
general_changed=bool(general_diff),
privacy_declarations_changed=bool(privacy_diff),
data_flow_changed=bool(data_flow_diff),
any_changed=system_updated,
general_diff_keys=len(general_diff.affected_paths) if general_diff else 0,
privacy_diff_keys=len(privacy_diff.affected_paths) if privacy_diff else 0,
data_flow_diff_keys=(
len(data_flow_diff.affected_paths) if data_flow_diff else 0
),
)

return system_updated


Expand Down
190 changes: 189 additions & 1 deletion tests/ctl/core/test_system_history.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from unittest.mock import AsyncMock, patch
from uuid import uuid4

import pytest
from fideslang.models import DataFlow as DataFlowSchema
from fideslang.models import PrivacyDeclaration as PrivacyDeclarationSchema
from fideslang.models import System as SystemSchema
from sqlalchemy import delete
from sqlalchemy.exc import SQLAlchemyError

from fides.api.db.system import create_system, update_system
from fides.api.db.crud import get_resource
from fides.api.db.system import create_system, update_system, upsert_system
from fides.api.models.sql_models import System
from fides.api.models.system_history import SystemHistory
from fides.api.util import errors
from fides.config import get_config

CONFIG = get_config()
Expand Down Expand Up @@ -137,3 +141,187 @@ async def test_automatic_system_update(self, db, async_session, system: System):
).all()

assert system_histories[0].edited_by == "automatic_system_update"


class TestUpsertSystemFetchOptimization:
"""Regression coverage for the ENG-3593 fetch-count reduction in
/system/upsert. Each updated system used to fan out to 4 get_resource
calls; after the fix it should be exactly 1."""

@pytest.fixture()
async def system(self, async_session):
resource = SystemSchema(
fides_key=str(uuid4()),
organization_fides_key="default_organization",
name="upsert_fetch_opt_system",
system_type="test",
privacy_declarations=[],
)

system = await create_system(
resource, async_session, CONFIG.security.oauth_root_client_id
)
yield system
delete(System).where(System.id == system.id)
Comment thread
adamsachs marked this conversation as resolved.

async def test_upsert_passes_existing_system_to_update(
self, async_session, system: System
):
"""upsert_system must pass the System loaded by its existence check
through to update_system as `existing_system`. If this contract is
broken, the redundant fetch the optimization removed is silently
re-introduced via update_system's fallback path."""
system_schema = SystemSchema.model_validate(system)

with patch(
"fides.api.db.system.update_system",
new=AsyncMock(return_value=(system, False)),
) as mock_update:
await upsert_system(
resources=[system_schema],
db=async_session,
current_user_id=CONFIG.security.oauth_root_client_id,
)

mock_update.assert_called_once()
call_kwargs = mock_update.call_args.kwargs
assert call_kwargs.get("existing_system") is not None, (
"update_system was called without existing_system - this defeats "
"the ENG-3593 fetch optimization."
)
assert call_kwargs["existing_system"].fides_key == system.fides_key

async def test_update_system_with_existing_system_persists_changes(
self, db, async_session, system: System
):
"""When update_system is called with an explicit existing_system
(the new ENG-3593 code path), the inline UPDATE + db.refresh still
persist the change to the DB and emit the expected audit entry."""
pre_loaded = await get_resource(System, system.fides_key, async_session)

system_schema = SystemSchema.model_validate(system)
system_schema.description = "Modified via explicit existing_system path"

_, updated = await update_system(
resource=system_schema,
db=async_session,
current_user_id=CONFIG.security.oauth_root_client_id,
existing_system=pre_loaded,
)
assert updated

# The DB row was actually updated.
fresh = await get_resource(System, system.fides_key, async_session)
assert fresh.description == "Modified via explicit existing_system path"

# An audit entry on the general axis was created.
system_histories = SystemHistory.filter(
db=db, conditions=(SystemHistory.system_id == system.id)
).all()
assert len(system_histories) == 1

async def test_upsert_system_emits_one_fetch_per_updated_system(
self, async_session
):
"""Regression guard for ENG-3593: each updated system on the UPDATE
path should issue exactly one `crud.get_resource` call (the existence
check in `upsert_system`). Before the fix this was four. If a future
change re-introduces a redundant call, this test fails loudly.

Uses two systems to also catch any accidental constant-cost bug
(e.g., loading systems once but counting wrong).
"""
# Stable fides_keys across both passes so the second pass hits the
# rows the first pass created. The schema instances themselves must
# be rebuilt because `update_system` mutates inputs (it does a
# `delattr(resource, "privacy_declarations")` after upserting them).
fides_keys = [f"fetch_count_{uuid4()}" for _ in range(2)]

def build_payload() -> list[SystemSchema]:
return [
SystemSchema(
fides_key=fk,
organization_fides_key="default_organization",
name=f"Fetch count test {i}",
system_type="test",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests/ctl/core/test_system_history.py:247

test_upsert_system_emits_one_fetch_per_updated_system relies on the literal string "Fetching resource" appearing in log output to count DB fetches. This is a fragile coupling to crud.get_resource's internal log message — if that string ever changes or the log level is raised, the assertion len(matching) == 1 will always pass even if the regression is re-introduced.

A more robust approach would be to patch get_resource itself and count invocations:

with patch("fides.api.db.system.get_resource", wraps=get_resource) as mock_get:
    await upsert_system(resources=build_payload(), db=async_session, ...)

# Each system should only be fetched once (existence check in upsert_system).
assert mock_get.call_count == len(fides_keys)

This directly measures what the optimization is meant to guarantee rather than inferring it through log messages.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adjusted to use the mocking in 618c8d7

privacy_declarations=[],
)
for i, fk in enumerate(fides_keys)
]

# Prime: this run creates the rows (INSERT path); not what we measure.
await upsert_system(
resources=build_payload(),
db=async_session,
current_user_id=CONFIG.security.oauth_root_client_id,
)

# Measure: every row exists, every row hits the UPDATE path. Patch
# `get_resource` at the call site (system.py) and wrap the original
# so behavior is unchanged but invocations are counted.
with patch(
"fides.api.db.system.get_resource", wraps=get_resource
) as mock_get_resource:
await upsert_system(
resources=build_payload(),
db=async_session,
current_user_id=CONFIG.security.oauth_root_client_id,
)

# One call per system: `upsert_system`'s existence check. The
# `update_system` call no longer needs to fetch because
# `existing_system` is threaded through.
assert mock_get_resource.call_count == len(fides_keys), (
f"Expected {len(fides_keys)} get_resource calls on the UPDATE "
f"path (one existence check per system), got "
f"{mock_get_resource.call_count}."
)

async def test_update_system_raises_query_error_on_sqlalchemy_error(
self, async_session
):
"""When the inlined UPDATE raises a SQLAlchemyError, update_system
must wrap it in a domain-level errors.QueryError. This mirrors the
guard the prior `crud.update_resource` call provided and prevents
raw SQLAlchemy exceptions from propagating to API callers.

Builds the System inline rather than using the class `system`
fixture: ``async with db.begin()`` auto-rolls back when the
injected error propagates, which expires ORM objects bound to the
shared session-scoped fixture. The fixture's teardown then tries
to access ``system.id`` and triggers a lazy reload that fails
outside an async greenlet context.
"""
fides_key = f"qe_test_{uuid4()}"
resource = SystemSchema(
fides_key=fides_key,
organization_fides_key="default_organization",
name="query_error_test_system",
system_type="test",
privacy_declarations=[],
)
await create_system(
resource, async_session, CONFIG.security.oauth_root_client_id
)
pre_loaded = await get_resource(System, fides_key, async_session)

system_schema = SystemSchema.model_validate(pre_loaded)
system_schema.description = "Should fail"

original_execute = async_session.execute

async def execute_or_fail(stmt, *args, **kwargs):
# Fail only on the System UPDATE so we don't break transaction
# control or other internal session machinery.
if getattr(stmt, "table", None) is System.__table__:
raise SQLAlchemyError("simulated DB failure")
return await original_execute(stmt, *args, **kwargs)

with patch.object(async_session, "execute", side_effect=execute_or_fail):
with pytest.raises(errors.QueryError):
await update_system(
resource=system_schema,
db=async_session,
current_user_id=CONFIG.security.oauth_root_client_id,
existing_system=pre_loaded,
)
Loading