# General Scraper

Given a domain, breadth-first crawl and generate a domain_data.json
containing all text content of all pages available through crawling, 
useful when sitemap.xml is not available.

## Shared Variables

* domain (i.e. uscis.gov)
* processed_set # set of all crawled sites
* crawl_queue # list of sites to crawl
* domain_data # list of processed SiteData
* crawl_site # internal function to crawl the next site in the queue

## Helper Functions

```python
def extract_text_content(html: str) -> str:
    '''Extracts main text content from HTML, excluding header and footer sections.'''
    pass
    
def extract_links(html: str) -> List[str]:
    '''Extracts all links from HTML without filtering.'''
    pass

def normalize_link(link: str, base_url: str) -> str:
    '''Normalizes a single link to a standard format.'''
    pass

def normalize_links(links: List[str], base_url: str) -> List[str]:
    '''Normalizes a list of links to a standard format and removes duplicates.'''
    prepared_links = [normalize_link(link) for link in links]
    return list(set(prepared_links))

def filter_domain_links(links: List[str], domain: str) -> List[str]:
    '''Filters links to include only those under the specified domain.'''
    pass

def filter_new_links(
    links: List[str], 
    in_process_links: Set[str], 
    processed_links: Set[str]
) -> List[str]:
    '''Filters links to exclude those that have already been processed.'''
    pass
```

## Additional Concerns

* ~Enable JS rendering~ (js render, just like premium, requires 10 credits instead of 1)
* Add 100ms delay to thread scheduler
* Failure url list to output and rescrape if needed
* Parse PDF links using LangChain
* Given domain url, scrape one level down non-domain urls on the site.

In [None]:
# Env imports.
from os import getenv
from dotenv import load_dotenv

# Retrieve the API keys from environment variables
load_dotenv()

API_KEY: str = getenv('SCRAPERAPI_API_KEY')
NUM_RETRIES: int = int(getenv('SCRAPERAPI_NUM_RETRIES'))
MAX_WORKERS: int = int(getenv('SCRAPERAPI_MAX_WORKERS'))

In [None]:
from pprint import pprint
from typing import Dict, List, Optional, Set

In [None]:
from scraperapi_sdk import ScraperAPIClient
from scraperapi_sdk.exceptions import ScraperAPIException

client: ScraperAPIClient = ScraperAPIClient(API_KEY)

In [None]:
from bs4 import BeautifulSoup

class EmptyTextContentException(Exception):
    '''No text content found at the requested URL.'''
    pass

def extract_text_content(html: str) -> str:
    '''Extracts main text content from HTML, excluding header and footer sections.'''
    
    soup: BeautifulSoup = BeautifulSoup(html, 'html.parser')
    soup.header and soup.header.decompose()
    soup.footer and soup.footer.decompose()
    text_content: str = soup.get_text(separator=' ', strip=True)

    if not text_content:
        raise EmptyTextContentException
    
    return text_content

In [None]:
domain: str = 'osu.dev'
url: str = f'https://{domain}'
html: str = client.get(url, params={'retry': NUM_RETRIES})
text_content: str = extract_text_content(html)
print(len(text_content))

In [None]:
print(text_content)

In [None]:
def extract_links(html: str) -> List[str]:
    '''Extracts all links from HTML without filtering.'''
    
    soup: BeautifulSoup = BeautifulSoup(html, 'html.parser')
    links: List[str] = []
    for a_tag in soup.find_all('a', href=True):
        links.append(a_tag['href'])
        
    return links

In [None]:
links: List[str] = extract_links(html)
pprint(links)

In [None]:
from urllib.parse import urljoin

def normalize_link(link: str, base_url: str) -> Optional[str]:
    '''Normalizes a single link to a standard format.'''

    result: str = link.strip().strip('/')
    if not result or result.startswith('#'):
        return None

    return urljoin(base_url, result).lower()

def normalize_links(links: List[str], base_url: str) -> List[str]:
    '''Normalizes a list of links to a standard format and removes duplicates.'''

    prepared_links: Set[str] = set()
    for link in links:
        result: Optional[str] = normalize_link(link, base_url)
        if result:
            prepared_links.add(result)
    
    return list(prepared_links)

In [None]:
normalized_links: List[str] = normalize_links(links, base_url=url)
pprint(normalized_links)

In [None]:
from urllib.parse import urlparse

def filter_domain_links(links: List[str], domain: str) -> List[str]:
    '''Filters links to include only those under the specified domain.'''

    domain_links: List[str] = []
    for link in links:
        parsed_url: str = urlparse(link)
        link_hostname: str = parsed_url.hostname
        if link_hostname and link_hostname == domain:
            domain_links.append(link)
    
    return domain_links

In [None]:
domain_links: List[str] = filter_domain_links(links=normalized_links, domain=domain)
pprint(domain_links)

In [None]:
def filter_new_links(
    links: List[str],
    in_progress_set: Set[str],
    processed_set: Set[str]
) -> List[str]:
    '''Filters links to exclude those that are in progress or already processed.'''

    return [
        link for link in links
        if link not in in_progress_set and link not in processed_set
    ]

In [None]:
in_progress_set: Set[str] = set(['https://osu.dev/projects'])
processed_set: Set[str] = set(['https://osu.dev/support'])
new_links: List[str] = filter_new_links(domain_links, in_progress_set, processed_set)
pprint(new_links)

In [None]:
def extract_new_links(html: str, domain: str, base_url: str, in_progress_set: Set[str], processed_set: Set[str]):
    raw_links: List[str] = extract_links(html)
    normalized_links: List[str] = normalize_links(raw_links, base_url)
    domain_links: List[str] = filter_domain_links(normalized_links, domain)
    new_links: List[str] = filter_new_links(domain_links, in_progress_set, processed_set)
    return new_links

