In [None]:
#!/usr/bin/env python3
"""
Unified Telugu News Scheduler Pipeline (UPDATED FOR YOUR REQUIREMENTS)

- Asynchronously crawls Telugu news websites (crawl4ai)
- Extracts & cleans articles
- Filters to mostly-Telugu content (drops mojibake / non-Telugu)
- Chunks text with overlap
- Deduplicates at article level (url + md5(content))
- Deduplicates at chunk level (url + md5(chunk))
- Creates Cohere embeddings ONLY for new chunks
- Appends new embeddings to CSV (JSON-encoded)
- Rebuilds FAISS index + metadata CSV with ID column
- Scheduler runs at fixed interval

FILES USED:
- all_telugu_news_articles.csv
- all_telugu_chunk_embeddings.csv
- all_telugu_faiss_metadata.csv
- telugu_faiss_index.index
"""

import os
import json
import re
import time
import hashlib
import asyncio
import logging
import ast
from datetime import datetime
from typing import List, Dict, Tuple

import nest_asyncio
import schedule
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup

from crawl4ai import AsyncWebCrawler
import csv
csv.field_size_limit(10_000_000)

try:
    import faiss
except Exception:
    faiss = None

try:
    import cohere
except Exception:
    cohere = None

nest_asyncio.apply()

# ---------- Logging ----------
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s: %(message)s"
)
logger = logging.getLogger("telugu_pipeline")

# ---------- CONFIG ----------
COHERE_API_KEY = "DSdAuREU39x4mYDJaSDZ3DEmGM1x8000F7BZuRf2"

ARTICLES_CSV = "all_telugu_news_articles.csv"
CHUNKS_CSV = "all_telugu_chunk_embeddings_clean.csv"
META_CSV = "all_telugu_faiss_metadata.csv"
FAISS_INDEX_FILE = "telugu_faiss_index.index"
EMBEDDINGS_NPY = "embeddings_array.npy"

BATCH_SIZE = 32                     # Safer for trial
EMBED_MODEL = "embed-multilingual-v3.0"
EMBED_INPUT_TYPE = "search_document"
EMBED_DIM = 1024                     # Cohere multilingual dim

CATEGORY_START_URLS = [
    "https://www.eenadu.net/",
    "https://www.eenadu.net/andhra-pradesh",
    "https://www.eenadu.net/telangana",
    "https://www.eenadu.net/india",
    "https://www.eenadu.net/world",
    "https://www.eenadu.net/business",
    "https://www.eenadu.net/sports",
    "https://www.eenadu.net/pratibha",
    "https://www.eenadu.net/movies",
    "https://www.eenadu.net/youth",
    "https://www.eenadu.net/technology",
    "https://www.eenadu.net/health",
    "https://www.eenadu.net/latest-news",
    "https://www.eenadu.net/trending-news",
    "https://www.eenadu.net/web-stories",
    "https://www.eenadu.net/explained",
    "https://www.eenadu.net/sunday-magazine",
    "https://epaper.andhrajyothy.com/index.aspx",
    "https://epaper.sakshi.com/index.aspx",
    "https://www.andhrajyothy.com/",
    "https://www.sakshi.com/",
    "https://www.manabadi.co.in/",
]

TARGET_PATTERNS = [
    "/telugu-news/", "/breaking-news/", "/business/", "/sports/", "/movies/", "/districts/",
    "/andhra-pradesh/", "/telangana/", "/india/", "/world/", "/youth/", "/technology/",
    "/health/", "/devotional/", "/real-estate/", "/web-stories/", "/article/", "/district/",
    "/news/", "/state/", "/job", "/jobs", "/career", "/careers", "/notification", "/advt",
    "/education", "/result", "/exam"
]

# ---------- Initialize Cohere ----------
if cohere is None:
    logger.error("cohere package not installed. Run: pip install cohere")
    co_client = None
else:
    try:
        co_client = cohere.Client(COHERE_API_KEY)
    except Exception as e:
        logger.error(f"Failed to init Cohere client: {e}")
        co_client = None

# ---------- Helpers ----------

