Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 6 additions & 14 deletions src/examples/dspy_example/math_problems_cot_parallel.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import contextvars
import dspy
from dspy.datasets.gsm8k import GSM8K, gsm8k_metric
from dspy.teleprompt import BootstrapFewShot
from concurrent.futures import ThreadPoolExecutor
from opentelemetry.context import get_current, attach, detach

# flake8: noqa
from langtrace_python_sdk import langtrace, with_langtrace_root_span
Expand All @@ -22,7 +22,8 @@ def __init__(self):
self.prog = dspy.ChainOfThought("question -> answer")

def forward(self, question):
return self.prog(question=question)
result = inject_additional_attributes(lambda: self.prog(question=question), {'langtrace.span.name': 'MathProblemsCotParallel'})
return result

@with_langtrace_root_span(name="parallel_example")
def example():
Expand All @@ -34,21 +35,12 @@ def example():
optimized_cot = teleprompter.compile(CoT(), trainset=gsm8k_trainset)

questions = [
"What is the cosine of 0?",
"What is the tangent of 0?",
"What is the sine of 0?",
"What is the tangent of 100?",
]

current_context = get_current()

def run_with_context(context, func, *args, **kwargs):
token = attach(context)
try:
return func(*args, **kwargs)
finally:
detach(token)

with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(run_with_context, current_context, optimized_cot, question=q) for q in questions]
futures = [executor.submit(contextvars.copy_context().run, optimized_cot, question=q) for q in questions]

for future in futures:
ans = future.result()
Expand Down
24 changes: 21 additions & 3 deletions src/langtrace_python_sdk/instrumentation/dspy/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,14 @@ def traced_method(wrapped, instance, args, kwargs):
if config and len(config) > 0:
span_attributes["dspy.optimizer.config"] = json.dumps(config)

# passed operation name
opname = operation_name
if extra_attributes is not None and "langtrace.span.name" in extra_attributes:
# append the operation name to the span name
opname = f"{operation_name}-{extra_attributes['langtrace.span.name']}"

attributes = FrameworkSpanAttributes(**span_attributes)
with tracer.start_as_current_span(operation_name, kind=SpanKind.CLIENT) as span:
with tracer.start_as_current_span(opname, kind=SpanKind.CLIENT) as span:
_set_input_attributes(span, kwargs, attributes)

try:
Expand Down Expand Up @@ -100,6 +106,12 @@ def traced_method(wrapped, instance, args, kwargs):
**(extra_attributes if extra_attributes is not None else {}),
}

# passed operation name
opname = operation_name
if extra_attributes is not None and "langtrace.span.name" in extra_attributes:
# append the operation name to the span name
opname = f"{operation_name}-{extra_attributes['langtrace.span.name']}"

if instance.__class__.__name__:
span_attributes["dspy.signature.name"] = instance.__class__.__name__
span_attributes["dspy.signature"] = str(instance)
Expand All @@ -108,7 +120,7 @@ def traced_method(wrapped, instance, args, kwargs):
span_attributes["dspy.signature.args"] = str(kwargs)

attributes = FrameworkSpanAttributes(**span_attributes)
with tracer.start_as_current_span(operation_name, kind=SpanKind.CLIENT) as span:
with tracer.start_as_current_span(opname, kind=SpanKind.CLIENT) as span:
_set_input_attributes(span, kwargs, attributes)

try:
Expand Down Expand Up @@ -147,6 +159,12 @@ def traced_method(wrapped, instance, args, kwargs):
**(extra_attributes if extra_attributes is not None else {}),
}

# passed operation name
opname = operation_name
if extra_attributes is not None and "langtrace.span.name" in extra_attributes:
# append the operation name to the span name
opname = f"{operation_name}-{extra_attributes['langtrace.span.name']}"

if hasattr(instance, "devset"):
span_attributes["dspy.evaluate.devset"] = str(getattr(instance, "devset"))
if hasattr(instance, "trainset"):
Expand Down Expand Up @@ -175,7 +193,7 @@ def traced_method(wrapped, instance, args, kwargs):
span_attributes["dspy.evaluate.args"] = str(args)

attributes = FrameworkSpanAttributes(**span_attributes)
with tracer.start_as_current_span(operation_name, kind=SpanKind.CLIENT) as span:
with tracer.start_as_current_span(opname, kind=SpanKind.CLIENT) as span:
_set_input_attributes(span, kwargs, attributes)

try:
Expand Down
2 changes: 1 addition & 1 deletion src/langtrace_python_sdk/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "2.2.2"
__version__ = "2.2.3"