In [21]:
import random
random.getrandbits(16 * 8)

158040279988997565257440694439896516614

In [3]:
"""Span types.

These are roughly equivalent to RecordAppCall but abstract away specific method
information into type of call related to types of components.
"""

from __future__ import annotations

import contextvars
import datetime
from enum import Enum
import functools
import random
import time
from typing import (ClassVar, Dict, Iterator, List, Mapping, Optional,
                    Sequence, Tuple, Union)

import opentelemetry
from opentelemetry.trace import status as trace_status
import opentelemetry.trace as ot_trace
import opentelemetry.trace.span as ot_span
from opentelemetry.util import types as ot_types
from opentelemetry.util._decorator import _agnosticcontextmanager
import pydantic
from logging import getLogger

logger = getLogger(__name__)

# import trulens_eval

TTimestamp = int # uint64, nonoseconds since epoch, as per OpenTelemetry
TSpanID = int # as per OpenTelemetry
TTraceID = int # as per OpenTelemetry

# TODO: look into the open telemetry tracer/traceprovider api, ignoring for now.

class OTSpan(pydantic.BaseModel, ot_span.Span):
    """Implementation of OpenTelemetry Span requirements.
    
    See also [OpenTelemetry Span](https://opentelemetry.io/docs/specs/otel/trace/api/#span).
    """

    _vendor: ClassVar[str] = "trulens_eval"
    @classmethod
    def _attr(self, name):
        return f"{self._vendor}@{name}"

    model_config = {
        'arbitrary_types_allowed': True,
        'use_attribute_docstrings': True
    }
    """Pydantic configuration."""

    name: str
    """Name of span."""

    kind: ot_trace.SpanKind = ot_trace.SpanKind.INTERNAL
    """Kind of span."""

    status: trace_status.StatusCode = trace_status.StatusCode.UNSET
    """Status of the span as per OpenTelemetry Span requirements."""

    status_description: Optional[str] = None
    """Status description as per OpenTelemetry Span requirements."""

    start_timestamp: TTimestamp = pydantic.Field(default_factory=time.time_ns)
    """Timestamp when the span's activity started in nanoseconds since epoch."""

    end_timestamp: Optional[TTimestamp] = None
    """Timestamp when the span's activity ended in nanoseconds since epoch.

    None if not yet ended.
    """

    context: ot_span.SpanContext
    """Unique immutable identifier for the span."""

    @staticmethod
    def hash_context(context: ot_span.SpanContext) -> Tuple[int, int]:
        return (context.trace_id, context.span_id)

    def _context_hash(self) -> Tuple[int, int]:
        return self.hash_context(self.context)

    events: List[Tuple[str, ot_types.Attributes, TTimestamp]] = pydantic.Field(default_factory=list)
    """Events recorded in the span."""

    links: Dict[Tuple[int, int], Mapping[str, ot_types.AttributeValue]] = pydantic.Field(default_factory=dict)
    """Relationships to other spans with attributes on each link."""

    attributes: Dict[str, ot_types.AttributeValue] = pydantic.Field(default_factory=dict)
    """Attributes of span."""

    def __init__(self, name: str, context: ot_span.SpanContext, **kwargs):
        kwargs['name'] = name
        kwargs['context'] = context
        kwargs['attributes'] = kwargs.get('attributes', {}) or {}
        kwargs['links'] = kwargs.get('links', {}) or {}

        super().__init__(**kwargs)

    def end(self, end_time: Optional[TTimestamp] = None):
        """See [end][opentelemetry.trace.span.Span.end]"""
        if end_time:
            self.end_timestamp = end_time
        else:
            self.end_timestamp = time.time_ns()

        self.status = trace_status.StatusCode.OK

    def get_span_context(self) -> ot_span.SpanContext:
        """See [end][opentelemetry.trace.span.Span.get_span_context]"""

        return self.context

    def set_attributes(self, attributes: Dict[str, ot_types.AttributeValue]) -> None:
        """See [end][opentelemetry.trace.span.Span.set_attributes]"""

        self.attributes.update(attributes)

    def set_attribute(self, key: str, value: ot_types.AttributeValue) -> None:
        """See [end][opentelemetry.trace.span.Span.set_attribute]"""

        self.attributes[key] = value

    def add_event(
        self,
        name: str,
        attributes: ot_types.Attributes = None,
        timestamp: Optional[int] = None
    ) -> None:
        """See [end][opentelemetry.trace.span.Span.add_event]"""
        self.events.append((name, attributes, timestamp or time.time_ns()))

    def add_link(
        self,
        context: ot_span.SpanContext,
        attributes: ot_types.Attributes = None
    ) -> None:
        """See [end][opentelemetry.trace.span.Span.add_link]"""

        if attributes is None:
            attributes = {}

        self.links[self.hash_context(context)] = attributes

    def update_name(self, name: str) -> None:
        """See [end][opentelemetry.trace.span.Span.update_name]."""

        self.name = name

    def is_recording(self) -> bool:
        """See [end][opentelemetry.trace.span.Span.is_recording]."""

        return self.status == trace_status.StatusCode.UNSET

    def set_status(
        self,
        status: Union[ot_span.Status, ot_span.StatusCode],
        description: Optional[str] = None
    ) -> None:
        """See [end][opentelemetry.trace.span.Span.set_status]"""

        if isinstance(status, ot_span.Status):
            if description is not None:
                raise ValueError("Ambiguous status description provided both in `status.description` and in `description`.")
            
            self.status = status.status_code
            self.status_description = status.description
        else:
            self.status = status
            self.status_description = description

    def record_exception(
        self,
        exception: Exception,
        attributes: ot_types.Attributes = None,
        timestamp: Optional[TTimestamp] = None,
        escaped: bool = False
    ) -> None:
        """See [end][opentelemetry.trace.span.Span.record_exception]"""

        self.status = trace_status.StatusCode.ERROR

        self.add_event(
            self._attr("exception"),
            attributes,
            timestamp
        )

