In [42]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import re
import json
import time
from json_parser import JSONGraph, JSONEndpoint, JSONVertex
import numpy as np

In [2]:
def get_ground_news_links(date: str) -> list[str]:
    """Returns links to featured GroundNews articles from the front page of
    GroundNews on the inputted date"""
    try:
        date = ''.join(date.split('-'))
        if len(date) != 8:
            raise ValueError('Date must be in YYYY-MM-DD format')
    except ValueError:
        raise ValueError('Date must be in YYYY-MM-DD format')
    
    url = f"https://web.archive.org/web/{date}/https://ground.news/"
    response = requests.get(url)
    if response.status_code != 200:
        raise ConnectionRefusedError('Connection Error')
    content = BeautifulSoup(response.text, "html.parser")
    anchors = content.find_all("a", href=True)
    anchor_links = [x.attrs['href'] for x in anchors] 
    article_links = [x for x in anchor_links if 'article/' in x]
    article_links = [x[x.find('https:'):] for x in article_links]
    return article_links

# Ground News Scraping

In [3]:

old_dates = None
try:
    scraped_df = pd.read_csv('data/complete_article_data.csv')
    old_dates = set(scraped_df['date'].values.tolist())
except:
    pass

#CHANGE HERE FOR NEW URLS (don't use same dates)
dates = ['2025-02-11', '2025-01-11', '2024-12-11', '2024-11-11', '2024-10-11',
        '2024-09-11', '2024-08-11', '2024-07-11', '2024-06-11', '2024-05-11',
        '2024-04-11', '2024-03-11', '2024-02-11', '2024-01-11']

links = []


Iterates through the dates list, grabbing available ground news article links from the ground news homepage on the input date 

In [13]:
for ind, date in enumerate(dates):
    if old_dates:
        if date in old_dates:
            print(f'Articles from {date} already scraped')
            continue
    links.append(get_ground_news_links(dates[ind]))

Articles from 2024-11-01 already scraped


In [None]:
#Loading df to check for already-scraped links
current_gnews = pd.read_csv('data/ground_news_links.csv')
seen_links = set(current_gnews['url'].values.tolist())

link_col = []
date_col = []
#Adding unseen links to the ground_news_links csv
for ind, link_group in enumerate(links):
    date = dates[ind]
    for link in link_group:
        if link not in seen_links:
            link_col.append(link)
            date_col.append(date)

gnews_link_df = pd.DataFrame()
gnews_link_df['url'] = link_col
gnews_link_df['date'] = date_col
try:
    old_gnews_links = pd.read_csv('data/ground_news_links.csv')
    gnews_link_df = pd.concat([old_gnews_links, gnews_link_df], axis=0)
except:
    pass
gnews_link_df.to_csv('data/ground_news_links.csv', index=False)

print(gnews_link_df.shape)
gnews_link_df.tail()

(1521, 3)


Unnamed: 0.1,Unnamed: 0,url,date
729,,https://ground.news/article/houthi-rebels-say-...,2024-02-11
730,,https://ground.news/article/tech-innovations-t...,2024-02-11
731,,https://ground.news/article/us-navy-helicopter...,2024-02-11
732,,https://ground.news/article/scientists-make-hu...,2024-02-11
733,,https://ground.news/article/archeologists-map-...,2024-02-11


In [None]:
gnews_link_df = pd.read_csv('data/ground_news_links.csv')
urls = []
biases = []
outlets = []
titles = []
dates = []
old_queue = pd.read_csv('data/articles_to_scrape.csv')
seen_dates = set(old_queue['date'].values.tolist())

{'2024-08-01',
 '2024-09-01',
 '2024-10-01',
 '2024-11-01',
 '2024-12-01',
 '2025-01-01',
 '2025-02-01'}

Iterates through ground news article links, grabbing all available urls to outside articles as well as recording the outlet and bias that url is associated with

In [117]:
def isolate_first_10(inp_str: str, start_str: str) -> dict:
    """Parses the groundnews javascript output into JSON"""
    start_ind = inp_str.find(start_str) - 2
    open_ind = inp_str.find(start_str) + len(start_str)
    bracket_num = 0
    started = False
    for ind, char in enumerate(inp_str[open_ind:]):
        if char == '[':
            bracket_num += 1
            started = True
        if char == ']':
            bracket_num -= 1
        if bracket_num == 0 and started:
            end_ind = ind + open_ind
            break
    exerpt = inp_str[start_ind:end_ind+1]
    pattern = r'\\"|"\\'  # Match \\ before or after "
    replacement = '"'

    result = re.sub(pattern, replacement, exerpt)
    pattern2 = r'\\"'
    result = re.sub(pattern, '"', result)
    result = f"""
    {{
        {result}
    }}
    """
    return json.loads(result)

num_processed = 0

