From 4b4375a1c22c647d2c7841388e5621c307793763 Mon Sep 17 00:00:00 2001 From: Karthik Kalyanaraman Date: Fri, 19 Jul 2024 13:26:53 -0700 Subject: [PATCH 1/3] Bugfix DSPy instrumentation --- .../instrumentation/dspy/instrumentation.py | 4 +- .../instrumentation/dspy/patch.py | 72 +++++++++---------- 2 files changed, 38 insertions(+), 38 deletions(-) diff --git a/src/langtrace_python_sdk/instrumentation/dspy/instrumentation.py b/src/langtrace_python_sdk/instrumentation/dspy/instrumentation.py index 375989ef..a9cc587e 100644 --- a/src/langtrace_python_sdk/instrumentation/dspy/instrumentation.py +++ b/src/langtrace_python_sdk/instrumentation/dspy/instrumentation.py @@ -27,12 +27,12 @@ class DspyInstrumentation(BaseInstrumentor): The DspyInstrumentor class represents the DSPy instrumentation""" def instrumentation_dependencies(self) -> Collection[str]: - return ["dspy >= 0.1.5"] + return ["dspy-ai >= 2.0.0"] def _instrument(self, **kwargs): tracer_provider = kwargs.get("tracer_provider") tracer = get_tracer(__name__, "", tracer_provider) - version = v("dspy") + version = v("dspy-ai") _W( "dspy.teleprompt.bootstrap", "BootstrapFewShot.compile", diff --git a/src/langtrace_python_sdk/instrumentation/dspy/patch.py b/src/langtrace_python_sdk/instrumentation/dspy/patch.py index 3427eba9..181b276d 100644 --- a/src/langtrace_python_sdk/instrumentation/dspy/patch.py +++ b/src/langtrace_python_sdk/instrumentation/dspy/patch.py @@ -39,25 +39,25 @@ def traced_method(wrapped, instance, args, kwargs): ), } span_attributes["dspy.optimizer.module.prog"] = json.dumps(prog) - if "metric" in instance and instance.metric: - span_attributes["dspy.optimizer.metric"] = instance.metric.__name__ + if hasattr(instance, 'metric'): + span_attributes["dspy.optimizer.metric"] = getattr(instance, 'metric').__name__ if kwargs.get("trainset") and len(kwargs.get("trainset")) > 0: span_attributes["dspy.optimizer.trainset"] = str(kwargs.get("trainset")) config = {} - if "metric_threshold" in instance and instance.metric_threshold: - config["metric_threshold"] = instance.metric_threshold - if "teacher_settings" in instance and instance.teacher_settings: - config["teacher_settings"] = instance.teacher_settings - if "max_bootstrapped_demos" in instance and instance.max_bootstrapped_demos: - config["max_bootstrapped_demos"] = instance.max_bootstrapped_demos - if "max_labeled_demos" in instance and instance.max_labeled_demos: - config["max_labeled_demos"] = instance.max_labeled_demos - if "max_rounds" in instance and instance.max_rounds: - config["max_rounds"] = instance.max_rounds - if "max_errors" in instance and instance.max_errors: - config["max_errors"] = instance.max_errors - if "error_count" in instance and instance.error_count: - config["error_count"] = instance.error_count + if hasattr(instance, 'metric_threshold'): + config["metric_threshold"] = getattr(instance, 'metric_threshold') + if hasattr(instance, 'teacher_settings'): + config["teacher_settings"] = getattr(instance, 'teacher_settings') + if hasattr(instance, 'max_bootstrapped_demos'): + config["max_bootstrapped_demos"] = getattr(instance, 'max_bootstrapped_demos') + if hasattr(instance, 'max_labeled_demos'): + config["max_labeled_demos"] = getattr(instance, 'max_labeled_demos') + if hasattr(instance, 'max_rounds'): + config["max_rounds"] = getattr(instance, 'max_rounds') + if hasattr(instance, 'max_steps'): + config["max_errors"] = getattr(instance, 'max_errors') + if hasattr(instance, 'error_count'): + config["error_count"] = getattr(instance, 'error_count') if config and len(config) > 0: span_attributes["dspy.optimizer.config"] = json.dumps(config) @@ -147,30 +147,30 @@ def traced_method(wrapped, instance, args, kwargs): **(extra_attributes if extra_attributes is not None else {}), } - if "devset" in instance and instance.devset is not None: - span_attributes["dspy.evaluate.devset"] = str(instance.devset) - if "display" in instance and instance.display is not None: - span_attributes["dspy.evaluate.display"] = str(instance.display) - if "num_threads" in instance and instance.num_threads is not None: - span_attributes["dspy.evaluate.num_threads"] = str(instance.num_threads) - if "return_outputs" in instance and instance.return_outputs is not None: + if hasattr(instance, "devset"): + span_attributes["dspy.evaluate.devset"] = str(getattr(instance, "devset")) + if hasattr(instance, "trainset"): + span_attributes["dspy.evaluate.display"] = str(getattr(instance, "trainset")) + if hasattr(instance, "num_threads"): + span_attributes["dspy.evaluate.num_threads"] = str(getattr(instance, "num_threads")) + if hasattr(instance, "return_outputs"): span_attributes["dspy.evaluate.return_outputs"] = str( - instance.return_outputs + getattr(instance, "return_outputs") ) - if "display_table" in instance and instance.display_table is not None: - span_attributes["dspy.evaluate.display_table"] = str(instance.display_table) - if "display_progress" in instance and instance.display_progress is not None: + if hasattr(instance, "display_table"): + span_attributes["dspy.evaluate.display_table"] = str(getattr(instance, "display_table")) + if hasattr(instance, "display_progress"): span_attributes["dspy.evaluate.display_progress"] = str( - instance.display_progress + getattr(instance, "display_progress") ) - if "metric" in instance and instance.metric is not None: - span_attributes["dspy.evaluate.metric"] = instance.metric.__name__ - if "error_count" in instance and instance.error_count is not None: - span_attributes["dspy.evaluate.error_count"] = str(instance.error_count) - if "error_lock" in instance and instance.error_lock is not None: - span_attributes["dspy.evaluate.error_lock"] = str(instance.error_lock) - if "max_errors" in instance and instance.max_errors is not None: - span_attributes["dspy.evaluate.max_errors"] = str(instance.max_errors) + if hasattr(instance, "metric"): + span_attributes["dspy.evaluate.metric"] = getattr(instance, "metric").__name__ + if hasattr(instance, "error_count"): + span_attributes["dspy.evaluate.error_count"] = str(getattr(instance, "error_count")) + if hasattr(instance, "error_lock"): + span_attributes["dspy.evaluate.error_lock"] = str(getattr(instance, "error_lock")) + if hasattr(instance, "max_errors"): + span_attributes["dspy.evaluate.max_errors"] = str(getattr(instance, "max_errors")) if args and len(args) > 0: span_attributes["dspy.evaluate.args"] = str(args) From 907baca864cb55357a0385d0f4ecac44baa86a7b Mon Sep 17 00:00:00 2001 From: Karthik Kalyanaraman Date: Fri, 19 Jul 2024 13:59:39 -0700 Subject: [PATCH 2/3] Add example for parallel execution --- .../math_problems_cot_parallel.py | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 src/examples/dspy_example/math_problems_cot_parallel.py diff --git a/src/examples/dspy_example/math_problems_cot_parallel.py b/src/examples/dspy_example/math_problems_cot_parallel.py new file mode 100644 index 00000000..8c5fabf7 --- /dev/null +++ b/src/examples/dspy_example/math_problems_cot_parallel.py @@ -0,0 +1,59 @@ +import dspy +from dspy.datasets.gsm8k import GSM8K, gsm8k_metric +from dspy.teleprompt import BootstrapFewShot +from concurrent.futures import ThreadPoolExecutor +from opentelemetry.context import get_current, attach, detach + +# flake8: noqa +from langtrace_python_sdk import langtrace, with_langtrace_root_span + +langtrace.init() + +turbo = dspy.OpenAI(model="gpt-3.5-turbo", max_tokens=250) +dspy.settings.configure(lm=turbo) + +# Load math questions from the GSM8K dataset +gsm8k = GSM8K() +gsm8k_trainset, gsm8k_devset = gsm8k.train[:10], gsm8k.dev[:10] + +class CoT(dspy.Module): + def __init__(self): + super().__init__() + self.prog = dspy.ChainOfThought("question -> answer") + + def forward(self, question): + return self.prog(question=question) + +@with_langtrace_root_span(name="parallel_example") +def example(): + # Set up the optimizer: we want to "bootstrap" (i.e., self-generate) 4-shot examples of our CoT program. + config = dict(max_bootstrapped_demos=4, max_labeled_demos=4) + + # Optimize! Use the `gsm8k_metric` here. In general, the metric is going to tell the optimizer how well it's doing. + teleprompter = BootstrapFewShot(metric=gsm8k_metric, **config) + optimized_cot = teleprompter.compile(CoT(), trainset=gsm8k_trainset) + + questions = [ + "What is the cosine of 0?", + "What is the tangent of 0?", + ] + + current_context = get_current() + + def run_with_context(context, func, *args, **kwargs): + token = attach(context) + try: + return func(*args, **kwargs) + finally: + detach(token) + + with ThreadPoolExecutor(max_workers=2) as executor: + futures = [executor.submit(run_with_context, current_context, optimized_cot, question=q) for q in questions] + + for future in futures: + ans = future.result() + print(ans) + + +if __name__ == "__main__": + example() From a791b3c327e762bdea88be4c3fc580d68d9d5c6e Mon Sep 17 00:00:00 2001 From: Karthik Kalyanaraman Date: Fri, 19 Jul 2024 14:01:01 -0700 Subject: [PATCH 3/3] Bump version --- src/langtrace_python_sdk/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/langtrace_python_sdk/version.py b/src/langtrace_python_sdk/version.py index b19ee4b7..ba51cedf 100644 --- a/src/langtrace_python_sdk/version.py +++ b/src/langtrace_python_sdk/version.py @@ -1 +1 @@ -__version__ = "2.2.1" +__version__ = "2.2.2"