In [33]:
import pandas as pd
from datetime import datetime, timedelta
import re
import feedparser
import hashlib
from datetime import datetime
from typing import List
import requests
from bs4 import BeautifulSoup


## Scraping Data (Part 1)

Ideally you want to start by collapsing all the parts (1 of 4)

After Scouting the basic format of rss feed,\
I first scraped all the rss links

In [None]:
import feedparser
import hashlib
from datetime import datetime
import pandas as pd


RSS_FEEDS = [
    "http://rss.cnn.com/rss/cnn_topstories.rss",
    "http://qz.com/feed",
    "http://feeds.foxnews.com/foxnews/politics",
    "http://feeds.reuters.com/reuters/businessNews",
    "http://feeds.feedburner.com/NewshourWorld",
    "https://feeds.bbci.co.uk/news/world/asia/india/rss.xml"
]

def parse_rss_feeds() -> pd.DataFrame:
    articles_data = []

    for feed_url in RSS_FEEDS:
        feed = feedparser.parse(feed_url)
        for entry in feed.entries:
            title = entry.title if hasattr(entry, 'title') else 'NA'
            content = entry.summary if hasattr(entry, 'summary') else (entry.title_detail.value if hasattr(entry, 'title_detail') else 'NA')
            published = entry.published if hasattr(entry, 'published') else 'NA',
            url = entry.link if hasattr(entry, 'link') else 'NA'
            source = feed.feed.title if hasattr(feed.feed, 'title') else 'NA'

            article_data = {
                "title": title,
                "content": content,
                "published": published,
                "url":url,
                "source": source
            }
            articles_data.append(article_data)

    df = pd.DataFrame(articles_data)
    return df

articles_df = parse_rss_feeds()
print(articles_df)


### After The links next was to scrape the body and headers

In [None]:

def scrape_headers_body(url):
    try:
        response = requests.get(url)
        
        if response.status_code == 200:
            soup = BeautifulSoup(response.content, 'html.parser')
            
            headers = [header.text.strip() for header in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])]
            body_text = [text.text.strip() for text in soup.find_all('p')]
            
            return headers, body_text
        else:
            print(f"Error: Unable to retrieve content from the URL: {url}")
            return [], []
    except Exception as e:
        print(f"Error occurred while scraping URL: {url}. Error: {str(e)}")
        return [], []

articles_df[['headers', 'body_text']] = articles_df['url'].apply(lambda x: pd.Series(scrape_headers_body(x)))

print(articles_df)


Will save the Failed links here to revisit later

In [None]:
failed_df = pd.DataFrame(columns=articles_df.columns)

articles_df['headers'] = articles_df['headers'].apply(lambda x: None if len(x) < 3 else x)
articles_df['body_text'] = articles_df['body_text'].apply(lambda x: None if len(x) < 3 else x)
articles_df['published'] = articles_df['published'].apply(lambda x: None if x == "('NA',)" else x)

failed_df = failed_df.append(articles_df[(articles_df['headers'].isna()) | (articles_df['body_text'].isna())])

articles_df = articles_df.dropna(subset=['headers', 'body_text'])

print("Modified Articles DataFrame:")
print(articles_df)
print("\nFailed URLs DataFrame:")
print(failed_df)


## Database (PART 2)

The database should be explained in the readme file

 
### Preprocess
Before ingesting it to the database we must preprocess it \
Since, We have articles, I took the liberty of loading it from file.

In [31]:
articles_df = pd.read_csv("Hey2c.csv")
articles_df.set_index('Unnamed: 0', inplace=True)
articles_df.index.name = 'index'
articles_df = articles_df.drop(index=1)

In [34]:
## On a broad level:
# Fix body_text that was scraped as a list and fixed time which is the most important part when we put it in  a database

import pandas as pd

