In [1]:
import time
import hashlib
import pandas as pd

import csv
import requests
from os.path import exists
from bs4 import BeautifulSoup

In [2]:
# For Compression
import json
import gzip

In [3]:
# For Data fetching
from pymongo import MongoClient

Some Implementation notes on GCP to consider:
* We are opting for batch prediction to reduce the number of cold starts on GKE (that is 0 -> X number)
* Since we are opting for batch prediction, it may be true that NOT ALL scheduled RSS feed scraping will end up going through the prediction pipeline. I set it where if the length of the RSS_List exceeds a number of articles we have, then we flush it and send it to the ML pipeline. After that we destroy the RSS_List object and instaniate a new one.
* This pipeline needs to be robust, there is a risk with batching. If one article fails, we cannot just throw the entire batch out!

The general idea is that:
<br>
PyMongo -> fetches RSS_Configurations and Links
<br>
RSS_Configurations and Links -> Iterate over to parse
<br>
RSS_Configurations and Links -> If Quota is met, flush and push to the ML API and Database as well
<br>

Extras:
<br>
Can we run a EDA or Data insights for batched data? Match distributions like using a Kolmogorov-Smirnov test?
<br>
Catch data drift?

# PyMongo -> RSS_List 

Below is a an ideal list after fetching the information from PyMongo, it could be thought of as the input.

In [4]:
# These are examples of 3 links to use with appropriate configurations
rss_objects = [
    (
        'https://rss.nytimes.com/services/xml/rss/nyt/World.xml',
        {
            "scrap_tags": ["title", "description", "dc:creator", "pubDate"]
        }
    ), 
    (
        'https://rss.nytimes.com/services/xml/rss/nyt/YourMoney.xml',
        {
            "scrap_tags": ["title", "description", "dc:creator", "pubDate"]
        }
    ),
    (
        'https://www.wgbh.org/tags/bunp.rss',
        {
            "scrap_tags": ["title", "link", "description", "content:encoded", "category", "pubDate"]
        }
    )    
]

# Just GBH
rss_objects = [
    (
        'https://www.wgbh.org/tags/bunp.rss',
        {
            "scrap_tags": ["title", "link", "description", "content:encoded", "category", "pubDate"]
        }
    )    
]

# RSS_Request_Object

A request object that has all the necessary functions to parse, fetch, and maybe load. 
<br>
This will be a seperate file.

In [None]:
class rss_request_obj:
    """
    An RSS Request Object.
    rss_url: str - The url to scrap at. Must be a valid RSS URL
    scrap_configuration: dict - Settings for the RSS Request Object
    """
    def __init__(self, rss_url: str, scrap_configuration: dict):
        self.rss_url = rss_url
        self.scrap_configuration = scrap_configuration
        self.data_bucket = []
        self.article_count = 0

        if (not self.validate_settings()):
            raise Exception(f"RSS Object Failed to Init: Errors found in settings configuration")

    def getDataBucket(self):
        """
        Gets all the data parsed from the request object so far.
        """
        return self.data_bucket

    def getArticleCount(self):
        """
        Returns the count of the articles of the RSS request object.
        """
        return self.article_count

    def validate_settings(self):
        """
        Ensures that scrap configuration settings is appropriate.

        I will add more later...

        Returns: 
        Boolean indicated whether the configuration dict abides the data schema
        """
        data_schema = ["scrap_tags"] # I hard coded this, but in the future, this will be dynamic, you can add to this
        
        for schema_part in data_schema:
            if (schema_part not in self.scrap_configuration.keys()):
                raise Exception(f"RSS Object Failed to Init: Missing setting '{schema_part}' in dict")
        if (len(self.scrap_configuration["scrap_tags"]) != len(set(self.scrap_configuration["scrap_tags"]))):
            raise Exception(f"RSS Object Failed to Init: Duplicate tag found!")
        
        return True
    
    def rss_request(self):
        """
        RSS Fetch for the the links. Should return an xml tree of the scrapped items.

        Exception:
        returns void if request fails or xml tree could not be built

        Returns:
        BeautifulSoup object that holds the xml tree
        """
        try:
            headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'}
            r = requests.get(self.rss_url, headers=headers)
            soup = BeautifulSoup(r.content, features='lxml-xml')
            return soup
        except Exception as e:
            print(f"> Job Failed: Failed Fetching RSS Feed! RSS Request Object at {self.rss_url} threw an error!")
            print(f"{e}\n")
            return

    def rss_parse(self):
        """
        RSS Fetch for the the links. Should return an xml tree of the scrapped items.

        Exception:
        returns void if there is failure in fields

        Returns:
        BeautifulSoup object that holds the xml tree
        """
        try:
            if (self.rss_url == None):
                raise Exception("RSS Parsed Failed: rss_url is of NoneType!")
            xml_tree = self.rss_request()
            items = xml_tree.select('channel > item')
            if (len(items) == 0):
                raise Exception("RSS Parsed Failed: <item> tag could not be found!")
            for item in items: # This is O(n^2), I don't like it, but I think its the best for now...
                tags = self.scrap_configuration["scrap_tags"]
                entry = {}
                # Duplication check, we assume that duplicate articles will have the same first tag content in scrap_tags.
                if (self.checkDuplicates(tags[0], (item.find(tags[0])).text)):
                    continue
                    
                for tag in tags:
                    data_obj = (item.find(tag))
                    if (data_obj == None):
                        print(f"{tag} was not found in RSS feed.")
                        continue
                    entry[tag] = data_obj.text
                self.data_bucket.append(entry)
                self.article_count += 1
        except Exception as e:
            print(f"> Job Failed: Failed Parsing RSS Feed! RSS Request Object at {self.rss_url} threw an error!")
            print(f"{e}\n")
            return

    def checkDuplicates(self, key, value):
        """
        Checks for duplicate items by taking a key (tag) and a value of that tag and comparing
        it against 
        
        Returns:
        Boolean representing if the item already exists
        """
        return any(d[key] == value for d in self.data_bucket)



