<a href="https://colab.research.google.com/github/BodrulJalal/CS676-Algorithm-of-Data-Science/blob/main/Deliverable_3_Gradio.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# @title Installations:
# ✅ Install all required packages (including the latest Gemini SDK)
# !pip install -U google-generativeai requests beautifulsoup4 tldextract python-dateutil lxml scikit-learn
# ✅ Install all required packages (silently)
!pip install -q -U google-generativeai gradio_client requests beautifulsoup4 tldextract python-dateutil lxml scikit-learn > /dev/null 2>&1

import warnings
warnings.filterwarnings("ignore")

In [2]:
# @title Environment API Keys:
from google.colab import userdata
import os

os.environ["SERP_API_KEY"] = ""
os.environ["GEMINI_API_KEY"] = ""

#real SERP_API_KEY
# os.environ["SERP_API_KEY"] = userdata.get('SERP_API_KEY')
# os.environ["GEMINI_API_KEY"] = userdata.get('GEMINI_API_KEY')

In [3]:
# @title Credibility Scoring Functions:
# ============================================================
# ==========  C R E D I B I L I T Y   S C O R I N G  =========
# ============================================================
# This section is the *deliverable_1* rule-based scorer

import re, time, json, math, tldextract, requests, random, hashlib
from urllib.parse import urlparse
from datetime import datetime
from dateutil import parser as dateparser
from bs4 import BeautifulSoup

# --- Networking defaults ---
USER_AGENT = 'Mozilla/5.0 (CredibilityPOC/0.1)'   # Browser-y UA to avoid blocks
DEFAULT_TIMEOUT = 12                               # Seconds

# --- Heuristic signals ---
CLICKBAIT_TERMS = [
    "you won't believe", 'shocking', 'jaw-dropping', 'what happened next',
    'unbelievable', 'miracle', 'exposed', "secret they don't want you to know"
]
TRANSPARENCY_HINTS = [
    'author','byline','by ','by:','written by','editor','editorial',
    'fact-check','fact check','sources','references','citations',
    'methodology','about us','about the author','corrections','disclosures'
]
INSTITUTIONAL_TLDS = {'edu','gov','ac','sch','mil'}

def fetch_html(url: str):
    """
    Fetch raw HTML for a URL with retries and basic anti-block heuristics.
    - Rotates realistic headers
    - Exponential backoff on 403/429/5xx
    - Fallback for Reddit: try old.reddit.com if www.reddit.com blocks
    Returns (html_text, None) on success, or (None, 'error message') on failure.
    """
    import random
    import time
    import requests
    from urllib.parse import urlparse

    # Primary + backup user-agent/header sets
    header_candidates = [
        {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                          "AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/126.0 Safari/537.36",
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.9",
            "Connection": "keep-alive",
        },
        {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                          "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15",
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.9",
        },
        {
            "User-Agent": USER_AGENT,  # your original UA, as a last resort
            "Accept-Language": "en-US,en;q=0.9",
        },
    ]

    def reddit_alt(u: str) -> list[str]:
        """If it's a Reddit URL, also try old.reddit.com."""
        try:
            parsed = urlparse(u)
            if parsed.netloc.endswith("reddit.com") and not parsed.netloc.startswith("old."):
                alt = u.replace("//www.reddit.com", "//old.reddit.com")
                if alt == u:  # if it wasn't www, still try old.
                    alt = u.replace("//reddit.com", "//old.reddit.com")
                return [u, alt]
        except Exception:
            pass
        return [u]

    urls_to_try = reddit_alt(url)
    last_err = None

    for candidate_url in urls_to_try:
        # up to 3 attempts per candidate url, with exponential backoff
        backoff = 1.0
        for attempt in range(3):
            headers = header_candidates[min(attempt, len(header_candidates)-1)]
            try:
                resp = requests.get(candidate_url, headers=headers, timeout=DEFAULT_TIMEOUT, allow_redirects=True)
                status = resp.status_code

                # Retry on "blocked"/rate-limited or transient server errors
                if status in (403, 429) or 500 <= status < 600:
                    last_err = f"{status} {resp.reason}"
                    time.sleep(backoff)
                    backoff *= 2.0
                    continue

                resp.raise_for_status()

                # Prefer HTML content; some sites return other types
                ctype = (resp.headers.get("Content-Type") or "").lower()
                if "text/html" not in ctype:
                    last_err = f"Non-HTML content-type: {ctype}"
                    # Don’t retry endlessly for non-HTML; move to next candidate/final
                    break

                return resp.text, None

            except requests.exceptions.RequestException as e:
                last_err = str(e)
                time.sleep(backoff)
                backoff *= 2.0
                continue

    return None, f"Fetch error: {last_err or 'unknown error'}"


