Skip to content

Commit

Permalink
Store called queryids within the QueryCallsCache
Browse files Browse the repository at this point in the history
  • Loading branch information
amw-zero committed May 13, 2024
1 parent fdc049e commit f7b78bd
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 18 deletions.
8 changes: 6 additions & 2 deletions postgres/datadog_checks/postgres/query_calls_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ class QueryCallsCache:
def __init__(self):
self.cache = {}
self.next_cache = {}
self.called_queryids = []
self.next_called_queryids = set()

def end_query_call_snapshot(self):
"""To prevent evicted statements from building up in the cache we
replace the cache outright after each sampling of pg_stat_statements."""
self.cache = self.next_cache
self.next_cache = {}
self.called_queryids = self.next_called_queryids
self.next_called_queryids = set()

def set_calls(self, queryid, calls):
"""Updates the cache of calls per query id.
Expand All @@ -34,5 +38,5 @@ def set_calls(self, queryid, calls):
calls_changed = True

self.next_cache[queryid] = calls

return calls_changed
if calls_changed:
self.next_called_queryids.add(queryid)
18 changes: 8 additions & 10 deletions postgres/datadog_checks/postgres/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ def _check_called_queries(self):

with self._check._get_main_db() as conn:
with conn.cursor(cursor_factory=CommenterCursor) as cursor:
called_queryids = []
query = QUERYID_TO_CALLS_QUERY.format(pg_stat_statements_view=pgss_view_without_query_text)
rows = self._execute_query(cursor, query, params=(self._config.dbname,))

Expand All @@ -223,18 +222,17 @@ def _check_called_queries(self):
continue

calls = row[1]
calls_changed = self._query_calls_cache.set_calls(queryid, calls)
if calls_changed:
called_queryids.append(queryid)
self._query_calls_cache.set_calls(queryid, calls)

self._query_calls_cache.end_query_call_snapshot()
self._check.gauge(
"dd.postgresql.pg_stat_statements.calls_changed",
len(called_queryids),
len(self._query_calls_cache.called_queryids),
tags=self.tags,
hostname=self._check.resolved_hostname,
)

return called_queryids
return self._query_calls_cache.called_queryids

def run_job(self):
# do not emit any dd.internal metrics for DBM specific check code
Expand Down Expand Up @@ -272,7 +270,7 @@ def collect_per_statement_metrics(self):
@tracked_method(agent_check_getter=agent_check_getter, track_result_length=True)
def _load_pg_stat_statements(self):
try:
called_queryids = self._check_called_queries()
self._check_called_queries()
available_columns = set(self._get_pg_stat_statements_columns())
missing_columns = PG_STAT_STATEMENTS_REQUIRED_COLUMNS - available_columns
if len(missing_columns) > 0:
Expand Down Expand Up @@ -347,7 +345,7 @@ def _load_pg_stat_statements(self):
pg_stat_statements_view=self._config.pg_stat_statements_view,
filters=filters,
extra_clauses="",
called_queryids=', '.join([str(i) for i in called_queryids]),
called_queryids=', '.join([str(i) for i in self._query_calls_cache.called_queryids]),
),
params=params,
)
Expand Down Expand Up @@ -478,7 +476,7 @@ def _apply_deltas(self, rows):
query_text = {row['query_signature']: row['query'] for row in rows}
applied_rows = []
for query_signature, query_sig_metrics in self._baseline_metrics.items():
for queryid, row in query_sig_metrics.items():
for row in query_sig_metrics.values():
if query_signature in query_text:
applied_rows.append({**row, 'query': query_text[query_signature]})
else:
Expand All @@ -490,7 +488,7 @@ def _apply_deltas(self, rows):
# pg_stat_statements eviction), we clear it out periodically to force a full refetch.
def _check_baseline_metrics_expiry(self):
if (
self._last_baseline_metrics_expiry == None
self._last_baseline_metrics_expiry is None
or self._last_baseline_metrics_expiry + BASELINE_METRICS_EXPIRY < time.time()
):
self._baseline_metrics = {}
Expand Down
13 changes: 7 additions & 6 deletions postgres/tests/test_query_calls_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,28 @@ def test_statement_queryid_cache_same_calls_does_not_change():
cache = QueryCallsCache()
cache.set_calls(123, 1)
cache.end_query_call_snapshot()
changed = cache.set_calls(123, 1)
cache.set_calls(123, 1)
cache.end_query_call_snapshot()

assert changed is False
assert cache.called_queryids == set()


def test_statement_queryid_cache_multiple_calls_change():
cache = QueryCallsCache()
cache.set_calls(123, 1)
cache.end_query_call_snapshot()
changed = cache.set_calls(123, 2)
cache.set_calls(123, 2)

assert changed is True
assert cache.called_queryids == {123}


def test_statement_queryid_cache_after_pg_stat_statement_eviction():
cache = QueryCallsCache()
cache.set_calls(123, 100)
cache.end_query_call_snapshot()
changed = cache.set_calls(123, 5)
cache.set_calls(123, 5)

assert changed is True
assert cache.called_queryids == {123}


def test_statement_queryid_cache_snapshot_eviction():
Expand Down

0 comments on commit f7b78bd

Please sign in to comment.