# RSS_List

This is a wrapper for RSS_Request Objects.
<br>
This will be a seperate file.
<br>
To Do:
Add Try catch blocks so we don't block the flow of execution.

In [None]:
class RSS_List:
    def __init__(self):
        self.RSS_Request_Bucket = []
        self.total_article_counts = 0
        self.unparsed = []
        self.flushed = False

    def __repr__(self):
        display = ""
        display += "=================================================================\n"
        display += (f"Total Article Counts: {self.total_article_counts}\n")
        display += (f"# Request Objects in RSS List: {len(self.RSS_Request_Bucket)}\n")
        display += (f"Unparsed Indexes of RSS Request Objects: {self.unparsed}\n")
        display += "=================================================================\n"

        for rss_obj in self.RSS_Request_Bucket:
            if (len(rss_obj.getDataBucket()) != 0):
                display += (f"\n {rss_obj.getDataBucket()}\n")
                display += "\n=================================================================\n"
        
        return display

    def getRSSBucketList(self):
        """
        Gets the the list of RSS_Request_Objects
        """
        return self.RSS_Request_Bucket

    def getTotalArticleCount(self):
        """
        Gets the total articles aggregated across all RSS_Request_Objects
        """
        return self.total_article_counts

    def addRequestObject(self, RSS_req_obj):
        """
        Adds the Request Object without parsing it.
        """
        self.RSS_Request_Bucket.append(RSS_req_obj)
        self.unparsed.append(len(self.RSS_Request_Bucket) - 1)
        return

    def addRequestObjectAndParse(self, RSS_req_obj):
        """
        Adds the Request Object parsing it.
        """
        RSS_req_obj.rss_parse();
        self.RSS_Request_Bucket.append(RSS_req_obj)
        self.total_article_counts += RSS_req_obj.getArticleCount()
        return   

    def parseAllObjects(self):
        """
        Parses all the unparsed RSS_Request_Objects. 
        """
        while (len(self.unparsed) != 0):
            unparsed_obj_idx = self.unparsed.pop(0)
            parsed_obj = self.RSS_Request_Bucket[unparsed_obj_idx]
            parsed_obj.rss_parse()
            self.total_article_counts += parsed_obj.getArticleCount()
        return

    def flush_list_to_data(self):
        """
        This converts all the RSS_Request_Objects to the list of dictionaries found in the attribute 'RSS_Request_Objects.data_bucket'.
        I may contemplate on encoding this with lossless compression because the corpus can be very big!!!!!
        Note: Data -> JSON -> gzip
        The conversion is useful to send to the ML API endpoint for batch parsing and prediction.
        Note that running this function is irreversible! 
        It essentially overwrites the RSS_Request_Objects with 'RSS_Request_Objects.data_bucket' so use with caution.

        So far there are no protections for failed overwrites (you will get a mixed datatype array), I will add these protections later...
        """
        
        pass
        

# Main Driver Script

To be used in main(): func.

In [None]:
def pyMongoFetchRSS(query):
    client = MongoClient("mongodb://mongo:UTwpvdTfzaWxGt29evbw@containers-us-west-177.railway.app:6703")
    db = client.se_naacp_gbh
    collection = db.rss_links

    cursor = collection.find(query)
    return list(cursor)

In [None]:
def exec_RSS_scrap():
    try:
        RSS_FREE_LIST = RSS_List()
        
        # MongoDB fetch on RSS links and objects
        rss_links = pyMongoFetchRSS({}) # In the future, we will pass {} for all... {'name': 'GreatBlueHill'}
        print(rss_links)

        for link in rss_links:
            if ('GreatBlueHill'): # 
                rss_obj = rss_request_obj(link['link'], {'scrap_tags': link['scrap_tags']}) # Create the RSS Request Object
                RSS_FREE_LIST.addRequestObjectAndParse(rss_obj)
                print(RSS_FREE_LIST)
            
    except Exception as e:
            print(f"> Job Failed: exec_RSS_scrap ran into errors!")
            print(f"{e}\n")
            return

In [None]:
exec_RSS_scrap()

# Testing the Implmentations

A little note: Our worse case runtime is $O(n^3)$! We iterate every link, then for every link we iterate for each item, then for each item, we scrap the associated tags. 3 for-loops used. Is there a better way???? Maybe Look for some memory tradeoffs.

In [None]:
RSS_FREE_LIST = RSS_List()

# We then iterate over the pymongo list of links and configurations
for rss in rss_objects:
    rss_obj = rss_request_obj(rss[0], rss[1]) # Create the RSS Request Object
    RSS_FREE_LIST.addRequestObjectAndParse(rss_obj)


In [None]:
print(RSS_FREE_LIST)

In [None]:
RSS_FREE_LIST = RSS_List()

# We then iterate over the pymongo list of links and configurations
for rss in rss_objects:
    rss_obj = rss_request_obj(rss[0], rss[1]) # Create the RSS Request Object
    RSS_FREE_LIST.addRequestObject(rss_obj)

In [None]:
print(RSS_FREE_LIST)

In [None]:
# We then parse all the unparsed
RSS_FREE_LIST.parseAllObjects()

In [None]:
#print(RSS_FREE_LIST)