class DictNamespace(Dict[str, ot_types.AttributeValue]):
    """View into a dict with keys prefixed by some `namespace` string.
    
    Replicates the values without the prefix in self.
    """

    def __init__(self, parent: Dict, namespace: str, **kwargs):
        self.parent = parent
        self.namespace = namespace

    def __getitem__(self, key):
        return dict.__getitem__(self, key)
    
    def __setitem__(self, key, value):
        dict.__setitem__(self, key, value)
        self.parent[f"{self.namespace}.{key}"] = value

    def __delitem__(self, key):
        dict.__delitem__(self, key)
        del self.parent[f"{self.namespace}.{key}"]

class SpanType(Enum):
    """Span types."""

    ROOT = "root"

    RETRIEVER = "retriever"

    RERANKER = "reranker"

    LLM = "llm"

    EMBEDDING = "embedding"

    TOOL = "tool"

    AGENT = "agent"

    TASK = "task"

    OTHER = "other"

class Span(OTSpan):
    """Base Span type.
    
    Smallest unit of recorded activity.
    """

    @property
    def span_id(self) -> TSpanID:
        """Identifier for the span."""

        return self.context.span_id

    @property
    def trace_id(self) -> TTraceID:
        """Identifier for the trace this span belongs to."""

        return self.context.trace_id

    @functools.cached_property
    def parent_span_id(self) -> Optional[TSpanID]:
        """Id of parent span if any.

        None if this is a root span.
        """

        for link in self.links:
            if link.trace_id == self.trace_id and link.attributes.get(self._attr("relationship")) == "parent":
                return link.span_id

        return None

    @property
    def tags(self) -> List[str]:
        """Tags associated with the span."""

        return self.attributes.get(self._attr("tags"), [])
    @tags.setter
    def tags(self, value: List[str]):
        self.attributes[self.attr("tags")] = value

    @property
    def span_type(self) -> SpanType:
        """Type of span."""

        return self.attributes.get(self._attr("span_type"), SpanType.OTHER)
    @span_type.setter
    def span_type(self, value: SpanType):
        self.attributes[self._attr("span_type")] = value

    attributes_metadata: DictNamespace[str, ot_types.AttributeValue] 
    # will be set as a DictNamespace indexing elements in attributes
    @property
    def metadata(self) -> DictNamespace[str, ot_types.AttributeValue]:
        return self.attributes_metadata

    @metadata.setter
    def metadata(self, value: Dict[str, str]):
        for k, v in value.items():
            self.attributes_metadata[k] = v

    # input: Dict[str, str] = pydantic.Field(default_factory=dict)
    # Make property

    # output: Dict[str, str] = pydantic.Field(default_factory=dict)
    # Make property

    def __init__(self, **kwargs):
        kwargs['attributes_metadata'] = DictNamespace(parent={}, namespace="temp")
        # Temporary fake for validation in super.__init__ below.

        super().__init__(**kwargs)

        # Actual. This is needed as pydantic will copy attributes dict in init.
        self.attributes_metadata = DictNamespace(
            parent=self.attributes,
            namespace=self._attr("metadata")
        )

class SpanRoot(Span):
    pass

class SpanRetriever(Span):
    pass

class SpanReranker(Span):
    pass

class SpanLLM(Span):
    pass

class SpanEmbedding(Span):
    pass

class SpanTool(Span):
    pass

