<a href="https://colab.research.google.com/github/ZNAXNOR/AI-Blog-Posts/blob/main/AI_Blog_Posts.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# =============================
# 1. Install Dependencies
# =============================
!pip install feedparser python-dotenv openai requests sentence-transformers scikit-learn rapidfuzz



In [2]:
# =============================
# 2. Imports & Environment Setup
# =============================
import os
import json
import time
import re
import base64
import feedparser
import requests
from datetime import datetime, timedelta
from typing import List, Dict, Any

from dotenv import load_dotenv
from google.colab import userdata
from openai import OpenAI

from sentence_transformers import SentenceTransformer, util
import numpy as np

In [3]:
# -----------------------------
# Environment
# -----------------------------
load_dotenv()

os.environ["HF_TOKEN"] = userdata.get('HF_Token')
os.environ["WP_URL"] = "https://odlabagency.wpcomstaging.com"
os.environ["WP_USER"] = "odomkardalvi"
os.environ["WP_PASS"] = userdata.get('WP_OdLabsAgency_App')

HF_TOKEN = os.environ["HF_TOKEN"]
WP_URL = os.environ["WP_URL"]
WP_USER = os.environ["WP_USER"]
WP_PASS = os.environ["WP_PASS"]

client = OpenAI(base_url="https://router.huggingface.co/v1", api_key=HF_TOKEN)
embed_model = SentenceTransformer("all-MiniLM-L6-v2")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [4]:
# -----------------------------
# Tunables (single source of truth)
# -----------------------------
# Duplicate control
DUPLICATE_SIMILARITY = 0.90        # block if >= this cosine similarity
MEMORY_WINDOW_DAYS = 45            # keep last N days of posts in local memory
MAX_RECENT_LOCAL = 500             # upper bound on local memory size

# Coherence scoring vs sources
SEMANTIC_COHERENCE_THRESHOLD = 0.75  # >= approved_auto, else needs_review
TOP_K_SOURCE_MATCH = 3

# WordPress fetch size (when comparing to server-side drafts as a safeguard)
WP_FETCH_PER_PAGE = 40
WP_FETCH_PAGES = 2                  # up to ~80 recent drafts

# Retry behavior for HTTP calls
HTTP_RETRIES = 2
HTTP_BACKOFF = 2.0

# Paths
LOCAL_MEMORY_PATH = "published_posts_embeddings.json"

In [5]:
# =============================
# 3. RSS Feed Lists
# =============================
GLOBAL_RSS_FEEDS = [
    "https://feeds.bbci.co.uk/news/technology/rss.xml",
    "https://www.theverge.com/rss/index.xml",
    "https://feeds.arstechnica.com/arstechnica/technology-lab",
    "https://rss.nytimes.com/services/xml/rss/nyt/Technology.xml",
    "https://www.cnet.com/rss/news/",
    "https://www.technologyreview.com/feed/",
    "https://www.wired.com/feed/category/gear/latest/rss",
    "https://techcrunch.com/feed/",
    "https://venturebeat.com/category/ai/feed/",
    "https://www.zdnet.com/news/rss.xml"
]

INDIA_RSS_FEEDS = [
    "https://www.thehindu.com/sci-tech/technology/feeder/default.rss",
    "https://economictimes.indiatimes.com/tech/rssfeeds/13357270.cms",
    "https://indianexpress.com/section/technology/feed/",
    "https://www.livemint.com/rss/technology",
    "https://timesofindia.indiatimes.com/rssfeeds/5880659.cms",
]

GOOGLE_NEWS_RSS = [
    "https://news.google.com/rss/search?q=technology&hl=en-IN&gl=IN&ceid=IN:en",
    "https://news.google.com/rss/search?q=artificial+intelligence&hl=en-US&gl=US&ceid=US:en",
    "https://news.google.com/rss/search?q=machine+learning&hl=en-GB&gl=GB&ceid=GB:en",
]

RSS_FEEDS = GLOBAL_RSS_FEEDS + INDIA_RSS_FEEDS + GOOGLE_NEWS_RSS