In [None]:
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
from datetime import datetime
from time import sleep
import logging

logging.basicConfig(level=logging.INFO)

def is_html_response(response) -> bool:
    '''Determine if the response content is HTML.'''
    # Primary check for Content-Type
    if 'Content-Type' in response.headers and response.headers['Content-Type'].startswith('text/html'):
        return True

    # Secondary fallback: Check the content start
    content_start: str = response.text[:300].lower()
    return '<!doctype html>' in content_start or '<html' in content_start

def crawl_domain(client: ScraperAPIClient, domain: str) -> (List[Dict[str, str]], List[str]):

    base_url: str = f'https://{domain}'
    ready_queue: List[str] = [base_url]
    in_progress_set: Set[str] = set()
    processed_set: Set[str] = set()
    
    domain_data: List[Dict[str, str]] = []
    failed_sites: List[str] = []
    
    lock: Lock = Lock()

    def crawl_site():
        '''
        1. Consume next url in queue to crawl, use Lock for memory safety,
        2. In try-catch, make request using ScraperAPIClient, obtain raw html,
        3. Extract text content and links from raw html, process links.
        '''
        with lock:
            if not ready_queue:
                return
            url: str = ready_queue.pop(0)
            in_progress_set.add(url)

        try:
            response = client.make_request(url=url, params={'retry': NUM_RETRIES})
            
            if is_html_response(response):
                html: str = response.content
                text_content: str = extract_text_content(html)
                with lock:
                    domain_data.append({
                        'url': url,
                        'text_content': text_content,
                        'scrape_timestamp': datetime.now().isoformat()
                    })
                    new_links: List[str] = extract_new_links(html, domain, base_url, in_progress_set, processed_set)
                    ready_queue.extend(new_links)
                    logging.info(f'Success! Adding {len(new_links)} new links to queue...')
            
        except ScraperAPIException as e:
            with lock:
                failed_sites.append(url)
            logging.error(e)

        # Clean up by moving url from in-progress to processed set.
        with lock:
            processed_set.add(url)
            in_progress_set.remove(url)

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        while True:
            with lock:
                if not ready_queue and not in_progress_set:
                    break
            executor.submit(crawl_site)
            sleep(0.1)

    logging.info(f'Successfully crawled {len(domain_data)} sites from domain: {domain}')
    
    return domain_data, failed_sites

In [None]:
domain: str = 'www.igs.com'
domain_data, failed_sites = crawl_domain(client, domain)

In [None]:
from pydantic import BaseModel, HttpUrl, constr
from datetime import datetime
from typing import List, Optional

class LocData(BaseModel):
    loc: HttpUrl
    lastmod: Optional[datetime] = None

class SitemapData(BaseModel):
    locs: List[LocData]

class SiteData(BaseModel):
    url: HttpUrl
    src: Optional[HttpUrl]
    text_content: constr(min_length=1)
    scrape_timestamp: datetime
    year_published: Optional[int] = None

class DomainData(BaseModel):
    sites: List[SiteData]

In [None]:
from datetime import datetime
from lxml import etree

SITEMAP_NAMESPACE = {'s': 'http://www.sitemaps.org/schemas/sitemap/0.9'}

class EmptySitemapDataException(Exception):
    '''No valid urls found in the requested domain's sitemap.xml.'''
    pass

def get_subfield(url_tag: etree._Element, field_name: str) -> Optional[str]:
    field = url_tag.find(f's:{field_name}', SITEMAP_NAMESPACE)
    return field.text if field is not None else None

def get_sitemap_data(client: ScraperAPIClient, sitemap_url: str) -> List[Dict[str, str]]:
    # Use 'make_request' instead of 'get' for byte str compatibility.
    # i.e. w/ encoding declaration <?xml version="1.0" encoding="UTF-8"?> 
    response = client.make_request(url=sitemap_url, params={'retry': NUM_RETRIES})
    root = etree.fromstring(response.content)
    sitemap_data = []

    for url in root.findall('.//s:url', SITEMAP_NAMESPACE):
        loc = url.find('s:loc', SITEMAP_NAMESPACE)
        if loc is not None:
            url_data = {
                'loc': loc.text,
                'lastmod': get_subfield(url_tag=url, field_name='lastmod'),
            }
            sitemap_data.append(url_data)
            # pprint(f'Success: Added URL data to sitemap dataset: {url_data}.')
        else:
            pprint(f'Warning: No <loc> tag found in URL data: {url}. Skipping...')
    
    if not sitemap_data:
        raise EmptySitemapDataException

    return sitemap_data

In [None]:
def prepare_output_filename(domain: str) -> str:
    return f'{domain.replace('.', '_')}_domain_data.json'

print(prepare_output_filename(domain))

In [None]:
import json
output_filename = prepare_output_filename(domain)
with open(output_filename, 'w', encoding='utf-8') as f:
    json.dump(domain_data, f, indent=2)

In [None]:
import pandas as pd
import plotly.express as px
from plotly.offline import init_notebook_mode
init_notebook_mode(connected=True)

In [None]:
df = pd.DataFrame(domain_data)
df['text_length'] = df['text_content'].apply(len)

In [None]:
fig = px.histogram(
    df, 
    x='text_length', 
    title='Distribution of Text Content Length',
    labels={'text_length': 'Text Length (chars)'},
    nbins=5000  # Adjust number of bins as needed
)

fig.show(renderer="iframe")