def extract_article_fields(html: str, url: str):
    """
    Parse HTML to extract: title, author, published date, body text (paragraphs),
    and link counts (total + external), plus transparency hint flag.
    """
    soup = BeautifulSoup(html, 'lxml')
    text_chunks, title, author, published = [], None, None, None

    # --- Title from <title> or OG meta ---
    if soup.title and soup.title.string:
        title = soup.title.string.strip()
    mt = soup.find('meta', attrs={'property':'og:title'}) or soup.find('meta', attrs={'name':'title'})
    if not title and mt and mt.get('content'):
        title = mt['content'].strip()

    # --- Author / byline in common locations ---
    for selector in [
        {'name':'meta','attrs':{'name':'author'}},
        {'name':'meta','attrs':{'property':'article:author'}},
        {'name':'span','class_':re.compile('author|byline', re.I)},
        {'name':'div','class_':re.compile('author|byline', re.I)},
        {'name':'a','class_':re.compile('author', re.I)},
    ]:
        if selector['name']=='meta':
            node = soup.find('meta', attrs=selector['attrs'])
            if node and node.get('content'):
                author = node['content'].strip(); break
        else:
            node = soup.find(selector['name'], class_=selector.get('class_'))
            if node and node.get_text(strip=True):
                candidate = node.get_text(' ', strip=True)
                if len(candidate) >= 3:
                    author = candidate; break

    # --- Publish date in common meta/time/span patterns ---
    for date_sel in [
        {'name':'meta','attrs':{'property':'article:published_time'}},
        {'name':'meta','attrs':{'name':'date'}},
        {'name':'time','attrs':{}},
        {'name':'span','class_':re.compile('date|time', re.I)},
    ]:
        if date_sel['name']=='meta':
            node = soup.find('meta', attrs=date_sel['attrs'])
            if node and node.get('content'):
                try:
                    published = dateparser.parse(node['content'], fuzzy=True); break
                except Exception:
                    pass
        else:
            node = soup.find(date_sel['name'], class_=date_sel.get('class_'))
            if node and node.get_text(strip=True):
                try:
                    published = dateparser.parse(node.get_text(strip=True), fuzzy=True); break
                except Exception:
                    pass

    # --- Body text: prefer a likely article container, else all <p> ---
    main_container = None
    for cls in ['article','post','story','content','entry-content','article-body']:
        mc = soup.find(True, class_=re.compile(cls, re.I))
        if mc: main_container = mc; break
    paragraphs = (main_container.find_all('p') if main_container else soup.find_all('p'))
    for p in paragraphs:
        t = p.get_text(' ', strip=True)
        if t and len(t) > 40: text_chunks.append(t)
    article_text = '\n\n'.join(text_chunks)[:100000]  # cap to avoid huge pages

    # --- Link counts: total & external ---
    all_links, external_links = [], []
    base_host = urlparse(url).netloc.lower()
    for a in soup.find_all('a', href=True):
        href = a['href']
        if href.startswith('http://') or href.startswith('https://'):
            all_links.append(href)
            if urlparse(href).netloc.lower() != base_host:
                external_links.append(href)

    # --- Transparency hint flag (string match) ---
    full_text_for_hints = (article_text + ' ' + ' '.join(TRANSPARENCY_HINTS)).lower()

    return {
        'title': title,
        'author': author,
        'published': published.isoformat() if published else None,
        'text': article_text,
        'num_paragraphs': len(text_chunks),
        'all_links': all_links,
        'external_links': external_links,
        'has_transparency_hints': any(h in full_text_for_hints for h in TRANSPARENCY_HINTS),
    }

def score_url(url: str, fields: dict):
    """
    Apply heuristic scoring rules → (score 0–100, explanation string).
    Starts at 50 and adds/subtracts per signal.
    """
    explanation_bits = []
    score = 50  # neutral baseline

    # HTTPS
    if url.lower().startswith('https://'):
        score += 12; explanation_bits.append('+12: uses HTTPS')
    else:
        score -= 10; explanation_bits.append('-10: not using HTTPS')

    # Institutional TLD
    ext = tldextract.extract(url)
    tld_last = (ext.suffix.split('.')[-1] if ext.suffix else '')
    if tld_last in INSTITUTIONAL_TLDS:
        score += 14; explanation_bits.append(f'+14: institutional TLD ({tld_last})')

    # Author/byline
    if fields.get('author'):
        score += 10; explanation_bits.append('+10: author/byline found')
    else:
        score -= 6; explanation_bits.append('-6: no clear author/byline')

    # Published recency (NOTE: uses datetime.utcnow(), may warn in 3.12+)
    published = fields.get('published')
    if published:
        try:
            dt = dateparser.parse(published)
            if (datetime.utcnow() - dt).days <= 3650:
                score += 6; explanation_bits.append('+6: reasonably recent publication date')
            else:
                score -= 4; explanation_bits.append('-4: appears quite old')
        except Exception:
            explanation_bits.append('0: could not parse publication date reliably')
    else:
        explanation_bits.append('0: no publication date found')

    # References
    total_links = len(fields.get('all_links', []))
    external_links_count = len(fields.get('external_links', []))
    if total_links >= 5 and external_links_count >= 3:
        score += 10; explanation_bits.append(f'+10: provides references (links: {total_links}, external: {external_links_count})')
    elif total_links >= 2:
        score += 4; explanation_bits.append(f'+4: some references (links: {total_links})')
    else:
        score -= 6; explanation_bits.append(f'-6: minimal/no references (links: {total_links})')

    # Length (by paragraph count)
    num_paras = fields.get('num_paragraphs', 0)
    if num_paras >= 8:
        score += 6; explanation_bits.append('+6: substantive article length')
    elif num_paras >= 3:
        score += 2; explanation_bits.append('+2: moderate article length')
    else:
        score -= 6; explanation_bits.append('-6: very short article text')

    # Clickbait language
    text_lower = (fields.get('text') or '').lower()
    clickbait_hits = sum(1 for term in CLICKBAIT_TERMS if term in text_lower)
    if clickbait_hits >= 2:
        score -= 10; explanation_bits.append('-10: strong clickbait indicators')
    elif clickbait_hits == 1:
        score -= 4; explanation_bits.append('-4: mild clickbait indicators')

    # Advertising/sponsor cues
    ad_signals = len(re.findall(r"advertis(e|ement)|sponsor(ed|ship)", text_lower))
    iframes_penalty = min(8, math.floor(ad_signals / 5) * 2)
    if iframes_penalty:
        score -= iframes_penalty; explanation_bits.append(f'-{iframes_penalty}: advertising/sponsorship language')

    # Clamp score and join explanation
    score = max(0, min(100, int(round(score))))
    explanation = '; '.join(explanation_bits)
    return score, explanation