In [6]:
def _http(headers: Dict[str, str], method: str, url: str, **kwargs) -> requests.Response:
    for attempt in range(HTTP_RETRIES + 1):
        try:
            print(f"[DEBUG] HTTP {method} {url} (attempt {attempt+1})")
            resp = requests.request(method, url, headers=headers, timeout=30, **kwargs)
            print(f"[DEBUG] HTTP status: {resp.status_code}")
            if resp.status_code >= 500 and attempt < HTTP_RETRIES:
                time.sleep(HTTP_BACKOFF * (attempt + 1))
                continue
            return resp
        except requests.RequestException as e:
            print(f"[ERROR] HTTP request failed: {e}")
            if attempt >= HTTP_RETRIES:
                raise
            time.sleep(HTTP_BACKOFF * (attempt + 1))
    return resp

def strip_html(text: str) -> str:
    return re.sub(r"<[^>]*>", " ", text or "").strip()

def join_tags(tags: List[str]) -> str:
    return " ".join(sorted([t.strip() for t in (tags or []) if t and isinstance(t, str)]))

def build_unified_text(title: str, excerpt: str, tags: List[str], body_html: str) -> str:
    return f"{title}\n{join_tags(tags)}\n{excerpt}\n{strip_html(body_html)}".strip()

In [7]:
# =============================
# 5. Local Memory (embeddings + metadata)
# =============================
# We persist embeddings to avoid recomputation and to guard across runs.

def load_memory() -> List[Dict[str, Any]]:
    try:
        with open(LOCAL_MEMORY_PATH, "r") as f:
            data = json.load(f)
    except FileNotFoundError:
        data = []
    cutoff = (datetime.utcnow() - timedelta(days=MEMORY_WINDOW_DAYS)).isoformat()
    pruned = [d for d in data if d.get("utc_iso", "") >= cutoff]
    return pruned[:MAX_RECENT_LOCAL]


def save_memory(entries: List[Dict[str, Any]]):
    with open(LOCAL_MEMORY_PATH, "w") as f:
        json.dump(entries[:MAX_RECENT_LOCAL], f)


def add_to_memory(post: Dict[str, Any]):
    entries = load_memory()
    entries.insert(0, post)
    save_memory(entries)

In [8]:
# =============================
# 6. Fetch & Cluster Articles
# =============================
from sklearn.metrics.pairwise import cosine_similarity

def fetch_articles(feed_urls: List[str]) -> List[Dict[str, Any]]:
    articles = []
    for feed_url in feed_urls:
        print(f"[DEBUG] Fetching feed: {feed_url}")
        feed = feedparser.parse(feed_url)
        if feed.entries:
            print(f"[DEBUG] {len(feed.entries)} entries fetched from {feed_url}")
            for entry in feed.entries:
                entry['source_feed'] = feed_url
                articles.append(entry)
        else:
            print(f"[WARN] No entries found in {feed_url}")
    print(f"[DEBUG] Total articles fetched: {len(articles)}")
    return articles


def cluster_articles(articles: List[Dict[str, Any]], threshold: float = 0.75) -> List[List[Dict[str, Any]]]:
    if not articles:
        return []
    texts = [(a.get('title', '') + ' ' + a.get('summary', '')) for a in articles]
    embs = embed_model.encode(texts, convert_to_numpy=True, normalize_embeddings=True)

    clusters, used = [], set()
    for i, _ in enumerate(articles):
        if i in used:
            continue
        cluster = [articles[i]]
        used.add(i)
        for j in range(i + 1, len(articles)):
            if j in used:
                continue
            sim = float(cosine_similarity([embs[i]], [embs[j]])[0][0])
            if sim >= threshold:
                cluster.append(articles[j])
                used.add(j)
        clusters.append(cluster)
    return clusters

