Get URLs from Sitemap

In [None]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np

# Get URLs from sitemap

sitemap_url = 'https://www.err.ee/sitemap/sitemap0.xml'

response = requests.get(sitemap_url)
soup = BeautifulSoup(response.content, 'xml')

url_tags = soup.find_all("url")

urls = [url.loc.text for url in url_tags]

df = pd.DataFrame(urls, columns=["URL"])
df.to_excel("ERR_URLs.xlsx", index = False)
df.head

Get URLs from Search API

In [None]:
import requests
import pandas as pd
from datetime import datetime, timedelta
from tqdm import tqdm

def fetch_err_urls_paginated(start_date, end_date, limit=50, category=109):
    # Base URL
    url = "https://www.err.ee/api/search/getContents/"
    all_urls = []
    offset = 0

    while True:
        # Parameters for the GET request
        params = {
            "options": f'{{"total":0,"limit":{limit},"offset":{offset},"phrase":"","publicStart":"{start_date}","publicEnd":"{end_date}","timeFromSchedule":false,"types":[],"category":{category}}}'
        }

        # Send GET request
        response = requests.get(url, params=params)

        # Check the response
        if response.status_code == 200:
            response_data = response.json()  # Parse the JSON data

            # Extract URLs from the 'contents' list
            urls = [item['url'] for item in response_data.get('contents', []) if 'url' in item]


            # Break if no more URLs are returned
            if not urls:
                break

            all_urls.extend(urls)
            print(f"Fetched {len(urls)} URLs with offset {offset} ({start_date} to {end_date})")

            # Check if we have fetched all available results
            total_found = response_data.get('totalFound', 0)
            if len(all_urls) >= total_found:
                break

            offset += limit  # Increment offset for the next batch
        else:
            print("Request failed with status code:", response.status_code)
            print("Response text:", response.text)  # Print the raw response text for debugging
            break

    return all_urls

def fetch_urls_until_limit(limit_per_request=50, total_limit=100000):
    all_urls = []
    end_date = datetime.strptime("09.02.2025", "%d.%m.%Y")

    # Initialize progress bar
    with tqdm(total=total_limit, desc="Fetching URLs") as pbar:
        while len(all_urls) < total_limit:
            start_date = end_date - timedelta(days=30)  # Fetch in 30-day chunks
            start_date_str = start_date.strftime("%d.%m.%Y")
            end_date_str = end_date.strftime("%d.%m.%Y")

            # Fetch all URLs for the range using pagination
            urls = fetch_err_urls_paginated(start_date_str, end_date_str, limit=limit_per_request)
            all_urls.extend(urls)

            print(f"Fetched {len(urls)} URLs for the period {start_date_str} to {end_date_str}")

            # Update the progress bar and end date
            pbar.update(len(urls))
            end_date = start_date - timedelta(days=1)  # Move to the previous date range

            # Stop if no URLs were fetched in the last iteration
            if not urls:
                break

    print(f"Collected {len(all_urls)} URLs.")
    return all_urls[:total_limit]  # Ensure we return exactly the required number of URLs

# Fetch URLs until we reach the limit
urls = fetch_urls_until_limit(total_limit=100_000)
df = pd.DataFrame(urls, columns=["URL"])

df.to_csv("err_100_000.csv", index=False)

Get Aggregated Categories of Search

In [None]:
import requests
import pandas as pd
from datetime import datetime, timedelta
from tqdm import tqdm
from collections import Counter

def fetch_err_categories_paginated(start_date, end_date, limit=50, category=109):
    url = "https://www.err.ee/api/search/getContents/"
    all_categories = []
    offset = 0

    while True:
        params = {
            "options": f'{{"total":0,"limit":{limit},"offset":{offset},"phrase":"","publicStart":"{start_date}","publicEnd":"{end_date}","timeFromSchedule":false,"types":[],"category":{category}}}'
        }
        response = requests.get(url, params=params)

        if response.status_code == 200:
            response_data = response.json()
            contents = response_data.get('contents', [])

            if not contents:
                break

            for item in contents:
                name = item.get('primaryCategory', {}).get('name', '')
                all_categories.append(name)

            total_found = response_data.get('totalFound', 0)
            if len(all_categories) >= total_found:
                break

            offset += limit
        else:
            print("Request failed with status code:", response.status_code)
            break

    return all_categories

