# Feeds

> Series of utility tools to parse and manage Small Web feeds (RSS and Atom feeds)

In [None]:
#| default_exp feeds

## Imports

In [None]:
#| export

import concurrent.futures
import datetime
import feedparser
import os
import re 
import requests
import sqlite3
from collections import namedtuple
from langdetect import detect
from rich import print
from rich.progress import Progress

## Feeds DB

We will want to save diffent kind of information related to the feeds we process. We will save that information locally in a lightweigth SQLite database. Here are different kind of things we will want to save:

 - feed's ID (private key)
 - its language
 - number of entries
 - last time we downloaded it
 - type of feed
 - feed's URL
 - feed's title
 - feed's description
 - feed's author

### Connect to the Database

In [None]:
#| export

def connect_feeds_db() -> sqlite3.Connection:
    """Connect to the feeds database"""
    db_folder = os.environ.get('DB_PATH').rstrip('/')

    # create the db folder if not already existing
    if not os.path.exists(db_folder):
        os.makedirs(db_folder)

    conn = sqlite3.connect(f"{db_folder}/feeds.db")
    return conn

### Create DB

In [None]:
#| export

def create_feeds_db(conn: sqlite3.Connection):
    """Create the feeds database"""
    c = conn.cursor()
    c.execute('''CREATE TABLE IF NOT EXISTS feeds
                 (id str PRIMARY KEY, 
                  url text,                    
                  title text, 
                  description text, 
                  lang str,
                  feed_type str,
                  license str)''')
    c.close()
    conn.commit()

def create_articles_db(conn: sqlite3.Connection):
    """Create the articles database"""
    c = conn.cursor()
    c.execute('''CREATE TABLE IF NOT EXISTS articles
                 (id str PRIMARY KEY, 
                  feed_id str, 
                  title text, 
                  content text, 
                  creation_date datetime,
                  lang str,
                  license str, 
                  FOREIGN KEY (feed_id) REFERENCES feeds(feed_id))''')
    c.close()
    conn.commit()

## Sync Feeds

The local feeds needs to be synchroninzed with the Small Web index. Most of them will be new, but it is possible that some of the previous feeds gets removed from the feeds index. In that case, we have to remove the feed from the local system and the SQL database. The process is as fellow:

 1. check if some of the feeds got removed from the index
    1. if so, remove the feed from the local system
    2. remove the feed from the SQL database
 2. if the feed is not already on the file system, create a unique folder name for each of the new feed
 3. create a `DDMMYYYY` folder under the ID of the feed in the `FEEDS_PATH` folder
 4. download the feed's file in that folder

The local folder and file structure should be:

 - `FEEDS_PATH`
   - `feed_unique_folder`
     - `DDMMYYYY`
       - `feed.xml`
     - `DDMMYYYY`
       - `feed.xml`

### Get Feeds

