In [1]:
from __future__ import annotations

import re
from typing import Tuple, Optional

import requests


DEFAULT_HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/124.0 Safari/537.36"
    )
}


def fetch_page(url: str, timeout: float = 15.0) -> Tuple[Optional[str], str]:
    """Fetches a URL and returns (title, text). Best-effort, no external services.

    - Uses requests with a friendly UA and timeout.
    - Tries BeautifulSoup if available for better parsing; otherwise falls back to regex strip.
    """
    resp = requests.get(url, headers=DEFAULT_HEADERS, timeout=timeout)
    resp.raise_for_status()
    html = resp.text

    title = None
    text_content = None

    try:
        from bs4 import BeautifulSoup  # type: ignore

        soup = BeautifulSoup(html, "html.parser")
        title = soup.title.get_text(strip=True) if soup.title else None
        # Remove script/style
        for tag in soup(["script", "style", "noscript"]):
            tag.decompose()
        text_content = soup.get_text(" ", strip=True)
    except Exception:
        # Fallback: naive tag removal
        title_match = re.search(r"<title>(.*?)</title>", html, flags=re.I | re.S)
        if title_match:
            title = re.sub(r"\s+", " ", title_match.group(1)).strip()
        # Remove scripts/styles
        html = re.sub(r"<script[\s\S]*?</script>", " ", html, flags=re.I)
        html = re.sub(r"<style[\s\S]*?</style>", " ", html, flags=re.I)
        # Strip tags
        text_only = re.sub(r"<[^>]+>", " ", html)
        text_content = re.sub(r"\s+", " ", text_only).strip()

    return title, text_content or ""



In [2]:
url = 'https://www.cnbc.com/2025/10/31/cre-companies-ai-goals.html'
fetch_page(url)

("Few CRE companies have achieved their AI goals. Here's why",
 'Few CRE companies have achieved their AI goals. Here\'s why Skip Navigation Markets Pre-Markets U.S. Markets Europe Markets China Markets Asia Markets World Markets Currencies Cryptocurrency Futures & Commodities Bonds Funds & ETFs Business Economy Finance Health & Science Media Real Estate Energy Climate Transportation Industrials Retail Wealth Sports Life Small Business Investing Personal Finance Fintech Financial Advisors Options Action ETF Street Buffett Archive Earnings Trader Talk Tech Cybersecurity AI Enterprise Internet Media Mobile Social Media CNBC Disruptor 50 Tech Guide Politics White House Policy Defense Congress Expanding Opportunity Europe Politics China Politics Asia Politics World Politics Video Latest Video Full Episodes Livestream Top Video Live Audio Europe TV Asia TV CNBC Podcasts CEO Interviews Digital Originals Watchlist Investing Club Trust Portfolio Analysis Trade Alerts Meeting Videos Homestretch

In [None]:
title = "Few CRE companies have achieved their AI goals. Here's why"
text_content = '''Few CRE companies have achieved their AI goals. Here\'s why Skip Navigation Markets Pre-Markets U.S. Markets Europe Markets China Markets Asia Markets World Markets Currencies Cryptocurrency Futures & Commodities Bonds Funds & ETFs Business Economy Finance Health & Science Media Real Estate Energy Climate Transportation Industrials Retail Wealth Sports Life Small Business Investing Personal Finance Fintech Financial Advisors Options Action ETF Street Buffett Archive Earnings Trader Talk Tech Cybersecurity AI Enterprise Internet Media Mobile Social Media CNBC Disruptor 50 Tech Guide Politics White House Policy Defense Congress Expanding Opportunity Europe Politics China Politics Asia Politics World Politics Video Latest Video Full Episodes Livestream Top Video Live Audio Europe TV Asia TV CNBC Podcasts CEO Interviews Digital Originals Watchlist Investing Club Trust Portfolio Analysis Trade Alerts Meeting Videos Homestretch Jim\'s Columns Education Subscribe PRO Pro News Josh Brown Mike Santoli Calls of the Day My Portfolio Livestream Full Episodes Stock Screener Market Forecast Options Investing Chart Investing Subscribe Livestream Menu Make It select USA INTL Livestream Search quotes, news & videos Livestream Watchlist SIGN IN Create free account Markets Business Investing Tech Politics Video Watchlist Investing Club PRO Livestream Menu Real Estate Housing Construction REITs Rising Risks Newsletter Sign-up CNBC Property Play Just 5% of CRE companies have achieved their AI goals. Here\'s why Published Fri, Oct 31 2025 8:00 AM EDT Updated Fri, Oct 31 2025 8:11 AM EDT Diana Olick @in/dianaolick @DianaOlickCNBC @DianaOlick WATCH LIVE Key Points Real estate companies are moving beyond initial testing and exploration of AI into more targeted applications that aim to redefine value, according to a new survey from JLL. JLL found that 88% of investors, owners and landlords said they have started piloting AI, with most pursuing an average of five use cases simultaneously. Just 5% of respondents said they have achieved all their program goals, while close to half said they have achieved two to three goals. Diminishing perspective of downtown London skyscrapers Chunyip Wong | Istock | Getty Images A version of this article first appeared in the CNBC Property Play newsletter with Diana Olick. Property Play covers new and evolving opportunities for the real estate investor, from individuals to venture capitalists, private equity funds, family offices, institutional investors and large public companies. Sign up to receive future editions, straight to your inbox. The commercial real estate market has been historically slow to modernize, and yet it appears to be accelerating its adoption of artificial intelligence. Companies are moving beyond initial testing and exploration into more targeted applications that aim to redefine value, according to a new survey from JLL. The survey of more than 1,500 senior CRE investor and occupier decision-makers across various industries found that, while still in the early stages, organizations are making AI a priority in their technology budgets. They are also moving from using it just for efficiency to focusing on how it can grow their businesses. JLL found that 88% of investors, owners and landlords said they have started piloting AI, with most pursuing an average of five use cases simultaneously. And more than 90% of occupiers are running corporate real estate AI pilots, according to the report. Compare that with just 5% starting AI pilots two years ago. The adoption is fast, but not entirely easy. Just 5% of respondents said they have achieved all their program goals, while close to half said they have achieved two to three goals. Much of the efforts are still experimental, without much growth. "If you think about commercial real estate, traditionally, it is not a quick technology adopter, and it\'s usually skeptical," said Yao Morin, chief technology officer at JLL. "So the high number of adoptions is actually quite surprising to me. What is not surprising on the flip side is that only 5% actually thinks that they have achieved all the goals. This is pretty aligned with a lot of other industries as well." Get Property Play directly to your inbox CNBC\'s Property Play with Diana Olick covers new and evolving opportunities for the real estate investor, delivered weekly to your inbox. Subscribe here to get access today . The reason they\'re not hitting their goals is because the goal line has moved. Companies have gone beyond just wanting to do certain tasks faster, or so-called operational efficiencies. Now they are tying AI to their revenue goals. For example, some are using it to help them improve their investment risk models, making investment and portfolio decisions based on the output of AI. That will require big changes to the fundamental way they operate. "When you really start moving towards the revenue side, the margin expansion side, then it\'s going to require a lot more than just using a technology," Morin explained. "You can\'t just say, \'Well, I\'m saving you 10% to do this particular thing.\' Companies need to actually rethink their operating model, to rethink how they organize to actually achieve the savings." And so companies are investing heavily in AI, despite economic headwinds. More than half of investors surveyed by JLL have been able to get significant budget growth over the past two years in the space. Their No. 1 spend is on strategic advisory on technology or AI, and most report their budgets have increased solely due to AI. After that, the spending goes to upgrading both cyber- and data-security measures and infrastructure for AI integration. Morin said what she found really surprising is that while most think companies will start using AI for simple tasks, or, low-risk, low-hanging fruit, that was not at all the case. "Our survey showed the opposite. We are getting to a point of sophistication, beyond this initial skeptical phase, where companies are really focusing on the competitive advantage to pressing business problems, using AI to solve instead of [just] those simple low-risk operations." More In CNBC Property Play Why global investment firm Nuveen is betting on this niche real estate subsector Diana Olick Commercial real estate is finally embracing blockchain. Here\'s what investors should know Diana Olick Major real estate developers are fast becoming power brokers Diana Olick Read More Subscribe to CNBC PRO Subscribe to Investing Club Licensing & Reprints CNBC Councils Supply Chain Values CNBC on Peacock Join the CNBC Panel Digital Products News Releases Closed Captioning Corrections About CNBC Internships Site Map Ad Choices Careers Help Contact News Tips Got a confidential news tip? We want to hear from you. Get In Touch CNBC Newsletters Sign up for free newsletters and get more CNBC delivered to your inbox Sign Up Now Get this delivered to your inbox, and more info about our products and services. Advertise With Us Please Contact Us Privacy Policy Your Privacy Choices CA Notice Terms of Service © 2025 CNBC LLC. All Rights Reserved. A Division of NBCUniversal Data is a real-time snapshot *Data is delayed at least 15 minutes.\n      Global Business and Financial News, Stock Quotes, and Market Data\n      and Analysis. Market Data Terms of Use and Disclaimers Data also provided by'''

In [19]:
from bs4 import BeautifulSoup

ModuleNotFoundError: No module named 'bs4'

In [21]:
import re, json, requests
from typing import Optional, Tuple, List
from bs4 import BeautifulSoup

DEFAULT_HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/124.0 Safari/537.36"
    )
}