def preprocess_text(text):

    if isinstance(text, str):
        text = re.sub(r'[^a-zA-Z\s]', '', text)
        text = text.lower()
        text = re.sub(r'http\S+', '', text)
        words_to_remove = ['Read more', 'Click here', 'This video can not be played', 'CNN values your feedback', 'Markets', 'Hot Stocks', 'Fear & Greed Index','CNN values your feedback','Markets', 'Hot Stocks', 'Fear & Greed Index', 'Latest Market News', 'Hot Stocks']
        text = re.sub(r'|'.join(map(re.escape, words_to_remove)), '', text,flags=re.IGNORECASE)
        return text.strip()
    
    elif isinstance(text, list):
        processed_text = [preprocess_text(sub_text) for sub_text in text if sub_text.strip()]
        return processed_text
    else:
        return text


def preprocess_date(date_string):

    if 'GMT' not in date_string and 'UTC' not in date_string:
        time_zone_offset_str = date_string[-5:]
        date_obj = datetime.strptime(date_string[:-6], '%a, %d %b %Y %H:%M:%S')

        time_zone_offset = timedelta(
            hours=int(time_zone_offset_str[1:3]),
            minutes=int(time_zone_offset_str[3:])
        )

        if time_zone_offset_str[0] == '+':
            date_obj = date_obj - time_zone_offset
        elif time_zone_offset_str[0] == '-':
            date_obj = date_obj + time_zone_offset

        date_utc_str = date_obj.strftime('%Y-%m-%d %H:%M:%S+00:00')

        return date_utc_str
    else:
        if 'GMT' in date_string:
            date_string = date_string.replace('GMT', '+0000')
        elif 'IST' in date_string:
            date_string = date_string.replace('IST', '+0530') 

        return preprocess_date(date_string)


def preprocess_dataframe(df):
    
    df = df.applymap(lambda x: None if str(x) in ('NA',"[]") else x)

    try:
        df['published'] = df['published'].apply(lambda x: None if x is None or len(str(x)) < 8 else x)
        df.dropna(inplace=True)
        df['published'] = df['published'].str[2:-3].apply(preprocess_date)
    except Exception as e:
        print(f"Error occurred in fixing date format: {e}")

    
    df.dropna(inplace=True)
    df["body_text"] = df["body_text"].apply(lambda x: preprocess_text(x))

    return df
    

df = articles_df.copy()

# Preprocess dataframe
df = preprocess_dataframe(df)



In [35]:
df.head(5)

Unnamed: 0_level_0,title,content,published,url,source,headers,body_text
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
0,Some on-air claims about Dominion Voting Syste...,Some on-air claims about Dominion Voting Syste...,2023-04-19 12:44:51+00:00,https://www.cnn.com/business/live-news/fox-new...,CNN.com - RSS Channel - HP Hero,"['CNN values your feedback', 'Settlement reach...",our live coverage has ended follow the latest ...
2,Here are the 20 specific Fox broadcasts and tw...,"• Fox-Dominion trial delay 'is not unusual,' j...",2023-04-17 16:01:11+00:00,https://www.cnn.com/2023/04/17/media/dominion-...,CNN.com - RSS Channel - HP Hero,"['CNN values your feedback', 'Here are the 20 ...",fear greed index for all the interest in bi...
3,Judge in Fox News-Dominion defamation trial: '...,The judge just announced in court that a settl...,2023-04-19 08:28:17+00:00,https://www.cnn.com/2023/04/18/media/fox-domin...,CNN.com - RSS Channel - HP Hero,"['CNN values your feedback', 'Fox News settles...",fear greed index fox news reached a lastsec...
4,'Difficult to say with a straight face': Tappe...,A settlement has been reached in Dominion Voti...,2023-04-18 21:17:44+00:00,https://www.cnn.com/videos/politics/2023/04/18...,CNN.com - RSS Channel - HP Hero,"['CNN values your feedback', '‘Difficult to sa...",cable news network a warner bros discovery com...
5,Millions in the US could face massive conseque...,"• DeSantis goes to Washington, a place he once...",2023-04-18 20:34:45+00:00,https://www.cnn.com/2023/04/18/politics/mccart...,CNN.com - RSS Channel - HP Hero,"['CNN values your feedback', 'The US economy c...",millions of americans could face massive conse...