def evaluate_url(url: str):
    """
    Orchestrator: fetch → parse → score.
    Returns a dict with 'score', 'explanation', and 'details' on success.
    Returns None if the page cannot be accessed (STRICT EXCLUDE).
    """
    if not isinstance(url, str) or not url.strip():
        return None  # exclude invalid

    html, err = fetch_html(url)
    if err:
        return None  # STRICT EXCLUDE: do not return a low score or an error object

    fields = extract_article_fields(html, url)
    score, explanation = score_url(url, fields)

    return {
        'score': score,
        'explanation': explanation,
        'details': {
            'url': url,
            'title': fields.get('title'),
            'author': fields.get('author'),
            'published': fields.get('published'),
            'num_paragraphs': fields.get('num_paragraphs'),
            'total_links': len(fields.get('all_links', [])),
            'external_links': len(fields.get('external_links', [])),
        },
    }

# ============================================================
# ======  H Y B R I D   ( R u l e s  +  L i n e a r  R e g ) =
# ============================================================
# This section:
# 1) builds features from evaluate_url() outputs
# 2) creates a labeled dataset of 20 real URLs (0–100 labels)
# 3) runs 5-fold cross-validation for LinearRegression
# 4) fits a final model on ALL available rows
# 5) exposes hybrid_score(url, alpha) for inference
#
# Notes:
# - If some URLs fail to fetch, they're STRICTLY EXCLUDED (no score, not shown).
# - CV uses MAE and R^2 to give you both error and fit quality.

import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import KFold, cross_val_score
from datetime import datetime, timezone
from dateutil import parser as dateparser

# ---------- Feature Engineering ----------
def _safe(value, default=0):
    return value if value is not None else default

def _https_flag(url: str) -> int:
    return 1 if isinstance(url, str) and url.lower().startswith('https://') else 0

def _institutional_tld_flag(url: str) -> int:
    try:
        ext = tldextract.extract(url)
        tld_last = (ext.suffix.split('.')[-1] if ext.suffix else '')
        return 1 if tld_last in {'edu','gov','ac','sch','mil'} else 0
    except Exception:
        return 0

def _days_since(published_iso: str) -> int:
    if not published_iso:
        return 99999  # treat unknown/absent as very old
    try:
        dt = dateparser.parse(published_iso)
        return max(0, (datetime.utcnow() - dt).days)  # OK to mirror your original utc usage
    except Exception:
        return 99999

def features_from_eval(eval_obj: dict) -> dict:
    """
    Build a feature dict using ONLY fields the original scorer returns.
    These features are model-agnostic and cheap to compute.
    """
    d = eval_obj.get('details', {})
    url = d.get('url') or ''
    feats = {
        "https": _https_flag(url),
        "inst_tld": _institutional_tld_flag(url),
        "has_author": 1 if d.get('author') else 0,
        "num_paragraphs": _safe(d.get('num_paragraphs'), 0),
        "total_links": _safe(d.get('total_links'), 0),
        "external_links": _safe(d.get('external_links'), 0),
        "days_since_pub": _days_since(d.get('published')),
    }
    return feats

FEATURE_ORDER = [
    "https",
    "inst_tld",
    "has_author",
    "num_paragraphs",
    "total_links",
    "external_links",
    "days_since_pub",
]

def vectorize_features(feat_dict: dict, feature_order=FEATURE_ORDER):
    """Convert a feature dict to a fixed-order numeric vector."""
    return np.array([feat_dict.get(k, 0) for k in feature_order], dtype=float)

# ---------- Labeled Dataset (20 REAL URLs, mixed quality) ----------
# Labels are illustrative 0–100 targets. Adjust as you refine ground truth.
LABELED_URLS = [
    # Highly credible / institutional / quality editorial
    ("https://www.cdc.gov/flu/about/index.html", 92),
    ("https://www.nih.gov/news-events/news-releases", 88),
    ("https://www.mayoclinic.org/diseases-conditions/dehydration/symptoms-causes/syc-20354086", 85),
    ("https://www.who.int/news-room/fact-sheets/detail/diabetes", 90),
    ("https://www.britannica.com/science/photosynthesis", 86),
    ("https://www.health.harvard.edu/staying-healthy/what-is-intermittent-fasting", 78),
    ("https://www.hopkinsmedicine.org/health/conditions-and-diseases", 84),
    ("https://www.bbc.com/news/science_and_environment", 80),
    ("https://www.reuters.com/world/us/", 80),
    ("https://apnews.com/hub/technology", 78),
    ("https://en.wikipedia.org/wiki/Ice_cream", 75),

    # Mid credibility consumer health / informative
    ("https://www.healthline.com/nutrition/green-tea-and-weight-loss", 70),
    ("https://www.webmd.com/diet/obesity/features/green-tea-and-weight-loss", 68),
    ("https://www.nature.com/scitable/definition/photosynthesis-288/", 82),
    ("https://med.stanford.edu/news/all-news.html", 82),

    # Lower credibility / UGC / lighter editorial
    ("https://medium.com/", 45),
    ("https://www.reddit.com/r/icecreamery/comments/19elt19/looking_for_resources_to_learn_how_to_make_ice/", 20),
    ("https://www.quora.com/Is-green-tea-good-for-weight-loss", 25),
    ("https://www.livestrong.com/article/13715706-green-tea-benefits/", 60),
    ("https://www.buzzfeed.com/", 40),
]