def _first(s: List[Optional[str]]) -> Optional[str]:
    for v in s:
        if v and v.strip():
            return v.strip()
    return None

def _get_meta(soup: BeautifulSoup, names_props: List[Tuple[str, str]]) -> Optional[str]:
    # names_props: list of (attr_name, attr_value) pairs to try, e.g. ("property","og:title")
    for attr, val in names_props:
        tag = soup.find("meta", attrs={attr: val})
        if tag and tag.get("content"):
            return tag["content"].strip()
    return None

def _parse_jsonld_all(soup: BeautifulSoup) -> List[dict]:
    items = []
    for tag in soup.find_all("script", type="application/ld+json"):
        try:
            data = json.loads(tag.string or tag.text or "")
            if isinstance(data, list):
                items.extend([x for x in data if isinstance(x, dict)])
            elif isinstance(data, dict):
                items.append(data)
        except Exception:
            continue
    return items

def _jsonld_pick_article(jsonlds: List[dict]) -> Optional[dict]:
    # Prefer NewsArticle/Article nodes
    for node in jsonlds:
        t = node.get("@type")
        if t == "NewsArticle" or t == "Article" or (isinstance(t, list) and any(x in ("NewsArticle","Article") for x in t)):
            return node
    # Sometimes the article is nested
    for node in jsonlds:
        for k, v in node.items():
            if isinstance(v, dict) and v.get("@type") in ("NewsArticle","Article"):
                return v
            if isinstance(v, list):
                for it in v:
                    if isinstance(it, dict) and it.get("@type") in ("NewsArticle","Article"):
                        return it
    return None

def extract_cnbc(url: str, timeout: float = 20.0) -> Tuple[Optional[str], str]:
    r = requests.get(url, headers=DEFAULT_HEADERS, timeout=timeout)
    r.raise_for_status()
    html = r.text
    soup = BeautifulSoup(html, "html.parser")

    # ---------- TITLE ----------
    # 1) OpenGraph / Twitter
    title = _get_meta(soup, [
        ("property", "og:title"),
        ("name", "twitter:title"),
        ("name", "parsely-title"),
    ])
    # 2) JSON-LD headline
    if not title:
        jsonlds = _parse_jsonld_all(soup)
        node = _jsonld_pick_article(jsonlds)
        if node:
            title = _first([node.get("headline"), node.get("name")])
    # 3) H1 fallbacks
    if not title:
        h1 = soup.find("h1")
        if h1:
            title = h1.get_text(" ", strip=True)
    # 4) <title> tag last
    if not title and soup.title:
        title = soup.title.get_text(" ", strip=True)

    # ---------- BODY ----------
    # A) JSON-LD articleBody (best: avoids nav/footers)
    body_text = ""
    if 'jsonlds' not in locals():
        jsonlds = _parse_jsonld_all(soup)
    node = _jsonld_pick_article(jsonlds)
    if node:
        # CNBC often provides articleBody as a string or list of paragraphs
        ab = node.get("articleBody")
        if isinstance(ab, str) and ab.strip():
            body_text = ab.strip()
        elif isinstance(ab, list):
            body_text = "\n\n".join([x for x in ab if isinstance(x, str) and x.strip()])

    # B) DOM fallbacks (scoped to main article region only)
    if not body_text:
        # Try common CNBC containers (several variants over time)
        containers = []
        # data-testid variants
        containers += soup.select('main [data-testid="ArticleBody"], article [data-testid="ArticleBody"]')
        # class name patterns historically used
        containers += soup.select('div[class*="ArticleBody-"], article div[class*="ArticleBody-"]')
        # generic article paragraphs
        if not containers:
            containers += soup.select("main article")
        # collect paragraphs
        paras = []
        for c in containers[:2]:  # don’t over-collect in case of duplicates
            for p in c.find_all(["p","h2","li"]):
                # skip bylines/captions if marked
                txt = p.get_text(" ", strip=True)
                if txt:
                    paras.append(txt)
            if paras:
                break
        body_text = "\n\n".join(paras)

    # C) Very last resort: strip everything (you already tried this—kept here as fallback)
    if not body_text:
        # Remove scripts/styles/noscript
        for t in soup(["script","style","noscript"]):
            t.decompose()
        # Try to confine to <main>
        main = soup.find("main")
        if main:
            body_text = main.get_text(" ", strip=True)
        else:
            body_text = soup.get_text(" ", strip=True)

    return title, body_text

# Example:
# t, content = extract_cnbc("https://www.cnbc.com/2025/10/30/government-shutdown-delta-air-traffic-controllers.html")
# print(t)
# print(content[:1000])


# Test flow

In [None]:
import os
import sys
import re
import logging
from typing import Tuple, Optional
import requests

import google.generativeai as genai
from pydantic_ai import Agent
from pydantic_ai.models.google import GoogleModel
from pydantic_ai.providers.google import GoogleProvider

# Create and configure logger
logging.basicConfig(filename="./logs/newfile.log",
                    format='%(asctime)s %(message)s',
                    filemode='w')
logger = logging.getLogger()

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
from dotenv import load_dotenv
load_dotenv()

GEMINI_KEY = os.getenv('GOOGLE_API_KEY')
DEFAULT_HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/124.0 Safari/537.36"
    )
}

# Import pydant
provider = GoogleProvider(api_key=GEMINI_KEY)
model = GoogleModel('gemini-2.5-flash', provider=provider)

In [None]:
from __future__ import annotations

import asyncio
import logging
import os
import re
from threading import Lock
from typing import Optional, Tuple

import httpx
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from pydantic_ai import Agent
from pydantic_ai.models.google import GoogleModel, GoogleProvider

from models import ClassificationResult

load_dotenv()

logger = logging.getLogger(__name__)

DEFAULT_HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
        "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
    )
}
FETCH_TIMEOUT_SECONDS = 10.0  # Reduced from 20s
LLM_TIMEOUT_SECONDS = 30.0  # Reduced from 45s
MAX_INPUT_CHARACTERS = 8_000  # Reduced from 12,000 for faster LLM processing
LLM_MODEL = "gemini-2.5-flash-lite"  # Faster experimental model with better performance
BATCH_LIMIT = 500
MAX_CONCURRENT_REQUESTS = 10  # Limit concurrent HTTP requests

SYSTEM_PROMPT = """You are a professional news analyst specialising in financial and business reporting.
Interpret the supplied article title and body, then populate the output schema exactly.

For each article determine:
1. is_financial: True if the piece has a financial or business focus, otherwise False.
2. sector: Industry or market sectors referenced (list of strings, empty if none).
3. companies: 
   - Include only named operating companies or subsidiaries that are materially involved or affected in the article.
   - Exclude unless the article is about them:
     • Media outlets (Reuters, CNBC, etc.)
     • Data/survey/benchmark providers (S&P Global, Markit, PMI compilers)
     • Government agencies, regulators, NGOs, think tanks
     • Stock indices or ETFs (S&P 500, MSCI, etc.)
     • Generic groups with no explicit company names (“Chinese automakers”)
   - Use canonical company names, no duplicates.
   - If no target companies appear, return an empty list.
4. country: Countries or regions mentioned (list of strings, empty if none).
5. sentiment: One of Negative, Neutral, Positive describing the overall tone.
6. confident_score: Numeric confidence between 0.0 and 10.0.
7. summary_en: Complete 2-3 sentence English summary (50-100 words). Always finish complete sentences.
8. summary_tr: Complete 2-3 sentence Turkish summary (50-100 words). Always finish complete sentences.

Always respond in JSON compatible with the provided schema. Do not include additional commentary."""

