In [2]:
import sys
!{sys.executable} -m pip install -r requirements.txt

Defaulting to user installation because normal site-packages is not writeable


In [3]:
import os
import yfinance as yf
from dotenv import load_dotenv
from typing import List, TypedDict, Annotated
import operator

import mlflow
from langgraph.graph import StateGraph, START, END

from langchain_community.tools import DuckDuckGoSearchRun
from langchain.output_parsers import PydanticOutputParser
from langchain.prompts import PromptTemplate
from langchain_community.tools.yahoo_finance_news import YahooFinanceNewsTool
from langchain_openai import AzureChatOpenAI
from pydantic import BaseModel, Field
from typing import Dict, Any, List

# --- Load Environment Variables ---
#  .env file with Azure OpenAI credentials ( used creds that I had access to)
load_dotenv(".env")
AZURE_OPENAI_KEY = os.getenv("AZURE_OPENAI_KEY")
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_DEPLOYMENT = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4o")
AZURE_OPENAI_VERSION = os.getenv("OPENAI_API_VERSION")


USER_AGENT environment variable not set, consider setting it to identify your requests.


In [4]:
print(AZURE_OPENAI_ENDPOINT)

https://eastus.api.cognitive.microsoft.com/


In [10]:
mlflow.set_tracking_uri("file:///tmp/mlruns")  # logs to local folder
mlflow.set_experiment("market-sentiment-analyzer")

# ---------------------------
# Pydantic schema
# ---------------------------
class SentimentProfile(BaseModel):
    company_name: str = Field(..., description="Name of the company")
    stock_code: str = Field(..., description="Ticker symbol of the company")
    newsdesc: str = Field(..., description="Concise news description")
    sentiment: str = Field(..., description="Positive/Negative/Neutral")
    people_names: List[str] = []
    places_names: List[str] = []
    other_companies_referred: List[str] = []
    related_industries: List[str] = []
    market_implications: str = ""
    confidence_score: float = Field(..., description="Confidence score between 0-1")

parser = PydanticOutputParser(pydantic_object=SentimentProfile)

# ---------------------------
# 3) Stock code mapping
# ---------------------------
STOCK_CODE_MAP = {
    "Microsoft": "MSFT",
    "Apple": "AAPL",
    "Tesla": "TSLA",
    "Google": "GOOGL",
    "Amazon": "AMZN",
}

def get_stock_code(company_name: str) -> str:
    return STOCK_CODE_MAP.get(company_name, company_name)  # fallback = same string

# ---------------------------
# 4) Fetch news
# ---------------------------
def fetch_news_for_ticker(ticker: str, max_items: int = 5) -> List[str]:
    try:
        news_items = yf.Ticker(ticker).news[:max_items]
        return [item.get("title", "") for item in news_items if item.get("title")]
    except Exception as e:
        print(f"[WARN] Could not fetch news for {ticker}: {e}")
        return []

# ---------------------------
# 5) Build prompt
# ---------------------------
def build_parser_and_prompt():
    template = """
    You are a financial analyst.
    Based on the following news headlines about {company_name} ({stock_code}),
    provide a structured sentiment analysis in JSON.

    Headlines:
    {news_items}

    {format_instructions}
    """
    return PromptTemplate(
        template=template,
        input_variables=["company_name", "stock_code", "news_items"],
        partial_variables={"format_instructions": parser.get_format_instructions()},
    )

# ---------------------------
# 6) Pipeline
# ---------------------------
def run_pipeline(company_name: str) -> Dict[str, Any]:
    with mlflow.start_run():
        stock_code = get_stock_code(company_name)
        news_items = fetch_news_for_ticker(stock_code)

        news_blob = "\n".join(f"- {n}" for n in news_items) if news_items else "No recent news available."

        prompt = build_parser_and_prompt()
        
        
        llm = AzureChatOpenAI(
        azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT"),
        openai_api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
        openai_api_key=os.getenv("AZURE_OPENAI_API_KEY"),
        azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
        openai_api_type=os.getenv("OPENAI_API_TYPE"),
        temperature=0)


        chain = prompt | llm | parser
        result = chain.invoke({
            "company_name": company_name,
            "stock_code": stock_code,
            "news_items": news_blob,
        })

        mlflow.log_params({"company_name": company_name, "stock_code": stock_code})
        mlflow.log_text(news_blob, "news.txt")
        mlflow.log_dict(result.dict(), "result.json")

        return result.dict()


In [12]:
import json
result = run_pipeline("Microsoft")
print(json.dumps(result, indent=2))

{
  "company_name": "Microsoft",
  "stock_code": "MSFT",
  "newsdesc": "No recent news available.",
  "sentiment": "Neutral",
  "people_names": [],
  "places_names": [],
  "other_companies_referred": [],
  "related_industries": [],
  "market_implications": "",
  "confidence_score": 0.5
}


/tmp/ipykernel_215652/1179239222.py:97: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  mlflow.log_dict(result.dict(), "result.json")
/tmp/ipykernel_215652/1179239222.py:99: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  return result.dict()