In [9]:
# =============================
# 7. AI Post Generator (JSON contract)
# =============================
def generate_post_with_ai(topic_articles: List[Dict[str, Any]], retries: int = 2) -> Dict[str, Any]:
    titles = [a.get('title', '') for a in topic_articles]
    summaries = [a.get('summary', '') for a in topic_articles]
    sources = [(a.get('link', ''), a.get('title', '')) for a in topic_articles]

    schema_example = {
        "title": "Example Title",
        "excerpt": "Example excerpt...",
        "tags": ["tag1", "tag2"],
        "body": "<h3>Heading</h3><p>Paragraph...</p>",
        "image_prompt": "A futuristic AI lab scene"
    }

    prompt = f"""
    You are an expert journalist. Merge the following sources into one unified post.
    Return ONLY valid JSON matching this schema:
    {json.dumps(schema_example, ensure_ascii=False)}

    Requirements:
    - 900–1200 words
    - Conversational but well-researched
    - <h3> for headings, <h4> for subheadings, <p> for paragraphs
    - Add a <h3>Sources</h3> section with <ol> clickable links.

    Articles:
    TITLES: {titles}
    SUMMARIES: {summaries}
    SOURCES: {sources}
    """

    attempt = 0
    while attempt <= retries:
        try:
            completion = client.chat.completions.create(
                model="openai/gpt-oss-20b",
                messages=[{"role": "user", "content": prompt}],
                temperature=0.7,
                max_tokens=4000,
            )
            choice = completion.choices[0]
            content = getattr(choice, "text", None) or getattr(choice.message, "content", None)
            if not content or not content.strip():
                raise ValueError("Empty AI completion")
            m = re.search(r"\{[\s\S]*\}", content)
            if m:
                content = m.group(0)
            data = json.loads(content)
            data["body"] = re.sub(r"\\n", "", data.get("body", ""))
            return data
        except Exception as e:
            attempt += 1
            time.sleep(2)
            prompt = "Your last output was invalid JSON, return only valid JSON.\n" + prompt
    raise ValueError("AI failed to return valid JSON")

In [10]:
# =============================
# 8. Coherence Score vs Sources
# =============================
def semantic_coherence_score(generated_html: str, source_articles: List[Dict[str, Any]], top_k: int = TOP_K_SOURCE_MATCH) -> float:
    gen_text = strip_html(generated_html)
    src_texts = [f"{a.get('title','')} {a.get('summary','')}" for a in source_articles]
    if not src_texts:
        return 0.0
    gen_emb = embed_model.encode([gen_text], convert_to_tensor=True, normalize_embeddings=True)
    src_embs = embed_model.encode(src_texts, convert_to_tensor=True, normalize_embeddings=True)
    sims = util.cos_sim(gen_emb, src_embs).cpu().numpy().flatten()
    sims.sort()
    sims = sims[::-1]
    k = min(top_k, len(sims))
    return round(float(sum(sims[:k]) / k), 4)

In [11]:
# =============================
# 9. Duplicate Detection (Unified, Embedding-Only)
# =============================
# - Build a single unified text (title + tags + excerpt + body) for both new and past posts
# - Compare cosine similarity; skip if >= DUPLICATE_SIMILARITY

from numpy import ndarray

def encode_paragraphs(text: str) -> List[np.ndarray]:
    paragraphs = [p.strip() for p in text.split('\n') if p.strip()]
    if not paragraphs:
        paragraphs = [text]
    embeddings = embed_model.encode(paragraphs, convert_to_numpy=True, normalize_embeddings=True)
    return embeddings, paragraphs


def is_duplicate_unified_paragraph(new_post: Dict[str, Any]) -> bool:
    memory = load_memory()
    if not memory:
        return False

    new_text = build_unified_text(
        new_post.get("title", ""),
        new_post.get("excerpt", ""),
        new_post.get("tags", []),
        new_post.get("body", "")
    )
    new_embs, new_paras = encode_paragraphs(new_text)

    max_sim_overall = -1.0
    for old in memory:
        old_text = old.get("body_text", "")
        old_embs, old_paras = encode_paragraphs(old_text)
        # Compute paragraph-wise similarity
        for i, ne in enumerate(new_embs):
            sims = [float(util.cos_sim(ne, oe)[0][0]) for oe in old_embs]
            para_max = max(sims)
            max_sim_overall = max(max_sim_overall, para_max)
            if para_max >= DUPLICATE_SIMILARITY:
                print(f"⛔ Duplicate detected in paragraph {i} against '{old.get('title','<unknown>')}' with sim={para_max:.3f}")
                return True

    print(f"ℹ️ Max paragraph similarity to memory: {max_sim_overall:.3f}")
    return False

In [12]:
# =============================
# 10. WordPress Helpers (Basic Auth, meta fields)
# =============================

def wp_headers() -> Dict[str, str]:
    token = base64.b64encode(f"{WP_USER}:{WP_PASS}".encode()).decode("utf-8")
    return {"Authorization": f"Basic {token}", "Content-Type": "application/json"}


