In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import random
import string
import uuid
from datetime import datetime, timedelta

from intelligence_layer.core.tracer.tracer import (
    Context,
    Event,
    ExportedSpan,
    ExportedSpanList,
    SpanAttributes,
    SpanStatus,
    TaskSpanAttributes,
)


def generate_random_dict(max_key_value_length=1000, max_dict_size=5):
    dict_size = random.randint(1, max_dict_size)
    random_dict = {}
    for _ in range(dict_size):
        key_length = random.randint(1, max_key_value_length)
        value_length = random.randint(1, max_key_value_length)
        key = "".join(
            random.choice(string.ascii_letters + string.digits)
            for _ in range(key_length)
        )
        value = "".join(
            random.choice(string.ascii_letters + string.digits)
            for _ in range(value_length)
        )
        random_dict[key] = value
    return random_dict


def generate_exported_span_list(byte_size: int) -> ExportedSpanList:
    exported_spans = []
    current_size = 0
    root_span = None

    while current_size < byte_size:
        if not root_span:
            # Create the root span
            root_span = ExportedSpan(
                context=Context(trace_id=uuid.uuid4(), span_id=uuid.uuid4()),
                name="root_span",
                parent_id=None,
                start_time=datetime.utcnow() - timedelta(minutes=random.randint(0, 60)),
                end_time=datetime.utcnow(),
                attributes=random.choice(
                    [
                        SpanAttributes(),
                        TaskSpanAttributes(
                            input=generate_random_dict(), output=generate_random_dict()
                        ),
                    ]
                ),
                events=[
                    Event(
                        name=f"event_{i}",
                        message=f"message_{i}",
                        body=generate_random_dict(),
                    )
                    for i in range(random.randint(1, 10))
                ],
                status=random.choice([SpanStatus.OK, SpanStatus.ERROR]),
            )
            exported_spans.append(root_span)
            current_size += len(root_span.json().encode("utf-8"))
        else:
            # Create a child span
            parent_span = random.choice(exported_spans)
            child_span = ExportedSpan(
                context=Context(
                    trace_id=parent_span.context.trace_id, span_id=uuid.uuid4()
                ),
                name=f"child_span_{len(exported_spans)}",
                parent_id=parent_span.context.span_id,
                start_time=parent_span.start_time
                + timedelta(milliseconds=random.randint(0, 1000)),
                end_time=parent_span.end_time
                + timedelta(milliseconds=random.randint(0, 1000)),
                attributes=random.choice(
                    [
                        SpanAttributes(),
                        TaskSpanAttributes(
                            input=generate_random_dict(), output=generate_random_dict()
                        ),
                    ]
                ),
                events=[
                    Event(
                        name=f"event_{i}",
                        message=f"message_{i}",
                        body=generate_random_dict(),
                    )
                    for i in range(random.randint(1, 10))
                ],
                status=random.choice([SpanStatus.OK, SpanStatus.ERROR]),
            )
            exported_spans.append(child_span)
            current_size += len(child_span.json().encode("utf-8"))

        # Avoid excessive memory usage
        if len(exported_spans) > 1000:
            break

    return ExportedSpanList(root=exported_spans)

In [None]:
def get_size(data):
    size = len(data.model_dump_json().encode("utf-8"))
    print(size)
    return size

In [None]:
from typing import Any

from intelligence_layer.core.tracer.tracer import SpanType


def _upload_trace(trace: ExportedSpanList, max_size=100) -> str:
    MAX_TRACE_SIZE = 1_000_000  # 1MB
    TRUNCATED_PLACEHOLDER = "[TRUNCATED]"

    def slim_span(span: ExportedSpan) -> ExportedSpan:
        """Create a trimmed version of a span with reduced payload size."""
        span_data = span.model_dump()

        # Trim attributes
        if isinstance(span.attributes, TaskSpanAttributes):
            span_data["attributes"] = TaskSpanAttributes(
                type=SpanType.TASK_SPAN,
                input=_truncate_value(span.attributes.input),
                output=_truncate_value(span.attributes.output),
            )

        # Trim events while keeping first/last for context
        if span.events:
            kept_events = (
                [span.events[0], span.events[-1]]
                if len(span.events) > 2
                else span.events
            )
            span_data["events"] = [
                Event(
                    name=event.name,
                    message=_truncate_str(event.message, 200),
                    body=_truncate_value(event.body),
                    timestamp=event.timestamp,
                )
                for event in kept_events
            ]

        return ExportedSpan(**span_data)

    def _truncate_value(value: Any, max_size: int = 1000) -> Any:
        """Recursively truncate large values while maintaining JSON serializability."""
        if isinstance(value, (str, bytes)):
            return _truncate_str(value, max_size)
        if isinstance(value, dict):
            return {k: _truncate_value(v, max_size) for k, v in value.items()}
        if isinstance(value, list):
            return [
                _truncate_value(v, max_size) for v in value[:10]
            ]  # Keep first 10 elements
        return value

    def _truncate_str(value: str, max_length: int) -> str:
        return (
            value[:max_length] + TRUNCATED_PLACEHOLDER
            if len(value) > max_length
            else value
        )

    # Check initial size
    if get_size(trace) > MAX_TRACE_SIZE:
        # Find largest span using serialized size
        spans = trace.root
        largest_span = max(spans, key=lambda s: len(s.model_dump_json()))

        # Replace largest span with trimmed version
        modified_spans = [slim_span(s) if s == largest_span else s for s in spans]
        trace = ExportedSpanList(root=modified_spans)

    # Proceed with upload
    print("final length:", get_size(trace))

In [None]:
data = generate_exported_span_list(50_000_000)

In [None]:
_upload_trace(data)

In [None]:
_upload_trace()