In [None]:
import dspy
import mlflow
from dspy.teleprompt import MIPROv2
import os

lm = dspy.LM(
    model='openai/gpt-oss-20b:free',
    api_base='https://openrouter.ai/api/v1',
    api_key='sk-or-v1-XXXXXXXXXXXXXXXX',
    cache=False
    max_tokens=2000
)

dspy.settings.configure(lm=lm)

In [None]:
lm("Say this is a test!", temperature=0.7)  # => ['This is a test!']
lm(messages=[{"role": "user", "content": "Say this is a test!"}])  # => ['This is a test!']

In [None]:
mlflow.set_experiment("Autonomous_Market_Analyst_v1")
# mlflow.dspy.autolog()

2026/02/06 22:31:23 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
2026/02/06 22:31:23 INFO mlflow.store.db.utils: Updating database tables
2026/02/06 22:31:23 INFO alembic.runtime.migration: Context impl SQLiteImpl.
2026/02/06 22:31:23 INFO alembic.runtime.migration: Will assume non-transactional DDL.
2026/02/06 22:31:23 INFO alembic.runtime.migration: Context impl SQLiteImpl.
2026/02/06 22:31:23 INFO alembic.runtime.migration: Will assume non-transactional DDL.


<Experiment: artifact_location='file:a:/Portfolio/mlflow/abdo/mlruns/1', creation_time=1770403493005, experiment_id='1', last_update_time=1770403493005, lifecycle_stage='active', name='Autonomous_Market_Analyst_v1', tags={}>

### DSPy Signatures

In [None]:
class ResearchTask(dspy.Signature):
    """
    Research product information and analyze competitors in the market.
    """
    product_name = dspy.InputField(desc="Name of the product or market domain")
    research_notes = dspy.OutputField(
        desc="Detailed report about market conditions and competitors"
    )


class MarketingStrategy(dspy.Signature):
    """
    Transform research insights into a complete marketing strategy.
    """
    research_notes = dspy.InputField()
    strategy = dspy.OutputField(
        desc="Marketing plan including distribution channels and competitive advantage"
    )
    

### Search Tool (Retriever)

In [None]:
from tavily import TavilyClient

tavily_client = TavilyClient(api_key="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")

def search_wikipedia(query: str) -> list[str]:
    """
    Web search retriever using Tavily.
    Compatible with DSPy ReAct agents.
    """
    response = tavily_client.search(
        query=query,
        search_depth="basic",
        max_results=3
    )

    return [r["content"] for r in response["results"]]

### Market Intelligence System
#####    (ReAct + CoT + Self-Reflection)

In [None]:
class MarketIntelligenceSystem(dspy.Module):
    def __init__(self):
        super().__init__()

        # ReAct agent for research
        self.researcher = dspy.ReAct(
            ResearchTask,
            tools=[search_wikipedia]
        )

        # Chain-of-Thought agent for strategy generation
        self.strategist = dspy.ChainOfThought(
            MarketingStrategy
        )
        
    def forward(self, product_name):
        # Market research
        research = self.researcher(
            product_name=product_name
        )

        # Initial strategy
        result = self.strategist(
            research_notes=research.research_notes
        )
        strategy = result.strategy

        # Final output
        return dspy.Prediction(
            research=research.research_notes,
            strategy=strategy
        )

### Training Data

In [None]:
trainset = [
    dspy.Example(
        product_name="Electric Cars",
        strategy="Focus on battery longevity, charging infrastructure, and eco-friendly branding."
    ).with_inputs("product_name"),

    dspy.Example(
        product_name="AI Writing Tools",
        strategy="Target content creators with speed, SEO optimization, and workflow integration."
    ).with_inputs("product_name"),
]

### Evaluation Metric

In [None]:
def simple_metric(gold ,  pred , trace = None):
    """
    Very simple quality heuristic:
    the strategy must be sufficiently detailed.
    """
    return len(pred.strategy) > 80

###  Optimizer (Prompt + Reasoning)

In [None]:
optimizer = MIPROv2(
    metric=simple_metric,
    auto=None,
    num_candidates=1, 
    num_threads=1
)

with mlflow.start_run(run_name="Market_System_Optimization"):
    optimized_system = optimizer.compile(
    MarketIntelligenceSystem(),
    trainset=trainset,
    valset=trainset,
    num_trials=1,
    minibatch=False,
    data_aware_proposer=False 
)

###  DSPy â†’ MLflow Trace Logger

In [None]:
def log_dspy_trace(trace, name:str):
    """
    Store DSPy execution trace as a JSON artifact in MLflow.
    """
    trace_dict = trace.to_dict()
    path = f"{name}_dspy_trace.json"
    
    with open(path, "w") as f:
        json.dump(trace_dict, f, indent=2)
        
    mlflow.log_artifact(path)

### Run + Trace Visualization

In [None]:
with mlflow.start_run(run_name="DSPy_Full_Trace_Run"):

    mlflow.log_param("system", "MarketIntelligenceSystem")
    mlflow.log_param("reflection_loops", 2)
    mlflow.set_tag("framework", "DSPy")
    
    result = optimized_system(
        product_name="Sustainable Fashion Brands",
        trace=True
    )
    
    mlflow.log_text(result.strategy, "final_strategy.txt")
    mlflow.log_text(result.research, "research_notes.txt")
    
    log_dspy_trace(
        trace=result.trace,
        name="full_dspy_trace"
    )

    print("Final Strategy:\n")
    print(result.strategy)    
    

###  Save Optimized System

In [None]:
optimized_system.save("market_intelligence_system.json")