
# Investor Ownership & News Pipeline (Gemini + Fintel + Yahoo + Weaviate + RDF)

This notebook implements an end-to-end pipeline to extract investor–company ownership relationships and supporting articles, normalize majority/minority semantics, serialize to RDF, and ingest into Weaviate for Graph-RAG.

**Features**
- spaCy NER to extract target **Company (ORG)** from user query
- **Gemini** search for recent articles with **URLs** (stored as `source_url`)
- **Fintel** extraction via **cloudscraper** + **markdownify** with **13D/G** priority; falls back to **13F**
- **Yahoo Finance** institutional holders (with `source_url`)
- Alias normalization → `majority` if `percentage >= 50` else `minority`
- In-memory **RDF** graph for KG
- **Weaviate** single-class schema with `source_url` and `investmentDate` and ingestion helpers

> Notes:  
> - Set your Gemini API key in the environment variable `GEMINI_API_KEY`.  
> - Set your Weaviate endpoint in `WEAVIATE_URL`.  
> - This notebook includes graceful fallbacks so it can run without external access for demos.


In [None]:

# Optional installs (uncomment if needed)
# %pip install google-generativeai spacy yfinance cloudscraper markdownify rdflib weaviate-client fastapi uvicorn nbformat
# %pip install git+https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl
# or: python -m spacy download en_core_web_sm


In [None]:

import os, re, uuid, json, datetime, math
from typing import List, Dict, Any, Optional, Tuple

# --- LLM / NLP ---
import spacy

# --- Web / scraping ---
import requests
import cloudscraper
from markdownify import markdownify as md

# --- Finance data ---
import yfinance as yf

# --- KG / RDF ---
from rdflib import Graph, Namespace, URIRef, Literal
from rdflib.namespace import RDF, XSD

# --- Weaviate ---
try:
    import weaviate
    WEAVIATE_AVAILABLE = True
except Exception as e:
    print("Weaviate client not available:", e)
    WEAVIATE_AVAILABLE = False

# --- Gemini ---
GEMINI_OK = False
try:
    import google.generativeai as genai
    if os.getenv("GEMINI_API_KEY"):
        genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
        GEMINI_OK = True
except Exception as e:
    print("Gemini SDK not available or not configured:", e)
    GEMINI_OK = False

# --- spaCy model ---
try:
    nlp = spacy.load("en_core_web_sm")
except Exception:
    try:
        from spacy.cli import download
        download("en_core_web_sm")
        nlp = spacy.load("en_core_web_sm")
    except Exception as e:
        print("spaCy model load failed; using blank English model as fallback:", e)
        nlp = spacy.blank("en")

In [None]:

RELATIONSHIP_SYNONYMS = {
    "majority": [
        "principle stakeholder", "principal stakeholder", "major stakeholder",
        "majority stakeholder", "voting control", "voting majority",
        "buyout", "acquisition", "acquired", "controlling interest", "equity control"
    ],
    "minority": [
        "equity stake", "invest along with", "small shareholding",
        "minority interest", "limited influence", "passive investment as minority"
    ],
}

ALL_SYNONYMS = [s.lower() for lst in RELATIONSHIP_SYNONYMS.values() for s in lst]

def detect_relationship_synonyms(text: str) -> List[str]:
    if not text:
        return []
    text_l = text.lower()
    found = []
    for syn in ALL_SYNONYMS:
        if syn in text_l:
            found.append(syn)
    seen = set(); out = []
    for s in found:
        if s not in seen:
            out.append(s); seen.add(s)
    return out

def normalize_tag(synonyms: List[str], percentage: Optional[float] = None) -> str:
    if percentage is not None and not (isinstance(percentage, (int, float)) and not math.isnan(float(percentage))):
        percentage = None
    if percentage is not None:
        return "majority" if float(percentage) >= 50.0 else "minority"
    for syn in synonyms:
        for tag, tag_syns in RELATIONSHIP_SYNONYMS.items():
            if syn in [s.lower() for s in tag_syns]:
                return tag
    return "unknown"

In [None]:

COMPANY_TICKER_MAP = {
    "BlackRock": "BLK",
    "Apple": "AAPL",
    "Microsoft": "MSFT",
    "Tesla": "TSLA",
    "Amazon": "AMZN",
    "Dropbox": "DBX",
    "Company B": "B",
}

def extract_target_company(query: str) -> str:
    doc = nlp(query)
    for ent in getattr(doc, "ents", []):
        if getattr(ent, "label_", None) == "ORG":
            return ent.text
    m = re.search(r"[A-Z][\w&.,()\- ]{2,}", query)
    return m.group(0).strip() if m else "Company B"

