In [1]:
from langchain_community.tools.yahoo_finance_news import YahooFinanceNewsTool
from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain_community.tools import DuckDuckGoSearchResults
from langchain_core.tools import tool
from langchain_core.messages import ToolMessage, AIMessage
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Tuple
import matplotlib.pyplot as plt
from textwrap import dedent
import time
import networkx as nx
import sqlite3

USER_AGENT environment variable not set, consider setting it to identify your requests.


In [2]:
LMSTUDIO_URL = "http://localhost:1234/v1"
LLM_NAME = "gemma-3-27b-it"
ENTITY_EXTRACTOR = "qwen/qwen3-next-80b"
EXTRACTION_TEMPERATURE = 0
FETCH_DELAY_SECONDS = 1.5
SQLITE_DB_PATH = "kg_store.db"

In [3]:

y_finance = YahooFinanceNewsTool()
wiki = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
ddgo = DuckDuckGoSearchResults()

In [4]:
def extract_messages_by_type(result, message_type, index=0):
    """Extract messages of a specific type from LangChain agent result"""
    messages = result['messages']
    filtered_messages = [msg for msg in messages if isinstance(msg, message_type)]
    return filtered_messages[index] if len(filtered_messages) > index else None


In [5]:
class Entity(BaseModel):
    text: str = Field(description="The extracted entity text")
    label: str = Field(description="Entity type (PERSON, ORGANIZATION, STOCK_SYMBOL, etc.)")
    confidence: Optional[float] = Field(description="Confidence score", default=None)


In [6]:
class NERResponse(BaseModel):
    entities: List[Entity] = Field(description="List of extracted named entities")


In [7]:
extraction_model = ChatOpenAI(
    base_url=LMSTUDIO_URL,
    api_key="dummy",
    model_name=ENTITY_EXTRACTOR,
    temperature=0
).with_structured_output(NERResponse)

In [8]:
def extract_named_entities(text: str) -> NERResponse:
    """
    Run deterministic NER using a structured-output LLM.
    Returns a NERResponse (pydantic validated).
    """
    prompt = dedent(f"""
    Extract named entities from this text.

    Include these entity types:
    PERSON, ORGANIZATION, STOCK_SYMBOL, MONEY, DATE, LOCATION, PRODUCT, METRIC, EVENT.

    Output must be a JSON object containing a single key "entities" which is a list
    of objects with "text", "label", and optional "confidence".

    Text:
    {text}
    """)
    print("Calling extraction_model for NER")
    result: NERResponse = extraction_model.invoke(prompt)
    return result

In [18]:
def fetch_entity_info(entity_name: str, max_length_chars: int = 20_000) -> str:
    """
    Try multiple sources in order; return the first good text result.
    - yahoo_tool
    - wiki_tool
    - duck_tool (fallback)
    Returns a string (possibly short) describing the entity.
    """
    # 1) Yahoo Finance (news)
    try:
        print(f"Trying YahooNews for '{entity_name}'")
        res = y_finance.run(entity_name)
        print(F"YFinance Results: {res}")
        if res:
            # some tools return short failure messages; ignore those
            if isinstance(res, str) and ("No news found" in res or len(res.strip()) < 30):
                print("Yahoo returned no useful result")
            else:
                return str(res)[:max_length_chars]
    except Exception as e:
        print(f"Yahoo tool failed for {entity_name}: {e}")

    # 2) Wikipedia
    try:
        print(f"Trying Wikipedia for '{entity_name}'")
        res = wiki.run(entity_name)
        if res:
            return str(res)[:max_length_chars]
    except Exception as e:
        print(f"Wikipedia tool failed for {entity_name}: {e}")

    # 3) DuckDuckGo fallback
    try:
        print(f"Trying DuckDuckGo for '{entity_name}'")
        res = ddgo.run(entity_name)
        if res:
            return str(res)[:max_length_chars]
    except Exception as e:
        print(f"DuckDuckGo tool failed for {entity_name}: {e}")

    return f"No data found for {entity_name}"

