Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@ datu-sqlserver/
test_schema_cache.json

.cache/
site/
site/
.coverage*
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ RUN apt-get update && \
rm -rf /var/lib/apt/lists/* && \
pip install --upgrade pip

RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
RUN curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
RUN curl https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor > /etc/apt/trusted.gpg.d/microsoft.gpg \
&& curl https://packages.microsoft.com/config/debian/11/prod.list -o /etc/apt/sources.list.d/mssql-release.list

RUN apt-get update
RUN env ACCEPT_EULA=Y apt-get install -y msodbcsql18
Expand All @@ -21,4 +21,4 @@ RUN uv sync --extra postgres --extra sqldb
ENV PATH="/app/.venv/bin:$PATH"
# Reset the entrypoint, don't invoke `uv`
ENTRYPOINT []
CMD ["uvicorn", "datu.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
CMD ["uvicorn", "datu.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
1 change: 1 addition & 0 deletions changelog.d/+7413cd44.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Enable product Telemetry
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies = [
"fastmcp>=2.10.5",
"mcp-use[search]>=1.3.7",
"onnxruntime==1.19.2 ; sys_platform == 'darwin' and platform_machine == 'x86_64'",
"posthog>=6.5.0",
]

[project.urls]
Expand Down
14 changes: 12 additions & 2 deletions src/datu/app_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from datu.integrations.config import IntegrationConfigs
from datu.mcp.config import MCPConfig
from datu.services.config import SchemaRAGConfig
from datu.telemetry.config import TelemetryConfig


class Environment(Enum):
Expand Down Expand Up @@ -56,6 +57,13 @@ class DatuConfig(BaseSettings):
schema_sample_limit (int): The maximum number of rows to sample from the schema.
schema_categorical_threshold (int): The threshold for categorical columns in the schema.
enable_schema_rag (bool): Enable RAG for schema extraction.
enable_anonymized_telemetry (bool): Enable anonymized telemetry. Default is True.
app_environment (str): The application environment (e.g., "dev", "test", "prod"). Default is "dev".
telemetry (TelemetryConfig | None): Configuration settings for telemetry.
enable_mcp (bool): Whether to enable MCP integration. Default is False.
mcp (MCPConfig | None): Configuration settings for MCP integration.
enable_schema_rag (bool): Enable RAG for schema extraction.
schema_rag (SchemaRAGConfig | None): Configuration settings for schema RAG.

Attributes:
host (str): The host address for the application.
Expand All @@ -77,7 +85,8 @@ class DatuConfig(BaseSettings):
mcp (MCPConfig | None): Configuration settings for MCP integration.
enable_schema_rag (bool): Enable RAG for schema extraction.
schema_rag (SchemaRAGConfig | None): Configuration settings for schema RAG.

enable_anonymized_telemetry (bool): Enable anonymized telemetry.
telemetry (TelemetryConfig | None): Configuration settings for telemetry.

"""

Expand Down Expand Up @@ -110,7 +119,8 @@ class DatuConfig(BaseSettings):
description="Configuration settings for schema RAG (Retrieval-Augmented Generation).",
)
enable_anonymization: bool = False

enable_anonymized_telemetry: bool = True
telemetry: TelemetryConfig | None = Field(default_factory=TelemetryConfig)
model_config = SettingsConfigDict(
env_prefix="datu_",
env_nested_delimiter="__",
Expand Down
13 changes: 12 additions & 1 deletion src/datu/factory/llm_client_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from typing import Literal

from datu.llm_clients.openai_client import OpenAIClient
from datu.telemetry.product.events import MCPClientEvent
from datu.telemetry.product.posthog import get_posthog_client


def get_llm_client(provider: Literal["openai"] | None = None) -> OpenAIClient | None:
Expand All @@ -20,6 +22,15 @@ def get_llm_client(provider: Literal["openai"] | None = None) -> OpenAIClient |
ValueError: If the specified provider is not supported.
"""
if provider == "openai":
return OpenAIClient()
openai_client = OpenAIClient()
if openai_client.agent:
posthog_client = get_posthog_client()
if openai_client.mcp_client and getattr(openai_client.mcp_client, "config", None):
servers = openai_client.mcp_client.config.get("mcpServers", {})
server_names = list(servers.keys()) if servers else []
else:
server_names = []
posthog_client.capture(MCPClientEvent(server_names=server_names))
return openai_client
else:
raise ValueError("Invalid LLM provider specified in configuration.")
4 changes: 4 additions & 0 deletions src/datu/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from datu.app_config import get_logger, settings
from datu.routers import chat, metadata, transformations
from datu.schema_extractor.schema_cache import load_schema_cache
from datu.telemetry.product.events import OpenAIEvent
from datu.telemetry.product.posthog import get_posthog_client

logger = get_logger(__name__)

Expand Down Expand Up @@ -56,6 +58,8 @@ def start_app() -> None:
It also sets the logging level based on the configuration settings.
"""
logger.info("Starting the FastAPI application...")
posthog_client = get_posthog_client()
posthog_client.capture(OpenAIEvent())
uvicorn.run(app, host=settings.host, port=settings.port)


Expand Down
File renamed without changes.
19 changes: 19 additions & 0 deletions src/datu/telemetry/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Telemetry configuration settings."""

from pydantic_settings import BaseSettings, SettingsConfigDict


class TelemetryConfig(BaseSettings):
"""Telemetry configuration settings."""

api_key: str = "phc_m74dfR9nLpm2nipvkL2swyFDtNuQNC9o2FL2CSbh6Je"
package_name: str = "datu-core"

model_config = SettingsConfigDict(
env_prefix="telemetry_",
env_nested_delimiter="__",
)


def get_telemetry_settings() -> TelemetryConfig:
return TelemetryConfig()
Empty file.
50 changes: 50 additions & 0 deletions src/datu/telemetry/product/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Telemetry events for product usage."""

from typing import Any, ClassVar, Dict

from datu.app_config import settings


class ProductTelemetryEvent:
"""Base class for all telemetry events."""

max_batch_size: ClassVar[int] = 1

def __init__(self, **kwargs):
self._props = kwargs
self.batch_size = 1

@property
def name(self) -> str:
return self.__class__.__name__

@property
def properties(self) -> Dict[str, Any]:
return self._props

@property
def batch_key(self) -> str:
return self.name

def batch(self, other: "ProductTelemetryEvent") -> "ProductTelemetryEvent":
"""Simple batch: append counts together."""
if self.name != other.name:
raise ValueError("Cannot batch different event types")
self.batch_size += other.batch_size
return self


class MCPClientEvent(ProductTelemetryEvent):
"""Event for when the MCP client starts."""

def __init__(self, server_names: list[str]):
super().__init__()
self._props["mcp_server_names"] = server_names


class OpenAIEvent(ProductTelemetryEvent):
"""Event for OpenAI-related telemetry."""

def __init__(self, **kwargs):
super().__init__(**kwargs)
self._props["openai_model"] = settings.openai_model
124 changes: 124 additions & 0 deletions src/datu/telemetry/product/posthog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""PostHog telemetry client for product usage tracking."""

import importlib
import logging
import platform
import sys
import uuid
from functools import lru_cache
from pathlib import Path
from typing import Any, Dict, Optional

import posthog

from datu.app_config import Environment, get_logger, settings
from datu.telemetry.config import TelemetryConfig as TelemetrySettings
from datu.telemetry.product.events import ProductTelemetryEvent

logger = get_logger(__name__)

POSTHOG_EVENT_SETTINGS = {"$process_person_profile": False}


class PostHogClient:
"""Telemetry client with basic batching + config via Pydantic."""

UNKNOWN_USER_ID = "UNKNOWN"
USER_ID_PATH = Path.home() / ".cache" / "datu-core" / "telemetry_user_id"

def __init__(self, telemetry_settings: Optional[TelemetrySettings]) -> None:
self.settings = telemetry_settings or TelemetrySettings()
self._batched_events: Dict[str, ProductTelemetryEvent] = {}
self._user_id: str = ""
self._user_id_path: Path = self.USER_ID_PATH
self.session_id = str(uuid.uuid4())

if (
not settings.enable_anonymized_telemetry
or "pytest" in sys.modules
or settings.app_environment in [Environment.TEST.value]
):
posthog.disabled = True
else:
logger.info("Enabled anonymized telemetry. See https://docs.datu.fi for more information.")
posthog.api_key = self.settings.api_key
posthog_logger = logging.getLogger("posthog")
posthog_logger.disabled = True

@property
def user_id(self) -> str:
if self._user_id:
return self._user_id

try:
if not self._user_id_path.exists():
self._user_id_path.parent.mkdir(parents=True, exist_ok=True)
new_id = str(uuid.uuid4())
self._user_id_path.write_text(new_id)
self._user_id = new_id
else:
self._user_id = self._user_id_path.read_text().strip()
except Exception:
self._user_id = self.UNKNOWN_USER_ID

return self._user_id

def _base_context(self) -> Dict[str, Any]:
try:
pkg_version = importlib.metadata.version(self.settings.package_name)
except importlib.metadata.PackageNotFoundError:
pkg_version = "unknown"

extras_installed: Dict[str, bool] = {}
try:
dist = importlib.metadata.distribution(self.settings.package_name)
extras = dist.metadata.get_all("Provides-Extra") or []
for extra in extras:
extras_installed[extra] = True
except importlib.metadata.PackageNotFoundError:
extras_installed = {}

return {
"python_version": sys.version.split()[0],
"os": platform.system(),
"os_version": platform.release(),
"package_version": pkg_version,
"extras_installed": extras_installed,
}

def capture(self, event: ProductTelemetryEvent) -> None:
"""Capture an event (with simple batching)."""
if not settings.enable_anonymized_telemetry or not self.settings.api_key:
return

if event.max_batch_size == 1:
self._send(event)
return

batch_key = event.batch_key
if batch_key not in self._batched_events:
self._batched_events[batch_key] = event
return

batched = self._batched_events[batch_key].batch(event)
self._batched_events[batch_key] = batched

if batched.batch_size >= batched.max_batch_size:
self._send(batched)
del self._batched_events[batch_key]

def _send(self, event: ProductTelemetryEvent) -> None:
try:
posthog.capture(
distinct_id=self.user_id,
event=event.name,
properties={**self._base_context(), **POSTHOG_EVENT_SETTINGS, **event.properties},
)
except Exception:
logger.debug("Failed to send telemetry event", exc_info=True)


@lru_cache(maxsize=1)
def get_posthog_client() -> PostHogClient:
"""Get the PostHog telemetry client."""
return PostHogClient(settings.telemetry)
2 changes: 1 addition & 1 deletion tests/integrations/sql_server/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Common fixtures for tests in integrations sql_server module ."""

# pylint: disable=redefined-outer-name
# pylint: disable=redefined-outer-name disable=unused-argument disable=import-outside-toplevel
import pytest


Expand Down
Empty file added tests/llm_clients/__init__.py
Empty file.
10 changes: 9 additions & 1 deletion tests/routers/test_chat.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# tests/routers/test_chat.py
"""Test suite for the chat router."""

import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
Expand All @@ -8,6 +9,7 @@


def _client() -> TestClient:
"""Create a test client for the chat router."""
app = FastAPI()
app.include_router(chat.router, prefix="/chat/sql")
return TestClient(app)
Expand All @@ -18,6 +20,8 @@ def _client() -> TestClient:

@pytest.mark.asyncio
async def test_happy_path_with_mcp(monkeypatch):
"""Test that the happy path with MCP enabled works correctly."""

async def fake_generate_response(_msgs, _sys):
return "Query name: Sales\n```sql\nSELECT 1;\n```"

Expand All @@ -36,6 +40,8 @@ async def fake_generate_response(_msgs, _sys):

@pytest.mark.asyncio
async def test_error_path_with_mcp(monkeypatch):
"""Test that the error path with MCP enabled works correctly."""

async def boom(*_a, **_k):
raise RuntimeError("LLM down")

Expand All @@ -55,6 +61,8 @@ async def boom(*_a, **_k):

@pytest.mark.asyncio
async def test_passthrough_to_generate_sql_core(monkeypatch):
"""Test that the passthrough to generate_sql_core works correctly."""

async def fake_generate_sql_core(request: ChatRequest):
return {"assistant_response": "core path", "queries": []}

Expand Down
Empty file added tests/services/__init__.py
Empty file.
Loading
Loading