- Inserted the cleaned dataframe to postgres

In [56]:
from sqlalchemy import create_engine
import pandas as pd

db_string = 'postgresql://kaustubh:concept@localhost:4111/thetimeisnow'
engine = create_engine(db_string)

table_name = 'articles'

df.to_sql(table_name, engine, if_exists='append', index=False)

print("Row inserted successfully!")


Row inserted successfully!


## Categorization (Part 3)

- Explored a few methods, but since i was on a deadline, i settled on using a huggingface pipeline after trying and testing
- Originally i was going to use spacy text classification models, but this is faster
- Also incorporated a small threshold to decide whether it really cant be classfied anywhere else but "Others"

In [None]:
from transformers import pipeline
import pandas as pd

def categorize_text(df):
    classifier = pipeline("zero-shot-classification", model="MoritzLaurer/mDeBERTa-v3-base-mnli-xnli")

    categories = ["terrorism", "natural disaster","positive","protest","riot"]


    threshold = 0.1  # You can adjust this threshold as needed

    for index, row in df.iterrows():
        text = row['body_text']
        result = classifier(text, candidate_labels=categories)
        category_scores = {label: score for label, score in zip(result['labels'], result['scores'])}
        print("="*12)
        scores_sum = sum(result['scores'])
        print(scores_sum)
        scores_len = len(result['scores'])
        print(scores_len)
        scores_avg = scores_sum / scores_len
        print(scores_avg)
        is_close = all(abs(score - scores_avg) < threshold for score in result['scores'])
        print(abs(score - scores_avg),'<',threshold)
        if is_close:
            top_category = "other"
        else:
            top_category = result['labels'][0]
        df.at[index, 'category'] = top_category

    return df

df_with_categories = categorize_text(df)
print(df_with_categories)


0.9999999478459358
5
0.19999998956918716
0.20630082339048386 < 0.1
1.000000026077032
5
0.20000000521540642
0.2063008077442646 < 0.1
0.999999949708581
5
0.1999999899417162
0.20630082301795483 < 0.1
0.9999999552965164
5
0.19999999105930327
0.20630082190036775 < 0.1
1.0000000186264515
5
0.2000000037252903
0.2063008092343807 < 0.1
1.0000000223517418
5
0.20000000447034835
0.20630080848932267 < 0.1
0.9999999552965164
5
0.19999999105930327
0.20630082190036775 < 0.1
0.9999999776482582
5
0.19999999552965164
0.20630081743001938 < 0.1
1.0
5
0.2
0.206300812959671 < 0.1
1.0000000298023224
5
0.20000000596046447
0.20630080699920655 < 0.1
0.9999999776482582
5
0.19999999552965164
0.20630081743001938 < 0.1
0.9999999552965164
5
0.19999999105930327
0.20630082190036775 < 0.1
0.9999999813735485
5
0.1999999962747097
0.2063008166849613 < 0.1
0.9999999925494194
5
0.19999999850988387
0.20630081444978715 < 0.1
1.0000000223517418
5
0.20000000447034835
0.20630080848932267 < 0.1
0.9999999850988388
5
0.1999999970197

In [43]:
df.to_csv("holamigos.csv")

## Celery (Part 4)

Retrieve articles with article Id and have category column

#### Testing
Before i tried going ahead with anything i wanted to ensure that postgres and celery can work together well, This is what i created to see if i can asynchronously, post and retrieve data into the dataframe

In [None]:
from celery import Celery
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, DateTime, ARRAY
from sqlalchemy.sql import func

os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')

app = Celery('tasks', broker='redis://localhost:6379/0')
db_string = 'postgresql://kaustubh:concept@localhost:4111/thetimeisnow'
engine = create_engine(db_string)


logging.basicConfig(filename='celery_errors.log', level=logging.ERROR)

metadata = MetaData()

articles = Table(
    'articles',
    metadata,
    Column('article_id', Integer, primary_key=True),
    Column('title', String),
    Column('title_details', String),
    Column('published_date', DateTime(timezone=True), default=func.now()),
    Column('url', String),
    Column('source_id', Integer),
    Column('genre_id', ARRAY(String)),
    Column('body', String),
    Column('header', String)
)

