In [1]:
import dspy
from typing import List, Any, Callable, Optional, Tuple
from pydantic import BaseModel, Field
from dataclasses import dataclass

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
from dotenv import load_dotenv
from pathlib import Path
def load_parent_env():
    # Start from the current working directory
    current_dir = Path.cwd()

    # Traverse up the directory structure
    for parent in current_dir.parents:
        env_path = parent / '.env'
        if env_path.exists():
            load_dotenv(env_path)
            break

In [3]:
load_parent_env()

In [4]:
omni = dspy.OpenAI(model='gpt-4o', max_tokens=1000)
dspy.settings.configure(lm=omni)

In [5]:
@dataclass
class Tool:
    name: str
    desc: str
    input_variable: str
    function: Callable

    def __call__(self, *args, **kwargs) -> Tuple[str, str]:
        return self.function(*args, **kwargs)

In [6]:
# WORKING_DIRECTORY = Path('../playground/multi_trace/dsph_fL1/')
WORKING_DIRECTORY = Path('../playground/multi_trace/dsph_fL1/')
def prelude():
    written_files = []
    if not WORKING_DIRECTORY.exists():
        WORKING_DIRECTORY.mkdir()
    try:
        written_files = [
            f.relative_to(WORKING_DIRECTORY) for f in WORKING_DIRECTORY.rglob("*")
        ]
    except Exception:
        pass
    if not written_files:
        return "No files written"
    return "\n".join([f" - {f}" for f in written_files])
    

In [7]:
class PersistentProgramOfThought(dspy.ProgramOfThought):
    def __init__(self, signature, max_iters=3, import_white_list=None):
        super().__init__(signature, max_iters, import_white_list)
    
    def forward(self, **kwargs):
        input_kwargs = {
            field_name: kwargs[field_name] for field_name in self.input_fields
        }
        code_data = self.code_generate(**input_kwargs)
        parsed_code, error = self.parse_code(code_data)
        # FIXME: Don't try to execute the code if it didn't parse
        code, output, error = self.execute_code(parsed_code)
        hop = 0
        while hop < self.max_iters and error:
            print("Error in code execution")
            input_kwargs.update({"previous_code": code, "error": error})
            print(parsed_code)
            print(error)
            code_data = self.code_regenerate(**input_kwargs)
            parsed_code, error = self.parse_code(code_data)
            # FIXME: Don't try to execute the code if it didn't parse
            code, output, error = self.execute_code(parsed_code)
            hop += 1
            if hop == self.max_iters:
                print("Max hops reached. Error persists.")
                return None
        input_kwargs.update({"final_generated_code": code, "code_output": output})
        answer_gen_result = self.generate_answer(**input_kwargs)
        return answer_gen_result, code, output

In [8]:
# formulation 1 -- generalist planning optimizations

class Input(BaseModel):
    """Input to planner predictor"""
    
    objective: str = Field(desc="Final desired outcome for the plan")
    files: str = Field(desc="A text description of the included files and their headers")
    context: str = Field(desc="Information that is necessary for solving the issue")

class Plan(BaseModel):
    """Plan to follow in future"""

    steps: List[str] = Field(
        description="different steps to follow, should be in sorted order"
    )
    
def parse_plan(plan: Plan) -> str:
    return "\n".join(f"{step}" for _, step in enumerate(plan.steps))

class Objective2Plan(dspy.Signature):
    """Come up with an ordered step-by-step plan that, if sequentially executed, will yield the correct answer based on the objective. Make sure all information (be specific) is provided and that no superflous steps are included. The result of the final step should be the final answer."""
    input: Input = dspy.InputField()
    output: Plan = dspy.OutputField()


# Formulation 2: Domain-Specific Optimization.
class SLOV2Plan(dspy.Signature):
    """
    Come up with an ordered step-by-step plan that, if sequentially executed, will yield the correct answer based on the objective. Ensure that all information is provided and that no superflous steps are included. The result of the final step should be the final answer.
    Objective: Produce a report on anomalies in microservice application traces and metrics based on an SLO violation.
    Context:
    - You have access to a working directory containing:
      1. main_traces.csv: A summary of Jaeger traces for all calls in the workload, including headers for traceID, duration-ms, startTime, endTime, rpcErrors, operation, and services_involved.
      2. Individual trace tree JSON files, each named after its respective traceID.
    - Anomalies may include errors, extreme bottlenecks, recursions, or other unusual patterns.
    - The plan will be executed by an agent, not by you.
    """
    slo_violation: str = dspy.InputField(desc="A description of the service level objective violation incident")
    output: Plan = dspy.OutputField()


# Program of Thought
class Plan2Anom(dspy.Signature):
    """Follow the given plan in a step by step fashion so that an anomaly summary is produced at the end"""
    
    plan: str = dspy.InputField(desc="A step by step plan to follow")
    report: str = dspy.OutputField(desc="A report on anomalies or coding errors which have prevented such a report from being produced")

In [9]:
# slo_violation="Please check if the traces in the provided directory are anomalous."
slo_violation = "We have observed an increase in service latency above 2 seconds for more than 20% of user requests, violating service level agreements."
query = Input(
    objective="Produce a report on anomalies in microservice application traces and metrics based on an SLO violation.",
    files="You have access to a working directory containing: 1. main_traces.csv: A summary of Jaeger traces for all calls in the workload, including headers for traceID, duration-ms, startTime, endTime, rpcErrors, operation, and services_involved, 2. Individual trace tree JSON files, each named after its respective traceID. Anomalies may include errors, extreme bottlenecks, recursions, or other unusual patterns. The plan will be executed by an agent, not by you.",
    context=slo_violation
)


class PlanningFlow(dspy.Module):
    def __init__(self):
        super().__init__()
        self.files = prelude()
        # self.violation = slo_violation
        # self.planner = dspy.TypedPredictor(SLOV2Plan)
        self.planner = dspy.TypedPredictor(Objective2Plan)
        self.actor = PersistentProgramOfThought(Plan2Anom, import_white_list="pandas,csv,numpy,json,os")

    def forward(self):
        
        plan = self.planner(
            input=query
        ).output
        # print(plan)
        print(parse_plan(plan))
        report, code, out = self.actor(
            plan=parse_plan(plan)
        )
        return (report, code, out)
    
    
        

In [10]:
flow = PlanningFlow()
flow.forward()

1. Load the main_traces.csv file into a data structure for analysis.
2. Filter the traces in main_traces.csv to identify those with a duration greater than 2000 milliseconds.
3. Calculate the percentage of traces with a duration greater than 2000 milliseconds to confirm the SLO violation.
4. For each trace with a duration greater than 2000 milliseconds, load the corresponding individual trace tree JSON file using the traceID.
5. Analyze each individual trace tree JSON file to identify anomalies such as rpcErrors, extreme bottlenecks, recursions, or other unusual patterns.
6. Document each identified anomaly with details including traceID, type of anomaly, and any relevant metrics or patterns observed.
7. Compile the documented anomalies into a comprehensive report.
8. Review the report to ensure all identified anomalies are clearly described and supported by data from the traces and metrics.
9. Finalize the report and prepare it for presentation or distribution.
Error in code execution

KeyboardInterrupt: 