Skip to content

Commit

Permalink
skip operations from system databases
Browse files Browse the repository at this point in the history
  • Loading branch information
lu-zhengda committed May 22, 2024
1 parent a0b5879 commit cc4fad2
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 34 deletions.
5 changes: 1 addition & 4 deletions mongo/datadog_checks/mongo/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,7 @@ def current_op(self, session=None):
# Because the currentOp command and db.currentOp() helper method return the results in a single document,
# the total size of the currentOp result set is subject to the maximum 16MB BSON size limit for documents.
# The $currentOp stage returns a cursor over a stream of documents, each of which reports a single operation.
return self["admin"].aggregate(
[{'$currentOp': {'allUsers': True, 'idleSessions': True}}],
session=session,
)
return self["admin"].aggregate([{'$currentOp': {'allUsers': True}}], session=session)

def _is_arbiter(self, options):
cli = MongoClient(**options)
Expand Down
82 changes: 53 additions & 29 deletions mongo/datadog_checks/mongo/dbm/operation_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import time
from datetime import datetime
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Tuple

from bson import json_util

Expand Down Expand Up @@ -43,6 +43,8 @@ class MongoDbExplainExceptionBase(Exception):
"client",
}

SYSTEM_DATABASES = {"admin", "config", "local"}


def agent_check_getter(self):
return self._check
Expand Down Expand Up @@ -110,7 +112,9 @@ def _get_operation_samples(self, now):
now, operation, operation_metadata, obfuscated_command, query_signature
)

explain_plan = self._get_explain_plan(op=operation.get("op"), command=command)
explain_plan = self._get_explain_plan(
op=operation.get("op"), command=command, dbname=operation_metadata["dbname"]
)
sample = self._create_operation_sample_payload(
now, operation_metadata, obfuscated_command, query_signature, explain_plan, activity
)
Expand All @@ -126,6 +130,25 @@ def _get_current_op(self):
yield operation

def _should_include_operation(self, operation: dict) -> bool:
# Skip operations from db that are not configured to be monitored
namespace = operation.get("ns")
if not namespace:
self._check.log.debug("Skipping operation without namespace: %s", operation)
return False

db, _ = namespace.split(".", 1)
if self._check._config.db_names is not None:
if db not in self._check._config.db_names:
self._check.log.debug(
"Skipping operation for database %s because it is not configured to be monitored", db
)
return False

if db in SYSTEM_DATABASES:
self._check.log.debug("Skipping operation for system database %s", db)
return False

# Skip operations without a command
command = operation.get("command")
if not command:
self._check.log.debug("Skipping operation without command: %s", operation)
Expand All @@ -150,29 +173,23 @@ def _should_include_operation(self, operation: dict) -> bool:
return True

def _should_explain(self, op: Optional[str], command: dict) -> bool:
dbname = command.get("$db")
if not dbname or dbname in ("admin", "local"):
# Skip system operations
self._check.log.debug("Skipping explain system operation: %s", command)
return False

if not op or op == "none":
# Skip operations that are not queries
self._check.log.debug("Skipping explain operation without operation type: %s", command)
return False

if op in ("getMore", "insert", "update", "getmore", "killcursors", "remove"):
if op in ("insert", "update", "getmore", "killcursors", "remove"):
# Skip operations that are not queries
self._check.log.debug("Skipping explain operation type %s: %s", op, command)
return False

return True

def _get_explain_plan(self, op: Optional[str], command: dict) -> OperationSamplePlan:
def _get_explain_plan(self, op: Optional[str], command: dict, dbname: str) -> OperationSamplePlan:
if not self._should_explain(op, command):
return

dbname = command.pop("$db")
dbname = command.pop("$db", dbname)
try:
explain_plan = self._check.api_client[dbname].command("explain", command, verbosity="executionStats")
explain_plan = self._format_explain_plan(explain_plan)
Expand Down Expand Up @@ -201,22 +218,6 @@ def _format_explain_plan(self, explain_plan: dict) -> dict:
if key not in ("serverInfo", "serverParameters", "command", "ok", "$clusterTime", "operationTime")
}

def _get_command_collection(self, command: dict) -> Optional[str]:
for key in (
"collection",
"find",
"aggregate",
"update",
"insert",
"delete",
"findAndModify",
"distinct",
"count",
):
collection = command.get(key)
if collection and isinstance(collection, str): # edge case like {"aggregate": 1}
return collection

def _get_operation_client(self, operation: dict) -> OperationSampleClient:
client_metadata = operation.get("clientMetadata", {})

Expand All @@ -238,15 +239,38 @@ def _get_command_truncation_state(self, command: dict) -> Optional[str]:
return None
return "truncated" if command.get("$truncated") else "not_truncated"

def _get_command_collection(self, command: dict, collection_from_ns: str) -> Optional[str]:
if collection_from_ns != '$cmd':
return collection_from_ns

# If the collection name parsed from namespace is $cmd
# we try to look for the collection in the command
for key in (
"collection",
"find",
"aggregate",
"update",
"insert",
"delete",
"findAndModify",
"distinct",
"count",
):
collection = command.get(key)
if collection and isinstance(collection, str): # edge case like {"aggregate": 1}
return collection

def _get_operation_metadata(self, operation: dict) -> OperationSampleOperationMetadata:
namespace = operation.get("ns")
db, collection = namespace.split(".", 1)
command = operation.get("command", {})
return {
"type": operation.get("type"),
"op": operation.get("op"),
"shard": operation.get("shard"),
"dbname": command.get("$db"),
"dbname": command.get("$db", db),
"application": operation.get("appName"),
"collection": self._get_command_collection(command),
"collection": self._get_command_collection(command, collection),
"comment": command.get("comment"),
"truncated": self._get_command_truncation_state(command),
"client": self._get_operation_client(operation),
Expand Down
2 changes: 1 addition & 1 deletion mongo/tests/mocked_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def command(self, command, *args, **_):
return json.load(f, object_hook=json_util.object_hook)

def aggregate(self, pipeline, session=None, **kwargs):
if pipeline[0] == {'$currentOp': {'allUsers': True, 'idleSessions': True}}:
if pipeline[0] == {'$currentOp': {'allUsers': True}}:
# mock the $currentOp aggregation used for operation sampling
with open(os.path.join(HERE, "fixtures", f"$currentOp-{self.deployment}"), 'r') as f:
return json.load(f, object_hook=json_util.object_hook)
Expand Down

0 comments on commit cc4fad2

Please sign in to comment.