WHITESPACE_RE = re.compile(r"\s+")


class NewsAnalyzer:
    """High level orchestrator that extracts article contents and queries Gemini."""

    def __init__(
        self,
        gemini_key: str,
        *,
        max_input_chars: int = MAX_INPUT_CHARACTERS,
        fetch_timeout: float = FETCH_TIMEOUT_SECONDS,
        llm_timeout: float = LLM_TIMEOUT_SECONDS,
    ) -> None:
        self.fetch_timeout = fetch_timeout
        self.llm_timeout = llm_timeout
        self.max_input_chars = max_input_chars

        self.provider = GoogleProvider(api_key=gemini_key)
        self.model = GoogleModel(LLM_MODEL, provider=self.provider)
        self.agent = Agent(
            self.model,
            output_type=ClassificationResult,
            system_prompt=SYSTEM_PROMPT,
            model_settings={
                "max_tokens": 2048,  # Reduced from 2048 for faster responses
                "temperature": 0.3,  # Lower temperature = faster, more deterministic
            },
        )

        self._client: Optional[httpx.AsyncClient] = None
        self._client_lock = asyncio.Lock()

        logger.info("NewsAnalyzer ready with model %s", LLM_MODEL)

    async def start(self) -> None:
        """Warm the HTTP client ahead of serving requests."""
        await self._get_http_client()

    async def shutdown(self) -> None:
        """Tear down the shared HTTP client."""
        async with self._client_lock:
            if self._client is not None:
                await self._client.aclose()
                self._client = None

    async def _get_http_client(self) -> httpx.AsyncClient:
        """Create (or return) a shared async HTTP client."""
        if self._client is None:
            async with self._client_lock:
                if self._client is None:
                    timeout = httpx.Timeout(self.fetch_timeout)
                    self._client = httpx.AsyncClient(
                        headers=DEFAULT_HEADERS,
                        timeout=timeout,
                    )
        return self._client

    async def extract_url(
        self, url: str, fetch_timeout: Optional[float] = None
    ) -> Tuple[str, str]:
        """Download, clean, and return the article title and body."""
        client = await self._get_http_client()
        timeout = fetch_timeout or self.fetch_timeout

        try:
            response = await client.get(
                url,
                timeout=timeout,
                follow_redirects=True,
            )
            response.raise_for_status()
        except httpx.TimeoutException as exc:
            logger.warning("Timed out fetching %s after %.1fs", url, timeout)
            raise TimeoutError(
                f"Timed out fetching article after {timeout} seconds."
            ) from exc
        except httpx.HTTPStatusError as exc:
            raise ValueError(
                f"HTTP {exc.response.status_code} error while fetching article."
            ) from exc
        except httpx.RequestError as exc:
            raise ValueError(
                f"Network error while fetching article: {exc}"
            ) from exc

        title, body = self._parse_article(response.text)
        if not body:
            raise ValueError("Could not extract readable text from the article.")

        return title or str(response.url), body

    def _parse_article(self, html: str) -> Tuple[Optional[str], str]:
        soup = BeautifulSoup(html, "html.parser")

        title: Optional[str]
        if soup.title and soup.title.string:
            title = soup.title.string.strip()
        else:
            title = None

        for tag in soup(
            [
                "script",
                "style",
                "noscript",
                "header",
                "footer",
                "svg",
                "form",
                "iframe",
                "nav",
                "aside",
            ]
        ):
            tag.decompose()

        text = soup.get_text(separator=" ", strip=True)
        cleaned_text = self._clean_text(text)

        return title, cleaned_text

    def _clean_text(self, text: str) -> str:
        """Collapse whitespace and trim to the configured maximum."""
        collapsed = WHITESPACE_RE.sub(" ", (text or "")).strip()
        if len(collapsed) > self.max_input_chars:
            logger.debug(
                "Truncating article from %d to %d characters for cost control.",
                len(collapsed),
                self.max_input_chars,
            )
            return collapsed[: self.max_input_chars]
        return collapsed

    async def llm_analyzer(
        self, contents: str, title: str, llm_timeout: Optional[float] = None
    ) -> ClassificationResult:
        """Send the cleaned article to Gemini and return a structured result."""
        if not contents:
            raise ValueError("Article contents are empty after preprocessing.")

        payload = f"- Title: {title}\n- Contents: {contents}"
        timeout = llm_timeout or self.llm_timeout

        try:
            response = await asyncio.wait_for(
                self.agent.run(payload),
                timeout=timeout,
            )
        except asyncio.TimeoutError as exc:
            logger.error("LLM analysis timed out after %.1fs", timeout)
            raise TimeoutError(
                f"LLM analysis exceeded timeout of {timeout} seconds."
            ) from exc
        except Exception as exc:
            logger.exception("Unexpected error during LLM analysis")
            raise

        result_data = response.output.model_dump()
        result_data.setdefault("page_title", title)
        result_data["page_title"] = result_data.get("page_title") or title
        result_data["extracted_characters"] = len(contents)

        return ClassificationResult.model_validate(result_data)

    async def analyze_with_url(
        self,
        url: str,
        *,
        fetch_timeout: Optional[float] = None,
        llm_timeout: Optional[float] = None,
    ) -> ClassificationResult:
        """Fetch and analyse a remote article."""
        title, text = await self.extract_url(
            url,
            fetch_timeout=fetch_timeout,
        )
        result = await self.analyze_with_contents(
            text=text,
            title=title or str(url),
            llm_timeout=llm_timeout,
        )
        result_payload = result.model_dump()
        result_payload.update(
            {
                "source_url": url,
                "page_title": result_payload.get("page_title") or title or str(url),
            }
        )
        return ClassificationResult.model_validate(result_payload)

    async def analyze_with_contents(
        self,
        text: str,
        title: str,
        *,
        llm_timeout: Optional[float] = None,
    ) -> ClassificationResult:
        """Analyse raw article text supplied by the caller."""
        cleaned_title = title.strip() or "Untitled article"
        cleaned_text = self._clean_text(text)

        if not cleaned_text:
            raise ValueError("Article text is empty or unreadable after cleaning.")

        return await self.llm_analyzer(
            cleaned_text,
            cleaned_title,
            llm_timeout=llm_timeout,
        )


_cached_analyzer: Optional[NewsAnalyzer] = None
_cached_analyzer_lock = Lock()


def get_analyzer() -> NewsAnalyzer:
    """Return a singleton NewsAnalyzer instance, creating it on first use."""
    global _cached_analyzer
    with _cached_analyzer_lock:
        if _cached_analyzer is None:
            api_key = os.getenv("GOOGLE_API_KEY") or os.getenv("GEMINI_API_KEY")
            if not api_key:
                raise ValueError(
                    "Missing API key. Set GOOGLE_API_KEY or GEMINI_API_KEY environment variable."
                )
            _cached_analyzer = NewsAnalyzer(gemini_key=api_key)
        return _cached_analyzer


async def shutdown_analyzer() -> None:
    """Close the cached analyzer and reset the singleton."""
    global _cached_analyzer
    with _cached_analyzer_lock:
        analyzer = _cached_analyzer
        _cached_analyzer = None

    if analyzer is not None:
        await analyzer.shutdown()


In [2]:
analyzer = get_analyzer()