# ---------- Build Dataset by Evaluating & Featurizing ----------
rows_X, rows_y = [], []

for url, label in LABELED_URLS:
    try:
        ev = evaluate_url(url)
        if ev is None:  # STRICT EXCLUDE
            continue
        feats = features_from_eval(ev)
        x = vectorize_features(feats)
        rows_X.append(x)
        rows_y.append(float(label))
    except Exception:
        # Strict policy: exclude on any error
        continue

X = np.vstack(rows_X) if rows_X else np.zeros((0, len(FEATURE_ORDER)))
y = np.array(rows_y) if rows_y else np.zeros((0,))

print(f"Prepared dataset: {X.shape[0]} rows × {X.shape[1]} features.")

# ---------- 5-Fold Cross-Validation ----------
# We use KFold regression CV with two metrics:
#  - MAE (lower is better): absolute error in points of the 0–100 score
#  - R^2 (higher is better): variance explained
if len(X) >= 5:
    k = min(5, len(X))  # guard in case many rows were excluded
    if k < 2:
        print("\nNot enough rows for CV; skipping cross-validation.")
    else:
        kf = KFold(n_splits=k, shuffle=True, random_state=42)
        # MAE (scikit returns negative MAE for loss; invert sign to report positive MAE)
        mae_scores = cross_val_score(LinearRegression(), X, y, cv=kf, scoring="neg_mean_absolute_error")
        r2_scores  = cross_val_score(LinearRegression(), X, y, cv=kf, scoring="r2")
        mae_vals = -mae_scores  # turn to positive
        print(f"\n{k}-fold CV results on {len(X)} rows:")
        print(f"  MAE per fold: {np.round(mae_vals, 2)} | mean={mae_vals.mean():.2f}")
        print(f"  R^2 per fold:  {np.round(r2_scores, 3)} | mean={r2_scores.mean():.3f}")
else:
    print("\nNot enough rows for CV; need ≥5 examples.")

# ---------- Fit Final Linear Regression on ALL available rows ----------
linreg = LinearRegression()
if len(X) >= 2:
    linreg.fit(X, y)
    print("\nFinal model (trained on all available rows):")
    for kname, w in zip(FEATURE_ORDER, linreg.coef_):
        print(f"  {kname:>16s}: {w: .3f}")
    print(f"  {'Intercept':>16s}: {linreg.intercept_: .3f}")
else:
    print("\nNot enough rows to train final model (need ≥2). Using neutral ML predictions.")

# ---------- Inference helpers ----------
def predict_ml_score_from_eval(eval_obj: dict) -> float:
    """
    Predict a 0–100 credibility score from features using the trained LinearRegression.
    Falls back to 50.0 if not enough training data is available.
    """
    if len(X) < 2:
        return 50.0
    feats = features_from_eval(eval_obj)
    x = vectorize_features(feats).reshape(1, -1)
    pred = linreg.predict(x)[0]
    return float(np.clip(pred, 0.0, 100.0))

def _stars(score_0_100: float) -> str:
    stars = int(round(score_0_100 / 20))
    stars = max(0, min(5, stars))
    return "★"*stars + "☆"*(5 - stars)

def hybrid_score(url: str, alpha: float = 0.6) -> dict | None:
    """
    Blend rule-based score with ML-predicted score:
        final = alpha * rule_score + (1 - alpha) * ml_score

    STRICT EXCLUDE:
    - If fetch/evaluation fails, return None (do not score or include).
    """
    ev = evaluate_url(url)
    if ev is None:
        return None  # STRICT EXCLUDE

    details = ev.get("details", {})
    rule_score = float(ev.get("score", 0.0))

    # Normal path: combine rules + ML
    ml_score = predict_ml_score_from_eval(ev)
    final = float(np.clip(alpha * rule_score + (1 - alpha) * ml_score, 0.0, 100.0))

    # Stars
    stars = int(round(final / 20))
    stars = max(0, min(5, stars))
    star_str = "★" * stars + "☆" * (5 - stars)

    return {
        "url": url,
        "title": details.get("title"),
        "rule_score": round(rule_score, 1),
        "ml_score": round(ml_score, 1),
        "hybrid_score": round(final, 1),
        "stars": star_str,
        "explanation": ev.get("explanation", "No detailed explanation available."),
        "details": details,
    }


Prepared dataset: 14 rows × 7 features.

