# Project Gutenber Crawler

Make sure you read the site's TOS and README.md on how to use the crawler.

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/LAION-AI/Open-Assistant/blob/notebooks/gutenberg/project_gutenberg_crawler.ipynb)

In [None]:
# uncomment and run below lines to set up if running in colab
# !git clone https://github.com/LAION-AI/Open-Assistant.git
# %cd Open-Assistant/notebooks/gutenberg
# !pip install -r requirements.txt

In [None]:
# global settings

LANG = "en"  # crawl english language books, NOTE: there are a few houndred books with multiple languages such as 'en; es', 
FOLDER = "text"  # save metadata and body of text to this folder
CHUNKS = 5  # divide the dataset into this many compressed parquet files if you have less memory
STATUS = "crawled.csv"  # save the list of downloaded files and their status into this csv

In [None]:
# import required packages
import os
import io
import re
import requests
import time
import warnings

try: 
    from BeautifulSoup import BeautifulSoup
except ImportError:
    from bs4 import BeautifulSoup
from tqdm import tqdm

import numpy as np    
import pandas as pd

from typing import Tuple, Optional, Any

# Code for crawler

In [None]:
class GutenbergCrawler:
    
    HEADER = {
        "User-Agent": "Mozilla/5.0 (compatible; GutenbergCrawler/0.1)",
    }
    TIMER = 600  # wait ms between calls
    MIRRORS = [
        "http://www.mirrorservice.org/sites/ftp.ibiblio.org/pub/docs/books/gutenberg/", 
        "https://www.gutenberg.org/dirs/", 
        "http://mirrors.xmission.com/gutenberg/",
        #"https://gutenberg.pglaf.org/",  # most likely to get rate limited
    ]  # see https://www.gutenberg.org/MIRRORS.ALL for available mirrors
    
    def __init__(self, folder: Optional[str] = None) -> None:
        self.folder = folder
        if self.folder is not None:
            os.makedirs(self.folder, exist_ok=True) 
        self.calls = 0
        self.last_call = 0
    
    def _get(self, url: str) -> str:
        self.calls += 1
        diff = max(0.0, self.TIMER - (time.time() - self.last_call))
        if diff:
            time.sleep(diff / 1000.0)
        data = requests.get(url, headers=self.HEADER)
        self.last_call = time.time()
        if data.status_code == 404:
            return None
        try:
            return data.content.decode("utf-8")
        except UnicodeDecodeError:
            try:
                return data.content.decode("ISO-8859-1")  # latin-1
            except UnicodeDecodeError:
                return data.content.decode("utf-8", "backslashreplace")
    
    def catalog(self) -> pd.DataFrame:
        try:
            csv = pd.read_csv("https://www.gutenberg.org/cache/epub/feeds/pg_catalog.csv.gz", sep=",")
        except Exception:
            raw = self._get("https://www.gutenberg.org/cache/epub/feeds/pg_catalog.csv")
            if raw is None:
                raise ValueError("Catalog CSV file does not exist!")
            csv = pd.read_csv(io.StringIO(raw), sep=",")
        return csv.loc[csv["Type"] == "Text"].reset_index(drop=True)
    
    def search(self, url: str) -> dict:
        """Use catalog() instead! Returns dict with book_id: 'book title' pairs for gutenberg.org pages"""
        assert "/www.gutenberg.org" in url, "The URL must be a page at https://www.gutenberg.org/"
        html = self._get(url)
        if html is None:
            return {}
        dom = BeautifulSoup(html, "html.parser")
        results = {}
        for a in dom.find_all("a"):
            for elem in re.findall(r'<a href=\"/ebooks/(\d+)\">(.+?)</a>', str(a)):
                ebook, title = elem
                results[int(ebook)] = title.replace("\r<br/>", "\r\n") 
        return results
        
    def download(self, book: int) -> Optional[str]:
        book = int(book)
        assert book > 0
        mirror = np.random.choice(self.MIRRORS)
        if book < 10:
            page = f"0/{book}/"
        else:
            page = "/".join([char for char in str(book)[:-1]]) + f"/{book}/"
        url = f"{mirror}{page}{book}-h/{book}-h.htm"
        return self._get(url)
        
    def parse(self, book: int, html: str) -> Tuple[Optional[str], Optional[str]]:
        book = int(book)
        assert book > 0
        if html is None:
            return None, None
        dom = BeautifulSoup(html, "html.parser")
        if dom is None or dom.title is None or dom.title.string is None or "404" in dom.title.string:
            return None, None
    
        meta = ""
        for pre in dom.select("title, pre"):
            meta += str(pre.get_text()).strip()
            # remove metadata from dom afterwards
            pre.extract()
        if re.findall(r'(?i)\*{2,}[^\n]+?(?:please.+?copyright|copyrighted.+?project)[^\n]+?\*{2,}\r?\n', meta):
            warnings.warn(f"Book {book} is copyrighted.")
            return None, None
        for img in dom.select("img"):
            # add image alt attributes as text
            try:
                img.insert(0, img["alt"])
            except KeyError:
                pass
        text = str(dom.get_text()).strip()
        if re.findall(r'(?i)\*{2,}[^\n]+?(?:please.+?copyright|copyrighted.+?project)[^\n]+?\*{2,}\r?\n', text):
            warnings.warn(f"Book {book} is copyrighted.")
            return None, None
        
        s = re.split(r'(?i)\*{2,}[^\n]+?project gutenberg[^\n]+?\*{2,}\s*[\r\n]+', text)  # 49843 
        if len(s) > 1:
            if len(s) > 3:
                warnings.warn(f"Book {book} is malformed.")
                return None, None
            meta += s[0]
            return meta, s[1]
        return meta, text
    
    @staticmethod
    def pretty(text: Optional[str]) -> str:
        if not text:
            return ""
        # attempt to remove transcriber's notes
        text = re.sub(r'(?i)(?:\[|\b)transcriber[\'’]?s? notes?\s*(?:[^\xa0\n].*?\]?(?:\r?\n){1,2})+', '', text)
        # attempt to remove e-text info
        text = re.sub(r'(?i)e-text prepared(?:[^\xa0]\(?.+\)?\r?\n{1,3})+(?:\xa0*\s*note\:\s*(?:.+\s*\r{0,2}\n{1,2}){1,5}\xa0\s+)?', '', text)
        # standardize line endings
        text = "\r\n".join(text.splitlines())
        text = re.sub(r'(\r\n){3,}', '\r\n\r\n\r\n', text).strip()
        return text
    
    def _write(self, file: str, content: str) -> None:
        path = os.path.join(self.folder, file) if self.folder is not None else file
        with open(path, "w+", encoding="utf-8") as f:
            f.write(content)
    
    def save(self, book:int) -> bool:
        html = self.download(book)
        meta, text = self.parse(book, html)
        if meta:
            self._write(f"{book}_meta.txt", meta)
        if text:
            self._write(f"{book}_text.txt", text)
        return bool(text)