In [4]:
title = "Few CRE companies have achieved their AI goals. Here's why"
text_content = '''Few CRE companies have achieved their AI goals. Here\'s why Skip Navigation Markets Pre-Markets U.S. Markets Europe Markets China Markets Asia Markets World Markets Currencies Cryptocurrency Futures & Commodities Bonds Funds & ETFs Business Economy Finance Health & Science Media Real Estate Energy Climate Transportation Industrials Retail Wealth Sports Life Small Business Investing Personal Finance Fintech Financial Advisors Options Action ETF Street Buffett Archive Earnings Trader Talk Tech Cybersecurity AI Enterprise Internet Media Mobile Social Media CNBC Disruptor 50 Tech Guide Politics White House Policy Defense Congress Expanding Opportunity Europe Politics China Politics Asia Politics World Politics Video Latest Video Full Episodes Livestream Top Video Live Audio Europe TV Asia TV CNBC Podcasts CEO Interviews Digital Originals Watchlist Investing Club Trust Portfolio Analysis Trade Alerts Meeting Videos Homestretch Jim\'s Columns Education Subscribe PRO Pro News Josh Brown Mike Santoli Calls of the Day My Portfolio Livestream Full Episodes Stock Screener Market Forecast Options Investing Chart Investing Subscribe Livestream Menu Make It select USA INTL Livestream Search quotes, news & videos Livestream Watchlist SIGN IN Create free account Markets Business Investing Tech Politics Video Watchlist Investing Club PRO Livestream Menu Real Estate Housing Construction REITs Rising Risks Newsletter Sign-up CNBC Property Play Just 5% of CRE companies have achieved their AI goals. Here\'s why Published Fri, Oct 31 2025 8:00 AM EDT Updated Fri, Oct 31 2025 8:11 AM EDT Diana Olick @in/dianaolick @DianaOlickCNBC @DianaOlick WATCH LIVE Key Points Real estate companies are moving beyond initial testing and exploration of AI into more targeted applications that aim to redefine value, according to a new survey from JLL. JLL found that 88% of investors, owners and landlords said they have started piloting AI, with most pursuing an average of five use cases simultaneously. Just 5% of respondents said they have achieved all their program goals, while close to half said they have achieved two to three goals. Diminishing perspective of downtown London skyscrapers Chunyip Wong | Istock | Getty Images A version of this article first appeared in the CNBC Property Play newsletter with Diana Olick. Property Play covers new and evolving opportunities for the real estate investor, from individuals to venture capitalists, private equity funds, family offices, institutional investors and large public companies. Sign up to receive future editions, straight to your inbox. The commercial real estate market has been historically slow to modernize, and yet it appears to be accelerating its adoption of artificial intelligence. Companies are moving beyond initial testing and exploration into more targeted applications that aim to redefine value, according to a new survey from JLL. The survey of more than 1,500 senior CRE investor and occupier decision-makers across various industries found that, while still in the early stages, organizations are making AI a priority in their technology budgets. They are also moving from using it just for efficiency to focusing on how it can grow their businesses. JLL found that 88% of investors, owners and landlords said they have started piloting AI, with most pursuing an average of five use cases simultaneously. And more than 90% of occupiers are running corporate real estate AI pilots, according to the report. Compare that with just 5% starting AI pilots two years ago. The adoption is fast, but not entirely easy. Just 5% of respondents said they have achieved all their program goals, while close to half said they have achieved two to three goals. Much of the efforts are still experimental, without much growth. "If you think about commercial real estate, traditionally, it is not a quick technology adopter, and it\'s usually skeptical," said Yao Morin, chief technology officer at JLL. "So the high number of adoptions is actually quite surprising to me. What is not surprising on the flip side is that only 5% actually thinks that they have achieved all the goals. This is pretty aligned with a lot of other industries as well." Get Property Play directly to your inbox CNBC\'s Property Play with Diana Olick covers new and evolving opportunities for the real estate investor, delivered weekly to your inbox. Subscribe here to get access today . The reason they\'re not hitting their goals is because the goal line has moved. Companies have gone beyond just wanting to do certain tasks faster, or so-called operational efficiencies. Now they are tying AI to their revenue goals. For example, some are using it to help them improve their investment risk models, making investment and portfolio decisions based on the output of AI. That will require big changes to the fundamental way they operate. "When you really start moving towards the revenue side, the margin expansion side, then it\'s going to require a lot more than just using a technology," Morin explained. "You can\'t just say, \'Well, I\'m saving you 10% to do this particular thing.\' Companies need to actually rethink their operating model, to rethink how they organize to actually achieve the savings." And so companies are investing heavily in AI, despite economic headwinds. More than half of investors surveyed by JLL have been able to get significant budget growth over the past two years in the space. Their No. 1 spend is on strategic advisory on technology or AI, and most report their budgets have increased solely due to AI. After that, the spending goes to upgrading both cyber- and data-security measures and infrastructure for AI integration. Morin said what she found really surprising is that while most think companies will start using AI for simple tasks, or, low-risk, low-hanging fruit, that was not at all the case. "Our survey showed the opposite. We are getting to a point of sophistication, beyond this initial skeptical phase, where companies are really focusing on the competitive advantage to pressing business problems, using AI to solve instead of [just] those simple low-risk operations." More In CNBC Property Play Why global investment firm Nuveen is betting on this niche real estate subsector Diana Olick Commercial real estate is finally embracing blockchain. Here\'s what investors should know Diana Olick Major real estate developers are fast becoming power brokers Diana Olick Read More Subscribe to CNBC PRO Subscribe to Investing Club Licensing & Reprints CNBC Councils Supply Chain Values CNBC on Peacock Join the CNBC Panel Digital Products News Releases Closed Captioning Corrections About CNBC Internships Site Map Ad Choices Careers Help Contact News Tips Got a confidential news tip? We want to hear from you. Get In Touch CNBC Newsletters Sign up for free newsletters and get more CNBC delivered to your inbox Sign Up Now Get this delivered to your inbox, and more info about our products and services. Advertise With Us Please Contact Us Privacy Policy Your Privacy Choices CA Notice Terms of Service © 2025 CNBC LLC. All Rights Reserved. A Division of NBCUniversal Data is a real-time snapshot *Data is delayed at least 15 minutes.\n      Global Business and Financial News, Stock Quotes, and Market Data\n      and Analysis. Market Data Terms of Use and Disclaimers Data also provided by'''
test = await analyzer.analyze_with_contents(text=text_content, title=title)

In [5]:
output = test.model_dump()
output