class SpanAgent(Span):
    pass

class SpanTask(Span):
    pass

class SpanOther(Span):
    pass

class Tracer(pydantic.BaseModel, ot_trace.Tracer):
    context: Optional[ot_span.SpanContext] = None

    instrumenting_module_name: str = "trulens_eval"
    instrumenting_library_version: Optional[str] = None#trulens_eval.__version__

    spans: Dict[Tuple[int, int], Span] = pydantic.Field(default_factory=dict)

    trace_id: int
    """Unique identifier for the trace.
    
    16 bytes as per OpenTelemetry specification.
    """

    model_config = {
        'arbitrary_types_allowed': True,
        'use_attribute_docstrings': True
    }
    """Pydantic configuration."""

    def __init__(self, **kwargs):
        trace_id = random.getrandbits(16*8)

        root_context = ot_span.SpanContext(
            trace_id=trace_id,
            span_id=random.getrandbits(8*8), # 8 bytes as per OpenTelemetry specification
            is_remote=False
        )

        root_span = Span(
            trace_id=trace_id,
            span_id=root_context.span_id,
            name="root",
            context=root_context,
            kind=ot_trace.SpanKind.INTERNAL
        )

        kwargs['trace_id'] = trace_id
        kwargs['context'] = root_context

        super().__init__(**kwargs)

        self.spans[Span.hash_context(root_context)] = root_span

    def start_span(
        self,
        name: str,
        context: Optional[ot_trace.Context] = None,
        kind: ot_trace.SpanKind = ot_trace.SpanKind.INTERNAL,
        attributes: ot_trace.types.Attributes = None,
        links: ot_trace._Links = None,
        start_time: Optional[int] = None,
        record_exception: bool = True,
        set_status_on_exception: bool = True,

    ) -> Span:
        if context is None:
            context = self.context
        else:
            self.context = context

        if context.trace_id != self.trace_id:
            logger.warning("Parent context is not being traced by this tracer.")

        span_context = ot_trace.SpanContext(
            trace_id=self.trace_id,
            span_id=random.getrandbits(8*8), # 8 bytes as per OpenTelemetry specification
            is_remote=False
        )

        span = Span(
            name=name,
            context=span_context,
            kind=kind,
            attributes=attributes,
            links=links,
            start_time=start_time,
            record_exception=record_exception,
            set_status_on_exception=set_status_on_exception
        )

        self.spans[Span.hash_context(span_context)] = span

        return span

    @_agnosticcontextmanager
    def start_as_current_span(
        self,
        name: str,
        context: Optional[ot_trace.Context] = None,
        kind: ot_trace.SpanKind = opentelemetry.trace.SpanKind.INTERNAL,
        attributes: ot_types.Attributes = None,
        links: ot_trace._Links = None,
        start_time: Optional[int] = None,
        record_exception: bool = True,
        set_status_on_exception: bool = True,
        end_on_exit: bool = True,
    ) -> Iterator[OTSpan]:

        span = self.start_span(
            name,
            context,
            kind,
            attributes,
            links,
            start_time,
            record_exception,
            set_status_on_exception
        )

        token = ot_trace.use_span(span, end_on_exit=end_on_exit).__enter__()
        yield span

        token.__exit__(None, None, None)
        return

In [4]:
tracer = Tracer()

with tracer.start_as_current_span("test") as s:
    print("hello")

s.metadata.parent

hello


{}

In [5]:
tracer.model_dump()