def safe_read_csv(path: str, **kwargs) -> pd.DataFrame:
    if not os.path.exists(path):
        return pd.DataFrame()
    try:
        return pd.read_csv(path, encoding="utf-8", engine="python", **kwargs)
    except UnicodeDecodeError:
        logger.warning(f"UTF-8 decode failed for {path}, retrying with latin1")
        try:
            return pd.read_csv(path, encoding="latin1", engine="python", **kwargs)
        except Exception as e:
            logger.error(f"Error reading {path} with latin1: {e}")
            return pd.DataFrame()
    except Exception as e:
        logger.error(f"Error reading {path}: {e}")
        return pd.DataFrame()

def write_csv_safe(df: pd.DataFrame, path: str, mode: str = "w"):
    try:
        df.to_csv(path, mode=mode, header=(mode == "w"), index=False, encoding="utf-8")
        logger.info(f"Saved {len(df)} rows to {path}")
    except Exception as e:
        logger.error(f"Failed to write {path}: {e}")

def md5_hash_text(text: str) -> str:
    return hashlib.md5(text.encode("utf-8")).hexdigest()

def looks_like_telugu(text: str, min_ratio: float = 0.3) -> bool:
    """Return True if >= min_ratio of chars are in Telugu Unicode block."""
    if not isinstance(text, str) or not text:
        return False
    total = len(text)
    telugu_chars = sum(1 for ch in text if "\u0C00" <= ch <= "\u0C7F")
    return telugu_chars / max(total, 1) >= min_ratio

def clean_text_for_chunking(text: str) -> str:
    if not isinstance(text, str):
        return ""
    s = re.sub(r"\s+", " ", text).strip()
    s = re.sub(r"(?i)(privacy policy|read more|advertisement|subscribe|follow us).*", "", s)
    return s.strip()

def safe_get_text(el):
    try:
        return el.get_text(strip=True) if el else ""
    except:
        return ""

def detect_source(url: str) -> str:
    u = url.lower()
    if "eenadu.net" in u:
        return "eenadu"
    if "andhrajyothy" in u:
        return "andhrajyothy"
    if "sakshi.com" in u:
        return "sakshi"
    return "unknown"

def extract_date_from_soup(soup: BeautifulSoup, url: str) -> str:
    for tag in ["time", "span", "div"]:
        for el in soup.find_all(tag):
            txt = safe_get_text(el)
            if not txt:
                continue
            m = re.search(r"(\d{4}-\d{2}-\d{2})", txt)
            if m:
                return m.group(1)
            m2 = re.search(r"(\d{2})/(\d{2})/(\d{4})", txt)
            if m2:
                d, m_, y = m2.groups()
                return f"{y}-{m_}-{d}"
    m = re.search(r"/(\d{4})/(\d{2})/(\d{2})", url)
    if m:
        return f"{m.group(1)}-{m.group(2)}-{m.group(3)}"
    return datetime.today().strftime("%Y-%m-%d")

def clean_article_content(soup: BeautifulSoup, source: str, url: str):
    date = extract_date_from_soup(soup, url)
    title = ""
    content = ""

    if source == "eenadu":
        title = safe_get_text(soup.find("h1")) or safe_get_text(soup.title)
        tag = soup.select_one("article") or soup.select_one("div.article-content") \
              or soup.select_one("div.story-content")
        if tag:
            content = tag.get_text(separator="\n", strip=True)
    elif source == "andhrajyothy":
        title = safe_get_text(soup.find("h3")) or safe_get_text(soup.title)
        tag = soup.select_one("div.article-section") or soup.select_one("div.col-md-10")
        if tag:
            content = tag.get_text(separator="\n", strip=True)
    elif source == "sakshi":
        title = safe_get_text(soup.find("h1")) or safe_get_text(soup.title)
        tag = soup.select_one("div.news-body") or soup.select_one("div.storyContent")
        if tag:
            content = tag.get_text(separator="\n", strip=True)

    # Fallback
    if not content:
        paras = [safe_get_text(p) for p in soup.find_all("p") if len(safe_get_text(p)) > 40]
        content = "\n".join(paras)

    if not title:
        title = content[:120] if content else "Untitled"

    # Telugu check: drop obviously non-Telugu / mojibake
    if not looks_like_telugu(content, min_ratio=0.3):
        logger.warning(f"Content for {url} does not look like Telugu; skipping.")
        return "", "", date

    return title.strip(), content.strip(), date