5-fold CV results on 14 rows:
  MAE per fold: [10.16 63.73 20.13 71.33 20.27] | mean=37.12
  R^2 per fold:  [ 4.180000e-01 -6.031900e+01 -2.000000e-02 -1.332505e+03 -1.582500e+01] | mean=-281.650

Final model (trained on all available rows):
             https:  0.000
          inst_tld:  22.205
        has_author:  3.819
    num_paragraphs:  0.133
       total_links:  0.013
    external_links: -0.039
    days_since_pub: -0.000
         Intercept:  79.834


In [4]:
# @title SERP API Functions:
# --- Improved SerpAPI search (clean results, de-dupe, filter, robust errors) ---
import os, re, requests
from urllib.parse import urlparse

SERP_API_KEY = os.getenv("SERP_API_KEY")

# Common low-signal/social/video domains to exclude by default (tune as needed)
_DEFAULT_EXCLUDE_DOMAINS = {
    "reddit.com", "www.reddit.com", "old.reddit.com",
    "x.com", "twitter.com", "www.twitter.com",
    "tiktok.com", "www.tiktok.com",
    "pinterest.com", "www.pinterest.com",
    "facebook.com", "www.facebook.com",
    "instagram.com", "www.instagram.com",
    "youtube.com", "www.youtube.com", "youtu.be"
}

# Skip obvious non-article filetypes
_SKIP_FILETYPES = re.compile(r"\.(pdf|pptx?|docx?|xlsx?|zip|rar)(?:$|\?)", re.I)

def _host(url: str) -> str:
    return urlparse(url).netloc.lower()

def search_google(
    query: str,
    num_results: int = 30,
    exclude_domains: set | None = None,
    allow_news_results: bool = True,
    timeout: int = 20,
) -> list[dict]:
    """
    Search Google via SerpAPI and return a clean list of results:
        [{"title": str, "link": str, "snippet": str}, ...]
    - De-duplicates by link and avoids flooding from the same host
    - Skips social/UGC/video sites and non-HTML filetypes
    - Optionally includes Google News results
    """
    if not SERP_API_KEY:
        raise RuntimeError("Missing SERP_API_KEY / SERPAPI_API_KEY. Set it in the environment first.")

    exclude = set(_DEFAULT_EXCLUDE_DOMAINS)
    if exclude_domains:
        exclude |= set(exclude_domains)

    params = {
        "engine": "google",
        "q": query,
        "api_key": SERP_API_KEY,
        "num": 30,          # pull extra, then filter/trim to num_results
        "hl": "en",
        "gl": "us",
        "safe": "active",   # optional: reduce NSFW
    }

    try:
        resp = requests.get("https://serpapi.com/search", params=params, timeout=timeout)
        resp.raise_for_status()
        data = resp.json()
    except Exception as e:
        # On failure, return empty list (caller can handle and message user)
        print(f"[SerpAPI] Error: {e}")
        return []

    candidates: list[dict] = []

    # 1) Organic results
    for r in (data.get("organic_results") or []):
        link = r.get("link")
        title = r.get("title")
        snippet = r.get("snippet") or ""
        if not link or _SKIP_FILETYPES.search(link):
            continue
        host = _host(link)
        if host in exclude:
            continue
        candidates.append({"title": title, "link": link, "snippet": snippet})

    # 2) Optional: News results (helpful for timely topics)
    if allow_news_results:
        for r in (data.get("news_results") or []):
            link = r.get("link")
            title = r.get("title")
            snippet = r.get("snippet") or ""
            if not link or _SKIP_FILETYPES.search(link):
                continue
            host = _host(link)
            if host in exclude:
                continue
            candidates.append({"title": title, "link": link, "snippet": snippet})

    # De-duplicate by link; also avoid over-representing a single host
    cleaned, seen_links, seen_hosts = [], set(), set()
    for c in candidates:
        link, host = c["link"], _host(c["link"])
        if link in seen_links:
            continue
        # If we already have enough and this host is duplicate, skip
        if host in seen_hosts and len(cleaned) >= num_results:
            continue
        seen_links.add(link)
        seen_hosts.add(host)
        cleaned.append(c)
        if len(cleaned) >= num_results:
            break

    return cleaned

