diff --git a/postgres/datadog_checks/postgres/query_calls_cache.py b/postgres/datadog_checks/postgres/query_calls_cache.py index ce8b1a031de92..30e2b0d929cbb 100644 --- a/postgres/datadog_checks/postgres/query_calls_cache.py +++ b/postgres/datadog_checks/postgres/query_calls_cache.py @@ -7,12 +7,16 @@ class QueryCallsCache: def __init__(self): self.cache = {} self.next_cache = {} + self.called_queryids = [] + self.next_called_queryids = set() def end_query_call_snapshot(self): """To prevent evicted statements from building up in the cache we replace the cache outright after each sampling of pg_stat_statements.""" self.cache = self.next_cache self.next_cache = {} + self.called_queryids = self.next_called_queryids + self.next_called_queryids = set() def set_calls(self, queryid, calls): """Updates the cache of calls per query id. @@ -34,5 +38,5 @@ def set_calls(self, queryid, calls): calls_changed = True self.next_cache[queryid] = calls - - return calls_changed + if calls_changed: + self.next_called_queryids.add(queryid) diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 7eb2b856edc43..ed8a07d898744 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -213,7 +213,6 @@ def _check_called_queries(self): with self._check._get_main_db() as conn: with conn.cursor(cursor_factory=CommenterCursor) as cursor: - called_queryids = [] query = QUERYID_TO_CALLS_QUERY.format(pg_stat_statements_view=pgss_view_without_query_text) rows = self._execute_query(cursor, query, params=(self._config.dbname,)) @@ -223,18 +222,17 @@ def _check_called_queries(self): continue calls = row[1] - calls_changed = self._query_calls_cache.set_calls(queryid, calls) - if calls_changed: - called_queryids.append(queryid) + self._query_calls_cache.set_calls(queryid, calls) + self._query_calls_cache.end_query_call_snapshot() self._check.gauge( "dd.postgresql.pg_stat_statements.calls_changed", - len(called_queryids), + len(self._query_calls_cache.called_queryids), tags=self.tags, hostname=self._check.resolved_hostname, ) - return called_queryids + return self._query_calls_cache.called_queryids def run_job(self): # do not emit any dd.internal metrics for DBM specific check code @@ -272,7 +270,7 @@ def collect_per_statement_metrics(self): @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) def _load_pg_stat_statements(self): try: - called_queryids = self._check_called_queries() + self._check_called_queries() available_columns = set(self._get_pg_stat_statements_columns()) missing_columns = PG_STAT_STATEMENTS_REQUIRED_COLUMNS - available_columns if len(missing_columns) > 0: @@ -347,7 +345,7 @@ def _load_pg_stat_statements(self): pg_stat_statements_view=self._config.pg_stat_statements_view, filters=filters, extra_clauses="", - called_queryids=', '.join([str(i) for i in called_queryids]), + called_queryids=', '.join([str(i) for i in self._query_calls_cache.called_queryids]), ), params=params, ) @@ -478,7 +476,7 @@ def _apply_deltas(self, rows): query_text = {row['query_signature']: row['query'] for row in rows} applied_rows = [] for query_signature, query_sig_metrics in self._baseline_metrics.items(): - for queryid, row in query_sig_metrics.items(): + for row in query_sig_metrics.values(): if query_signature in query_text: applied_rows.append({**row, 'query': query_text[query_signature]}) else: @@ -490,7 +488,7 @@ def _apply_deltas(self, rows): # pg_stat_statements eviction), we clear it out periodically to force a full refetch. def _check_baseline_metrics_expiry(self): if ( - self._last_baseline_metrics_expiry == None + self._last_baseline_metrics_expiry is None or self._last_baseline_metrics_expiry + BASELINE_METRICS_EXPIRY < time.time() ): self._baseline_metrics = {} diff --git a/postgres/tests/test_query_calls_cache.py b/postgres/tests/test_query_calls_cache.py index 81e5e34f251cc..864aabcae979d 100644 --- a/postgres/tests/test_query_calls_cache.py +++ b/postgres/tests/test_query_calls_cache.py @@ -12,27 +12,28 @@ def test_statement_queryid_cache_same_calls_does_not_change(): cache = QueryCallsCache() cache.set_calls(123, 1) cache.end_query_call_snapshot() - changed = cache.set_calls(123, 1) + cache.set_calls(123, 1) + cache.end_query_call_snapshot() - assert changed is False + assert cache.called_queryids == set() def test_statement_queryid_cache_multiple_calls_change(): cache = QueryCallsCache() cache.set_calls(123, 1) cache.end_query_call_snapshot() - changed = cache.set_calls(123, 2) + cache.set_calls(123, 2) - assert changed is True + assert cache.called_queryids == {123} def test_statement_queryid_cache_after_pg_stat_statement_eviction(): cache = QueryCallsCache() cache.set_calls(123, 100) cache.end_query_call_snapshot() - changed = cache.set_calls(123, 5) + cache.set_calls(123, 5) - assert changed is True + assert cache.called_queryids == {123} def test_statement_queryid_cache_snapshot_eviction():