# ---------- Chunker ----------

def recursive_text_splitter(text: str, max_chunk_size: int = 350,min_chunk_size: int = 100, overlap: int = 80) -> List[str]:
    text = clean_text_for_chunking(text)
    if len(text) <= max_chunk_size:
        return [text] if len(text) >= min_chunk_size and looks_like_telugu(text) else []

    chunks = []
    start = 0
    L = len(text)
    while start < L:
        end = min(start + max_chunk_size, L)
        split_point = end
        for sep in ["\n", ". ", "! ", "? "]:
            pos = text.rfind(sep, start, end)
            if pos != -1 and (pos - start) >= min_chunk_size:
                split_point = pos + len(sep)
                break
        chunk = text[start:split_point].strip()
        if len(chunk) >= min_chunk_size and looks_like_telugu(chunk):
            chunks.append(chunk)
        next_start = split_point - overlap
        if next_start <= start:
            next_start = split_point
        start = next_start

    cleaned = []
    seen_hashes = set()
    for c in chunks:
        ch = re.sub(r"\s+", " ", c).strip()
        if len(ch) < min_chunk_size:
            continue
        h = md5_hash_text(ch)
        if h in seen_hashes:
            continue
        seen_hashes.add(h)
        cleaned.append(ch)
    return cleaned

# ---------- Dedup helpers ----------

def load_existing_articles_index(path: str = ARTICLES_CSV) -> Dict[str, str]:
    df = safe_read_csv(path)
    mapping = {}
    if df.empty:
        return mapping
    for _, row in df.iterrows():
        url = str(row.get("url", "")).strip()
        content = str(row.get("content", ""))
        mapping[url] = md5_hash_text(content)
    return mapping

def load_existing_chunk_keys(path: str = CHUNKS_CSV) -> set:
    df = safe_read_csv(path)
    if df.empty:
        return set()
    keys = set()
    for _, row in df.iterrows():
        url = str(row.get("url", "")).strip()
        chunk = str(row.get("chunk", "")).strip()
        ch = md5_hash_text(chunk)
        keys.add((url, ch))
    return keys

# ---------- Crawling ----------

async def collect_candidate_links(max_depth=5, max_pages_per_site=50) -> List[str]:
    links = set()
    try:
        async with AsyncWebCrawler(max_depth=max_depth, max_pages=max_pages_per_site) as crawler:
            for seed in CATEGORY_START_URLS:
                try:
                    result = await crawler.arun(seed)
                except Exception as e:
                    logger.warning(f"Crawler failed for seed {seed}: {e}")
                    continue
                soup = BeautifulSoup(result.html, "html.parser", from_encoding="utf-8")
                for a in soup.find_all("a", href=True):
                    href = a["href"].strip()
                    if href.startswith("//"):
                        href = "https:" + href
                    if href.startswith("/"):
                        parts = seed.split("/")
                        if len(parts) >= 3:
                            href = parts[0] + "//" + parts[2] + href
                    if any(pat in href for pat in TARGET_PATTERNS):
                        norm = href.split("#")[0].split("?")[0]
                        links.add(norm)
    except Exception as e:
        logger.error(f"Error collecting links: {e}")
    return list(links)

async def scrape_articles_from_links(links: List[str]) -> List[dict]:
    articles = []
    async with AsyncWebCrawler() as crawler:
        for i, url in enumerate(links):
            try:
                result = await crawler.arun(url)
                soup = BeautifulSoup(result.html, "html.parser", from_encoding="utf-8")
                source = detect_source(url)
                title, content, date = clean_article_content(soup, source, url)
                if content and len(content) > 80:
                    articles.append({"url": url, "title": title, "content": content, "date": date})
            except Exception as e:
                logger.debug(f"Failed to fetch {url}: {e}")
    return articles

# ---------- Embedding ----------