def credbot_summarize_text(text: str, query: str = "", max_chars: int = 500) -> str:
    """
    Super-light extractive summary:
      1) Split into sentences.
      2) Score sentences by presence of query keywords + informative cues.
      3) Pick top sentences until max_chars is reached.
    Falls back to the first 2–3 sentences if scoring yields little.
    """

    if not text:
        return ""

    # Normalize whitespace
    clean = re.sub(r"\s+", " ", text).strip()

    # Sentence split (simple, robust)
    sentences = re.split(r"(?<=[.!?])\s+(?=[A-Z0-9\"'])", clean)
    sentences = [s.strip() for s in sentences if len(s.strip()) > 0]

    if not sentences:
        return clean[:max_chars]

    # Prepare query keywords
    q = (query or "").lower().strip()
    q_terms = [t for t in re.split(r"[^a-z0-9]+", q) if len(t) >= 3]
    q_terms = list(dict.fromkeys(q_terms))  # unique order

    # Score sentences
    scored = []
    for i, s in enumerate(sentences):
        sl = s.lower()

        # keyword matches (weighted)
        kw_hits = sum(1 for t in q_terms if t in sl)
        kw_score = 3 * kw_hits

        # informative cues
        cues = 0
        if len(s) > 80: cues += 1
        if ":" in s or ";" in s or "(" in s or ")" in s: cues += 1
        if re.search(r"\b(according to|study|report|data|evidence|research|analysis|conclude|results)\b", sl):
            cues += 2

        # slight lead bias
        lead_bonus = max(0, 3 - i)  # first few sentences get a small boost

        score = kw_score + cues + 0.2 * lead_bonus
        scored.append((score, i, s))

    # Pick best sentences until we hit the budget
    picked = []
    total = 0
    for _, _, s in sorted(scored, key=lambda x: (-x[0], x[1])):  # high score, then original order
        if s in picked:
            continue
        if total + len(s) + (2 if picked else 0) > max_chars:
            # if nothing picked yet, at least include a truncated sentence
            if not picked:
                picked.append(s[:max_chars].rstrip() + "…")
                total = max_chars
            break
        picked.append(s)
        total += len(s) + (2 if len(picked) > 1 else 0)

        # Aim for ~3 sentences when possible
        if len(picked) >= 3:
            break

    # Fallback to lead-2/3 if scoring produced very little
    if len(picked) == 0:
        lead = []
        for s in sentences[:3]:
            if sum(len(x) for x in lead) + len(s) + (2 if lead else 0) <= max_chars:
                lead.append(s)
        picked = lead or [sentences[0][:max_chars].rstrip() + "…"]

    return "  ".join(picked)

In [5]:
# @title Gemini Chat Integration: (pinned to gemini-2.5-flash only)
import os, re, json, google.generativeai as genai

DEBUG = False  # set True for light diagnostics

# ---------- Gemini setup ----------
for _k in ["GOOGLE_API_BASE_URL", "GOOGLE_AI_API_BASE", "GOOGLE_API_ENDPOINT"]:
    os.environ.pop(_k, None)

_GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
if not _GEMINI_API_KEY:
    # raise RuntimeError("Missing GEMINI_API_KEY. Please set it in the environment first.")
    print("Missing GEMINI_API_KEY. Please set it in the environment first.")

genai.configure(api_key=_GEMINI_API_KEY, client_options={"api_endpoint": "https://generativelanguage.googleapis.com"})

# Pin to a single model; no discovery/filters.
MODEL_NAME = "gemini-2.5-flash"
try:
    _GEMINI = genai.GenerativeModel(model_name=MODEL_NAME)
    if DEBUG:
        try:
            _ = _GEMINI.generate_content("ping")
            print(f"[Gemini] {MODEL_NAME} ready.")
        except Exception as e:
            print(f"[Gemini] Probe warning on {MODEL_NAME}: {e}")
except Exception as e:
    _GEMINI = None
    if DEBUG:
        print(f"[Gemini] Init failed for {MODEL_NAME}: {e}")

def _gemini_text(prompt: str) -> str | None:
    """Minimal safe wrapper for a single pinned model."""
    if _GEMINI is None:
        if DEBUG: print("[Gemini] Model unavailable.")
        return None
    try:
        resp = _GEMINI.generate_content(prompt)
        out = getattr(resp, "text", None)
        if not out and hasattr(resp, "candidates") and resp.candidates:
            cand = resp.candidates[0]
            if hasattr(cand, "content") and cand.content and getattr(cand.content, "parts", None):
                out = cand.content.parts[0].text
        if DEBUG: print(f"[Gemini] Response present: {bool(out)}")
        return out.strip() if out else None
    except Exception as e:
        if DEBUG: print(f"[Gemini] Call failed: {e}")
        return None

def _clip_summary(s):
    if not s: return ""
    parts = re.split(r'(?<=[.!?])\s+', s.strip())
    return " ".join(parts[:3])[:350]

def _parse_json_block(text):
    if not text: return None
    m = re.search(r"```json\s*(\{.*?\})\s*```", text, flags=re.S)
    if m:
        try: return json.loads(m.group(1))
        except: pass
    m = re.search(r"(\{.*\})", text, flags=re.S)
    if m:
        try: return json.loads(m.group(1))
        except: pass
    return None

# ---------- Project-provided helpers expected earlier ----------
# - search_google(query, num_results=10, exclude_domains=set())
# - fetch_html(url) -> (html, err)
# - extract_article_fields(html, url) -> dict(...)
# - hybrid_score(url, alpha=0.6) -> dict(...)
# - credbot_summarize_text(text, query) -> str

def _score_and_summarize_url(link, query):
    try:
        hybrid = hybrid_score(link, alpha=0.6)
        if hybrid is None:
            return False
        html, err = fetch_html(link)
        if err or not html:
            return False
        fields = extract_article_fields(html, link)
        text = (fields.get("text") or "").strip()
        if not text:
            return False

        summary = credbot_summarize_text(text, query)

        title = fields.get("title") or "[No Title]"
        score = hybrid.get("hybrid_score", 0.0)
        stars = hybrid.get("stars") or ("★" * int(round(score / 20)))
        why = (hybrid.get("explanation") or "").strip()

        print(f"\n📰 {title}\n🔗 {link}\n📄 {summary}\n⭐ {stars} ({score}/100)")
        if why:
            print(f"📝 {why}\n")
        return True
    except Exception as e:
        print(f"[Score] Exception on {link}: {e}")
        return False