# Start crawling

In [None]:
gc = GutenbergCrawler(FOLDER)  # use text/ folder to save files

In [None]:
# get the catalog of ebooks (only text types will be returned)
df = gc.catalog()
df = df.loc[df["Language"] == LANG]
assert len(df), "No matching items in catalog!"
df = df.sample(frac=1)  # random shuffle
df.head()

In [None]:
if os.path.exists(STATUS):
    crawled = pd.read_csv(STATUS)
else:
    crawled = pd.DataFrame({"book":[], "success":[]})
print(f"{len(crawled)} out of {len(df)} items")

In [None]:
# NOTE: this will take really long depending on the number of ebooks selected
for index, row in df.iterrows():
    book = row["Text#"]
    if book not in crawled["book"].values:
        t = time.time()
        print(f"#{book} {row['Title']} ({row['Language']})", end=" ")
        if gc.save(book):
            print("✔️", end=" ")
            crawled = crawled.append({"book": book, "success": True}, ignore_index=True)
        else:
            print("❌", end=" ")
            crawled = crawled.append({"book": book, "success": False}, ignore_index=True)
        print(f"- {(time.time() - t):.3f}s")
        crawled.to_csv(STATUS, index=False)
        if len(crawled) % 25 == 0:
            print(f"▶▶▶ {len(crawled)} done ({int(crawled['success'].sum()) } successful) out of {len(df)} ◀◀◀")
print("Done.")

# Add the crawled text files into parquet datasets

In [None]:
crawled = pd.read_csv(STATUS)
crawled = crawled.loc[crawled["success"] == True]
crawled.rename(columns={"book": "Text#"}, inplace=True)

gc = GutenbergCrawler(FOLDER)
df = gc.catalog()
df = df.loc[df["Language"] == LANG]

print(f"{len(crawled)} out of {len(df)} ({len(crawled) / len(df) * 100.:.2f}%)")

In [None]:
df.drop_duplicates(subset=["Text#"], inplace=True)
df = pd.merge(df, crawled, on=["Text#"], how="inner")
assert not len(df.loc[df["success"] == False])
del crawled
df.drop(columns=["Type", "Language", "success"], inplace=True)
df.sort_values(by="Text#", ascending=True, inplace=True)
len(df)

In [None]:
def read(file: str) -> Optional[str]:
    result = None
    if os.path.exists(file):
        with open(file, "r", encoding="utf-8") as f:
            result = f.read()
    return result

def strip(value: Any) -> str:
    return str(value).strip() if value and pd.notna(value) else ""


for chunk in range(CHUNKS):
    n = len(df) // CHUNKS
    start, end = chunk * n, (chunk + 1) * n if chunk < CHUNKS - 1 else len(df)
    
    updated = {col:[] for col in list(df.columns) + ["Body"]}
    books = df["Text#"].values[start:end]
    for book in tqdm(books):
        text = read(os.path.join(FOLDER, f"{book}_text.txt"))
        text = gc.pretty(text)
        if not text:
            continue
        
        df_row = df.loc[df["Text#"] == book]
        updated["Text#"].append(book)
        updated["Issued"].append(pd.to_datetime(df_row["Issued"].values[0], format="%Y-%m-%d", errors="coerce"))
        updated["Title"].append(strip(df_row["Title"].values[0]))
        updated["Authors"].append(strip(df_row["Authors"].values[0]))
        updated["Subjects"].append(strip(df_row["Subjects"].values[0]))
        updated["LoCC"].append(strip(df_row["LoCC"].values[0]))
        updated["Bookshelves"].append(strip(df_row["Bookshelves"].values[0]))
        updated["Body"].append(text)
    
    updated = pd.DataFrame(updated)  
    if CHUNKS == 1:
        updated.to_parquet(f"gutenberg_{LANG}_all.pq", 
                           index=False, engine="pyarrow", compression="gzip")
    else:
        updated.to_parquet(f"gutenberg_{LANG}_{chunk + 1}_of_{CHUNKS}.pq", 
                           index=False, engine="pyarrow", compression="gzip")
    del updated

print("Done.")