def embed_text_batch(texts: List[str]) -> List[List[float]]:
    if co_client is None:
        raise RuntimeError("Cohere client not initialized.")
    out_embeddings = []
    for i in range(0, len(texts), BATCH_SIZE):
        batch = texts[i:i + BATCH_SIZE]
        try:
            resp = co_client.embed(texts=batch, model=EMBED_MODEL, input_type=EMBED_INPUT_TYPE)
            for vec in resp.embeddings:
                if len(vec) != EMBED_DIM:
                    raise ValueError(f"Unexpected embedding dim {len(vec)}")
            out_embeddings.extend([list(vec) for vec in resp.embeddings])
            logger.info(f"Embedded batch {i // BATCH_SIZE + 1} ({len(batch)} items)")
        except Exception as e:
            logger.error(f"Cohere embed error for batch starting at {i}: {e}")
            out_embeddings.extend([None] * len(batch))
        time.sleep(0.25)
    return out_embeddings

# ---------- FAISS Rebuild ----------
def rebuild_faiss_from_csv(chunks_csv_path: str = CHUNKS_CSV, meta_csv_path: str = META_CSV, faiss_index_path: str = FAISS_INDEX_FILE):
    if faiss is None:
        logger.warning("faiss not available; skipping FAISS rebuild.")
        return

    df = safe_read_csv(chunks_csv_path)
    if df.empty or "embedding" not in df.columns:
        logger.warning("No chunk embeddings found; skipping FAISS rebuild.")
        return

    def parse_embedding_field(x):
        try:
            # embeddings are saved as JSON strings
            arr = json.loads(str(x))
        except Exception:
            try:
                arr = ast.literal_eval(str(x))
            except Exception:
                return None
        if isinstance(arr, list) and len(arr) == EMBED_DIM:
            return np.array(arr, dtype="float32")
        return None

    df["embedding_obj"] = df["embedding"].apply(parse_embedding_field)
    df = df[df["embedding_obj"].notnull()].reset_index(drop=True)
    if df.empty:
        logger.warning("No valid embeddings after parsing; skipping FAISS rebuild.")
        return

    # Add ID column for FAISS metadata alignment
    df.insert(0, "id", range(len(df)))

    meta_cols = ["id", "url", "title", "content", "date", "chunk", "geographical", "sector"]
    meta_cols = [c for c in meta_cols if c in df.columns]
    df[meta_cols].to_csv(meta_csv_path, index=False, encoding="utf-8")

    embeddings = np.vstack(df["embedding_obj"].values)
    dim = embeddings.shape[1]
    index = faiss.IndexFlatL2(dim)
    index.add(embeddings)
    faiss.write_index(index, faiss_index_path)
    np.save(EMBEDDINGS_NPY, embeddings)

    logger.info(f"✅ Rebuilt FAISS index with {len(df)} vectors (dim {dim}) -> {faiss_index_path}")
    logger.info(f"✅ Metadata saved to {meta_csv_path} with id column")


# ---------- Chunk + Embed with Dedup ----------