def fetch_categories_until_limit(limit_per_request=50, total_limit=100_000):
    all_categories = []
    end_date = datetime.strptime("09.02.2025", "%d.%m.%Y")

    with tqdm(total=total_limit, desc="Fetching categories") as pbar:
        while len(all_categories) < total_limit:
            start_date = end_date - timedelta(days=30)
            start_date_str = start_date.strftime("%d.%m.%Y")
            end_date_str = end_date.strftime("%d.%m.%Y")

            batch = fetch_err_categories_paginated(start_date_str, end_date_str, limit=limit_per_request)
            all_categories.extend(batch)
            pbar.update(len(batch))

            end_date = start_date - timedelta(days=1)

            if not batch:
                break

    return all_categories[:total_limit]

# Run and count
categories = fetch_categories_until_limit(total_limit=100_000)
counter = Counter(categories)

# Print counts
for cat, count in counter.most_common():
    print(f"{cat}: {count}")

# Save to CSV
df = pd.DataFrame(counter.items(), columns=["Category", "Count"])
df.to_csv("err_100_000_categories.csv", index=False)


Fetch Articles

In [None]:
import requests
import pandas as pd
from bs4 import BeautifulSoup
import csv
from tqdm import tqdm
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_article_content(url):
    try:
        response = requests.get(url)
        soup = BeautifulSoup(response.text, 'html.parser')

        # Title
        title = soup.find('h1').text.strip() if soup.find('h1') else 'No title found'

        # Content
        content_paragraphs = soup.find('div', class_='body').find_all('p') if soup.find('div', class_='body') else []
        content = ' '.join([p.text.strip() for p in content_paragraphs])

        # References (Only hrefs in the body paragraphs)
        references = [a['href'] for p in content_paragraphs for a in p.find_all('a', href=True)]

        # Author
        author = soup.find('p', class_='editor editor-design')
        author = author.find('span', class_='name').text.strip() if author and author.find('span', class_='name') else 'Unknown'

        # Column
        column = soup.find('div', class_='category').text.strip() if soup.find('div', class_='category') else 'No category'

        # Date Published
        pubdate = soup.find('time').get('datetime') if soup.find('time') else 'No date'

        # Tags
        tags = []
        tag_section = soup.find('div', class_="keywords keywords-design")
        if tag_section:
            tags = [tag.text.strip() for tag in tag_section.find_all('a')]

        return {
            'title': title,
            'content': content,
            'url': url,
            'author': author,
            'column': column,
            'pubdate': pubdate,
            'hrefs': references,
            'tags': tags
        }
    except Exception as e:
        print(f"Failed to process URL {url}: {e}")
        return None

def update_news_data(csv_path):
    df = pd.read_csv(csv_path)
    results = []

    # Using ThreadPoolExecutor to fetch URLs concurrently
    with ThreadPoolExecutor(max_workers=6) as executor:
        future_to_url = {executor.submit(fetch_article_content, url): url for url in df['url']}
        for future in tqdm(as_completed(future_to_url), total=len(df['url']), desc="Processing URLs"):
            result = future.result()
            if result:
                results.append(result)

    # Convert list of dicts to DataFrame
    news_data = pd.DataFrame(results)

    # Save combined data back to CSV with quoting to handle special characters
    news_data.to_csv(csv_path, index=False, quoting=csv.QUOTE_NONNUMERIC)
    print(f"Updated data saved to {csv_path}")

csv_path = "err_articles/err_100_000.csv"
update_news_data(csv_path)
