Feed Parser and Data Extraction:
○ Create a script that reads the provided list of RSS feeds.
○ Parse each feed and extract relevant information from each news article,
including title, content, publication date, and source URL.
○ Ensure handling of duplicate articles from the same feed.

In [1]:
import feedparser
from sqlalchemy import create_engine, Column, String, Text, DateTime, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime

Base = declarative_base()

class NewsArticle(Base):
    __tablename__ = 'news_articles'
    id = Column(Integer, primary_key=True)
    title = Column(String, nullable=False)
    content = Column(Text)
    pub_date = Column(DateTime)
    source_url = Column(String, unique=True, nullable=False)

def parse_rss_feed(feed_url, processed_titles):
    feed = feedparser.parse(feed_url)

    articles = []

    for entry in feed.entries:
        title = entry.title.strip()
        if title not in processed_titles:
            content = entry.get('summary', entry.get('description', ''))
            pub_date = entry.get('published_parsed', entry.get('updated_parsed', None))
            source_url = entry.link

            articles.append({
                'title': title,
                'content': content,
                'pub_date': pub_date,
                'source_url': source_url
            })

            processed_titles.add(title)

    return articles

rss_feeds = [
    'http://rss.cnn.com/rss/cnn_topstories.rss',
    'http://qz.com/feed',
    'http://feeds.foxnews.com/foxnews/politics',
    'http://feeds.reuters.com/reuters/businessNews',
    'http://feeds.feedburner.com/NewshourWorld',
    'https://feeds.bbci.co.uk/news/world/asia/india/rss.xml',
]

processed_titles = set()

for feed_url in rss_feeds:
    articles = parse_rss_feed(feed_url, processed_titles)
    print(f"Articles from {feed_url}:\n")
    for article in articles:
        print(f"Title: {article['title']}")
        print(f"Content: {article['content']}")
        print(f"Publication Date: {article['pub_date']}")
        print(f"Source URL: {article['source_url']}")
        print("\n" + "-"*50 + "\n")


  Base = declarative_base()


Articles from http://rss.cnn.com/rss/cnn_topstories.rss:

Title: Some on-air claims about Dominion Voting Systems were false, Fox News acknowledges in statement after deal is announced
Content: 
Publication Date: time.struct_time(tm_year=2023, tm_mon=4, tm_mday=19, tm_hour=12, tm_min=44, tm_sec=51, tm_wday=2, tm_yday=109, tm_isdst=0)
Source URL: https://www.cnn.com/business/live-news/fox-news-dominion-trial-04-18-23/index.html

--------------------------------------------------

Title: Dominion still has pending lawsuits against election deniers such as Rudy Giuliani and Sidney Powell
Content: 
Publication Date: None
Source URL: https://www.cnn.com/business/live-news/fox-news-dominion-trial-04-18-23/h_8d51e3ae2714edaa0dace837305d03b8

--------------------------------------------------

Title: Here are the 20 specific Fox broadcasts and tweets Dominion says were defamatory
Content: • Fox-Dominion trial delay 'is not unusual,' judge says
• Fox News' defamation battle isn't stopping Trump

2. Database Storage:
○ Design a database schema to store the extracted news article data.
○ Implement logic to store new articles in the database without duplicates.

In [2]:
from sqlalchemy import create_engine, Column, String, DateTime, Integer, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class NewsArticle(Base):
    __tablename__ = 'news_articles'
    id = Column(Integer, primary_key=True)
    title = Column(String)
    content = Column(Text)
    publication_date = Column(DateTime)
    source_url = Column(String)
    category = Column(String)

engine = create_engine('postgresql://postgres:123456@localhost/postgres')
Base.metadata.create_all(engine)

articles = [
    {
        'title': 'Sample Title 1',
        'content': 'Sample Content 1',
        'pub_date': '2022-01-01T12:00:00',
        'source_url': 'http://example.com/article1',
    },
    {
        'title': 'Sample Title 2',
        'content': 'Sample Content 2',
        'pub_date': '2022-01-02T14:30:00',
        'source_url': 'http://example.com/article2',
    },
]

def save_articles(articles):
    Session = sessionmaker(bind=engine)
    session = Session()
    for article in articles:
        article['publication_date'] = article.pop('pub_date')
        news_article = NewsArticle(**article)
        session.add(news_article)
    session.commit()

save_articles(articles)


  Base = declarative_base()


3. Task Queue and News Processing:
○ Set up a Celery queue to manage asynchronous processing of new articles.

○ Configure the parser script to send extracted articles to the queue upon arrival.
○ Create a Celery worker that consumes articles from the queue and performs
further processing:
■ Category classification: Utilize NLTK or spaCy to classify each article into
the provided categories.
■ Update the database with the assigned category for each article.

In [3]:
from celery import Celery
from celery.signals import worker_process_init
from sqlalchemy import create_engine, Column, String, Text, DateTime, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords

app = Celery('news_processing', broker='pyamqp://guest:guest@localhost//')

stop_words = set(stopwords.words('english'))

engine = create_engine('postgresql://postgres:123456@localhost/postgres')
Session = sessionmaker(bind=engine)

Base = declarative_base()

class NewsArticle(Base):
    __tablename__ = 'news_articles'
    id = Column(Integer, primary_key=True)
    title = Column(String, nullable=False)
    content = Column(Text)
    pub_date = Column(DateTime)
    source_url = Column(String, unique=True, nullable=False)
    category = Column(String)

def classify_category(content):
    # Your category classification logic using NLTK or spaCy goes here
    return 'Uncategorized'

@app.task
def process_article(article):
    session = Session()

    try:
        existing_article = session.query(NewsArticle).filter_by(source_url=article['source_url']).first()

        if not existing_article:
            category = classify_category(article['content'])

            new_article = NewsArticle(
                title=article['title'],
                content=article['content'],
                pub_date=datetime.utcfromtimestamp(article['pub_date']),
                source_url=article['source_url'],
                category=category
            )
            session.add(new_article)
            session.commit()

    except Exception as e:
        logging.error(f"Error processing article: {str(e)}")

    finally:
        session.close()

@worker_process_init.connect
def configure_workers(**kwargs):
    global Session
    Session = sessionmaker(bind=engine)


  Base = declarative_base()


Logging and Error Handling:
○ Implement proper logging throughout the application to track events and potential
errors.
○ Handle parsing errors and network connectivity issues gracefully.

In [4]:
import logging

app = Celery('news_processing', broker='pyamqp://guest:guest@localhost//')

stop_words = set(stopwords.words('english'))

engine = create_engine('postgresql://postgres:123456@localhost/postgres')
Session = sessionmaker(bind=engine)

Base = declarative_base()

class NewsArticle(Base):
    __tablename__ = 'news_articles'
    id = Column(Integer, primary_key=True)
    title = Column(String, nullable=False)
    content = Column(Text)
    pub_date = Column(DateTime)
    source_url = Column(String, unique=True, nullable=False)
    category = Column(String)

def classify_category(content):
    # Your category classification logic using NLTK or spaCy goes here
    return 'Uncategorized'

@app.task
def process_article(article):
    session = Session()

    try:
        existing_article = session.query(NewsArticle).filter_by(source_url=article['source_url']).first()

        if not existing_article:
            category = classify_category(article['content'])

            new_article = NewsArticle(
                title=article['title'],
                content=article['content'],
                pub_date=datetime.utcfromtimestamp(article['pub_date']),
                source_url=article['source_url'],
                category=category
            )
            session.add(new_article)
            session.commit()

    except Exception as e:
        logging.error


  Base = declarative_base()
