In [26]:
!pip install langchain langchain-core langchain-community\
     langchain-experimental langchain-openai yfinance
!pip install yfinance



In [36]:
from langchain.chains import SequentialChain, TransformChain
from langchain_openai import AzureChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from typing import List
import yfinance as yf
import json
from datetime import datetime
from langfuse import Langfuse
from langfuse.callback import CallbackHandler

# Initialize Langfuse
langfuse = Langfuse()
handler = CallbackHandler()

# Define Pydantic model for structured output
class SentimentAnalysisResult(BaseModel):
    company_name: str = Field(..., description="Name of the company being analyzed")
    stock_code: str = Field(..., description="Stock ticker symbol")
    newsdesc: str = Field(..., description="Summary of news analyzed")
    sentiment: str = Field(..., description="Overall sentiment: Positive, Negative, or Neutral")
    people_names: List[str] = Field(default_factory=list, description="List of people mentioned")
    places_names: List[str] = Field(default_factory=list, description="List of places mentioned")
    other_companies_referred: List[str] = Field(default_factory=list, description="List of other companies mentioned")
    related_industries: List[str] = Field(default_factory=list, description="List of related industries")
    market_implications: str = Field(..., description="Potential market impact analysis")
    confidence_score: float = Field(..., ge=0.0, le=1.0, description="Confidence score of the analysis (0.0 to 1.0)")

# Step 1: Stock code extraction
def get_stock_code(inputs: dict) -> dict:
    handler.trace(
        name="stock_code_extraction",
        input=inputs,
        metadata={"step": "ticker_lookup"}
    )
    
    company_name = inputs["company_name"].lower()
    ticker_map = {
        "apple": "AAPL", "microsoft": "MSFT", "google": "GOOGL",
        "amazon": "AMZN", "meta": "META", "tesla": "TSLA",
        "nvidia": "NVDA", "netflix": "NFLX", "intel": "INTC",
        "amd": "AMD", "ibm": "IBM", "oracle": "ORCL"
    }
    
    ticker = ticker_map.get(company_name)
    if not ticker:
        try:
            search_result = yf.Ticker(company_name)
            if search_result.info:
                ticker = search_result.info.get('symbol', 'UNKNOWN')
        except Exception as e:
            handler.trace(
                name="stock_code_error",
                input=inputs,
                metadata={"error": str(e)}
            )
            raise ValueError(f"Could not find ticker for company: {company_name}")
    
    return {"ticker": ticker}

stock_code_chain = TransformChain(
    input_variables=["company_name"],
    output_variables=["ticker"],
    transform=get_stock_code
)

# Step 2: News fetching
def fetch_news(inputs: dict) -> dict:
    handler.trace(
        name="news_fetching",
        input=inputs,
        metadata={"step": "news_retrieval"}
    )
    
    ticker = inputs["ticker"]
    try:
        news_data = yf.Ticker(ticker).news
        news_items = []
        
        for item in news_data[:3]:  # Get top 3 news items
            news_items.append({
                "title": item.get("title", ""),
                "publisher": item.get("publisher", ""),
                "summary": item.get("description", ""),
                "link": item.get("link", ""),
                "published": item.get("providerPublishTime", "")
            })
        
        if not news_items:
            raise ValueError("No news articles found")
        
        return {"news": news_items}
    except Exception as e:
        handler.trace(
            name="news_fetch_error",
            input=inputs,
            metadata={"error": str(e)}
        )
        raise ValueError(f"Failed to fetch news: {str(e)}")

news_fetch_chain = TransformChain(
    input_variables=["ticker"],
    output_variables=["news"],
    transform=fetch_news
)