def get_ticker(company_name: str) -> Optional[str]:
    ticker = COMPANY_TICKER_MAP.get(company_name.strip())
    if ticker:
        return ticker
    if company_name.isupper() and 1 <= len(company_name) <= 5:
        return company_name
    return None

In [None]:

def gemini_search_news(query: str, max_items: int = 5) -> List[Dict[str, Any]]:
    if not GEMINI_OK:
        return [
            {"title": f"{query} majority buyout reported","url": "https://example.com/article-1","summary": f"Report suggests a controlling interest related to {query}.","date": "2024-05-15"},
            {"title": f"{query} minority investments update","url": "https://example.com/article-2","summary": f"Smaller equity stakes discussed for {query}.","date": "2024-02-10"}
        ]
    model = genai.GenerativeModel("gemini-1.5-pro")
    prompt = (
        "Search the web and return a STRICT JSON array of objects with keys: "
        "title, url, summary, date (YYYY-MM-DD). "
        "Focus on institutional ownership, 13D/13G filings, control/majority, buyouts. "
        f"Query: {query}"
    )
    try:
        resp = model.generate_content(prompt)
        text = resp.text.strip()
        data = None
        try:
            data = json.loads(text)
            if isinstance(data, dict) and 'results' in data:
                data = data['results']
        except Exception:
            import re as _re
            m = _re.search(r"\[\s*{[\s\S]*?}\s*\]", text)
            if m:
                data = json.loads(m.group(0))
        if not data or not isinstance(data, list):
            raise ValueError("Non-JSON or empty response")
        out = []
        for item in data[:max_items]:
            out.append({
                "title": str(item.get("title","")).strip(),
                "url": str(item.get("url","")).strip(),
                "summary": str(item.get("summary","")).strip(),
                "date": str(item.get("date","")).strip() or None
            })
        return [x for x in out if x.get("url")]
    except Exception as e:
        return [{"title": f"{query} majority buyout reported","url": "https://example.com/article-1","summary": f"Report suggests a controlling interest related to {query}.","date": "2024-05-15"}]

In [None]:

def _extract_markdown_tables(markdown_text: str) -> List[List[str]]:
    lines = markdown_text.splitlines()
    tables = []
    current = []
    for line in lines:
        if "|" in line:
            current.append(line)
        elif current:
            if any('---' in ln for ln in current[:3]):
                tables.append(current)
            current = []
    if current and any('---' in ln for ln in current[:3]):
        tables.append(current)
    return tables

def _parse_md_table(table_lines: List[str]) -> List[Dict[str, str]]:
    if not table_lines: return []
    header = [h.strip() for h in table_lines[0].split("|") if h.strip()]
    rows = []
    for row_line in table_lines[2:]:
        cols = [c.strip() for c in row_line.split("|") if c.strip()]
        if len(cols) != len(header):
            continue
        rows.append(dict(zip(header, cols)))
    return rows

def fetch_fintel_ownership(ticker: str) -> List[Dict[str, Any]]:
    url = f"https://fintel.io/so/us/{ticker.lower()}"
    try:
        scraper = cloudscraper.create_scraper(browser={
            'browser': 'chrome',
            'platform': 'windows',
            'mobile': False
        })
        html = scraper.get(url).text
        markdown_text = md(html)
        tables = _extract_markdown_tables(markdown_text)

        chosen_rows: List[Dict[str,str]] = []
        for tbl in tables:
            rows = _parse_md_table(tbl)
            if rows:
                keys = [k.lower() for k in rows[0].keys()]
                if any(("13d" in k) or ("13g" in k) or ("schedule" in k) or ("file" in k and "date" in k) for k in keys):
                    chosen_rows = rows
                    break
        if not chosen_rows and tables:
            chosen_rows = _parse_md_table(tables[0])
        if not chosen_rows:
            return []

        results = []
        for row in chosen_rows:
            investor = (
                row.get("Investor") or row.get("Holder") or row.get("Filer") or
                row.get("Institution") or next(iter(row.values()), None)
            )
            pct_text = (
                row.get("% Ownership") or row.get("% Out") or row.get("Ownership (%)") or
                row.get("% O/S") or row.get("Percent") or ""
            )
            date = (
                row.get("Effective Date") or row.get("File Date") or
                row.get("Filing Date") or row.get("Reported Date") or None
            )
            pct_val = None
            if pct_text:
                try:
                    pct_val = float(pct_text.replace("%", "").replace(",","").strip())
                except Exception:
                    pct_val = None

            results.append({
                "investor": investor,
                "company": ticker.upper(),
                "percentage": pct_val,
                "source": "Fintel (13D/G preferred; 13F fallback)",
                "investment_date": date,
                "source_url": url,
                "raw_text": " | ".join([f"{k}: {v}" for k, v in row.items()])
            })
        return results
    except Exception as e:
        print(f"Fintel fetch error for {ticker}: {e}")
        return []