{'context': (62253270439664091948775267856289665795,
  15593337627810139526,
  False,
  0,
  [],
  True),
 'instrumenting_module_name': 'trulens_eval',
 'instrumenting_library_version': None,
 'spans': {(62253270439664091948775267856289665795,
   15593337627810139526): {'name': 'root', 'kind': <SpanKind.INTERNAL: 0>, 'status': <StatusCode.UNSET: 0>, 'status_description': None, 'start_timestamp': 1713920705446151000, 'end_timestamp': None, 'context': (62253270439664091948775267856289665795,
    15593337627810139526,
    False,
    0,
    [],
    True), 'events': [], 'links': {}, 'attributes': {}, 'attributes_metadata': {}},
  (62253270439664091948775267856289665795,
   12360611701822629681): {'name': 'test', 'kind': <SpanKind.INTERNAL: 0>, 'status': <StatusCode.OK: 1>, 'status_description': None, 'start_timestamp': 1713920705446222000, 'end_timestamp': 1713920705460184000, 'context': (62253270439664091948775267856289665795,
    12360611701822629681,
    False,
    0,
    [],
    True), 

In [6]:
s.trace_id

62253270439664091948775267856289665795

In [7]:
s.metadata

{}

In [10]:
s.attributes

{'trulens_eval@metadata.b': 42}

In [9]:
s.metadata['b'] = 42

# Dev Notebook

This notebook loads the version of trulens_eval from the enclosing repo folder. You can use this to debug or devlop trulens_eval features.

In [1]:
# pip uninstall -y trulens_eval
# pip install git+https://github.com/truera/trulens@piotrm/azure_bugfixes#subdirectory=trulens_eval

# trulens_eval notebook dev

# %load_ext autoreload
# %autoreload 2
from pathlib import Path
import sys

base = Path().cwd()
while not (base / "trulens_eval").exists():
    base = base.parent


import os
if os.path.exists("default.sqlite"):
    os.unlink("default.sqlite")

print(base)

import shutil
shutil.copy(base / "release_dbs" / "0.19.0" / "default.sqlite", "default.sqlite")


# If running from github repo, can use this:
sys.path.append(str(base))

# Uncomment for more debugging printouts.
"""
import logging
root = logging.getLogger()
root.setLevel(logging.DEBUG)

handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
root.addHandler(handler)
"""

from trulens_eval.keys import check_keys

check_keys(
    "OPENAI_API_KEY",
    "HUGGINGFACE_API_KEY"
)

from trulens_eval import Tru
tru = Tru(database_prefix="dev")
#tru.reset_database()
# tru.run_dashboard(_dev=base, force=True)
# tru.db.migrate_database()

/Volumes/dev_new/trulens/trulens_eval
✅ Key OPENAI_API_KEY set from environment (same value found in .env file at /Volumes/dev_new/.env).
✅ Key HUGGINGFACE_API_KEY set from environment (same value found in .env file at /Volumes/dev_new/.env).
🦑 Tru initialized with db url sqlite:///default.sqlite .
🛑 Secret keys may be written to the database. See the `database_redact_keys` option of Tru` to prevent this.
Database has been reconfigured. Please update it by running `tru.migrate_database(prior_prefix="")` or reset it by running `tru.reset_database()`.


In [None]:
# tru.db.migrate_database()
tru.migrate_database()

In [None]:
for t in tru.db.orm.registry.values():
    print(t)

In [None]:
from trulens_eval.database.utils import copy_database

In [None]:
tru.db

In [None]:
copy_database("sqlite:///default.sqlite", "sqlite:///default2.sqlite", src_prefix="dev", tgt_prefix="dev")

In [None]:
from trulens_eval.tru_llama import TruLlama

check_keys("OPENAI_API_KEY", "HUGGINGFACE_API_KEY")
import os

from llama_index.core import SimpleDirectoryReader
from llama_index.core import VectorStoreIndex
if not os.path.exists("data/paul_graham_essay.txt"):
    os.system(
        'wget https://raw.githubusercontent.com/run-llama/llama_index/main/docs/docs/examples/data/paul_graham/paul_graham_essay.txt -P data/'
    )

documents = SimpleDirectoryReader("data").load_data()
index = VectorStoreIndex.from_documents(documents)

query_engine = index.as_query_engine()

# This test does not run correctly if async is used, i.e. not using
# `sync` to convert to sync.

In [None]:
from trulens_eval.feedback.provider.hugs import Dummy
from trulens_eval import Select
from trulens_eval.app import App
from trulens_eval.feedback.feedback import Feedback

f = Feedback(Dummy().language_match).on_input().on(
    App.select_context(query_engine))

tru_query_engine_recorder = TruLlama(query_engine, feedbacks=[f])

In [None]:
llm_response, record = tru_query_engine_recorder.with_record(
    query_engine.query, "What did the author do growing up?"
)
record

In [None]:
tru.run_dashboard(_dev=base, force=True)

In [None]:
res = record_async.feedback_results[0].result()

In [None]:
res.result

In [None]:
tru_query_engine_recorder = TruLlama(query_engine)
#with tru_query_engine_recorder as recording:
llm_response_async, record = await tru_query_engine_recorder.awith_record(query_engine.aquery, "What did the author do growing up?")

#record_async = recording.get()

In [None]:
tru_query_engine_recorder = TruLlama(query_engine)
with tru_query_engine_recorder as recording:
    llm_response_async = query_engine.aquery("What did the author do growing up?")

#record_async = recording.get()

In [None]:
recording.records

In [None]:
from llama_index.core.base_query_engine import BaseQueryEngine
isinstance(query_engine, BaseQueryEngine)

In [None]:
query_engine = index.as_query_engine()
tru_query_engine_recorder = TruLlama(query_engine)
with tru_query_engine_recorder as recording:
    llm_response_sync = query_engine.query(
        "What did the author do growing up?"
    )
record_sync = recording.get()