{'page_title': "Few CRE companies have achieved their AI goals. Here's why",
 'is_financial': 'Yes',
 'country': ['USA'],
 'sector': ['Real Estate', 'Technology', 'Financial Services'],
 'companies': ['JLL', 'CNBC'],
 'confident_score': 9.5,
 'sentiment': 'Neutral',
 'summary_en': 'Despite a significant increase in AI adoption within the commercial real estate (CRE) sector, with 88% of companies piloting AI and pursuing an average of five use cases, only 5% have fully achieved their AI goals. This is attributed to the evolving nature of AI objectives, moving from operational efficiencies to revenue generation and requiring fundamental business model changes. Companies are heavily investing in AI, particularly in strategic advisory, cybersecurity, and infrastructure, focusing on complex business problems rather than simple tasks.',
 'summary_tr': "Ticari gayrimenkul (CRE) sektöründe yapay zeka (AI) benimsenmesinde önemli bir artış olmasına rağmen, şirketlerin %88'i yapay zeka pilot uygu

In [11]:
url = 'https://www.cnbc.com/2025/10/30/government-shutdown-delta-air-traffic-controllers.html'
title, contents = await analyzer.extract_url(url=url)

In [16]:
contents

'Delta, United and American call on Congress to end government shutdown Skip Navigation Markets Pre-Markets U.S. Markets Europe Markets China Markets Asia Markets World Markets Currencies Cryptocurrency Futures &amp; Commodities Bonds Funds &amp; ETFs Business Economy Finance Health &amp; Science Media Real Estate Energy Climate Transportation Industrials Retail Wealth Sports Life Small Business Investing Personal Finance Fintech Financial Advisors Options Action ETF Street Buffett Archive Earnings Trader Talk Tech Cybersecurity AI Enterprise Internet Media Mobile Social Media CNBC Disruptor 50 Tech Guide Politics White House Policy Defense Congress Expanding Opportunity Europe Politics China Politics Asia Politics World Politics Video Latest Video Full Episodes Livestream Top Video Live Audio Europe TV Asia TV CNBC Podcasts CEO Interviews Digital Originals Watchlist Investing Club Trust Portfolio Analysis Trade Alerts Meeting Videos Homestretch Jim&#x27;s Columns Education Subscribe P

In [None]:
from dataclasses import dataclass
from typing import List, Literal, Optional

Plan = Literal["batch", "standard"]

# === PRICES: text-only (from your screenshots) ===
# Standard:  Input $0.30/M, Output $2.50/M, Cache write $0.03/M, Storage $1.00/M/hour
# Batch:     Input $0.15/M, Output $1.25/M, Cache write $0.03/M, Storage $1.00/M/hour

@dataclass(frozen=True)
class TextPrices:
    input_per_million: float
    output_per_million: float
    cache_write_per_million: float
    cache_storage_per_million_per_hour: float

PRICES_TEXT = {
    "standard": TextPrices(
        input_per_million=0.30,
        output_per_million=2.50,
        cache_write_per_million=0.03,
        cache_storage_per_million_per_hour=1.00,
    ),
    "batch": TextPrices(
        input_per_million=0.15,
        output_per_million=1.25,
        cache_write_per_million=0.03,
        cache_storage_per_million_per_hour=1.00,
    ),
}

GROUNDING_SEARCH_USD_PER_1000 = 35.00  # both plans; batch maps not available
TOKEN_SCALE = 1_000_000.0


def tokens_to_usd(tokens: int, usd_per_million: float) -> float:
    return (tokens / TOKEN_SCALE) * usd_per_million


def calc_text_cost_usd(
    *,
    plan: Plan,                 # "batch" (your use-case) or "standard"
    prompt_tokens: int,         # total input tokens for ONE request (system + title + content + etc.)
    output_tokens: int,         # output tokens for ONE request
    cache_write_tokens: int = 0,
    cache_storage_tokens: int = 0,
    cache_storage_hours: float = 0.0,
    grounded_search_prompts_paid: int = 0,  # after subtracting free quota
) -> dict:
    """
    Cost for a single **text-only** request.
    """
    p = PRICES_TEXT[plan]

    input_cost = tokens_to_usd(prompt_tokens, p.input_per_million)
    output_cost = tokens_to_usd(output_tokens, p.output_per_million)
    cache_write_cost = tokens_to_usd(cache_write_tokens, p.cache_write_per_million) if cache_write_tokens else 0.0

    cache_storage_cost = 0.0
    if cache_storage_tokens and cache_storage_hours:
        per_hour = tokens_to_usd(cache_storage_tokens, p.cache_storage_per_million_per_hour)
        cache_storage_cost = per_hour * cache_storage_hours

    grounding_search_cost = (grounded_search_prompts_paid / 1000.0) * GROUNDING_SEARCH_USD_PER_1000

    total = round(input_cost + output_cost + cache_write_cost + cache_storage_cost + grounding_search_cost, 6)

    return {
        "plan": plan,
        "input_tokens": prompt_tokens,
        "output_tokens": output_tokens,
        "costs": {
            "input_usd": round(input_cost, 6),
            "output_usd": round(output_cost, 6),
            "cache_write_usd": round(cache_write_cost, 6),
            "cache_storage_usd": round(cache_storage_cost, 6),
            "grounding_search_usd": round(grounding_search_cost, 6),
        },
        "total_usd": total,
        "rates_per_1M": {
            "input": p.input_per_million,
            "output": p.output_per_million,
            "cache_write": p.cache_write_per_million,
            "cache_storage_per_hour": p.cache_storage_per_million_per_hour,
            "grounding_search_per_1000": GROUNDING_SEARCH_USD_PER_1000,
        },
    }


def count_total_characters(data) -> int:
    """
    Recursively count total characters across all string values in a nested dict/list structure.
    Non-string fields (numbers, booleans, None) are ignored.

    Example:
        count_total_characters({"a": "Hello", "b": ["World", 123]})  # -> 10
    """
    total_chars = 0

    if isinstance(data, str):
        total_chars += len(data)

    elif isinstance(data, dict):
        for value in data.values():
            total_chars += count_total_characters(value)

    elif isinstance(data, list):
        for item in data:
            total_chars += count_total_characters(item)

    return total_chars


count_total_characters(output)

648

In [None]:
test_link = ['https://www.cnbc.com/2025/11/03/china-factory-activity-october-pmi-ratingdog-private-survey-shows.html',
    "https://www.reuters.com/world/asia-pacific/dollar-flirts-with-three-month-peak-investors-look-us-data-releases-2025-11-03/?utm_source=chatgpt.com",
    "https://www.reuters.com/world/asia-pacific/chinas-finance-ministry-sets-up-new-debt-management-department-2025-11-03/?utm_source=chatgpt.com",
    "https://www.fxstreet.com/news/japanese-yen-languishes-near-multi-month-low-against-bullish-usd-amid-boj-uncertainty-202511030249?utm_source=chatgpt.com",
    "https://www.fxstreet.com/news/eur-usd-holds-losses-below-11550-as-fed-rate-cut-bets-decrease-202511030104?utm_source=chatgpt.com",
    "https://finance.yahoo.com/news/3-brilliant-growth-stocks-buy-002300772.html?utm_source=chatgpt.com",
    "https://www.ft.com/content/a281b378-687e-498e-acfd-ebeebd1e7cf8?utm_source=chatgpt.com",
    "https://www.reuters.com/world/middle-east/major-gulf-bourses-fall-weak-earnings-us-rate-cut-uncertainty-2025-11-03/?utm_source=chatgpt.com",
    "https://www.reuters.com/business/us-bancorp-quarterly-profit-jumps-higher-fee-income-2025-10-16/?utm_source=chatgpt.com",
    "https://www.reuters.com/business/finance/bank-america-picks-manelski-zuberi-run-global-markets-unit-memo-shows-2025-10-31/?utm_source=chatgpt.com",
    "https://www.reuters.com/business/finance/us-banks-reap-bigger-profits-deals-rebound-third-quarter-2025-10-09/?utm_source=chatgpt.com",
    "https://www.reuters.com/business/finance/us-banking-giants-expect-dealmaking-spree-continue-profits-climb-2025-10-14/?utm_source=chatgpt.com",
    "https://www.barrons.com/articles/stock-futures-trading-november-f84e36e0?utm_source=chatgpt.com",
    "https://www.tradingview.com/news/tradingview%3A426d631d60a9c%3A0-key-facts-analysts-predict-dogecoin-breakout-targets-up-to-48-rsi-stable/?utm_source=chatgpt.com",
    "https://www.reuters.com/business/finance/australian-lender-westpacs-annual-profit-falls-2-2025-11-02/?utm_source=chatgpt.com",
    "https://www.reuters.com/business/finance/us-bank-profits-climb-regulator-adjusts-problem-bank-tracking-2025-02-25/?utm_source=chatgpt.com",
    "https://www.cnbc.com/2025/11/03/why-the-us-dollar-has-been-strong-lately.html",
    "https://www.cnbc.com/2025/11/02/bank-of-america-q3-earnings-what-to-watch.html",
    "https://www.cnbc.com/2025/11/01/fed-rate-cut-bets-and-what-it-means-for-markets.html",
    "https://www.cnbc.com/2025/10/31/china-local-government-debt-what-investors-need-to-know.html",
    "https://www.cnbc.com/2025/10/30/how-big-tech-earnings-are-influencing-broader-market-sentiment.html"
]


# Test with 20 contents

In [None]:
values = []
for i in range(0, 20):
    tmp = {
        'id': f'id_{i}',
        'title': f'test_{i}',
        'contents': contents
    }
    values.append(tmp)


In [29]:
values

[{'id': 'id_0',
  'title': 'test_0',
  'contents': 'Delta, United and American call on Congress to end government shutdown Skip Navigation Markets Pre-Markets U.S. Markets Europe Markets China Markets Asia Markets World Markets Currencies Cryptocurrency Futures &amp; Commodities Bonds Funds &amp; ETFs Business Economy Finance Health &amp; Science Media Real Estate Energy Climate Transportation Industrials Retail Wealth Sports Life Small Business Investing Personal Finance Fintech Financial Advisors Options Action ETF Street Buffett Archive Earnings Trader Talk Tech Cybersecurity AI Enterprise Internet Media Mobile Social Media CNBC Disruptor 50 Tech Guide Politics White House Policy Defense Congress Expanding Opportunity Europe Politics China Politics Asia Politics World Politics Video Latest Video Full Episodes Livestream Top Video Live Audio Europe TV Asia TV CNBC Podcasts CEO Interviews Digital Originals Watchlist Investing Club Trust Portfolio Analysis Trade Alerts Meeting Videos H

In [None]:
import os
from batch_processor import BatchProcessor
import json
from dotenv import load_dotenv
load_dotenv()

GEMINI_KEY = os.getenv('GOOGLE_API_KEY')

# Load content
with open('batch_contents.json') as f:
    contents = json.load(f)

# Initialize processor
processor = BatchProcessor(gemini_key=GEMINI_KEY)

# Prepare and submit batch
batch_file = processor.prepare_batch_from_contents(contents, "my_batch")
job_id = processor.submit_batch(batch_file)

# Wait for completion
processor.wait_for_completion(job_id)

# Get results
results = processor.retrieve_results(job_id, "my_batch")

  from .autonotebook import tqdm as notebook_tqdm


NameError: name 'os' is not defined

# Check API Rate Limit

In [None]:
from collections import Counter
from google.generativeai import BatchJob

ACTIVE_STATES = {
    "JOB_STATE_VALIDATING",
    "JOB_STATE_QUEUED",
    "JOB_STATE_RUNNING",
}

class BatchProcessor:
    ...
    def summarize_batch_capacity(self, page_size: int = 50) -> dict:
        """
        Give a quick snapshot of Gemini batch usage.

        Returns:
            {
                "max_requests_per_batch": int,
                "concurrent_job_limit": int,
                "total_jobs": int,
                "processing_jobs": int,
                "waiting_jobs": int,
                "completed_jobs": int,
            }
        """
        counts = Counter()
        for job in BatchJob.list(page_size=page_size):     # auto-paginates
            counts[str(job.state)] += 1

        processing = sum(counts[s] for s in ACTIVE_STATES & counts.keys())
        completed = counts.get("JOB_STATE_SUCCEEDED", 0) + counts.get("JOB_STATE_FAILED", 0)
        waiting = counts.get("JOB_STATE_PENDING", 0) + counts.get("JOB_STATE_PAUSED", 0)

        return {
            "max_requests_per_batch": self.max_batch_size,  # BATCH_LIMIT from news_analyzer
            "concurrent_job_limit": 5,                      # Gemini Batch API quota today
            "total_jobs": sum(counts.values()),
            "processing_jobs": processing,
            "waiting_jobs": waiting,
            "completed_jobs": completed,
        }


In [None]:
processor = BatchProcessor(gemini_key=os.environ["GEMINI_API_KEY"])
stats = processor.summarize_batch_capacity()
print(
    f"{stats['processing_jobs']} running out of {stats['concurrent_job_limit']} slots; "
    f"{stats['waiting_jobs']} queued; batch size limit {stats['max_requests_per_batch']}"
)


# Test with CSV input?

In [None]:
from __future__ import annotations

import asyncio
import logging
import os
import re
import time
from threading import Lock
from typing import Optional, Tuple

import httpx
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from pydantic_ai import Agent
from pydantic_ai.models.google import GoogleModel, GoogleProvider

from models import ClassificationResult

load_dotenv()

logger = logging.getLogger(__name__)

DEFAULT_HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
        "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
    )
}
FETCH_TIMEOUT_SECONDS = 10.0  # Reduced from 20s
LLM_TIMEOUT_SECONDS = 30.0  # Reduced from 45s
MAX_INPUT_CHARACTERS = 8000  # Reduced from 12,000 for faster LLM processing
LLM_MODEL = "gemini-2.5-flash-lite"  # Faster experimental model with better performance
MAX_CONCURRENT_REQUESTS = 2  # Conservative for free tier (15 RPM limit)
MIN_REQUEST_INTERVAL = 5.0  # Minimum seconds between requests (for free tier: 15 RPM = 4s interval)

SYSTEM_PROMPT = """
You are a professional financial and business news analyst. 
Analyze each provided article title and body, then produce output strictly matching the schema below.

For every article, determine the following fields:

1. is_financial (bool): 
   True if the article primarily concerns business, markets, finance, or the economy; otherwise False.
2. sector (list[str]): 
   The industry or market sectors discussed. Return an empty list if none are mentioned.
3. companies (list[str]): 
   Include only named operating companies or subsidiaries directly involved or affected.
   Exclude:
     • Media outlets (e.g., Reuters, CNBC)
     • Data/survey/benchmark providers (e.g., S&P Global, Markit)
     • Government agencies, regulators, NGOs, or think tanks
     • Stock indices or ETFs (e.g., S&P 500, MSCI)
     • Generic group labels (e.g., “Chinese automakers”)
   Use canonical company names and avoid duplicates. Return an empty list if none apply.
4. country (list[str]): 
   List all countries or regions mentioned. Return an empty list if none are mentioned.
5. sentiment (str): 
   Overall tone of the article — one of ["Positive", "Neutral", "Negative"].
6. confident_score (float): 
   Confidence level in the analysis on a scale from 0.0 to 10.0.
7. summary_en (str): 
   A concise 2–3 sentence English summary (50–100 words) that captures the key points and implications. 
   Always end with complete sentences.
8. summary_tr (str): 
   A concise 2–3 sentence Turkish summary (50–100 words) that mirrors the English version in meaning. 
   Always end with complete sentences.

Output must be valid JSON exactly matching this schema — no commentary or additional text.
"""

WHITESPACE_RE = re.compile(r"\s+")


class NewsAnalyzer:
    """High level orchestrator that extracts article contents and queries Gemini."""

    def __init__(
        self,
        gemini_key: str,
        *,
        max_input_chars: int = MAX_INPUT_CHARACTERS,
        fetch_timeout: float = FETCH_TIMEOUT_SECONDS,
        llm_timeout: float = LLM_TIMEOUT_SECONDS,
        max_concurrent: int = MAX_CONCURRENT_REQUESTS,
    ) -> None:
        self.fetch_timeout = fetch_timeout
        self.llm_timeout = llm_timeout
        self.max_input_chars = max_input_chars
        self.max_concurrent = max_concurrent

        self.provider = GoogleProvider(api_key=gemini_key)
        self.model = GoogleModel(LLM_MODEL, provider=self.provider)
        self.agent = Agent(
            self.model,
            output_type=ClassificationResult,
            system_prompt=SYSTEM_PROMPT,
            model_settings={
                "max_tokens": 2048,  # Reduced from 2048 for faster responses
                "temperature": 0.3,  # Lower temperature = faster, more deterministic
            },
        )

        self._client: Optional[httpx.AsyncClient] = None
        self._client_lock = asyncio.Lock()

        # Semaphore for concurrent request limiting
        self._llm_semaphore: Optional[asyncio.Semaphore] = None
        self._semaphore_lock = asyncio.Lock()

        # Rate limiting for free tier
        self._last_request_time: float = 0.0
        self._rate_limit_lock = asyncio.Lock()

        logger.info(
            "NewsAnalyzer ready with model %s (max_concurrent=%d, rate_limit=%.1fs)",
            LLM_MODEL,
            max_concurrent,
            MIN_REQUEST_INTERVAL
        )

    async def start(self) -> None:
        """Warm the HTTP client and semaphore ahead of serving requests."""
        await self._get_http_client()
        await self._get_semaphore()

    async def shutdown(self) -> None:
        """Tear down the shared HTTP client."""
        async with self._client_lock:
            if self._client is not None:
                await self._client.aclose()
                self._client = None

    async def _get_http_client(self) -> httpx.AsyncClient:
        """Create (or return) a shared async HTTP client."""
        if self._client is None:
            async with self._client_lock:
                if self._client is None:
                    timeout = httpx.Timeout(self.fetch_timeout)
                    self._client = httpx.AsyncClient(
                        headers=DEFAULT_HEADERS,
                        timeout=timeout,
                    )
        return self._client

    async def _get_semaphore(self) -> asyncio.Semaphore:
        """Create (or return) a shared semaphore for rate limiting."""
        if self._llm_semaphore is None:
            async with self._semaphore_lock:
                if self._llm_semaphore is None:
                    self._llm_semaphore = asyncio.Semaphore(self.max_concurrent)
        return self._llm_semaphore

    async def extract_url(
        self, url: str, fetch_timeout: Optional[float] = None
    ) -> Tuple[str, str]:
        """Download, clean, and return the article title and body."""
        client = await self._get_http_client()
        timeout = fetch_timeout or self.fetch_timeout

        try:
            response = await client.get(
                url,
                timeout=timeout,
                follow_redirects=True,
            )
            response.raise_for_status()
        except httpx.TimeoutException as exc:
            logger.warning("Timed out fetching %s after %.1fs", url, timeout)
            raise TimeoutError(
                f"Timed out fetching article after {timeout} seconds."
            ) from exc
        except httpx.HTTPStatusError as exc:
            raise ValueError(
                f"HTTP {exc.response.status_code} error while fetching article."
            ) from exc
        except httpx.RequestError as exc:
            raise ValueError(
                f"Network error while fetching article: {exc}"
            ) from exc

        title, body = self._parse_article(response.text)
        if not body:
            raise ValueError("Could not extract readable text from the article.")

        return title or str(response.url), body

    def _parse_article(self, html: str) -> Tuple[Optional[str], str]:
        soup = BeautifulSoup(html, "html.parser")

        title: Optional[str]
        if soup.title and soup.title.string:
            title = soup.title.string.strip()
        else:
            title = None

        for tag in soup(
            [
                "script",
                "style",
                "noscript",
                "header",
                "footer",
                "svg",
                "form",
                "iframe",
                "nav",
                "aside",
            ]
        ):
            tag.decompose()

        text = soup.get_text(separator=" ", strip=True)
        cleaned_text = self._clean_text(text)

        return title, cleaned_text

    def _clean_text(self, text: str) -> str:
        """Collapse whitespace and trim to the configured maximum."""
        collapsed = WHITESPACE_RE.sub(" ", (text or "")).strip()
        if len(collapsed) > self.max_input_chars:
            logger.debug(
                "Truncating article from %d to %d characters for cost control.",
                len(collapsed),
                self.max_input_chars,
            )
            return collapsed[: self.max_input_chars]
        return collapsed

    async def llm_analyzer(
        self, contents: str, title: str, llm_timeout: Optional[float] = None
    ) -> ClassificationResult:
        """Send the cleaned article to Gemini and return a structured result."""
        if not contents:
            raise ValueError("Article contents are empty after preprocessing.")

        payload = f"- Title: {title}\n- Contents: {contents}"
        timeout = llm_timeout or self.llm_timeout

        # Use semaphore to limit concurrent API calls
        semaphore = await self._get_semaphore()

        async with semaphore:
            # Rate limiting: ensure minimum interval between requests
            async with self._rate_limit_lock:
                current_time = time.time()
                time_since_last = current_time - self._last_request_time

                if time_since_last < MIN_REQUEST_INTERVAL:
                    wait_time = MIN_REQUEST_INTERVAL - time_since_last
                    logger.debug("Rate limiting: waiting %.2fs before next request", wait_time)
                    await asyncio.sleep(wait_time)

                self._last_request_time = time.time()

            try:
                response = await asyncio.wait_for(
                    self.agent.run(payload),
                    timeout=timeout,
                )
            except asyncio.TimeoutError as exc:
                logger.error("LLM analysis timed out after %.1fs", timeout)
                raise TimeoutError(
                    f"LLM analysis exceeded timeout of {timeout} seconds."
                ) from exc
            except Exception as exc:
                logger.exception("Unexpected error during LLM analysis")
                raise

        result_data = response.output.model_dump()
        result_data.setdefault("page_title", title)
        result_data["page_title"] = result_data.get("page_title") or title
        result_data["extracted_characters"] = len(contents)

        return ClassificationResult.model_validate(result_data)

    async def analyze_with_url(
        self,
        url: str,
        *,
        fetch_timeout: Optional[float] = None,
        llm_timeout: Optional[float] = None,
    ) -> ClassificationResult:
        """Fetch and analyse a remote article."""
        title, text = await self.extract_url(
            url,
            fetch_timeout=fetch_timeout,
        )
        result = await self.analyze_with_contents(
            text=text,
            title=title or str(url),
            llm_timeout=llm_timeout,
        )
        result_payload = result.model_dump()
        result_payload.update(
            {
                "source_url": url,
                "page_title": result_payload.get("page_title") or title or str(url),
            }
        )
        return ClassificationResult.model_validate(result_payload)

    async def analyze_with_contents(
        self,
        text: str,
        title: str | None = None,
        *,
        llm_timeout: Optional[float] = None,
    ) -> ClassificationResult:
        """Analyse raw article text supplied by the caller."""
        # Validate inputs
        text_stripped = text.strip() if text else ""
        if len(text_stripped) < 20:
            raise ValueError(
                "Article text must be at least 20 characters. "
                f"Received {len(text_stripped)} characters."
            )

        # Validate and clean title
        if title:
            title_stripped = title.strip()
            if len(title_stripped) < 3:
                logger.warning(
                    "Title too short (%d chars), deriving from content",
                    len(title_stripped)
                )
                title = None

        cleaned_title = title.strip() if title else text_stripped.splitlines()[0][:120]
        if not cleaned_title or len(cleaned_title) < 3:
            cleaned_title = "Untitled article"

        cleaned_text = self._clean_text(text_stripped)

        if not cleaned_text or len(cleaned_text) < 20:
            raise ValueError(
                "Article text is empty or too short after cleaning. "
                f"Cleaned length: {len(cleaned_text)}"
            )

        return await self.llm_analyzer(
            cleaned_text,
            cleaned_title,
            llm_timeout=llm_timeout,
        )


_cached_analyzer: Optional[NewsAnalyzer] = None
_cached_analyzer_lock = Lock()


def get_analyzer() -> NewsAnalyzer:
    """Return a singleton NewsAnalyzer instance, creating it on first use."""
    global _cached_analyzer
    with _cached_analyzer_lock:
        if _cached_analyzer is None:
            api_key = os.getenv("GOOGLE_API_KEY") or os.getenv("GEMINI_API_KEY")
            if not api_key:
                raise ValueError(
                    "Missing API key. Set GOOGLE_API_KEY or GEMINI_API_KEY environment variable."
                )
            _cached_analyzer = NewsAnalyzer(gemini_key=api_key)
        return _cached_analyzer


async def shutdown_analyzer() -> None:
    """Close the cached analyzer and reset the singleton."""
    global _cached_analyzer
    with _cached_analyzer_lock:
        analyzer = _cached_analyzer
        _cached_analyzer = None

    if analyzer is not None:
        await analyzer.shutdown()


In [5]:
analyzer = get_analyzer()

In [6]:
import pandas as pd
import numpy as np

df_check = pd.read_csv('articles_rows.csv')


In [None]:
df_check.tail()[['title','content']].values[0]

array([['cơn khát trở thành tất cả mọi thứ',
        'Cách chúng ta biến mất khi đuổi theo những cuộc đời không phải của mình.Amber. Ngày 15 tháng 5 năm 2025Bài dịch từ Amber. Link bài gốc: “Ôi, được thức dậy với lòng biết ơn vì nhịp đập nơi cổ họng; được đối diện với gương mà không thấy sợ hãi. Tôi cần phải đủ—cho chính mình.”\nChúng ta lạc lối biết bao, trong thế giới ảo của cuộc sống.Họ nói chúng ta may mắn, khi được sống trong kỷ nguyên của những cánh cửa vô tận; của vô số phiên bản bản thân đang chờ được triệu hồi chỉ bằng một cái lướt, một bài đăng, hay một màn lột xác từng bước.\xa0Nhưng không ai cảnh báo chúng ta về căn bệnh mà nó mang theo. Về việc quá nhiều khả năng có thể làm rách nát các khía cạnh bản thể của một con người.Da thịt chúng ta khao khát những buổi sáng bắt đầu bằng ánh sáng ban mai chứ không phải ánh sáng màn hình. Trước khi chân chạm đất, mắt chúng ta đã lạc vào thế giới của người khác. Chúng ta không nhận ra khi nào mình ngừng sống và chỉ chăm chăm quan sát n

In [8]:
# Create list of (title, content) tuples
articles = list(zip(df_check['title'], df_check['content']))

print(f"Loaded {len(articles)} articles")

# First 3 articles
for title, content in articles[:3]:
    print(f"Title: {title[:50]}...")
    print(f"Content: {content[:100] if pd.notna(content) else 'N/A'}...")
    print()

Loaded 49 articles
Title: Nuôi dạy trẻ ở Việt Nam: Xung đột giữa Giá trị Cá ...
Content: "Tôi là một người quyết tâm không cho con 4 tuổi nghiện điện thoại. Con trai tôi rất ngoan trừ một đ...

Title: Sangdo...
Content: 4 phút đọcCÀ SA VÀ GUCCI  Điều gì khác nhau giữa một cô gái mê đầm dạ hội với một thầy tu mê cà sa? ...

Title: call.me.beth...
Content:  Công ty Cổ Phần Felizz Trực thuộc Công ty Cổ Phần Spiderum Việt Nam (Spiderum Vietnam JSC)Người chị...



In [None]:
import asyncio
import pandas as pd
# from news_analyzer import get_analyzer, shutdown_analyzer

async def analyze_with_concurrency():
    """Process multiple articles concurrently - analyzer handles rate limiting."""
    
    df = pd.read_csv('articles_rows.csv')
    articles = list(zip(df['title'], df['content']))
    
    # Analyzer already has semaphore and rate limiting built-in
    analyzer = get_analyzer()
    
    async def analyze_one(i, title, content):
        try:
            print(f"[{i}/{len(articles)}] Processing: {title[:40]}...")
            
            # No semaphore needed - analyzer handles it internally
            result = await analyzer.analyze_with_contents(
                title=title,
                text=content,
                llm_timeout=30.0
            )
            
            return {
                'index': i,
                'title': title,
                'is_financial': result.is_financial,
                'sentiment': result.sentiment,
                'companies': ', '.join(result.companies),
                'sector': ', '.join(result.sector),
                'summary_en': result.summary_en,
                'summary_tr':result.summary_tr,
                'confident_score': result.confident_score
            }
            
        except ValueError as e:
            print(f"✗ Validation error on {i}: {e}")
            return {'index': i, 'title': title, 'error': f'Validation: {str(e)}'}
        except Exception as e:
            print(f"✗ Error on {i}: {e}")
            return {'index': i, 'title': title, 'error': str(e)}
    
    # Launch all tasks - analyzer's semaphore + rate limiter handle throttling
    tasks = [
        analyze_one(i, title, content)
        for i, (title, content) in enumerate(articles, 1)
    ]
    
    # Process all concurrently (analyzer limits to max_concurrent=3)
    results = await asyncio.gather(*tasks)
    await shutdown_analyzer()
    
    # Save results
    results_df = pd.DataFrame(results).sort_values('index')
    results_df.to_csv('analysis_results.csv', index=False)
    
    # Print summary
    successful = len([r for r in results if 'error' not in r])
    print(f"\n✓ Completed: {successful}/{len(results)} articles")
    print(f"✗ Errors: {len(results) - successful}")
    
    return results_df

# Run
results = await analyze_with_concurrency()

[1/49] Processing: Nuôi dạy trẻ ở Việt Nam: Xung đột giữa G...
[2/49] Processing: Sangdo...
[3/49] Processing: call.me.beth...
[4/49] Processing: Du lịch...
[5/49] Processing: Movie...
[6/49] Processing: rc09...
[7/49] Processing: Spiderum | Mạng Xã Hội Chia Sẻ Quan Điểm...
[8/49] Processing: Nhiếp ảnh...
[9/49] Processing: The Brands...
[10/49] Processing: Người trong muôn nghề...
[11/49] Processing: Xe máy...
[12/49] Processing: Góc nhìn thời sự...
[13/49] Processing: Khái quát lịch sử & văn hóa Chămpa...
[14/49] Processing: Tâm lý học...
[15/49] Processing: Nấu ăn Ẩm thực...
[16/49] Processing: Yêu...
[17/49] Processing: Sách...
[18/49] Processing: Hải Stark...
[19/49] Processing: Lịch sử...
[20/49] Processing: Quan điểm - Tranh luận...
[21/49] Processing: Sáng tác...
[22/49] Processing: HẬU NGÔ TRƯỜNG LOẠN - QUYỂN 1: LOẠN CHÚA...
[23/49] Processing: Hành Trình tới với Tứ Cực Việt Nam: Hiểu...
[24/49] Processing: WTF...
[25/49] Processing: Spiderum | Mạng Xã Hội Chia Sẻ Quan Điểm...

Unexpected error during LLM analysis
Traceback (most recent call last):
  File "/var/folders/hj/z9f9nn5s6wgclrh9s314r5zh0000gn/T/ipykernel_52687/510245600.py", line 256, in llm_analyzer
    response = await asyncio.wait_for(
               ^^^^^^^^^^^^^^^^^^^^^^^
    ...<2 lines>...
    )
    ^
  File "/opt/miniconda3/envs/longnv/lib/python3.13/asyncio/tasks.py", line 507, in wait_for
    return await fut
           ^^^^^^^^^
  File "/opt/miniconda3/envs/longnv/lib/python3.13/site-packages/pydantic_ai/agent/abstract.py", line 218, in run
    async for node in agent_run:
    ...<4 lines>...
                await event_stream_handler(_agent_graph.build_run_context(agent_run.ctx), stream)
  File "/opt/miniconda3/envs/longnv/lib/python3.13/site-packages/pydantic_ai/run.py", line 149, in __anext__
    next_node = await self._graph_run.__anext__()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/longnv/lib/python3.13/site-packages/pydantic_graph/graph.py", line 

✗ Error on 6: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429, 'message': 'Resource exhausted. Please try again later. Please refer to https://cloud.google.com/vertex-ai/generative-ai/docs/error-code-429 for more details.', 'status': 'RESOURCE_EXHAUSTED'}}


Unexpected error during LLM analysis
Traceback (most recent call last):
  File "/var/folders/hj/z9f9nn5s6wgclrh9s314r5zh0000gn/T/ipykernel_52687/510245600.py", line 256, in llm_analyzer
    response = await asyncio.wait_for(
               ^^^^^^^^^^^^^^^^^^^^^^^
    ...<2 lines>...
    )
    ^
  File "/opt/miniconda3/envs/longnv/lib/python3.13/asyncio/tasks.py", line 507, in wait_for
    return await fut
           ^^^^^^^^^
  File "/opt/miniconda3/envs/longnv/lib/python3.13/site-packages/pydantic_ai/agent/abstract.py", line 218, in run
    async for node in agent_run:
    ...<4 lines>...
                await event_stream_handler(_agent_graph.build_run_context(agent_run.ctx), stream)
  File "/opt/miniconda3/envs/longnv/lib/python3.13/site-packages/pydantic_ai/run.py", line 149, in __anext__
    next_node = await self._graph_run.__anext__()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/longnv/lib/python3.13/site-packages/pydantic_graph/graph.py", line 

✗ Error on 23: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429, 'message': 'Resource exhausted. Please try again later. Please refer to https://cloud.google.com/vertex-ai/generative-ai/docs/error-code-429 for more details.', 'status': 'RESOURCE_EXHAUSTED'}}