@app.task
def insert_into_articles(data):

    try:
        pass
        with engine.begin() as connection:
            connection.execute(articles.insert().values(data))
            print("executed")
            
    except Exception as e:
        logging.error(f'Error in insertion: {e}')
        raise
        


data = {
    'title': 'Sample Title 6',
    'title_details': 'Sample Title Details',
    'published_date': '2024-01-29 12:00:00' ,
    'url': 'http://example.com',
    'source_id': 101,
    'genre_id': [1,2,3],
    'body': 'Sample Body',
    'header': 'Sample Header'
}
#

insert_into_articles.delay(data)


with open('celery_errors.log', 'r') as log_file:
    print(log_file.read())


- Extracted Fresh data to be able to apply the preprocessing through celery.
- Also appended category column

In [77]:
from sqlalchemy import create_engine
import pandas as pd
from sqlalchemy import text as txt

db_url = 'postgresql://kaustubh:concept@localhost:4111/thetimeisnow'
engine = create_engine(db_url)


query = """
    SELECT article_id, body_text
    FROM articles;
"""

with engine.connect() as connection:
    result = connection.execute(txt(query))
    df = pd.DataFrame(result.fetchall(), columns=result.keys())

df['category'] = ''

df = df.head(16)

print(df)
gdf=df.copy()

    article_id                                          body_text category
0          229  our live coverage has ended follow the latest ...         
1          230  fear  greed index   for all the interest in bi...         
2          231  fear  greed index   fox news reached a lastsec...         
3          232  cable news network a warner bros discovery com...         
4          233  millions of americans could face massive conse...         
5          234  the yearold white man accused of shooting a bl...         
6          235  cable news network a warner bros discovery com...         
7          236  its sourdough bread and handstands for jake gy...         
8          237  a tiny intruder infiltrated white house ground...         
9          238  jamie foxx remains hospitalized in georgia nea...         
10         239  a yearold in ohio has died after he took a bun...         
11         240  cable news network a warner bros discovery com...         
12         241  fear  gre

### Main

Here i am creating multiple asynchronous tasks of categorization so they work parallely

#### Explanation

- Break dataframe into chunks
- Run pipeline for each chunk (Divided between celery workers)
- Append all parts
- Success

In [None]:
from celery import Celery, group
import pandas as pd
import os
import logging

os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

logging.basicConfig(filename='celery_errors.log', level=logging.ERROR)


def chunk_dataframe(df, chunk_size=2):
    chunks = [df.iloc[i:i + chunk_size].to_json() for i in range(0, len(df), chunk_size)]
    print("Dataframe has been chunked",chunks)
    return chunks



@app.task
def categorize_text_chunk(chunk_json):
    chunk = pd.read_json(chunk_json, orient='records')
    print("inside text chunk right before biggie",chunk)
    return chunk.to_json()
    #return categorize_text(chunk)


def categorize_text_celery(df):
    try:

        chunk_jsons = chunk_dataframe(df)

        print("jsons")

        tasks = [categorize_text_chunk.s(chunk_json) for chunk_json in chunk_jsons]

        print("tasks was complete",tasks)
        try:
            results = group(tasks).apply_async().get(timeout=10)
        except:
            pass

        print("\nresults was complete",results)
        reconstructed_chunks = [pd.read_json(result, orient='records') for result in results]

        # Concatenate reconstructed chunks into a single DataFrame
        categorized_df = pd.concat(reconstructed_chunks, ignore_index=True)

            
    except Exception as e:
        logging.error(f'Error in insertion: {e}')
        raise



df=gdf.copy().head(4)
categorized_df = categorize_text_celery(df)

print(categorized_df)


### In Progress
- However this bit here is incomplete and wont run, there is an issue with how celery handles dataframes, since its messaging service is based on json, we cant save a more complex data format in a less complex one.\

- Theres also an issue in how celery gathers results of asynced tasks.