In [19]:
def add_entities_to_graph(G: nx.Graph, seed_entity: str, seed_label: str, discovered_entities: List[Entity], source_text_snippet: Optional[str] = None):
    """
    Add node for seed_entity and nodes/edges for discovered_entities.
    Edge metadata includes 'relation' heuristically set to 'mentioned_with' and
    stores small snippet or count metadata.
    """
    if not G.has_node(seed_entity):
        G.add_node(seed_entity, label=seed_label)

    for ent in discovered_entities:
        if not G.has_node(ent.text):
            G.add_node(ent.text, label=ent.label)
        if not G.has_edge(seed_entity, ent.text):
            G.add_edge(seed_entity, ent.text, relation="mentioned_with", weight=1, snippet=(source_text_snippet or "")[:400])
        else:
            # increment weight if edge exists
            G[seed_entity][ent.text]["weight"] = G[seed_entity][ent.text].get("weight", 1) + 1

In [20]:
def relation_heuristic(seed_entity: str, seed_text: str, discovered_text: str) -> str:
    """
    Optionally attempt to infer a relation name between seed and discovered text.
    (This is a small placeholder function — you can replace with LLM-based relation extraction.)
    """
    # simple heuristics:
    if "acquir" in discovered_text.lower() or "acquire" in discovered_text.lower():
        return "acquired"
    if "ceo" in discovered_text.lower() or "chairman" in discovered_text.lower():
        return "has_leader"
    if "subsidiary" in discovered_text.lower() or "subsidiar" in discovered_text.lower():
        return "subsidiary"
    return "mentioned_with"

In [21]:
def expand_graph(seed_text: str, depth: int = 2, throttle: float = FETCH_DELAY_SECONDS, max_entities_per_layer: int = 8) -> nx.Graph:
    """
    Build a knowledge graph starting from seed_text.
    depth=1 -> only extract entities from seed_text
    depth=2 -> extract entities -> fetch per-entity -> extract entities from fetched content
    depth=3 -> repeat again (third layer)
    """
    G = nx.Graph()
    visited: Dict[str, int] = {}  # entity_text -> layer found
    # initial NER on seed_text
    print("Extracting initial entities (layer 1)")
    initial = extract_named_entities(seed_text).entities
    current_entities: List[Entity] = initial

    for layer in range(1, depth + 1):
        print(f"=== LAYER {layer} (entities to process: {len(current_entities)}) ===")
        next_layer_entities: List[Entity] = []

        # limit how many entities we expand per layer
        to_process = current_entities[:max_entities_per_layer]

        for ent in to_process:
            entity_key = ent.text
            if entity_key in visited:
                print(f"Skipping already visited entity: {entity_key}")
                continue

            visited[entity_key] = layer
            print(f"Layer {layer}: fetching info for '{entity_key}' (label={ent.label})")
            info_text = fetch_entity_info(entity_key)
            time.sleep(throttle)

            if info_text and not info_text.startswith("No data found"):
                # Run NER on the fetched info to discover new entities
                try:
                    discovered = extract_named_entities(info_text).entities
                except Exception as e:
                    print(f"NER on fetched info failed for {entity_key}: {e}")
                    discovered = []

                # add edges between seed entity and discovered entities
                add_entities_to_graph(G, seed_entity=entity_key, seed_label=ent.label, discovered_entities=discovered, source_text_snippet=info_text)

                # Add discovered to next layer (if not visited)
                for d in discovered:
                    if d.text not in visited and d.text != entity_key:
                        next_layer_entities.append(d)
            else:
                print(f"No useful info found for {entity_key}")

        # prepare for next iteration
        current_entities = next_layer_entities

    return G