In [None]:

def fetch_yahoo_ownership(ticker: str) -> List[Dict[str, Any]]:
    url = f"https://finance.yahoo.com/quote/{ticker.upper()}/holders"
    try:
        t = yf.Ticker(ticker.upper())
        df = t.institutional_holders
        if df is None or df.empty:
            return []
        results = []
        for _, row in df.iterrows():
            investor = row.get("Holder")
            pct = row.get("% Out")
            pct_val = None
            if pct is not None:
                try:
                    pct_val = float(str(pct).replace("%","").replace(",","").strip())
                except Exception:
                    pct_val = None
            results.append({
                "investor": investor,
                "company": ticker.upper(),
                "percentage": pct_val,
                "source": "Yahoo Finance (Institutional Holders)",
                "investment_date": None,
                "source_url": url,
                "raw_text": f"{investor} | %Out: {pct}"
            })
        return results
    except Exception as e:
        print(f"Yahoo ownership fetch error for {ticker}: {e}")
        return []

In [None]:

def merge_events(events: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    by_key: Dict[Tuple[str,str], Dict[str, Any]] = {}
    for e in events:
        if not e.get("investor"):
            continue
        key = (e["investor"].strip().lower(), e["company"].strip().lower())
        if key not in by_key:
            by_key[key] = dict(e)
            by_key[key]["source_url"] = [e.get("source_url")] if e.get("source_url") else []
        else:
            p_existing = by_key[key].get("percentage")
            p_new = e.get("percentage")
            if p_new is not None and (p_existing is None or p_new > p_existing):
                by_key[key]["percentage"] = p_new
            by_key[key]["raw_text"] = (by_key[key].get("raw_text","") + " || " + e.get("raw_text","")).strip(" |")
            if e.get("source_url"):
                by_key[key]["source_url"].append(e["source_url"])
        if not by_key[key].get("investment_date") and e.get("investment_date"):
            by_key[key]["investment_date"] = e["investment_date"]
        if e.get("source") and "Yahoo" in e["source"]:
            by_key[key]["source"] = e["source"]
        elif e.get("source") and "Fintel" in e["source"]:
            by_key[key]["source"] = e["source"]
    out = []
    for e in by_key.values():
        syns = detect_relationship_synonyms(e.get("raw_text",""))
        tag = normalize_tag(syns, e.get("percentage"))
        e["relationship_tag"] = tag
        e["relationship_synonyms"] = syns
        urls = [u for u in (e.get("source_url") or []) if u]
        e["source_url"] = sorted(set(urls))
        out.append(e)
    return out

In [None]:

EX = Namespace("http://example.com/onto/")
EXR = Namespace("http://example.com/resource/")

def event_to_rdf(event: Dict[str, Any]) -> Graph:
    g = Graph()
    g.bind("ex", EX); g.bind("exr", EXR)
    inv = (event.get("investor") or "unknown-investor").lower()
    co = (event.get("company") or "unknown-company").lower()
    actor_uri = URIRef(EXR + re.sub(r'\\W+','-', inv))
    target_uri = URIRef(EXR + re.sub(r'\\W+','-', co))
    rel_uri = URIRef(EXR + f"OwnershipRelationship-{uuid.uuid4().hex[:8]}")
    g.add((actor_uri, RDF.type, EX.Investor))
    g.add((target_uri, RDF.type, EX.Company))
    g.add((rel_uri, RDF.type, EX.OwnershipRelationship))
    g.add((rel_uri, EX.investor, actor_uri))
    g.add((rel_uri, EX.company, target_uri))
    pct = event.get("percentage")
    if pct is not None:
        try:
            g.add((rel_uri, EX.percentage, Literal(float(pct), datatype=XSD.decimal)))
        except Exception:
            pass
    g.add((rel_uri, EX.relationshipTag, Literal(event.get("relationship_tag","unknown"))))
    for syn in event.get("relationship_synonyms", []):
        g.add((rel_uri, EX.relationshipSynonym, Literal(syn)))
    for u in event.get("source_url", []):
        try:
            g.add((rel_uri, EX.sourceURL, URIRef(u)))
        except Exception:
            g.add((rel_uri, EX.sourceURL, Literal(u)))
    if event.get("investment_date"):
        try:
            g.add((rel_uri, EX.investmentDate, Literal(event["investment_date"], datatype=XSD.date)))
        except Exception:
            g.add((rel_uri, EX.investmentDate, Literal(event["investment_date"])))
    return g

In [None]:

WEAVIATE_URL = os.getenv("WEAVIATE_URL", "http://localhost:8080")

def ensure_weaviate_schema():
    if not WEAVIATE_AVAILABLE:
        print("Weaviate client not installed; skipping schema setup.")
        return
    client = weaviate.Client(WEAVIATE_URL)
    class_name = "OwnershipRelation"
    schema_def = {
        "class": class_name,
        "description": "Investor->Company ownership/investment relation",
        "vectorizer": "none",
        "properties": [
            {"name":"investor", "dataType":["text"], "description":"Investor entity"},
            {"name":"company", "dataType":["text"], "description":"Target company"},
            {"name":"percentage", "dataType":["number"], "description":"Ownership %"},
            {"name":"relationshipType", "dataType":["text"], "description":"majority/minority/unknown"},
            {"name":"investmentDate", "dataType":["date"], "description":"Date from filing/news"},
            {"name":"source", "dataType":["text"], "description":"Source descriptor (Fintel/Yahoo/News)"},
            {"name":"source_url", "dataType":["text[]"], "description":"URLs used to derive this record"},
            {"name":"raw_text", "dataType":["text"], "description":"Raw extracted text blob"}
        ]
    }
    schema = client.schema.get()
    existing = [c["class"] for c in schema.get("classes", [])]
    if class_name in existing:
        client.schema.delete_class(class_name)
    client.schema.create_class(schema_def)
    print("✅ Weaviate schema ensured for class:", class_name)

def weaviate_upsert_events(events: List[Dict[str, Any]]):
    if not WEAVIATE_AVAILABLE:
        print("Weaviate client not installed; skipping ingestion.")
        return
    client = weaviate.Client(WEAVIATE_URL)
    batch = client.batch
    batch.configure(batch_size=50)
    with batch as b:
        for e in events:
            props = {
                "investor": e.get("investor"),
                "company": e.get("company"),
                "percentage": e.get("percentage"),
                "relationshipType": e.get("relationship_tag"),
                "investmentDate": e.get("investment_date"),
                "source": e.get("source"),
                "source_url": e.get("source_url"),
                "raw_text": e.get("raw_text")
            }
            b.add_data_object(props, class_name="OwnershipRelation")
    print(f"✅ Ingested {len(events)} events into Weaviate")

In [None]:

def run_pipeline(query: str) -> Dict[str, Any]:
    company = extract_target_company(query)
    ticker = get_ticker(company) or company
    yahoo = fetch_yahoo_ownership(ticker)
    fintel = fetch_fintel_ownership(ticker)
    news = gemini_search_news(f"{company} institutional ownership OR 13D OR 13G OR controlling interest")
    news_events = []
    for n in news:
        news_events.append({
            "investor": None,
            "company": str(ticker).upper(),
            "percentage": None,
            "source": "Gemini (news)",
            "investment_date": n.get("date"),
            "source_url": n.get("url"),
            "raw_text": f"{n.get('title','')} :: {n.get('summary','')}"
        })
    merged = merge_events(yahoo + fintel)
    g_all = Graph(); g_all.bind("ex","http://example.com/onto/"); g_all.bind("exr","http://example.com/resource/")
    for e in merged:
        news_urls = [n["url"] for n in news]
        e["source_url"] = sorted(set((e.get("source_url") or []) + news_urls))
        g_all += event_to_rdf(e)
    rdf_turtle = g_all.serialize(format="turtle")
    if hasattr(rdf_turtle, "decode"):
        rdf_turtle = rdf_turtle.decode("utf-8")
    ensure_weaviate_schema()
    weaviate_upsert_events(merged)
    maj = [e for e in merged if e.get("relationship_tag") == "majority"]
    mino = [e for e in merged if e.get("relationship_tag") == "minority"]
    summary = {
        "company": company,
        "ticker": ticker,
        "counts": {"majority": len(maj), "minority": len(mino), "total": len(merged)},
        "note": "Percentages >= 50 are tagged as 'majority'; otherwise 'minority'."
    }
    return {
        "company": company,
        "ticker": ticker,
        "events": merged,
        "news": news,
        "rdf_turtle": rdf_turtle,
        "summary": summary
    }

In [None]:

# --- DEMO ---
result = run_pipeline("Who are the largest institutional holders of Dropbox?")
print(json.dumps(result["summary"], indent=2))
print("\nTop 3 events (preview):")
for e in result["events"][:3]:
    print(json.dumps(e, indent=2))
print("\nRDF Turtle (first 600 chars):\n", result["rdf_turtle"][:600], "...")
print("\nNews items (preview):")
for n in result["news"][:3]:
    print("-", n.get("title"), n.get("url"))