In [None]:
import dbnl
import pandas as pd
import json
from datetime import UTC, datetime, timedelta
import re
import random
import numpy as np

# Make sure your version matches the docs at https://docs.dbnl.com/
print("dbnl version:", dbnl.__version__)

In [None]:
# Login to DBNL (using default Sandbox url)
dbnl.login(
    api_url="http://localhost:8080/api",
    api_token="<DBNL_API_KEY>", # found at http://localhost:8080/tokens
)

In [None]:
# Create a new project
project = dbnl.get_or_create_project(
    name="ADK Calc Traces SDK via OTEL Two Week Simulation",
    schedule="daily",  # How often DBNL analyzes new data
    default_llm_model_name="quickstart_demo" # From step (2) in quickstart
)

In [None]:
from dbnl_otel_converter import dbnl_df_from_otel_file

df = dbnl_df_from_otel_file("traces.jsonl")

print(f"Loaded {len(df)} traces.")

In [None]:
# Helper functions for augmentation

def break_into_days(df, start_day, num_days, variation=0.10):
    """
    Split dataframe into daily chunks with ±variation in size.
    Ensures all rows are used exactly once.
    """
    total_rows = len(df)
    base = total_rows / num_days
    sizes = []
    for _ in range(num_days):
        factor = 1 + np.random.uniform(-variation, variation)
        sizes.append(int(base * factor))

    scale = total_rows / sum(sizes)
    sizes = [int(s * scale) for s in sizes]
    sizes[-1] = total_rows - sum(sizes[:-1])

    day_dfs = []
    idx = 0
    for day_idx, size in enumerate(sizes):
        chunk = df.iloc[idx : idx + size].copy()
        idx += size

        chunk["timestamp"] = start_day.replace(hour=12) + timedelta(days=day_idx)

        for row in chunk.itertuples(index=True):
            chunk.at[row.Index, "timestamp"] = randomize_timestamps_within_day(row)
            
        day_dfs.append(chunk.reset_index(drop=True))

    return day_dfs

def randomize_timestamps_within_day(row):
    rand_hour = random.randint(0, 23)
    rand_min = random.randint(0, 59)
    rand_sec = random.randint(0, 59)
    return row.timestamp.replace(
                hour=rand_hour,
                minute=rand_min,
                second=rand_sec,
    )

# Short messages (customize as you like)
complaints = [
    "That’s not right!",
    "Wrong result again!",
    "Calculator failed.",
    "Off by a mile.",
    "Bad math output!",
    "Totally incorrect!",
    "Oops, wrong calc.",
    "Computation error.",
    "Answer is wrong.",
    "Incorrect result.",
    "Math seems broken.",
    "Calculation flaw.",
    "Miscalculated that.",
    "Wrong total shown.",
    "Completely off!",
    "This seems buggy.",
    "Bad arithmetic!",
    "The math is wrong.",
    "Way off the mark.",
    "Error in result!",
]
praises = [
    "Perfect result!",
    "Nice work!",
    "Correct again!",
    "Spot on!",
    "You nailed it!",
    "Looks good!",
    "Math checks out!",
    "Well done!",
    "Accurate answer!",
    "Exactly right!",
    "All good here!",
    "Bang on target!",
    "That’s correct!",
    "Great calculation!",
    "Awesome result!",
    "Nice precision!",
    "Flawless math!",
    "Right on point!",
    "Excellent job!",
    "Spotless result!",
]

COST = {  # per token (Gemini 2.5 Flash pricing)
    "gen_ai.usage.input_tokens": 0.000000075,
    "gen_ai.usage.output_tokens": 0.00000030,
}

def est_cost_from_gen_ai_tokens(spans):
    """Sum gen_ai.usage.input_tokens + gen_ai.usage.output_tokens across all spans."""
    if spans is None:
        return 0

    total = 0
    for span in spans:
        attrs = span.get("attributes", [])
        # attrs is a list of (key, value) tuples
        for key, val in attrs:
            if key in ("gen_ai.usage.input_tokens", "gen_ai.usage.output_tokens"):
                if isinstance(val, str):
                    # strip wrapping quotes if present, then try to parse int
                    v = val.strip('"')
                    try:
                        total += int(v)*COST[key]
                    except ValueError:
                        pass  # ignore non-numeric weirdness
    return total


def get_math_output_from_row(row):
    try:
        return float(json.loads(json.loads(row["output"]))["content"]["parts"][0]["text"])
    except Exception:
        return None


def compute_feedback(row, p_keep=0.11):
    # 89% chance to leave both None
    if random.random() > p_keep:
        return pd.Series({"feedback_score": None, "feedback_text": None})

    out = get_math_output_from_row(row)
    exp = float(row["output_expected"])

    if out is not None and exp is not None and out == exp:
        score = 5
        text = random.choice(praises)
    else:
        score = 1
        text = random.choice(complaints)

    # JSON-safe primitives
    return pd.Series({"feedback_score": int(score), "feedback_text": str(text)})


def compute_abs_error(row):
    out = get_math_output_from_row(row)
    if "output_expected" in row:
        exp = float(row["output_expected"])
    else:
        exp = compute_expected_answer(row["input"])

    if exp is not None and out is not None:
        return abs(out - exp)
    else:
        return None


def compute_expected_answer(input_string: str) -> float | None:
    """
    Parse a math question and compute the expected answer.

    Handles various question formats like "1+2", "40-4*6", etc

    Returns:
        The computed answer as a float, or None if the question couldn't be parsed.
    """
    try:
        question = json.loads(json.loads(input_string))["new_message"]["parts"][0]["text"]
        return eval(question)
    except Exception:
        return None


In [None]:
df["output_expected"] = df["input"].apply(compute_expected_answer)
df[["feedback_score", "feedback_text"]] = df.apply(compute_feedback, axis=1)
df["absolute_error"] = df.apply(compute_abs_error, axis=1)
raw_spans = df["traces_data"]
dbnl_spans = dbnl.convert_otlp_traces_data(data=raw_spans)
df["total_cost"] = dbnl_spans.apply(est_cost_from_gen_ai_tokens)

In [None]:
# Wrapper for sending multiple days of dataframes to DBNL

def upload_to_dbnl(day_dfs):
    """Upload daily dataframes to DBNL."""
    print(f"Status: {app_url}/ns/{project.namespace_id}/projects/{project.id}/status")
    for idx, day_df in enumerate(day_dfs):
        print(f"{idx + 1}/{len(day_dfs)} uploading {min(day_df['timestamp']).date()} : {len(day_df)} traces.")
        data_start = min(day_df['timestamp']).replace(hour=0, minute=0, second=0, microsecond=0)
        data_end = data_start + timedelta(days=1)
        try:
            dbnl.log(
                project_id=project.id,
                data_start_time=data_start,
                data_end_time=data_end,
                data=day_df,
            )
        except Exception as e:
            if "Data already exists" in str(e):
                print("  Data already exists, skipping...")
                continue
            raise
    print(f"\nExplore: {app_url}/ns/{project.namespace_id}/projects/{project.id}")

In [None]:
day_dfs = break_into_days(df, start_day=datetime.now(tz=UTC) - timedelta(days=2), num_days=2, variation=0.10)
upload_to_dbnl(day_dfs)