In [22]:
def save_graph_to_sqlite(G: nx.Graph, sqlite_path: str = SQLITE_DB_PATH):
    """
    Save nodes and edges into a SQLite DB (simple schema).
    Tables:
      - nodes(id INTEGER PRIMARY KEY, name TEXT UNIQUE, label TEXT)
      - edges(id INTEGER PRIMARY KEY, source TEXT, target TEXT, relation TEXT, weight INTEGER, snippet TEXT)
    """
    print(f"Saving graph to SQLite DB at {sqlite_path}")
    conn = sqlite3.connect(sqlite_path)
    c = conn.cursor()
    c.execute("""CREATE TABLE IF NOT EXISTS nodes (
                 id INTEGER PRIMARY KEY,
                 name TEXT UNIQUE,
                 label TEXT
                 )""")
    c.execute("""CREATE TABLE IF NOT EXISTS edges (
                 id INTEGER PRIMARY KEY,
                 source TEXT,
                 target TEXT,
                 relation TEXT,
                 weight INTEGER,
                 snippet TEXT
                 )""")
    # Insert nodes
    for n, data in G.nodes(data=True):
        label = data.get("label", "")
        try:
            c.execute("INSERT OR IGNORE INTO nodes (name, label) VALUES (?, ?)", (n, label))
        except Exception as e:
            print(f"Failed to insert node {n}: {e}")

    # Insert edges
    for u, v, data in G.edges(data=True):
        relation = data.get("relation", "mentioned_with")
        weight = data.get("weight", 1)
        snippet = data.get("snippet", "")
        try:
            c.execute("INSERT INTO edges (source, target, relation, weight, snippet) VALUES (?, ?, ?, ?, ?)",
                      (u, v, relation, weight, snippet))
        except Exception as e:
            print(f"Failed to insert edge {u}-{v}: {e}")

    conn.commit()
    conn.close()
    print("Graph saved to SQLite.")

In [23]:
def visualize_graph(G: nx.Graph, figsize: Tuple[int, int] = (10, 10), max_nodes: int = 100):
    """
    Simple networkx visualization. Limits nodes to `max_nodes`.
    """
    if G.number_of_nodes() == 0:
        print("Graph is empty — nothing to visualize.")
        return

    H = G.copy()
    if H.number_of_nodes() > max_nodes:
        # take highest-degree nodes
        degrees = sorted(H.degree, key=lambda kv: kv[1], reverse=True)
        keep = {n for n, _ in degrees[:max_nodes]}
        H = H.subgraph(keep).copy()

    pos = nx.spring_layout(H, seed=42)
    labels = {n: n if len(n) <= 20 else n[:17] + "..." for n in H.nodes()}
    plt.figure(figsize=figsize)
    nx.draw_networkx_nodes(H, pos, node_size=600, alpha=0.9)
    nx.draw_networkx_edges(H, pos, alpha=0.6)
    nx.draw_networkx_labels(H, pos, labels, font_size=8)
    plt.axis("off")
    plt.show()

In [24]:
def fetch_financial_data(entity_name: str) -> Optional[dict]:
    """
    Attempts to fetch structured financial data for a stock/company symbol.
    Uses YahooFinanceTool (wraps yfinance).
    Returns a dict like:
    {
      "symbol": "RELIANCE.NS",
      "price": 2850.35,
      "currency": "INR",
      "previous_close": 2840.00,
      "market_cap": 2000000000000
    }
    """
    try:
        data = y_finance.run(entity_name)
        if isinstance(data, dict) and len(data) > 0:
            return data
        elif isinstance(data, str) and len(data.strip()) > 10:
            # sometimes returns a stringified JSON
            import json
            try:
                return json.loads(data)
            except json.JSONDecodeError:
                return {"raw_response": data}
    except Exception as e:
        print(f"Finance data lookup failed for {entity_name}: {e}")
    return None

In [25]:
stock_symbol = "MSFT"

In [26]:
graph = expand_graph(stock_symbol, depth=3, throttle=1.5, max_entities_per_layer=6)

Extracting initial entities (layer 1)
Calling extraction_model for NER
=== LAYER 1 (entities to process: 1) ===
Layer 1: fetching info for 'MSFT' (label=STOCK_SYMBOL)
Trying YahooNews for 'MSFT'
YFinance Results: No news found for company that searched with MSFT ticker.
Yahoo returned no useful result
Trying Wikipedia for 'MSFT'




  lis = BeautifulSoup(html).find_all('li')


Calling extraction_model for NER


KeyboardInterrupt: 

In [None]:
extraction_prompt = dedent(F"""
Tool Call Results:
{tool_message.content}

AI Agent Message: 
{second_ai_message.content}
""")

In [None]:
result = extract_named_entities(text=extraction_prompt)

In [None]:
print(result.model_dump_json(indent=2))