# Step 3: Sentiment analysis
def analyze_sentiment(inputs: dict) -> dict:
    handler.trace(
        name="sentiment_analysis_start",
        input=inputs,
        metadata={"step": "sentiment_analysis"}
    )
    
    # Initialize parser
    parser = PydanticOutputParser(pydantic_object=SentimentAnalysisResult)
    
    # Initialize Azure OpenAI - CORRECTED initialization
    # llm = AzureChatOpenAI(
    #     azure_deployment="your-deployment-name",  # Replace with your actual deployment name
    #     openai_api_version="2023-05-15",         # Use your API version
    #     temperature=0.3
    # )
    llm = AzureChatOpenAI(model='myllm')
    
    # Create prompt with format instructions
    prompt = ChatPromptTemplate.from_template("""
    Analyze the following news articles about {company_name} ({ticker}) and provide a structured sentiment analysis.
    
    News Articles:
    {news}
    
    {format_instructions}
    
    Provide a concise newsdesc summarizing the key points from all articles.
    For market implications, analyze potential impact on stock performance.
    """)
    
    # Create chain
    chain = prompt | llm | parser
    
    # Format news for prompt
    formatted_news = "\n".join(
        [f"Title: {item['title']}\nSummary: {item.get('summary', 'N/A')}" 
         for item in inputs["news"]]
    )
    
    try:
        result = chain.invoke({
            "company_name": inputs["company_name"],
            "ticker": inputs["ticker"],
            "news": formatted_news,
            "format_instructions": parser.get_format_instructions()
        }, config={"callbacks": [handler]})
        
        handler.trace(
            name="sentiment_analysis_complete",
            output=result.dict(),
            metadata={"status": "success"}
        )
        
        return {"sentiment_analysis": result.dict()}
    except Exception as e:
        handler.trace(
            name="sentiment_analysis_error",
            metadata={"error": str(e)}
        )
        raise ValueError(f"Sentiment analysis failed: {str(e)}")

sentiment_analysis_chain = TransformChain(
    input_variables=["company_name", "ticker", "news"],
    output_variables=["sentiment_analysis"],
    transform=analyze_sentiment
)

# Complete pipeline
pipeline = SequentialChain(
    chains=[stock_code_chain, news_fetch_chain, sentiment_analysis_chain],
    input_variables=["company_name"],
    output_variables=["sentiment_analysis"],
    verbose=True
)

# Main function with proper Langfuse tracing
def analyze_company_sentiment(company_name: str) -> dict:
    trace = langfuse.trace(
        name="market_sentiment_analysis",
        input={"company_name": company_name}
    )
    
    try:
        result = pipeline.invoke(
            {"company_name": company_name},
            config={"callbacks": [handler]}
        )
        
        # Update trace with successful completion
        trace.update(
            output=result["sentiment_analysis"],
            metadata={"status": "success"}
        )
        
        return result["sentiment_analysis"]
    except Exception as e:
        # Update trace with error information
        trace.update(
            metadata={
                "status": "failed",
                "error": str(e)
            }
        )
        return {
            "error": str(e),
            "company_name": company_name,
            "stock_code": "N/A",
            "sentiment": "Unknown"
        }

# Example usage
if __name__ == "__main__":
    analysis = analyze_company_sentiment("apple")
    print(json.dumps(analysis, indent=2))



