In [10]:
import os
import requests
import feedparser
import hashlib
import time
from datetime import datetime
from newspaper import Article, Config
from google import genai
from google.genai import types
from dotenv import load_dotenv
import json

# Fetch and transform articles

In [None]:
RSS_FEEDS = {
    'bbc_news': 'http://feeds.bbci.co.uk/news/rss.xml',
}

# Newspaper3k configuration
# Set up a user-agent to mimic a browser and avoid being blocked
user_agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
np_config = Config()
np_config.browser_user_agent = user_agent
np_config.request_timeout = 10 # Set a timeout for requests

load_dotenv()
GEMINI_API_KEY = os.environ['GEMINI_API_KEY']
PROMPTLAYER_API_KEY = os.environ['PROMPTLAYER_API_KEY']

if not GEMINI_API_KEY or not PROMPTLAYER_API_KEY:
    raise ValueError("GEMINI_API_KEY and PROMPTLAYER_API_KEY must be set in the environment variables.")

client = genai.Client(api_key = GEMINI_API_KEY)

In [3]:
def get_url_hash(url):
    """Generate a unique hash for the URL."""
    return hashlib.sha256(url.encode("utf-8")).hexdigest()

def get_full_article_text(url):
    """Fetch the full article text using Newspaper3k."""

    try:
        article = Article(url, config = np_config)
        article.download()
        article.parse()
        return article.text
    except Exception as e:
        print(f"Error fetching article from {url}: {e}")
        return None

def fetch_new_articles():

    new_articles = []
    for feed_name, feed_url in RSS_FEEDS.items():
        print(f"Processing feed: {feed_name}")
        try:
            feed = feedparser.parse(feed_url)
            for entry in feed.entries:
                article_url = entry.link
                article_id = get_url_hash(article_url)
                published_time = time.struct_time(entry.get("published_parsed", time.gmtime()))
                full_text = get_full_article_text(article_url)
                if full_text is None:
                    print(f"Skipping article {article_url} due to download error.")
                    continue

                article_data = {
                    'id': article_id, # unique hash
                    'url': article_url,
                    'title': entry.get('title', 'N/A'),
                    'content': full_text,
                    'published_dt': datetime.fromtimestamp(time.mktime(published_time)).isoformat(),
                    'feed_source': feed_name,
                    'ingestion_timestamp_utc': datetime.utcnow().isoformat()
                    }
                new_articles.append(article_data)
        except Exception as e:
            print(f"Error processing feed {feed_name}: {e}")

        print(f"\nFetch complete. Found {len(new_articles)} new articles.")
        return new_articles

def generate_response(system_instruction, user_content, max_output_tokens):
    
    response = client.models.generate_content(
        model = "gemini-2.0-flash-lite",
        contents = user_content,
        config = types.GenerateContentConfig(
            system_instruction = system_instruction,
            temperature = 0,
            candidate_count = 1,
            max_output_tokens = max_output_tokens
        )
    )

    if response.text:
        return response.text.strip()
    return None

def get_embedding(text):
    """Generate an embedding for the given text using Gemini."""

    try:
        response = client.models.embed_content(
            model = "text-embedding-004",
            contents = text, 
        )
        embeddings = response.embeddings[0].values

        return embeddings
    except Exception as e:
        print(f"Error generating embedding: {e}")
        return None

In [4]:
new_articles = fetch_new_articles()

Processing feed: bbc_news

Fetch complete. Found 34 new articles.


In [None]:
summary_prompt = f"Summarize the given news article in 2 to 3 concise bullet points, capturing the main topics discussed."
categorization_prompt = f"Analyze the given article and classify it into the single most relevant category from this list: Technology, Finance, World News, Science, Health, Politics, Sports, Entertainment. Respond with only the category name."

for article in new_articles:
    content = article.get("content", "")
    if not content:
        print(f"Skipping article {article['url']} due to missing content.")
        continue

    # summazrize articles
    res = generate_response(summary_prompt, content, 512)
    article["summary"] = res

    # categorize articles
    res = generate_response(categorization_prompt, content, 10)
    article["category"] = res

    # embedding generation
    res = get_embedding(content)
    article["embedding"] = res

In [None]:
print(json.dumps(new_articles[0], indent = 4))

{
    "id": "9fc0601e3a63f79a6c44316ea6a3aa7bac5c88d6919a95ca4cb7161fa03845c1",
    "url": "https://www.bbc.com/news/articles/c5ypgg28nvpo?at_medium=RSS&at_campaign=rss",
    "title": "Police get new guidance on releasing suspects' ethnicity",
    "content": "Police get new guidance on releasing suspects' ethnicity\n\nIt comes after a series of high-profile cases including that of two men, reported to be Afghan asylum seekers, charged over the alleged rape of a 12-year-old in Warwickshire.\n\nDecisions on releasing such information will remain with police forces, with wider legal and ethical considerations also considered, the NPCC added.\n\nIt is hoped the change will reduce the risk to public safety where there are high levels of misinformation about an incident or in cases of significant public interest.\n\nPolice will be encouraged to disclose the ethnicity and nationality of suspects charged in high-profile and sensitive investigations under new guidance, the National Police Chief

# Set up database and load

In [None]:
from sqlalchemy import create_engine, Column, String, Text, Float, DateTime, inspect
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.postgresql import VECTOR

In [None]:
DB_USER = os.environ["user"]
DB_PASSWORD = os.environ["password"]
DB_HOST = "localhost"
DB_PORT = os.environ["5432"]
DB_NAME = os.environ["db"]
DB_CONNECTION_STRING = f"postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

Base = declarative_base()

In [None]:
class Article(Base):
    __tablename__ = "articles"

    id = Column(String, primary_key = True)
    url = Column(String, unique = True, nullable = False)
    title = Column(String)
    published_dt = Column(DateTime(timezone = True))
    feed_source = Column(String)
    summary = Column(String)
    category = Column(String)
    embedding = Column(VECTOR(768))

def setup_database(engine):

    try:
        with engine.connect() as connection:
            connection.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))
            Base.metadata.create_all(engine, checkfirst = True)

            inspector = inspect(engine)
            if inspector.has_table("articles"):
                return True
            else:
                return False
    except Exception as e:
        print(f"Error setting up database: {e}")
        return False

def load_articles_to_db(articles_to_load):
    """Load articles into the database, updating existing ones."""

    if not articles_to_load:
        return

    engine = create_engine(DB_CONNECTION_STRING)

    if not setup_database(engine):
        print("Database setup failed. Exiting.")
        return
    
    Session = sessionmaker(bind = engine)
    session = Session()

    try:
        print(f"Preparing to upsert {len(articles_to_load)} articles into the database.")
        stmt = pg_insert(Article).values(articles_to_load)

        update_columns = {
            'title': stmt.excluded.title,
            'summary': stmt.excluded.summary,
            'category': stmt.excluded.category,
            'published_dt': stmt.excluded.published_dt,
            'url': stmt.excluded.url,
            'feed_source': stmt.excluded.feed_source,
            'embedding': stmt.excluded.embedding
        }

        upsert_stmt = stmt.on_conflict_do_update(
            index_elements = ['id'],
            set_ = update_columns
        )

        session.execute(upsert_stmt)
        session.commit()
    except Exception as e:
        print(f"Error during upsert operation: {e}")
        session.rollback()
    finally:
        session.close()

In [None]:
load_articles_to_db(new_articles)