for ind in range(gnews_link_df.shape[0]):
    row = gnews_link_df.iloc[ind]
    url = row['url']
    date = row['date']
    if date not in seen_dates:
        try:
            response = requests.get(url)
            #Continuing if connection not established
            if response.status_code != 200:
                print(response.status_code)
                raise ConnectionRefusedError('Connection Error')
            #Getting html content
            content = BeautifulSoup(response.text, "html.parser")
            #print('script fine')
            script_tags = content.find_all('script')
            
            js_raw = ''

            #Isolating the script element containing the relevant data
            for script in script_tags:
                txt = script.text
                if 'firstTenSources' in txt:
                    js_raw = txt
            tag_f = 'firstTenSources'

            #Transforming text into a json object/dictionary
            js = isolate_first_10(js_raw, tag_f)
        except:
            continue
        #Appending data to lists
        url_data = js['firstTenSources']
        date = gnews_link_df['date'].values[ind] 
        num_processed += 1
        for url_dict in url_data:
            biases.append(url_dict['sourceInfo']['bias'])
            outlets.append(url_dict['sourceInfo']['name'])
            urls.append(url_dict['url'])
            titles.append(url_dict['title'])
            dates.append(date)
        time.sleep(.5)


404
404


In [120]:
scrape_queue_df = pd.DataFrame()
scrape_queue_df['url'] = urls
scrape_queue_df['title'] = titles
scrape_queue_df['outlet'] = outlets
scrape_queue_df['bias'] = biases
scrape_queue_df['date'] = dates

scrape_queue_df.head()
scrape_queue_combined = pd.concat([old_queue, scrape_queue_df], axis=0)
scrape_queue_combined.drop_duplicates(subset=['url'])
scrape_queue_combined.to_csv('data/articles_to_scrape.csv', index=False)
scrape_queue_combined.shape

(12679, 5)

# Scraping Article Text

In [None]:
def scrape_url(url):
    try:
        response = requests.get(url, timeout=2.5)
        if response.status_code != 200:
            return 'Error'
    except:
        return 'Error'
    content = BeautifulSoup(response.text, "html.parser")
    all_p = content.find_all('p')
    clean_text = [x.text for x in all_p]
    full_text = ' '.join(clean_text)
    return full_text

scrape_queue_df = pd.read_csv('data/articles_to_scrape.csv')
try:
    pre_scraped_df = pd.read_csv('data/scraped_articles.csv')
    scraped_urls = pre_scraped_df['url'].values.tolist()
    scraped_text = pre_scraped_df['content'].values.tolist()
except:
    scraped_urls = []
    scraped_text = []

scraped_text_df = pd.DataFrame()

3664

In [123]:
#Iterates through unseen URLs, scraping their text and occasionally updating the CSV
for ind, url in enumerate(scrape_queue_df['url'].values):
    if ind % 50 == 0:
        print(ind)
    if url not in scraped_urls:
        scraped_text.append(scrape_url(url))
        scraped_urls.append(url)
    else:
        continue
    if ind % 50 == 0 or ind == scrape_queue_df.shape[0]-1:
        temp_df = pd.DataFrame()
        temp_df['url'] = scraped_urls
        temp_df['content'] = scraped_text
        temp_df.to_csv('data/scraped_articles.csv', index=False)
        scraped_text_df = temp_df

0 120.796875
50 120.796875
100 120.796875
150 120.796875
200 120.796875
250 120.796875
300 120.796875
350 120.796875
400 120.796875
450 120.796875
500 120.796875
550 120.796875
600 120.796875
650 120.796875
700 120.796875
750 120.796875
800 120.796875
850 120.796875
900 120.796875
950 120.796875
1000 120.796875
1050 120.796875
1100 120.796875
1150 120.796875
1200 120.796875
1250 120.796875
1300 120.796875
1350 120.796875
1400 120.796875
1450 120.8125
1500 120.8125
1550 120.8125
1600 120.8125
1650 120.8125
1700 120.8125
1750 120.8125
1800 120.8125
1850 120.8125
1900 120.8125
1950 120.8125
2000 120.8125
2050 120.8125
2100 120.8125
2150 120.8125
2200 120.8125
2250 120.8125
2300 120.8125
2350 120.8125
2400 120.8125
2450 120.8125
2500 120.828125
2550 120.828125
2600 120.828125
2650 120.828125
2700 120.828125
2750 120.828125
2800 120.828125
2850 120.828125
2900 120.828125
2950 120.828125
3000 120.828125
3050 120.828125
3100 120.828125
3150 120.828125
3200 120.828125
3250 120.828125
3300 120.

Filtering out URLS that could not be scraped from

In [143]:
old_article_data = pd.read_csv('data/complete_article_data.csv')
complete_articles = []
complete_urls = []
for ind, article in enumerate(scraped_text_df['content'].values):
    try:
        if len(article.split()) > 50:
            complete_articles.append(article)
            complete_urls.append(scraped_text_df['url'].values[ind])
    except:
        continue

complete_df = pd.DataFrame()
complete_df['url'] = complete_urls
complete_df['content'] = complete_articles
complete_df = scrape_queue_df.merge(complete_df, how='inner', on='url')
merged_article_data = pd.concat([old_article_data, complete_df], axis=0)
merged_article_data.drop_duplicates(subset=['url'], inplace=True)
merged_article_data.to_csv('data/complete_article_data.csv', index=False)


End

