Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 64 additions & 5 deletions datadog_sync/utils/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from datadog_sync.utils.custom_client import CustomClient
from datadog_sync.utils.base_resource import BaseResource
from datadog_sync.utils.log import Log
from datadog_sync.utils.filter import Filter, process_filters
from datadog_sync.utils.filter import Filter, process_filters, EXACT_MATCH_OPERATOR
from datadog_sync.utils.resource_utils import CustomClientHTTPError
from datadog_sync.utils.state import State
from datadog_sync.utils.storage.storage_types import StorageType
Expand Down Expand Up @@ -135,6 +135,52 @@ async def exit_async(self):
await self.destination_client._end_session()


def _unwrap_exact_match_pattern(pattern: str) -> str:
"""Extract the raw ID value from an ExactMatch ^...$-wrapped regex pattern.

Defensive check: ExactMatch always produces ^...$, so ValueError should not
fire in practice. Raises ValueError so callers can detect unexpected patterns.
"""
if not (pattern.startswith("^") and pattern.endswith("$")):
raise ValueError(f"Expected ExactMatch regex ^...$, got: {pattern!r}")
return pattern[1:-1]


def extract_exact_id_filters(
filters: Dict[str, list],
filter_operator: str,
resource_types: list,
) -> Optional[Dict[str, list]]:
"""Return {type: [id1, id2, ...]} when all conditions allow ID-targeted loading.

Conditions (all must be true):
- filter_operator is OR (case-insensitive)
- Every resource type in resource_types has at least one filter
- All filters for each type use Name=id + ExactMatch operator

Returns None if any condition fails → caller falls back to type-scoped loading.
"""
if filter_operator.lower() != "or":
return None
result = {}
for rt in resource_types:
rt_filters = filters.get(rt, [])
if not rt_filters:
return None # No filters for this type — can't use ID-targeted
# All filters must be id-field ExactMatch
if not all(f.attr_name == ["id"] and f.operator == EXACT_MATCH_OPERATOR for f in rt_filters):
return None
# Extract raw IDs from ^...$-wrapped regex patterns.
# Defensive: ExactMatch guarantees ^...$, so ValueError should not fire.
# Kept as a safety net in case filter construction changes upstream.
try:
ids = [_unwrap_exact_match_pattern(f.attr_re.pattern) for f in rt_filters]
Comment thread
michael-richey marked this conversation as resolved.
except ValueError:
return None # Pattern wasn't ^...$-wrapped — fall back gracefully
result[rt] = ids
return result


def build_config(cmd: Command, **kwargs: Optional[Any]) -> Configuration:
# configure logger — in JSON mode, Log writes NDJSON to stdout and silences stderr
emit_json = kwargs.get("emit_json", False)
Expand Down Expand Up @@ -318,9 +364,18 @@ def build_config(cmd: Command, **kwargs: Optional[Any]) -> Configuration:
raise click.UsageError("--minimize-reads cannot be combined with --cleanup")

# Determine loading strategy for minimize-reads
_state_resource_types = None # None = full load (existing behavior)
_state_resource_types = None # type-scoped; None = full load (existing behavior)
_state_exact_ids = None # ID-targeted; None = not using ID-targeted
if minimize_reads and (rs := kwargs.get("resources", None)):
_state_resource_types = [r.strip().lower() for r in rs.split(",") if r.strip()]
raw_types = [r.strip().lower() for r in rs.split(",") if r.strip()]
# Try ID-targeted strategy first (fast path: exact IDs from filters)
early_filters = process_filters(kwargs.get("filter"))
filter_operator = kwargs.get("filter_operator", "or")
_state_exact_ids = extract_exact_id_filters(early_filters, filter_operator, raw_types)
Comment thread
michael-richey marked this conversation as resolved.
if _state_exact_ids is None:
# Fall back to type-scoped loading
logger.debug("minimize-reads: ID-targeted not eligible — filters are not all id+ExactMatch+OR")
_state_resource_types = raw_types

# Initialize state
state = State(
Expand All @@ -329,10 +384,14 @@ def build_config(cmd: Command, **kwargs: Optional[Any]) -> Configuration:
destination_resources_path=destination_resources_path,
config=config,
resource_per_file=resource_per_file,
resource_types=_state_resource_types, # None = full load
resource_types=_state_resource_types, # None = full load or ID-targeted
exact_ids=_state_exact_ids, # None = not using ID-targeted
)