def _run_search_flow(topic):
    print(f"🔍 Searching for sources on “{topic}”…")
    try:
        results = search_google(
            "articles on " + topic,
            num_results=5,
            exclude_domains={},
        )
    except Exception as e:
        print(f"⚠️ Search error: {e}")
        return 0

    if not results:
        print("⚠️ No relevant articles found.")
        return 0

    shown = 0
    for r in results:
        link = r.get("link")
        if link and _score_and_summarize_url(link, topic):
            shown += 1

    if shown == 0:
        print("⚠️ None accessible.")
    return shown

A_CHAT, A_RUN = "chat_mode", "run_search"

_SYSTEM_PROMPT = (
    "You are CredBot, a concise and friendly assistant.\n"
    "Return ONLY a JSON object with keys: reply, summary, topic, action.\n"
    f'action must be \"{A_CHAT}\" or \"{A_RUN}\".\n'
    "- You (the model) decide greetings, topic extraction, and whether a web search is necessary.\n"
    f"- If a web search is needed, set action=\"{A_RUN}\" and provide a concise search query in \"topic\".\n"
    "- Keep reply ≤ 2 sentences. Keep summary to 2–3 sentences (≤350 chars).\n"
    "- Do not include backticks or extra text around the JSON."
)

def gemini_step(user_input, prev_summary, prev_topic):
    prompt = (
        f"{_SYSTEM_PROMPT}\n\n"
        f"Previous summary:\n{prev_summary or ''}\n\n"
        f"Previous topic:\n{prev_topic or ''}\n\n"
        f"User message:\n{user_input}\n\n"
        'Example response format:\n'
        '{"reply":"...", "summary":"...", "topic":"...", "action":"chat_mode"}'
    )

    raw = _gemini_text(prompt)
    data = _parse_json_block(raw) if raw else None
    if DEBUG:
        print(f"[Gemini] Raw present: {bool(raw)} | JSON parsed: {isinstance(data, dict)}")

    if not isinstance(data, dict):
        return ("Please Double Check Your API Keys",
                prev_summary or "", prev_topic or "", A_CHAT)

    reply    = (data.get("reply") or "").strip() or "Okay."
    summary  = _clip_summary(data.get("summary") or "")
    topic    = (data.get("topic") or "").strip()
    action   = (data.get("action") or A_CHAT).strip()
    if action not in {A_CHAT, A_RUN}:
        action = A_CHAT

    return reply, summary, topic, action

def run_chat():
    print("🤖 Hi, I’m CredBot. We can chat normally. (Type 'exit' to quit.)")
    if DEBUG: print(f"[Startup] Gemini ready: {bool(_GEMINI)}")

    summary, topic = "", ""

    while True:
        user = input("\nYou: ").strip()
        if not user:
            continue
        if user.lower() in {"exit","quit","bye"}:
            print("👋 Goodbye!")
            break

        reply, summary_out, topic_out, action = gemini_step(user, summary, topic)

        if action == A_RUN:
            run_topic = topic_out or topic or user
            shown = _run_search_flow(run_topic)
            if shown:
                summary = _clip_summary((summary_out or "") + f" Retrieved sources on {run_topic}.")
            else:
                summary = summary_out or summary
            topic = topic_out or topic
            print("🤖 Anything else to explore?")
            print(f"🧠 [summary] {summary}\n🎯 [topic] {topic}\n⚙️ [action] chat_mode")
            continue

        print(f"🤖 {reply}")
        summary = summary_out or summary
        topic = topic_out or topic

        if DEBUG:
            print(f"🧠 [summary] {summary}\n🎯 [topic] {topic}\n⚙️ [action] {A_CHAT}")


Missing GEMINI_API_KEY. Please set it in the environment first.


In [8]:
# @title ---- Gradio Setup + Chat (search results go into chat; links clickable; run LAST) ----
from __future__ import annotations
import os, io, re, contextlib, gradio as gr, google.generativeai as genai

DEBUG = False
MODEL_NAME = "gemini-2.5-flash"

try:
    _GEMINI
except NameError:
    _GEMINI = None

# ---- Keys ----
def _set_env_keys(serp_key: str | None, gemini_key: str | None) -> None:
    if serp_key and serp_key.strip():
        val = serp_key.strip()
        os.environ["SERP_API_KEY"] = val
        os.environ["SERPAPI_API_KEY"] = val
        globals()["SERP_API_KEY"] = val  # refresh cached var from SERP cell
    if gemini_key and gemini_key.strip():
        os.environ["GEMINI_API_KEY"] = gemini_key.strip()

def _get_serp_key() -> str | None:
    return (os.getenv("SERP_API_KEY") or os.getenv("SERPAPI_API_KEY") or "").strip() or None

# ---- Gemini ----
def _init_gemini_from_env() -> tuple[bool, str]:
    global _GEMINI
    for _k in ["GOOGLE_API_BASE_URL", "GOOGLE_AI_API_BASE", "GOOGLE_API_ENDPOINT"]:
        os.environ.pop(_k, None)
    gem_key = os.getenv("GEMINI_API_KEY", "").strip()
    if not gem_key:
        _GEMINI = None
        return False, "❌ Missing GEMINI_API_KEY."
    try:
        genai.configure(api_key=gem_key, client_options={"api_endpoint": "https://generativelanguage.googleapis.com"})
        m = genai.GenerativeModel(model_name=MODEL_NAME)
        try:
            _ = m.generate_content("ping")
        except Exception as e:
            if DEBUG: print(f"[Gemini] probe warn: {e}")
        _GEMINI = m
        return True, f"✅ Gemini ready ({MODEL_NAME}). Please Continue to Chat Tab or Return Back to Setup to Update API Keys"
    except Exception as e:
        _GEMINI = None
        return False, f"❌ Gemini init failed: {e}"

