# MongoDB Advisor

In [None]:
# TODO Set cluster name, database and collection name
cluster_name = "<cluster name on Atlas, eg.: Cluster0>"
database_name = "<placeholder>"
collection_name = "<placeholder>"

#### Context

In [None]:
# Imports
import json
import math
import os

import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import requests
from IPython.core.display import Markdown, HTML
from IPython.display import display
from anthropic import Anthropic
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from pymongo.mongo_client import MongoClient

# Load env variables for
# - MongoDB Atlas CLI,
# - MongoDB Client
# - Anthropic client
load_dotenv()

# Create MongoBD Client, point to a specific database/collection
uri = os.getenv("MONGO_URI")
mongodb_client = MongoClient(uri)
namespace = database_name + "." + collection_name
db = mongodb_client[database_name][collection_name]
stats_db = mongodb_client.get_database("observability").get_collection("queryStats")

# Create Anthropic client
anthropic_client = Anthropic()
base_context = []

In [None]:
# MongoDB context

def webpage_as_context(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')
    body = soup.select('div.body')[0]
    text = body.get_text(separator=" ", strip=True)
    return {
        "role": "user",
        "content": f"MongoDB documentation from {url}\n{text}",
    }


docs = [
    "https://www.mongodb.com/docs/manual/core/query-optimization/",
    "https://www.mongodb.com/docs/manual/tutorial/equality-sort-range-rule/",
    "https://www.mongodb.com/docs/manual/core/index-partial/",
    "https://www.mongodb.com/docs/manual/reference/explain-results/",
    "https://www.mongodb.com/docs/manual/tutorial/analyze-query-plan/",
    "https://www.mongodb.com/docs/manual/tutorial/optimize-query-performance-with-indexes-and-projections/",
    "https://www.mongodb.com/docs/atlas/performance-advisor/",
    "https://www.mongodb.com/docs/atlas/performance-advisor/index-ranking/",
    "https://www.mongodb.com/docs/manual/reference/operator/aggregation/indexStats/",
    "https://www.mongodb.com/docs/cloud-manager/reference/api/performance-advisor/get-suggested-indexes/",
    "https://www.mongodb.com/docs/manual/reference/operator/aggregation/queryStats/",
    "https://www.mongodb.com/docs/manual/core/query-shapes/"
]

for url in docs:
    context_message = webpage_as_context(url)
    base_context.append(context_message)

In [None]:
# Current indexes
# https://www.mongodb.com/docs/manual/reference/operator/aggregation/indexStats/
current_indexes = []

index_stats = list(db.aggregate([{"$indexStats": {}}]))
for idx in index_stats:
    current_indexes.append(str(idx["key"]))
current_indexes = sorted(current_indexes)
# Designed indexes - placeholder
designed_indexes = [{'_id': 1}]

In [None]:
# Suggested indexes
# https://www.mongodb.com/docs/atlas/cli/current/command/atlas-api-performanceAdvisor-listClusterSuggestedIndexes/
suggested_indexes = []
suggested_indexes_details = []
query_hashes_with_indexes = {}

index_suggestions_raw = !atlas api performanceAdvisor listClusterSuggestedIndexes --clusterName {cluster_name} --output json --version "2024-08-05" --namespaces {namespace}
index_suggestions = json.loads(index_suggestions_raw.s)

for suggestion in index_suggestions["content"]["suggestedIndexes"]:

    # Convert to json string index definition
    index_definition = {}
    for record in suggestion["index"]:
        index_definition.update(record)

    # Capture index recommendations with a high impact, if 100Mb+ scanned bytes can be saved with an index.
    # if suggestion["weight"] > 1024 * 1024 * 100:
    #     suggested_indexes.append(str(index_definition))

    index_definition = str(index_definition)
    suggested_indexes.append(index_definition)

    for sample_shape in index_suggestions["content"]["shapes"]:
        if sample_shape["id"] in suggestion["impact"]:

            # Use first sample query
            operation = sample_shape["operations"][0]

            operation["stats"].update({"count": sample_shape["count"]})
            if "ts" in operation["stats"]:
                del operation["stats"]["ts"]

            # Mapping for query hash -> recommended and current indexes
            query_hash = "n/a"
            current_index = "n/a"
            if "raw" in operation:
                sample_query_raw = json.loads(operation["raw"])

                if "queryHash" in sample_query_raw["attr"]:
                    query_hash = "#" + sample_query_raw["attr"]["queryHash"][0:6]
                if "queryShapeHash" in sample_query_raw["attr"]:
                    query_hash = "#" + sample_query_raw["attr"]["queryShapeHash"][0:6]

                current_index = sample_query_raw["attr"]["planSummary"]
                query_hashes_with_indexes.update(
                    {query_hash: {"suggested index": index_definition, "current index": current_index}}
                )

            suggested_indexes_details.append({
                "suggested index": index_definition,
                "current index": current_index,
                "sample query": json.dumps(operation["predicates"]),
                "stats": json.dumps(operation["stats"]),
                "scanned MB": math.ceil(suggestion["weight"] / (1024 * 1024)),
                "query hash": query_hash,
            })

In [None]:
# Redundant indexes
# https://www.mongodb.com/docs/atlas/cli/current/command/atlas-api-performanceAdvisor-listDropIndexes/
drop_indexes_raw = !atlas api performanceAdvisor listDropIndexes --clusterName {cluster_name} --version "2024-08-05"
drop_indexes_raw = json.loads(drop_indexes_raw.s)["content"]
drop_indexes_details = {
    "redundantIndexes": [],
    "unusedIndexes": [],
    "hiddenIndexes": [],
}
drop_indexes = []

for category in drop_indexes_raw.keys():
    for idx in drop_indexes_raw[category]:
        if idx["namespace"] == namespace:
            index_definition = {}
            for field in idx["index"]:
                index_definition.update(field)
            idx["index"] = index_definition
            drop_indexes.append(str(index_definition))
            drop_indexes_details[category].append(idx)

# -----------------

In [None]:
# Schema advise
# https://www.mongodb.com/docs/atlas/cli/current/command/atlas-api-performanceAdvisor-listSchemaAdvice/
schema_advise_raw = !atlas api performanceAdvisor listSchemaAdvice --clusterName {cluster_name} --version "2024-08-05"
schema_advise = json.loads(schema_advise_raw.s)["content"]

In [None]:
# Query statistics
# https://www.mongodb.com/docs/manual/reference/operator/aggregation/queryStats/
query_stats = list(mongodb_client.get_database("admin").aggregate([
    {"$queryStats": {}},
    {"$match": {
        "key.queryShape.cmdNs.db": database_name,
        "key.queryShape.cmdNs.coll": collection_name,
        "metrics.totalExecMicros.sum": {"$gt": 10000}
    }},
    {"$project": {
        "db": "$key.queryShape.cmdNs.db",
        "coll": "$key.queryShape.cmdNs.coll",
        "hash": {"$ifNull": ["$queryShapeHash", "$queryHash", "000000"]},
        "shape": "$key.queryShape",
        "sum": "$metrics.totalExecMicros.sum",
        "count": "$metrics.execCount",
        "min": "$metrics.totalExecMicros.min",
        "max": "$metrics.totalExecMicros.max",
        "timestamp": "$asOf"
    }},
    {"$project": {
        "shape.cmdNs": 0,
    }},
]))

df = pd.DataFrame.from_dict(query_stats)

df["hash"] = "#" + df["hash"].str.slice(0, 6)
df["sum"] = (df["sum"] / 1000).astype(int)
df["min"] = (df["min"] / 1000).astype(int)
df["max"] = (df["max"] / 1000).astype(int)
df["avg"] = (df["sum"] / df["count"]).round(2)
df["err+"] = (df["max"] - df["avg"]).round(2)
df["err-"] = (df["avg"] - df["min"]).round(2)
df["shape"] = df["shape"].astype(str)

exclude_shapes = r"collStats|queryStats|indexStats|listSearchIndexes"
df = df[~df["shape"].str.contains(exclude_shapes)]
df = df.reset_index(drop=True)
df.sort_values("hash", inplace=True)

# Save statistics to "observability.queryStats" collection for a historical view
# Only query hash is saved, shape is discarded
df_documents = json.loads(df.drop(['shape'], axis=1).to_json(orient="records", date_format='iso'))
res = stats_db.insert_many(df_documents)

# Get historical stats
query_stats_history = list(stats_db.aggregate([
    {"$match": {
        "db": database_name,
        "coll": collection_name,
    }},
    {"$project": {
        "_id": 0,
    }},
]))

query_stats_history = list(stats_db.find({"db": database_name, "coll": collection_name}))
df_history = pd.DataFrame.from_dict(query_stats_history)
df_history.sort_values("hash", inplace=True)
df_history['timestamp'] = pd.to_datetime(df_history['timestamp'])

### Query Statistics

In [None]:
# Plot current stats
fig = px.scatter(
    df,
    y="hash",
    x="avg",
    error_x="err+",
    error_x_minus="err-",
    color="hash",
    size="sum",
    color_discrete_sequence=df["hash"].sort_values().unique(),
)
fig.update_yaxes(visible=False)
fig.update_layout(xaxis_title="Query execution time, ms")
fig.update_layout(legend_title_text='query hash')
# fig.update_traces(marker_size=8)

fig.show()

display(HTML(df.to_html(index=False, columns=['hash', 'count', 'avg', 'sum', 'min', 'max', 'shape'])))

# Join query stats data with index recommendations
# df['index_recommendations'] = df['hash'].map(query_hashes_with_indexes)
# df = (
#     df
#     .join(pd.json_normalize(df['index_recommendations']))
#     .drop('index_recommendations', axis=1)
# )
# display(HTML(df.to_html(index=False, na_rep="n/a", columns=['hash', "current index", 'suggested index'])))


In [None]:
# Historical stats
fig = px.scatter(
    df_history,
    x="timestamp",
    y="avg",
    error_y="err+",
    error_y_minus="err-",
    facet_row="hash",
    color="hash",
    color_discrete_sequence=df_history["hash"].sort_values().unique(),
    # log_x=True,
    height=300 * len(df_history["hash"].unique()),
)
# fig.update_xaxes(matches=None)
fig.update_yaxes(matches=None)

fig.update_layout(xaxis_title="Query execution time, ms")
fig.update_traces(marker_size=8)
fig.for_each_xaxis(lambda xaxis: xaxis.update(showticklabels=True))
for axis in fig.layout:
    if type(fig.layout[axis]) == go.layout.YAxis:
        fig.layout[axis].title.text = ''

fig.show()

### Query List

#### `#000000` Short description of a query purpose
Section for more details

In [None]:
query_hash = "#000000"
pipeline = [
    {"$match": {
        "_id": "test",
    }}
]

# TODO design an index for the query
index = {'_id': 1}
designed_indexes.append(index)

### Query performance

In [None]:
# Historical stats
df_query = df_history[df_history["hash"] == query_hash]
df_query_json = json.loads(df.drop(['shape'], axis=1).to_json(orient="records", date_format='iso'))
fig = px.scatter(
    df_query,
    x="timestamp",
    y="avg",
    error_y="err+",
    error_y_minus="err-",
    facet_row="hash",
    color="hash",
    color_discrete_sequence=df_history["hash"].sort_values().unique(),
    # log_x=True,
    height=300,
)
# fig.update_xaxes(matches=None)
fig.update_yaxes(matches=None)

fig.update_layout(xaxis_title="Query execution time, ms")
fig.update_traces(marker_size=8)
fig.for_each_xaxis(lambda xaxis: xaxis.update(showticklabels=True))
for axis in fig.layout:
    if type(fig.layout[axis]) == go.layout.YAxis:
        fig.layout[axis].title.text = ''

fig.show()

In [None]:
# Measure end-to-end execution time
time = %timeit -o list(db.aggregate(pipeline=pipeline))

In [None]:
# Explain query
# https://www.mongodb.com/docs/manual/reference/explain-results/
explain = mongodb_client.get_database(database_name).command(
    'explain',
    {
        'aggregate': collection_name,
        'pipeline': pipeline,
        'cursor': {}
    },
    verbosity='allPlansExecution'
)

# Generate summary
context = base_context.copy()
context.extend([
    {
        "role": "user",
        "content": f"Act as a MongoDB advisor. You will be provided with information on indexes, example query that is being evaluated and a query explain plan. Use text graphics to visualise Execution Stages in Query Explain. Provide recommendations for query performance and indexes. Use markdown format for output, escape $ $ character sequence in markdown if it's not in the code block. Include original query in the beginning for a reference",
    },
    {
        "role": "user",
        "content": f"Current indexes: {current_indexes}, designed indexes: {designed_indexes}, suggested indexes: {suggested_indexes_details}, redundant indexes: {drop_indexes}",
    },
    {
        "role": "user",
        "content": f"Index stats: {index_stats}",
    },
    {
        "role": "user",
        "content": f"Schema advise: {schema_advise}",
    },
    {
        "role": "user",
        "content": f"Query Explain raw: {explain}",
    },
    {
        "role": "user",
        "content": f"Execution time on the client side: {time}",
    }
])
response = anthropic_client.messages.create(
    max_tokens=1024,
    messages=context,
    model="claude-3-7-sonnet-latest",
)

display(Markdown(response.content[0].text))
context.extend([
    {
        "role": "assistant",
        "content": response.content[0].text,
    }
])

### Indexes

In [None]:
# Organize indexes

# Convert designed indexes to strings
designed_indexes_str = [str(index_definition) for index_definition in designed_indexes]

message = ""
# Index summary
message += "Designed indexes:\n"
for idx in designed_indexes_str:
    status = "\033[91mx\033[0m "
    if idx in current_indexes:
        status = "\033[92m√\033[0m "
    if idx in suggested_indexes:
        status = "\033[91m+\033[0m "
    if idx in drop_indexes:
        status = "\033[91m-\033[0m "
    message += status + str(idx) + "\n"

if len(suggested_indexes) > 0:
    message += "\n"
    message += "Suggested indexes:\n"
    for si in suggested_indexes_details:
        message += "\033[92m+\033[0m " + str(si["suggested index"]) + "\n"
        message += "    current index: " + str(si["current index"]) + "\n"
        message += "    stats: " + str(si["stats"]) + "\n"
        message += "    query_hash: " + si["query hash"] + "\n"
        message += "    query: " + str(si["sample query"]) + "\n"

if len(drop_indexes) > 0:
    message += "\n"
    message += "Drop indexes:\n"
    for category in drop_indexes_details.keys():
        message += f"{category}\n"
        for idx in drop_indexes_details[category]:
            message += "    index: " + str(idx["index"]) + "\n"
            message += "    access count: " + str(idx["accessCount"]) + "\n\n"
            message += "    size: " + str(math.ceil(idx["sizeBytes"] / (1024 * 1024))) + " MB\n"

# Index diff
final_state_indexes = set()
final_state_indexes.update(designed_indexes_str)
final_state_indexes.update(current_indexes)
final_state_indexes.update(suggested_indexes)
final_state_indexes.difference_update(drop_indexes)
final_state_indexes = sorted(final_state_indexes)



!echo "| Current state                                          | Suggested state                                       |"
!echo "------------------------------------------------------------------------------------------------------------------"
!diff <(echo "{'\n'.join(current_indexes)}") <(echo "{'\n'.join(final_state_indexes)}") -yt --width=115
!clear
print(message)