The first step is to get le list of feeds for the Small Web. That list is available from the [Kagi Small Web index](https://github.com/kagisearch/smallweb/blob/main/smallweb.txt). Then for each of those feed in the list, we will download and save them locally in the `FEEDS_PATH` folder.

In [None]:
#| export

def get_small_web_feeds() -> list:
    """Get smallweb feeds from KagiSearch's github repository"""
    response = requests.get('https://raw.githubusercontent.com/kagisearch/smallweb/main/smallweb.txt')

    # Check if the request was successful
    if response.status_code == 200:
        # split the response into a list of lines
        return response.text.splitlines()
    else:
        return []

#### Tests

In [None]:
assert len(get_small_web_feeds()) > 0

### Feed ID

We build the unique ID of a feed from its URL. We use the following steps:

 1. For every character, if it not an alpha numeric character, we replace it with a `-`

This method is used to make sure we can use the ID to create files and directories on the local file system, as a private key in the BD, while keeping the ID readable. It could duplicate IDs if a non-alpha numeric character is the only differenciator of a URL, in which case both will be replaced by a `-` and the IDs will clash. But this is unlikely in short term and is good enough for now.

In [None]:
#| export

def get_feed_id_from_url(url: str) -> str:
    """Get the feed id from a feed url"""
    # Make feed folder name from URL by keeping alphanumeric characters only, and replacing everything else with a dash
    return ''.join(ch if ch.isalnum() else '-' for ch in url)

#### Tests

In [None]:
assert get_feed_id_from_url('https://example.com/feed.xml') == 'https---example-com-feed-xml'

### Process Removed Feeds From Index

It is possible that previously downloaded feeds get removed from the Small Web index. In this case, we get the latest version of the Small Web index, detect which was was removed, and remove it from the file system and the SQL database.

In [None]:
#| export

def gen_ids_index(index: list) -> list:
    """Return a list of IDs of the feeds in the index"""
    return [get_feed_id_from_url(url) for url in index]

#### Tests

In [None]:
index = ['https://example.com/feed.xml']
index2 = gen_ids_index(index)
assert index2 == ['https---example-com-feed-xml']

In [None]:
#| export

def process_removed_feed_from_index(index: list):
    """Process all the feeds that got removed from the SmallWeb index"""

    conn = connect_feeds_db()
    c = conn.cursor()
    ids_index = gen_ids_index(index)

    # get all the current feeds from FEEDS_PATH
    for folder in os.listdir(os.environ.get('FEEDS_PATH')):
        if folder not in ids_index:
            # remove the feed folder
            os.system(f'rm -rf {folder}')

            # remove from the database            
            
            c.execute(f"DELETE FROM articles WHERE feed_id = '{folder}'")
            c.execute(f"DELETE FROM feeds WHERE id = '{folder}'")
            conn.commit()

    c.close()
    conn.close()            

### Download a Feed

In [None]:
#| export

def download_feed(url: str):
    """Download a feed from a given url"""

    # create a folder for the feed if not already existing
    folder_path = f"{os.environ.get('FEEDS_PATH').rstrip('/')}/{get_feed_id_from_url(url)}"
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
        
    # Create the DDMMYYYY folder if it is not already existing
    date_folder_path = f"{folder_path}/{datetime.datetime.now().strftime('%d%m%Y')}"
    if not os.path.exists(date_folder_path):
        os.makedirs(date_folder_path)

    # only download if feed.xml is not existing
    if not os.path.exists(f"{date_folder_path}/feed.xml"):
        # Download the feed
        response = requests.get(url)

        # Check if the request was successful
        if response.status_code == 200:
            # Save the feed to the DDMMYYYY folder
            with open(f"{date_folder_path}/feed.xml", 'w') as f:
                f.write(response.text)
            #print(f"Downloaded feed from {url} to {date_folder_path}")
        else:
            print(f"Failed to download feed from {url}")

### Sync all the feeds from the index

In [None]:
#| export

def sync_feeds():
    """Sync all feeds from smallweb"""

    feeds = get_small_web_feeds()

    print("[cyan] Clean removed feed from the Small Web index...")
    process_removed_feed_from_index(feeds)

    with Progress() as progress:
        task = progress.add_task("[cyan]Downloading feeds locally...", total=len(feeds))

        def progress_indicator(future):
            "Local progress indicator callback for the concurrent.futures module."
            if not progress.finished:
                progress.update(task, advance=1)

        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            for url in feeds:

                futures = [executor.submit(download_feed, url)]

                # register the progress indicator callback for each of the future
                for future in futures:
                    future.add_done_callback(progress_indicator)

## Language Detection

We use the library [langdetect](https://pypi.org/project/langdetect/) to detect the language of a feed. We use the `detect` method of the library. We tried other avenues like Hugging Face madels, but the language detection performance and the processing performaces with not justifying the additional complexity for now (results were worse and much slower). You can check the file `01_language_detection.ipynb` for more details.

In [None]:
#| export

def detect_language(text: str):
    """Detect the language of a given text"""

    # remove all HTML tags from text
    text = re.sub('<[^<]+?>', '', text)

    # remove all HTML entities from text
    text = re.sub('&[^;]+;', '', text)

    # remove all extra spaces
    text = ' '.join(text.split())

    # return if the text is too short
    if len(text) < 64:
        return ''

    # limit the text to 4096 characters to speed up the 
    # language detection processing
    text = text[:4096]

    try:
        lang = detect(text)
    except:
        # if langdetect returns an errors because it can't read the charset, 
        # simply return an empty string to indicate that we can't detect
        # the language
        return ''

    return lang

### Tests

In [None]:
assert detect_language('This is a test') == ''
assert detect_language('This is a test' * 128) == 'en'

assert detect_language('Ceci est un test') == ''
assert detect_language('Ceci est un test' * 128) == 'fr'

assert detect_language('これはテストです') == ''
assert detect_language('これはテストです' * 128) == 'ja'

assert detect_language('이것은 테스트입니다') == ''
assert detect_language('이것은 테스트입니다' * 128) == 'ko'

assert detect_language('<br /><br /><br /><br /><br /><br /><br /><br /><br />This is a test') == ''
assert detect_language('<br /><br /><br /><br /><br /><br /><br /><br /><br />This is a test' * 128) == 'en'

## Parse a Local Feed

For any given feed URL, let's parse the local feed we downloaded for it and return an internal dictionary that represents it, whatever if it is a RSS or Atom feed. The internal representation of an small web article if represented by a `namedtuple`

In [None]:
#| export

Feed = namedtuple('Feed', ['id', 'url', 'title', 'description', 'lang', 'feed_type', 'license'])
Article = namedtuple('Article', ['url', 'feed', 'title', 'content', 'creation_date', 'lang', 'license'])

def parse_feed(feed_path: str, url: str):
    """Parse a feed from a given path and url"""

    feed_id = get_feed_id_from_url(url)
    parsed = feedparser.parse(feed_path)

    feed_title = parsed.feed.get('title', '')
    feed_description = parsed.feed.get('description', '')

    feed = Feed(feed_id,
                url,
                feed_title, 
                feed_description,
                detect_language(feed_title + feed_description),
                parsed.get('version', ''),
                parsed.get('license', ''))

    articles = []
    for entry in parsed.entries:
        article_title = entry.get('title', '')
        article_content = entry.description if 'description' in entry else entry.content if 'content' in entry else ''
        articles.append(Article(entry.get('link', ''),
                                feed_id,
                                article_title,
                                article_content,
                                entry.published if 'published_parsed' in entry else datetime.datetime.now(),
                                detect_language(article_title + article_content),
                                entry.get('license', '')))
    return feed, articles

## Sync Feeds DB from Local Cache

We do download all and every feeds locally and save them in a time stamped folder of the day where they were downloaded. We proceed that way such that we don't have to redownload all the feeds every time we change an internal process that requires us to parse the feeds again. We can just parse the local cache of the feeds we downloaded.

The synchronization occurs by simply creating one transaction per feed using INSERT OR INGORE which appears to be the fastest way to only add the new feeds and ignore the one that are already in the DB. This is also by far the simplest logic to implement and to reason about.

If the database is empty, then it will be fully populated with the cache of the provided DDMMYYY as input.

In [None]:
#| export

def sync_feeds():
    """Sync all feeds from smallweb"""

    feeds = get_small_web_feeds()

    print("[cyan] Clean removed feed from the Small Web index...")
    process_removed_feed_from_index(feeds)

    with Progress() as progress:
        task = progress.add_task("[cyan]Downloading feeds locally...", total=len(feeds))

        def progress_indicator(future):
            "Local progress indicator callback for the concurrent.futures module."
            if not progress.finished:
                progress.update(task, advance=1)

        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            for url in feeds:

                futures = [executor.submit(download_feed, url)]

                # register the progress indicator callback for each of the future
                for future in futures:
                    future.add_done_callback(progress_indicator)

def sync_feeds_db_from_cache(ddmmyyyy: str = datetime.datetime.now().strftime('%d%m%Y')):
    """Sync the feeds database from the cache. The cache by default to use is the one from today.
    It is possible to use a different cache by passing a different date in the format DDMMYYYY"""
    conn = connect_feeds_db()

    c = conn.cursor()

    urls = get_small_web_feeds()

    with Progress() as progress:

        task = progress.add_task("[cyan]Synching feeds DB from local cache...", total=len(urls))

        for url in urls:
            feed_id = get_feed_id_from_url(url)
            feed_folder = f"{os.environ.get('FEEDS_PATH').rstrip('/')}/{feed_id}"

            # it is possible the feed was not reachable last time it got scraped
            if not os.path.exists(feed_folder):
                progress.update(task, advance=1)
                continue

            # get the feed.xml path
            feed_path = f"{feed_folder}/{ddmmyyyy}/feed.xml"

            # if file does not exist, skip
            if not os.path.exists(feed_path):
                progress.update(task, advance=1)
                continue

            # parse the feed
            feed, articles = parse_feed(feed_path, url)

            # insert the feed into the database
            c.execute("INSERT OR IGNORE INTO feeds VALUES (?, ?, ?, ?, ?, ?, ?)", feed)

            # insert the articles into the database
            c.executemany("INSERT OR IGNORE INTO articles VALUES (?, ?, ?, ?, ?, ?, ?)", articles),

            conn.commit() 

            progress.update(task, advance=1)
    c.close()
    conn.close()

### Update the language of the feeds

The next step is to update the primary language of a feed. This is done by checking what is the highest number of articles with a certain language.

What the following SQLite query does, it to group by language and count the number of articles for each language. Then we order by the count in descending order and we limit the result to 1. This way, we get the language with the highest number of articles.

We have to take that result and to update the `feeds` table with the new language.

Note: it doesn't seems possible to do that in SQLite directly, if I am missing some feature of the query language, please propose a better solution and submit a PR.

In [None]:
#| export

def get_articles_lang_per_feeds():
    """Get the count of articles per language per feed"""
    conn = connect_feeds_db()
    c = conn.cursor()
    c.execute('''SELECT
                    fa.language,
                    fa.id
                 FROM (
                    SELECT
                        feeds.id,
                        feeds.url,
                        articles.lang AS language,
                        COUNT(*) AS lang_count
                    FROM feeds
                    LEFT JOIN articles ON articles.feed_id = feeds.id
                    GROUP BY feeds.id, feeds.url, articles.lang
                    ORDER BY feeds.id, lang_count DESC
                 ) AS fa
                            
                 GROUP BY fa.id''')
    rows = c.fetchall()
    conn.close()

    return rows

### Update the feeds table with the new languages

The next step is to take those results and to update the `feeds` table with the new language.

In [None]:
#| export

def update_feeds_with_languages(rows):
    """Update the feeds database with the language of the feed"""
    conn = connect_feeds_db()
    c = conn.cursor()
    c.executemany("UPDATE feeds SET lang = ? WHERE id = ?", rows)
    conn.commit()
    conn.close()

## Clean Small Web Index

This utility function is used to remove all the feeds that have been tagged as non-english. For the moment, only the ones that have been tagged with a non-english language will be added, the ones that the current heuristic couldn't determine the core language will be left in the index. Further work will be required for them.

In [None]:
#| export

def get_non_english_feeds():
    """Return the list of non-english feeds URL"""
    conn = connect_feeds_db()
    c = conn.cursor()
    c.execute('''SELECT url 
                 FROM feeds 
                 WHERE lang <> 'en' and lang <> '' 
                 ORDER BY lang DESC''')
    rows = c.fetchall()
    conn.close()

    return rows

Next step is to remove the feeds URLs from the Small Web index.

In [None]:
#| export

def get_cleaned_small_web_index():
    """Return the cleaned small web index"""

    index = get_small_web_feeds()
    non_english_feeds = get_non_english_feeds()

    # remove non-english feeds from the index
    for feed in non_english_feeds:
        index.remove(feed[0])

    # order the index by feed id
    index.sort()

    # write the index in a new text file
    with open('smallweb.txt', 'w') as f:
        for url in index:
            f.write(f"{url}\n")

    return index