def ensure_tags_exist(tag_names: List[str]) -> List[int]:
    headers = wp_headers()
    ids = []
    for tag in (tag_names or []):
        r = _http(headers, "GET", f"{WP_URL}/wp-json/wp/v2/tags?search={tag}")
        if r.status_code == 200 and r.json():
            ids.append(r.json()[0]["id"])
            print(f"[DEBUG] Tags Exist")
            continue
        cr = _http(headers, "POST", f"{WP_URL}/wp-json/wp/v2/tags", json={"name": tag})
        if cr.status_code in (200, 201):
            ids.append(cr.json()["id"])
    return ids


def post_to_wordpress(ai_post: Dict[str, Any], semantic_score: float) -> str:
    status_flag = "approved_auto" if semantic_score >= SEMANTIC_COHERENCE_THRESHOLD else "needs_review"
    headers = wp_headers()
    tag_ids = ensure_tags_exist(ai_post.get("tags", []))
    payload = {
        "title": ai_post.get("title", ""),
        "content": ai_post.get("body", ""),
        "excerpt": ai_post.get("excerpt", ""),
        "status": "draft",
        "tags": tag_ids,
        "meta": {
            "semantic_score": semantic_score,
            "status_flag": status_flag,
        },
    }
    r = _http(headers, "POST", f"{WP_URL}/wp-json/wp/v2/posts", json=payload)
    if r.status_code in (200, 201):
        print(f"✅ Draft created: {ai_post.get('title','')} (score={semantic_score}, flag={status_flag})")
        link = r.json().get("link", "")
        # persist to local memory with embedding
        unified_text = build_unified_text(
            ai_post.get("title",""), ai_post.get("excerpt",""), ai_post.get("tags",[]), ai_post.get("body","")
        )
        entry = {
            "utc_iso": datetime.utcnow().isoformat(),
            "title": ai_post.get("title",""),
            "excerpt": ai_post.get("excerpt",""),
            "tags": ai_post.get("tags", []),
            "body_text": strip_html(ai_post.get("body","")),
            "embedding": encode_unified(unified_text).tolist(),
            "wp_link": link,
        }
        add_to_memory(entry)
        return link
    print(f"❌ WP create failed: {r.status_code} {r.text}")
    return ""

In [13]:
# =============================
# 11. Main Flow
# =============================
def run_pipeline():
    articles = fetch_articles(RSS_FEEDS)
    clusters = cluster_articles(articles)
    if not clusters:
        print("No clusters found.")
        return

    # Limit number of clusters processed per run to save memory
    for cluster_idx, cluster in enumerate(clusters[:5]):
        feeds_covered = len({a.get('source_feed') for a in cluster})
        if len(cluster) < 2 or feeds_covered < 2:
            continue

        print(f"[DEBUG] Processing cluster {cluster_idx+1}/{len(clusters)} with {len(cluster)} articles")
        ai_post = generate_post_with_ai(cluster)

        if is_duplicate_unified_paragraph(ai_post):
            print(f"⛔ Cluster skipped due to duplication: {ai_post.get('title','')}")
            continue

        score = semantic_coherence_score(ai_post.get("body",""), cluster, TOP_K_SOURCE_MATCH)
        link = post_to_wordpress(ai_post, score)
        if link:
            print(f"Post URL: {link}")
            # Do not stop, process next clusters as well

In [14]:
# =============================
# 12. Run
# =============================
run_pipeline()

[DEBUG] Fetching feed: https://feeds.bbci.co.uk/news/technology/rss.xml
[DEBUG] 64 entries fetched from https://feeds.bbci.co.uk/news/technology/rss.xml
[DEBUG] Fetching feed: https://www.theverge.com/rss/index.xml
[DEBUG] 10 entries fetched from https://www.theverge.com/rss/index.xml
[DEBUG] Fetching feed: https://feeds.arstechnica.com/arstechnica/technology-lab
[DEBUG] 20 entries fetched from https://feeds.arstechnica.com/arstechnica/technology-lab
[DEBUG] Fetching feed: https://rss.nytimes.com/services/xml/rss/nyt/Technology.xml
[DEBUG] 28 entries fetched from https://rss.nytimes.com/services/xml/rss/nyt/Technology.xml
[DEBUG] Fetching feed: https://www.cnet.com/rss/news/
[DEBUG] 25 entries fetched from https://www.cnet.com/rss/news/
[DEBUG] Fetching feed: https://www.technologyreview.com/feed/
[DEBUG] 10 entries fetched from https://www.technologyreview.com/feed/
[DEBUG] Fetching feed: https://www.wired.com/feed/category/gear/latest/rss
[DEBUG] 20 entries fetched from https://www.w