[1m> Entering new SequentialChain chain...[0m
{
  "error": "'StatefulTraceClient' object is not callable",
  "company_name": "apple",
  "stock_code": "N/A",
  "sentiment": "Unknown"
}


In [37]:
from langchain.chains import SequentialChain, TransformChain
from langchain_openai import AzureChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from typing import List
import yfinance as yf
import json
from langfuse import Langfuse
from langfuse.callback import CallbackHandler
import certifi
import os
from time import sleep

# Selenium imports
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
import time

# Configure SSL certificates
os.environ['REQUESTS_CA_BUNDLE'] = certifi.where()
os.environ['SSL_CERT_FILE'] = certifi.where()

# Initialize Langfuse
langfuse = Langfuse()
handler = CallbackHandler()

# Pydantic model for structured output
class SentimentAnalysisResult(BaseModel):
    company_name: str = Field(..., description="Name of the company being analyzed")
    stock_code: str = Field(..., description="Stock ticker symbol")
    newsdesc: str = Field(..., description="Summary of news analyzed")
    sentiment: str = Field(..., description="Overall sentiment: Positive, Negative, or Neutral")
    people_names: List[str] = Field(default_factory=list, description="List of people mentioned")
    places_names: List[str] = Field(default_factory=list, description="List of places mentioned")
    other_companies_referred: List[str] = Field(default_factory=list, description="List of other companies mentioned")
    related_industries: List[str] = Field(default_factory=list, description="List of related industries")
    market_implications: str = Field(..., description="Potential market impact analysis")
    confidence_score: float = Field(..., ge=0.0, le=1.0, description="Confidence score of the analysis (0.0 to 1.0)")

# Step 1: Stock code extraction
def get_stock_code(inputs: dict) -> dict:
    span = langfuse.span(
        name="stock_code_extraction",
        input=inputs,
        metadata={"step": "ticker_lookup"}
    )

    company_name = inputs["company_name"].lower()
    ticker_map = {
        "apple": "AAPL", "microsoft": "MSFT", "google": "GOOGL",
        "amazon": "AMZN", "meta": "META", "tesla": "TSLA",
        "nvidia": "NVDA", "netflix": "NFLX", "intel": "INTC",
        "amd": "AMD", "ibm": "IBM", "oracle": "ORCL"
    }

    ticker = ticker_map.get(company_name)
    if not ticker:
        try:
            search_result = yf.Ticker(company_name)
            if search_result.info:
                ticker = search_result.info.get('symbol', 'UNKNOWN')
        except Exception as e:
            span.end(metadata={"error": str(e)})
            raise ValueError(f"Could not find ticker for company: {company_name}")

    span.end(output={"ticker": ticker})
    return {"ticker": ticker}

stock_code_chain = TransformChain(
    input_variables=["company_name"],
    output_variables=["ticker"],
    transform=get_stock_code
)

# Step 2: Fetch news using Selenium + BeautifulSoup to handle dynamic JS content
def fetch_news_selenium(inputs: dict) -> dict:
    span = langfuse.span(
        name="news_fetching",
        input=inputs,
        metadata={"step": "news_retrieval"}
    )

    ticker = inputs["ticker"]
    url = f"https://finance.yahoo.com/quote/{ticker}/news?p={ticker}"

    options = Options()
    options.add_argument("--headless")
    options.add_argument("--no-sandbox")
    options.add_argument("--disable-dev-shm-usage")

    # If ChromeDriver is NOT in PATH, specify executable_path in the line below:
    driver = webdriver.Chrome(options=options)

    try:
        driver.get(url)
        time.sleep(5)  # wait for page to load dynamic content
        
        html = driver.page_source
        soup = BeautifulSoup(html, "html.parser")

        news_items = []
        articles = soup.select("li.js-stream-content")
        for article in articles[:5]:  # grab top 5 news
            title_elem = article.find('h3')
            link_elem = article.find('a', href=True)
            summary_elem = article.find('p')
            
            title = title_elem.get_text(strip=True) if title_elem else ""
            link = f"https://finance.yahoo.com{link_elem['href']}" if link_elem else ""
            summary = summary_elem.get_text(strip=True) if summary_elem else ""
            
            if title:
                news_items.append({
                    "title": title,
                    "link": link,
                    "summary": summary,
                    "publisher": "Yahoo Finance",
                    "published": ""
                })

        if not news_items:
            raise ValueError("No news articles found on Yahoo Finance page")

        span.end(output={"news_count": len(news_items)})
        return {"news": news_items}

    except Exception as e:
        span.end(metadata={"error": str(e)})
        return {"news": []}
    finally:
        driver.quit()

news_fetch_chain = TransformChain(
    input_variables=["ticker"],
    output_variables=["news"],
    transform=fetch_news_selenium
)

# Step 3: Sentiment analysis
def analyze_sentiment(inputs: dict) -> dict:
    span = langfuse.span(
        name="sentiment_analysis",
        input=inputs,
        metadata={"step": "sentiment_analysis"}
    )

    parser = PydanticOutputParser(pydantic_object=SentimentAnalysisResult)
    model = AzureChatOpenAI(model='myllm')

    prompt = ChatPromptTemplate.from_template("""
Analyze the following news articles about {company_name} ({ticker}) and provide a structured sentiment analysis.

News Articles:
{news}

{format_instructions}

IMPORTANT: 
1. Your output must be valid JSON without any trailing commas.
2. If no news is available, provide a neutral analysis with appropriate market implications.
3. Include key entities when possible.
""")

    chain = prompt | model | parser

    formatted_news = "\n".join(
        [f"Title: {item['title']}\nSummary: {item.get('summary', 'N/A')}"
         for item in inputs["news"]]
    ) or "No news articles available for analysis."

    try:
        result = chain.invoke({
            "company_name": inputs["company_name"],
            "ticker": inputs["ticker"],
            "news": formatted_news,
            "format_instructions": parser.get_format_instructions()
        }, config={"callbacks": [handler]})

        validated_result = result.model_dump()  # Pydantic v2+

        span.end(output=validated_result)
        return {"sentiment_analysis": validated_result}
    except Exception as e:
        span.end(metadata={"error": str(e)})
        raise ValueError(f"Sentiment analysis failed: {str(e)}")

sentiment_analysis_chain = TransformChain(
    input_variables=["company_name", "ticker", "news"],
    output_variables=["sentiment_analysis"],
    transform=analyze_sentiment
)

# Combine chains into a pipeline
pipeline = SequentialChain(
    chains=[stock_code_chain, news_fetch_chain, sentiment_analysis_chain],
    input_variables=["company_name"],
    output_variables=["sentiment_analysis"],
    verbose=True
)

# Main runner with Langfuse tracing
def analyze_company_sentiment(company_name: str) -> dict:
    trace = langfuse.trace(
        name="market_sentiment_analysis",
        input={"company_name": company_name}
    )
    try:
        result = pipeline.invoke(
            {"company_name": company_name},
            config={"callbacks": [handler]}
        )

        trace.update(
            output=result["sentiment_analysis"],
            metadata={"status": "success"}
        )
        return result["sentiment_analysis"]
    except Exception as e:
        trace.update(
            metadata={"status": "failed", "error": str(e)}
        )
        return {
            "error": str(e),
            "company_name": company_name,
            "stock_code": "N/A",
            "sentiment": "Unknown"
        }

# Run example
if __name__ == "__main__":
    print(f"Using SSL certificates from: {certifi.where()}")
    try:
        analysis = analyze_company_sentiment("microsoft")
        print(json.dumps(analysis, indent=2))
    except Exception as e:
        print(f"Error occurred: {str(e)}")
        print("Possible fixes:")
        print(" - Ensure ChromeDriver is installed and in your PATH")
        print(" - Check network connection")
        print(" - Verify your Azure OpenAI credentials")


Using SSL certificates from: /home/user/GAAPB03-training/gen-ai/lib/python3.12/site-packages/certifi/cacert.pem


[1m> Entering new SequentialChain chain...[0m

[1m> Finished chain.[0m
{
  "company_name": "Microsoft",
  "stock_code": "MSFT",
  "newsdesc": "No news articles available for analysis. A neutral sentiment is adopted given the absence of recent developments.",
  "sentiment": "Neutral",
  "people_names": [],
  "places_names": [],
  "other_companies_referred": [],
  "related_industries": [
    "Technology",
    "Software",
    "Cloud Computing"
  ],
  "market_implications": "The lack of recent news may suggest stability in the market perception of Microsoft. Investors may consider this as a time to maintain their positions without any immediate catalysts for action.",
  "confidence_score": 0.5
}