In [None]:
results

Unnamed: 0,index,title,is_financial,sentiment,companies,sector,summary_en,summary_tr,confident_score,error
0,1,Nuôi dạy trẻ ở Việt Nam: Xung đột giữa Giá trị...,False,Neutral,,Technology,The article discusses the conflict between per...,"Makale, Vietnam'da çocuk yetiştirmede kişisel ...",7.5,
1,2,Sangdo,False,Neutral,,,The article is a collection of blog post title...,"Makale, başlıklar ve tarihlerden oluşan bir bl...",,
2,3,call.me.beth,False,Neutral,,,"This is contact information for Felizz JSC, a ...","Bu, Spiderum Vietnam JSC'nin bir yan kuruluşu ...",,
3,4,Du lịch,False,Neutral,,,The article is about tourism and provides cont...,"Makale turizm hakkında olup, Spiderum Vietnam ...",,
4,5,Movie,False,Neutral,,,This is a movie title. The content provides co...,"Bu bir film başlığıdır. İçerik, Felizz JSC.",0.0,
5,6,rc09,False,Neutral,,,The article provides contact information for F...,"Makale, Spiderum Vietnam JSC.",,
6,7,Spiderum | Mạng Xã Hội Chia Sẻ Quan Điểm - Kiế...,True,Neutral,,,The article discusses Vietnam's transition to ...,"Makale, küresel deneyimlerden alınan dersleri ...",,
7,8,Nhiếp ảnh,False,Neutral,,,The article is about photography and is publis...,Makale fotoğrafçılık hakkındadır ve Spiderum V...,,
8,9,The Brands,False,Neutral,,,The article provides contact information for F...,"Makale, Spiderum Vietnam JSC.",,
9,10,Người trong muôn nghề,False,Neutral,,,"This is a Vietnamese article titled ""People in...","Bu, ""Muôn Ngherdeki İnsanlar"" başlıklı bir Vie...",,