if _state_resource_types is not None:
if _state_exact_ids is not None:
total = sum(len(v) for v in _state_exact_ids.values())
logger.info(f"minimize-reads: ID-targeted loading for {total} resources across {list(_state_exact_ids.keys())}")
elif _state_resource_types is not None:
logger.info(f"minimize-reads: type-scoped loading for {_state_resource_types}")

# Initialize Configuration
Expand Down
14 changes: 14 additions & 0 deletions datadog_sync/utils/resources_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,21 @@ def _resource_connections(self, resource_type: str, _id: str) -> Tuple[Set[Tuple
# After retrieving all of the failed connections, we check if
# the resources are imported. Otherwise append to missing with its type.
for f_id in failed:
# With --minimize-reads, dependency types may not be in the
# initial scoped load. Lazily load this specific dependency
# (source+destination) so the source check below is accurate,
# and so connect_resources() in _apply_resource_cb() can
# successfully remap the ID in the destination.
self.config.state.ensure_resource_loaded(resource_to_connect, f_id)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description calls out a known limitation: monitors.py and restriction_policies.py have connect_id() overrides that access hardcoded destination types beyond what appears in resource_connections. Those types won't be in the scoped load and ensure_resource_loaded won't be called for them, so their IDs will silently not be remapped.

Silent ID-remapping failure is very hard to detect in production — the sync appears to succeed but references in the destination resource are stale. Even if fixing the root cause is deferred, the failure should be surfaced:

# After ensure_resource_loaded + source check:
if f_id in self.config.state.source[resource_to_connect]:
    # it loaded successfully
else:
    if self.config.state._minimize_reads:
        log.warning(
            'minimize-reads: dependency %s.%s not found in storage; '
            'ID remapping may be incomplete', resource_to_connect, f_id
        )
    missing_resources.add((resource_to_connect, f_id))

At minimum, the test plan should include a case that syncs a monitor widget in a dashboard (which exercises the monitors override path) to confirm it doesn't silently break.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Added a self.config.logger.warning call after the source check in _resource_connections. Noted in the comment that the monitors.py override (which accesses synthetics_tests directly in connect_id) won't be caught here since that path bypasses _resource_connections — fixing the root cause is a separate refactor.


if f_id not in self.config.state.source[resource_to_connect]:
if self.config.state._minimize_reads:
self.config.logger.warning(
"minimize-reads: dependency %s.%s not found in storage; "
"ID remapping may be incomplete",
resource_to_connect,
f_id,
)
missing_resources.add((resource_to_connect, f_id))

failed_connections.add((resource_to_connect, f_id))
Expand Down
50 changes: 46 additions & 4 deletions datadog_sync/utils/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
# under the 3-clause BSD style license (see LICENSE).
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2019 Datadog, Inc.
import logging
from typing import Any, Dict, List, Tuple

from datadog_sync.constants import (
Origin,
DESTINATION_PATH_DEFAULT,
DESTINATION_PATH_PARAM,
LOGGER_NAME,
RESOURCE_PER_FILE,
SOURCE_PATH_DEFAULT,
SOURCE_PATH_PARAM,
Expand All @@ -19,11 +21,15 @@
from datadog_sync.utils.storage.local_file import LocalFile
from datadog_sync.utils.storage.storage_types import StorageType

log = logging.getLogger(LOGGER_NAME)


class State:
def __init__(self, type_: StorageType = StorageType.LOCAL_FILE, **kwargs: object) -> None:
self._resource_types = kwargs.get("resource_types", None)
self._minimize_reads = self._resource_types is not None
self._resource_types = kwargs.get("resource_types", None) # type-scoped loading
self._exact_ids = kwargs.get("exact_ids", None) # ID-targeted loading
self._minimize_reads = self._resource_types is not None or self._exact_ids is not None
self._ensure_attempted: set = set() # tracks IDs attempted by ensure_resource_loaded
resource_per_file = kwargs.get(RESOURCE_PER_FILE, False)
source_resources_path = kwargs.get(SOURCE_PATH_PARAM, SOURCE_PATH_DEFAULT)
destination_resources_path = kwargs.get(DESTINATION_PATH_PARAM, DESTINATION_PATH_DEFAULT)
Expand Down Expand Up @@ -78,8 +84,44 @@ def destination(self):
return self._data.destination

def load_state(self, origin: Origin = Origin.ALL) -> None:
# resource_types=None → load all types (default behavior)
self._data = self._storage.get(origin, resource_types=self._resource_types)
if self._exact_ids is not None:
# ID-targeted: fetch only specified resources by constructing keys directly
self._data = self._storage.get_by_ids(origin, self._exact_ids)
else:
# Type-scoped (resource_types set) or full load (resource_types=None)
self._data = self._storage.get(origin, resource_types=self._resource_types)

def ensure_resource_loaded(self, resource_type: str, resource_id: str) -> None:
Comment thread
michael-richey marked this conversation as resolved.
"""Lazily load source+destination state for one dependency resource.

Called from _resource_connections() in resources_handler.py when a
cross-type dependency is encountered that may not be in the initial
(scoped) load. Loads both source and destination state so that
connect_id() in _apply_resource_cb() can remap IDs correctly.

Note: requires resource_per_file=True in the storage backend.
get_single constructs per-resource filenames; monolithic layout
will silently return (None, None) for every dependency.

Contract:
- Idempotent: no-op if (resource_type, resource_id) already attempted
- No-op when not in minimize-reads mode (_minimize_reads=False)
- Appends to state: never replaces existing entries
- Missing file: (None, None) → resource stays absent (correct behavior)
- asyncio-safe: fully synchronous, no await points
"""
if not self._minimize_reads:
return
key = (resource_type, resource_id)
if key in self._ensure_attempted:
return
self._ensure_attempted.add(key)
log.debug(f"minimize-reads: lazy-loading dep {resource_type}.{resource_id}")
src, dst = self._storage.get_single(resource_type, resource_id)
if src is not None:
self._data.source[resource_type][resource_id] = src
if dst is not None:
self._data.destination[resource_type][resource_id] = dst

def dump_state(self, origin: Origin = Origin.ALL) -> None:
self._storage.put(origin, self._data)
Expand Down
15 changes: 12 additions & 3 deletions datadog_sync/utils/storage/_base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple

from datadog_sync.constants import LOGGER_NAME
from datadog_sync.constants import LOGGER_NAME, Origin


log = logging.getLogger(LOGGER_NAME)
Expand Down Expand Up @@ -80,7 +80,6 @@ def get(self, origin, resource_types=None) -> StorageData:
"""
pass

@abstractmethod
def get_by_ids(self, origin, exact_ids: Dict[str, List[str]]) -> StorageData:
"""Load specific resources by ID, constructing keys directly. No listing needed.

Expand All @@ -91,7 +90,17 @@ def get_by_ids(self, origin, exact_ids: Dict[str, List[str]]) -> StorageData:
Returns StorageData with only the requested resources. Missing resources
are silently skipped (no exception raised for NotFound).
"""
pass
if not getattr(self, "resource_per_file", True):
raise ValueError("get_by_ids() requires --resource-per-file. Re-run with --resource-per-file enabled.")
data = StorageData()
for resource_type, ids in exact_ids.items():
for resource_id in ids:
src, dst = self.get_single(resource_type, resource_id)
if origin in [Origin.SOURCE, Origin.ALL] and src is not None:
data.source[resource_type][resource_id] = src
if origin in [Origin.DESTINATION, Origin.ALL] and dst is not None:
data.destination[resource_type][resource_id] = dst
return data

@abstractmethod
def get_single(self, resource_type: str, resource_id: str) -> Tuple[Optional[Dict], Optional[Dict]]:
Expand Down
16 changes: 1 addition & 15 deletions datadog_sync/utils/storage/aws_s3_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import json
import logging
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
from typing import Dict, Optional, Tuple

import boto3
from botocore.exceptions import ClientError
Expand Down Expand Up @@ -175,20 +175,6 @@ def _try_get_object(self, key: str) -> Optional[Dict]:
log.warning(f"invalid json in aws resource file: {key}")
return None

def get_by_ids(self, origin: Origin, exact_ids: Dict[str, List[str]]) -> StorageData:
"""Load specific resources by ID without listing. Constructs keys directly."""
if not self.resource_per_file:
raise ValueError("get_by_ids() requires --resource-per-file. " "Re-run with --resource-per-file enabled.")
data = StorageData()
for resource_type, ids in exact_ids.items():
for resource_id in ids:
src, dst = self.get_single(resource_type, resource_id)
if origin in [Origin.SOURCE, Origin.ALL] and src is not None:
data.source[resource_type][resource_id] = src
if origin in [Origin.DESTINATION, Origin.ALL] and dst is not None:
data.destination[resource_type][resource_id] = dst
return data

def get_single(self, resource_type: str, resource_id: str) -> Tuple[Optional[Dict], Optional[Dict]]:
"""Load one resource's source and destination state by ID.
Expand Down
16 changes: 1 addition & 15 deletions datadog_sync/utils/storage/azure_blob_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import json
import logging
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
from typing import Dict, Optional, Tuple

from azure.core.exceptions import ResourceNotFoundError
from azure.storage.blob import BlobServiceClient, ContainerClient
Expand Down Expand Up @@ -138,20 +138,6 @@ def _try_get_blob(self, key: str) -> Optional[Dict]:
log.warning(f"invalid json in azure resource file: {key}")
return None

def get_by_ids(self, origin: Origin, exact_ids: Dict[str, List[str]]) -> StorageData:
"""Load specific resources by ID without listing. Constructs keys directly."""
if not self.resource_per_file:
raise ValueError("get_by_ids() requires --resource-per-file. " "Re-run with --resource-per-file enabled.")
data = StorageData()
for resource_type, ids in exact_ids.items():
for resource_id in ids:
src, dst = self.get_single(resource_type, resource_id)
if origin in [Origin.SOURCE, Origin.ALL] and src is not None:
data.source[resource_type][resource_id] = src
if origin in [Origin.DESTINATION, Origin.ALL] and dst is not None:
data.destination[resource_type][resource_id] = dst
return data

def get_single(self, resource_type: str, resource_id: str) -> Tuple[Optional[Dict], Optional[Dict]]:
"""Load one resource's source and destination state by ID."""
safe_id = self._sanitize_id_for_filename(resource_id)
Expand Down
16 changes: 1 addition & 15 deletions datadog_sync/utils/storage/gcs_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import json
import logging
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
from typing import Dict, Optional, Tuple

from google.api_core.exceptions import NotFound
from google.cloud import storage as gcs_storage
Expand Down Expand Up @@ -130,20 +130,6 @@ def _try_get_blob(self, key: str) -> Optional[Dict]:
log.warning(f"invalid json in gcs resource file: {key}")
return None

def get_by_ids(self, origin: Origin, exact_ids: Dict[str, List[str]]) -> StorageData:
"""Load specific resources by ID without listing. Constructs keys directly."""
if not self.resource_per_file:
raise ValueError("get_by_ids() requires --resource-per-file. " "Re-run with --resource-per-file enabled.")
data = StorageData()
for resource_type, ids in exact_ids.items():
for resource_id in ids:
src, dst = self.get_single(resource_type, resource_id)
if origin in [Origin.SOURCE, Origin.ALL] and src is not None:
data.source[resource_type][resource_id] = src
if origin in [Origin.DESTINATION, Origin.ALL] and dst is not None:
data.destination[resource_type][resource_id] = dst
return data

def get_single(self, resource_type: str, resource_id: str) -> Tuple[Optional[Dict], Optional[Dict]]:
"""Load one resource's source and destination state by ID."""
safe_id = self._sanitize_id_for_filename(resource_id)
Expand Down
16 changes: 1 addition & 15 deletions datadog_sync/utils/storage/local_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import json
import logging
import os
from typing import Dict, List, Optional, Tuple
from typing import Dict, Optional, Tuple

from datadog_sync.constants import (
Origin,
Expand Down Expand Up @@ -112,20 +112,6 @@ def write_resources_file(self, origin: Origin, data: StorageData) -> None:
with open(filename, "w+", encoding="utf-8") as out_file:
json.dump(value, out_file)

def get_by_ids(self, origin: Origin, exact_ids: Dict[str, List[str]]) -> StorageData:
"""Load specific resources by ID. Constructs filenames directly without listing."""
if not self.resource_per_file:
raise ValueError("get_by_ids() requires --resource-per-file. " "Re-run with --resource-per-file enabled.")
data = StorageData()
for resource_type, ids in exact_ids.items():
for resource_id in ids:
src, dst = self.get_single(resource_type, resource_id)
if origin in [Origin.SOURCE, Origin.ALL] and src is not None:
data.source[resource_type][resource_id] = src
if origin in [Origin.DESTINATION, Origin.ALL] and dst is not None:
data.destination[resource_type][resource_id] = dst
return data

def get_single(self, resource_type: str, resource_id: str) -> Tuple[Optional[Dict], Optional[Dict]]:
"""Load one resource's source and destination state by ID.
Expand Down
Loading