def chunk_and_embed_and_save(new_articles: List[dict]):
    if not new_articles:
        logger.info("No new articles to process.")
        return

    existing_article_map = load_existing_articles_index(ARTICLES_CSV)
    existing_chunk_keys = load_existing_chunk_keys(CHUNKS_CSV)

    deduped_articles: Dict[str, dict] = {}
    for art in new_articles:
        url = str(art.get("url", "")).strip()
        content = str(art.get("content", ""))
        h = md5_hash_text(content)
        if url in existing_article_map and existing_article_map[url] == h:
            continue
        deduped_articles[url] = {**art, "_content_hash": h}

    if not deduped_articles:
        logger.info("No new or changed articles after deduplication.")
        return

    # Save new/updated articles
    rows_to_append = []
    for url, art in deduped_articles.items():
        rows_to_append.append({
            "url": url,
            "title": art.get("title", ""),
            "content": art.get("content", ""),
            "date": art.get("date", datetime.today().strftime("%Y-%m-%d"))
        })
    df_new_articles = pd.DataFrame(rows_to_append)
    if os.path.exists(ARTICLES_CSV):
        df_new_articles.to_csv(ARTICLES_CSV, mode="a", header=False, index=False, encoding="utf-8")
    else:
        df_new_articles.to_csv(ARTICLES_CSV, mode="w", header=True, index=False, encoding="utf-8")
    logger.info(f"✅ Saved {len(df_new_articles)} new/updated articles to {ARTICLES_CSV}")

    # Chunk & embed only new deduped articles
    chunk_rows = []
    for url, art in deduped_articles.items():
        title = art.get("title", "")
        content = art.get("content", "")
        date = art.get("date", datetime.today().strftime("%Y-%m-%d"))
        chunks = recursive_text_splitter(content)
        for ch in chunks:
            ch_key = (url, md5_hash_text(ch))
            if ch_key in existing_chunk_keys:
                continue
            chunk_rows.append({
                "url": url,
                "title": title,
                "content": content,
                "date": date,
                "chunk": ch,
                "geographical": "unknown",
                "sector": "general",
                "embedding": None
            })

    if not chunk_rows:
        logger.info("No new chunks to embed.")
        return

    logger.info(f"Preparing to embed {len(chunk_rows)} new chunks.")

    texts = [r["chunk"] for r in chunk_rows]
    embeddings = embed_text_batch(texts)

    final_rows = []
    for idx, row in enumerate(chunk_rows):
        emb = embeddings[idx] if idx < len(embeddings) else None
        if not emb:
            continue
        try:
            emb_list = [float(x) for x in emb]
        except Exception:
            logger.exception("Embedding parse error; skipping.")
            continue
        row["embedding"] = emb_list
        final_rows.append(row)

    if not final_rows:
        logger.info("No embeddings succeeded; nothing to save.")
        return

    df_chunks = pd.DataFrame(final_rows)
    df_chunks["embedding"] = df_chunks["embedding"].apply(json.dumps)
    if os.path.exists(CHUNKS_CSV):
        df_chunks.to_csv(CHUNKS_CSV, mode="a", header=False, index=False, encoding="utf-8")
    else:
        df_chunks.to_csv(CHUNKS_CSV, mode="w", header=True, index=False, encoding="utf-8")
    logger.info(f"✅ Saved {len(df_chunks)} new chunk embeddings to {CHUNKS_CSV}")

# ---------- Full pipeline job ----------

def pipeline_job(max_depth=3, max_pages_per_site=50):
    logger.info("\n--- Scheduler run: starting pipeline ---")
    try:
        links = asyncio.run(collect_candidate_links(max_depth=max_depth, max_pages_per_site=max_pages_per_site))
        if not links:
            logger.info("No candidate links found.")
            return
        logger.info(f"Collected {len(links)} candidate links.")
    except Exception as e:
        logger.error(f"Failed to collect links: {e}")
        return

    try:
        articles = asyncio.run(scrape_articles_from_links(links))
        if not articles:
            logger.info("No articles scraped this run.")
        else:
            logger.info(f"Scraped {len(articles)} articles.")
    except Exception as e:
        logger.error(f"Failed to scrape articles: {e}")
        articles = []

    try:
        chunk_and_embed_and_save(articles)
    except Exception as e:
        logger.error(f"Error during chunk/embed/save: {e}")

    try:
        rebuild_faiss_from_csv()
    except Exception as e:
        logger.error(f"Error rebuilding FAISS: {e}")

    logger.info("✅ Pipeline run complete!\n")

# ---------- Scheduler ----------

def start_scheduler(interval_minutes: int = 5, max_depth=3, max_pages_per_site=10):
    schedule.clear()
    schedule.every(interval_minutes).minutes.do(
        lambda: pipeline_job(max_depth=max_depth, max_pages_per_site=max_pages_per_site)
    )
    logger.info(f"Scheduler started. Pipeline runs every {interval_minutes} minutes.")
    try:
        pipeline_job(max_depth=max_depth, max_pages_per_site=max_pages_per_site)  # run once immediately
        while True:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("Scheduler stopped by user.")
    except Exception as e:
        logger.error(f"Scheduler loop error: {e}")

if __name__ == "__main__":
    start_scheduler(interval_minutes=5, max_depth=3, max_pages_per_site=50)