# ---- Readiness ----
def _training_ready() -> bool:
    try:
        return hasattr(linreg, "coef_") and len(getattr(linreg, "coef_", [])) > 0
    except Exception:
        return False

def _overall_ready() -> bool:
    return bool(_training_ready() and _GEMINI is not None)

def _status_text() -> str:
    rows = int(X.shape[0]) if "X" in globals() else 0
    has_serp = bool(_get_serp_key())
    has_gem  = bool(os.getenv("GEMINI_API_KEY", "").strip())
    return (
        f"Training: {'✅' if _training_ready() else '❌'} (rows={rows}) • "
        f"GEMINI_API_KEY: {'✅' if has_gem else '❌'} • "
        f"SERP_API_KEY: {'✅' if has_serp else '⚠️ optional'} • "
        f"Gemini: {'✅' if _GEMINI is not None else '❌'}"
    )

# ---- Capture printed search for embedding into chat ----
def _run_search_and_capture(topic: str) -> str:
    buf = io.StringIO()
    with contextlib.redirect_stdout(buf):
        try:
            _run_search_flow(topic)
        except Exception as e:
            print(f"⚠️ Search error: {e}")
    return buf.getvalue().strip()

# ---- Make URLs clickable in Markdown ----
def _linkify_urls(text: str) -> str:
    # Replace any plain URL with a markdown link
    return re.sub(r'(https?://[^\s<>\]\)]+)', r'[\1](\1)', text)

# ---- UI callbacks ----
def on_save_keys(serp_key: str, gemini_key: str):
    _set_env_keys(serp_key, gemini_key)
    ok, gem_status = _init_gemini_from_env()
    ready = _overall_ready()
    banner = _status_text()
    interactive = gr.update(interactive=ready)
    cleared = gr.update(value="")
    return banner, gem_status, cleared, cleared, interactive, interactive

def chat_handler(user_msg: str, chat_hist, summary: str, topic: str):
    if not _overall_ready():
        return user_msg, chat_hist or [], summary, topic
    if not user_msg.strip():
        return "", chat_hist or [], summary, topic

    reply, summary_out, topic_out, action = gemini_step(user_msg, summary, topic)

    bot_msg = reply
    if action == "run_search":
        run_topic = topic_out or topic or user_msg
        raw_log = _run_search_and_capture(run_topic)
        if raw_log:
            # Convert URLs to clickable links and show nicely
            log_md = _linkify_urls(raw_log)
            bot_msg = (
                f"{reply}\n\n"
                f"<details><summary><b>[Click to Expand List] Search results for:</b> <i>{run_topic}</i></summary>\n\n"
                f"{log_md}\n\n"
                f"</details>"
            )
        summary = _clip_summary((summary_out or summary) + f" Retrieved sources on {run_topic}.")
        topic = topic_out or topic
    else:
        summary = summary_out or summary
        topic = topic_out or topic

    chat_hist = (chat_hist or []) + [[user_msg, bot_msg]]
    return "", chat_hist, summary, topic

# ---- UI ----
with gr.Blocks() as demo:
    gr.Markdown("## 🤖 CredBot — Setup & Chat (Gemini 2.5 Flash)")
    status_md = gr.Markdown(_status_text())

    with gr.Tabs():
        with gr.Tab("🔐 Setup"):
            gr.Markdown("Enter your API keys. **Values are hidden** and not printed.")
            with gr.Row():
                serp_input = gr.Textbox(
                    label="SERP_API_KEY (for web search)",
                    type="password",
                    placeholder="Paste SERP_API_KEY…",
                    scale=1,
                )
                gem_input = gr.Textbox(
                    label="GEMINI_API_KEY (required)",
                    type="password",
                    placeholder="Paste GEMINI_API_KEY…",
                    scale=1,
                )
            with gr.Row():
                save_btn = gr.Button("Save Keys & Initialize", variant="primary")
            setup_status_md = gr.Markdown("")

        with gr.Tab("💬 Chat"):
            chatbot = gr.Chatbot(height=480, label="Dialogue")  # search appears inside the chat now
            with gr.Row():
                msg = gr.Textbox(
                    placeholder="Type your message…",
                    scale=4,
                    autofocus=True,
                    interactive=_overall_ready(),
                )
                send = gr.Button(
                    "Send",
                    variant="primary",
                    scale=1,
                    interactive=_overall_ready(),
                )

    # memory
    state_summary = gr.State("")
    state_topic = gr.State("")

    # wiring
    save_btn.click(
        on_save_keys,
        inputs=[serp_input, gem_input],
        outputs=[status_md, setup_status_md, serp_input, gem_input, msg, send],
    )
    send.click(
        chat_handler,
        inputs=[msg, chatbot, state_summary, state_topic],
        outputs=[msg, chatbot, state_summary, state_topic],
    )
    msg.submit(
        chat_handler,
        inputs=[msg, chatbot, state_summary, state_topic],
        outputs=[msg, chatbot, state_summary, state_topic],
    )

demo.launch()


It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://3ad9ef159fc66410f4.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