# JSON parser class if import did not work for some reason

In [None]:
class JSONGraph:

    def __init__(self, js: dict):
        self.js = js
        self.roots = list(js.keys())
        queue = [self.add_vert(item, None) for item in self.roots]

        endpoints = {}
        endpoint_parents = {}
        while len(queue) > 0:
            node = queue.pop(0)
            node_path = node.get_path()
            node_val = self.get_item(js, node_path)
            #Handling endpoints
            if type(node_val) == dict:
                children = node_val.keys()
                for child in children:
                    queue.append(self.add_vert(child, node))                
            elif type(node_val) == list:
                for ind, item in enumerate(node_val):
                    if type(item) == dict:
                        children = item.keys()
                        for child in children:
                            queue.append(self.add_vert([ind, child], node))
                    else: 
                        endpoint = self.add_endpoint([ind, item], node)
                        endpoints[endpoint] = node_val         
            else:
                endpoint = self.add_endpoint(node_val, node)
                endpoints[endpoint] = node_val
        self.endpoints = endpoints
        return

    def add_vert(self, name, parent: "JSONVertex | None") -> "JSONVertex":
        return JSONVertex(name, parent)

    def add_endpoint(self, value: any, parent: "JSONVertex") -> "JSONEndpoint":
        return JSONEndpoint(value, parent)

    def get_id(self, name: str, ids: list[int]) -> str:
        id = np.random.choice(0,1000)
        while id in ids:
            id = np.random.choice(0,1000)
        return f'{name}_*{id}'
    
    def get_item(self, item_dict, path):
        #path = [x.split('_*')[0] for x in path]
        current = item_dict
        while len(path) > 0:
            ind = path.pop(0)
            if type(ind) == list:
                current = current[ind[0]][ind[1]]
            else:
                current = current[ind]
        return current
    
    def get_endpoints(self) -> dict["JSONEndpoint", any]:
        return self.endpoints
    
    def find_all(self, query: any) -> dict[str, any]:
        """Returns the location and value of all data containing the query"""
        matches = {}
        for endpoint, val in self.endpoints.items():
            if type(val) in [list, str]:
                if query in val:
                    path = endpoint.get_path()
                    matches[endpoint] = val
            elif type(query) in [float, int]:
                if query == val:
                    path = endpoint.get_path()
                    matches[f'Location: root{path}'] = val
        return matches
    
    def find_tags(self, query: str, return_endpoints=False) -> dict[str, any]:
        matches = {}
        for endpoint in self.endpoints:
            path = endpoint.get_path()
            if query in path:
                node = endpoint.parent
                while True:
                    if type(node.name) == list:
                        tag = node.name[1]
                    else:
                        tag = node.name
                    if tag == query:
                        break
                    node = node.parent
                
                matches[node] = self.get_item(self.js, node.get_path())
        return matches
    
    def find_neighbors_tag(self, query):
        endpoints = self.find_tags(query)
        neighbors = {}
        for endpoint in endpoints:
            if isinstance(endpoint, JSONVertex):
                neighbors[endpoint.parent] = self.get_item(self.js, endpoint.parent.get_path())
            else:
                neighbors[endpoint.parent.parent] = self.get_item(self.js, endpoint.parent.parent.get_path())
        return neighbors
    
    def find_neighbors_content(self, query):
        endpoints = self.find_all(query)
        neighbors = {}
        for endpoint in endpoints:
            neighbors[endpoint.parent.parent] = self.get_item(self.js, endpoint.parent.parent.get_path())
        return
    
    def find_neighbors(self, vertex: "JSONVertex | JSONEndpoint") -> dict:
        if isinstance(vertex, JSONVertex):
            return self.get_item(self.js, vertex.parent.get_path())
        elif isinstance(vertex, JSONEndpoint):
            return self.get_item(self.js, vertex.parent.parent.get_path())
        else:
            raise TypeError('Vertex Must be a JSONVertex or JSONEndpoint instance')


class JSONVertex:
    children: list[str]
    parent: "JSONVertex | None"
    name: str | list[int, str]
    def __init__(self, name, parent: "JSONVertex | None"):
        self.parent = parent
        self.name = name
        pass

    def get_path(self):
        path = [self.name]
        node = self
        while node.parent is not None:
            parent_name = node.parent.name
            if type(parent_name) == list:
                path = parent_name + path 
            else:
                path = [parent_name] + path
            node = node.parent
        return path
    
    def __str__(self):
        return self.name
    
    def __repr__(self):
        path = self.get_path()
        return f"JSONEndpoint(root{path})"


class JSONEndpoint:
    parent: JSONVertex
    value: any
    def __init__(self, value, parent):
        self.parent = parent
        self.value = value
        pass

    def get_path(self):
        path = []
        node = self
        while node.parent is not None:
            parent_name = node.parent.name
            if type(parent_name) == list:
                path = parent_name + path 
            else:
                path = [parent_name] + path
            node = node.parent
        return path

    def __str__(self):
        return f'{self.parent.name}: {self.value}'
    
    def __repr__(self):
        path = self.get_path()
        return f"JSONEndpoint(root{path})"
