In [18]:
config = {
    "target_country": "DE",
    "digital_corpora_hosts_url": "https://digitalcorpora.s3.amazonaws.com/corpora/files/CC-MAIN-2021-31-PDF-UNTRUNCATED/metadata/cc-hosts-20230303.csv.gz",
    "digital_corpora_provenance_url": "https://digitalcorpora.s3.amazonaws.com/corpora/files/CC-MAIN-2021-31-PDF-UNTRUNCATED/metadata/cc-provenance-20230303.csv.gz",
    "max_workers": 30,
    "max_pdf_pages": 4,
    "max_pdf_size": 5 * 1024 * 1024, # 5 MB,
    "timeout_seconds": 30
}

In [2]:
%pip install pymupdf

[0mNote: you may need to restart the kernel to use updated packages.


In [3]:
from urllib.request import urlretrieve
from pathlib import Path
import csv
import gzip
import tempfile
from tqdm.auto import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
import pymupdf
import os
import json
import shutil
from tempfile import TemporaryDirectory
import uuid
import requests
from enum import Enum

In [4]:
import signal

class NoKeyboardInterrupt:
    """Class to make operations uninterruptible"""
    def __enter__(self):
        self.original_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        signal.signal(signal.SIGINT, self.original_handler)

In [15]:
class RejectionReason(Enum):
    TOO_MANY_PAGES = "TOO_MANY_PAGES"
    TOO_LARGE = "TOO_LARGE"
    CORRUPTED = "CORRUPTED"
    TIMEOUT = "TIMEOUT"
    OTHER = "OTHER"
    ENCRYPTED = "ENCRYPTED"
    NOT_FOUND = "NOT_FOUND"

class DigitalCorporaDownloader:
    def __init__(self, config: dict):
        self.config = config
        self.digital_corpora_hosts = None
        self.digital_corpora_provenance = None
        self.rejected_pdf_urls = None
        self.rejected_pdf_urls_lock = Lock()
        self.accepted_pdf_urls = None
        self.accepted_pdf_urls_lock = Lock()

    def _download_and_extract_gz_file(self, url: str, local_path: Path):
        with tempfile.NamedTemporaryFile(delete=True) as temp:
            urlretrieve(url, temp.name)
            with gzip.open(temp.name, 'rb') as f:
                with open(local_path, 'wb') as f_out:
                    f_out.write(f.read())


    def _csv_to_dict(self, csv_path: Path):
        parsed_hosts = {}
        with csv_path.open(encoding='utf-8-sig') as f:
            csv_reader = csv.reader(f)
            header = next(csv_reader)
            for row in csv_reader:
                parsed_host = {}
                for idx, head in enumerate(header):
                    parsed_host[head] = row[idx]
                parsed_hosts[parsed_host["url_id"]] = parsed_host
        return parsed_hosts

    def _get_digital_corpora_hosts(self, local_path: Path = Path("hosts.csv")):
        if self.digital_corpora_hosts:
            return self.digital_corpora_hosts

        if not local_path.exists():
            print(f"Downloading hosts file to {local_path}")
            self._download_and_extract_gz_file(self.config["digital_corpora_hosts_url"], local_path)

        print(f"Loading hosts from {local_path} ...")
        self.digital_corpora_hosts = self._csv_to_dict(local_path)
        return self.digital_corpora_hosts

    # TODO: Requires a lot of memory ( 25 GB )
    def _get_digital_corpora_provenance(self, local_path: Path = Path("provenance.csv")):
        if self.digital_corpora_provenance:
            return self.digital_corpora_provenance

        if not local_path.exists():
            print(f"Downloading provenance file to {local_path}")
            self._download_and_extract_gz_file(self.config["digital_corpora_provenance_url"], local_path)

        print(f"Loading provenance from {local_path} ...")
        self.digital_corpora_provenance = self._csv_to_dict(local_path)
        return self.digital_corpora_provenance

    def _filter_url_ids_by_country(self, hosts: dict, target_country: str):
        filtered_hosts = []
        for url_id, host in hosts.items():
            if host["country"] == target_country:
                filtered_hosts.append(url_id)
        return filtered_hosts

    def filter_pdf_urls_by_country(self, target_country: str):
        hosts = self._get_digital_corpora_hosts()
        url_ids_for_target_country = self._filter_url_ids_by_country(hosts, target_country)

        provenance = self._get_digital_corpora_provenance()
        pdf_urls_for_target_country = []

        for url_id in url_ids_for_target_country:
            pdf_urls_for_target_country.append(provenance[url_id]["url"])
        return pdf_urls_for_target_country

    def _save_rejected_pdf_urls(self, file_path: Path = Path("rejected_pdf_urls.json")):
        with open(file_path, "w") as f:
            json.dump(self.rejected_pdf_urls, f)

    def _get_rejected_pdf_urls(self, file_path: Path = Path("rejected_pdf_urls.json")):
        if self.rejected_pdf_urls:
            return self.rejected_pdf_urls

        if not file_path.exists():
            self.rejected_pdf_urls = {}
            return self.rejected_pdf_urls
        
        with open(file_path, "r") as f:
            self.rejected_pdf_urls = json.load(f)
            return self.rejected_pdf_urls

    def _save_accepted_pdf_urls(self, file_path: Path = Path("accepted_pdf_urls.json")):
        with open(file_path, "w") as f:
            json.dump(self.accepted_pdf_urls, f)

    def _get_accepted_pdf_urls(self, file_path: Path = Path("accepted_pdf_urls.json")):
        if self.accepted_pdf_urls:
            return self.accepted_pdf_urls

        if not file_path.exists():
            self.accepted_pdf_urls = {}
            return self.accepted_pdf_urls
        
        with open(file_path, "r") as f:
            self.accepted_pdf_urls = json.load(f)
            return self.accepted_pdf_urls

    def _validate_pdf(self, url: str, pdf_path: Path, target_folder: Path):
        # check if pdf can be opened by pymupdf
        try:
            pdf = pymupdf.open(pdf_path)
        except Exception as e:
            self._reject_pdf(url, pdf_path, RejectionReason.CORRUPTED)
            return False
        
        # check if pdf is encrypted
        if pdf.is_encrypted:
            self._reject_pdf(url, pdf_path, RejectionReason.ENCRYPTED)
            return False
        
        # check if pdf has less than 4 pages
        if pdf.page_count > self.config["max_pdf_pages"]:
            self._reject_pdf(url, pdf_path, RejectionReason.TOO_MANY_PAGES)
            return False
        
        # check if pdf file size is too large
        if os.path.getsize(pdf_path) > self.config["max_pdf_size"]:
            self._reject_pdf(url, pdf_path, RejectionReason.TOO_LARGE)
            return False
        
        self._accept_pdf(url, pdf_path, target_folder)
        return True

    def _reject_pdf(self, url: str, pdf_path: Path, reason: RejectionReason):
        with self.rejected_pdf_urls_lock:
            self.rejected_pdf_urls[url] = reason.value
            # Temporary file is automatically deleted using tmp folder

    def _accept_pdf(self, url: str, pdf_path: Path, target_folder: Path):
        with self.accepted_pdf_urls_lock:
            #with NoKeyboardInterrupt():
            # This operation should not be interrupted in order to avoid
            # a mismatch between the accepted_pdf_urls.json and the
            # actual pdf files
            pdf_index = len(self.accepted_pdf_urls)
            shutil.copy(pdf_path, target_folder / f"{pdf_index}.pdf")
            # TODO: If interrupted here, the pdf will be downloaded again with a different name leading to a duplicate
            self.accepted_pdf_urls[url] = f"{pdf_index}.pdf"

    def _download_pdf(self, url: str, output_path: Path):
        response = requests.get(url, timeout=self.config["timeout_seconds"], stream=True)
        
        response.raise_for_status()
        
        with open(output_path, 'wb') as pdf_file:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    pdf_file.write(chunk)

    def _download_pdf_worker(self, url: str, target_folder: Path):
        target_folder.mkdir(parents=True, exist_ok=True)

        # Check if url is already accepted (downloaded and successfully validated)
        accepted_pdf_urls = self._get_accepted_pdf_urls()
        if url in accepted_pdf_urls:
            return
        
        # check if url is already rejected (downloaded but not successfully validated)
        rejected_pdf_urls = self._get_rejected_pdf_urls()
        if url in rejected_pdf_urls:
            return
        
        # download pdf to tmp folder
        with TemporaryDirectory() as tmp_folder:
            tmp_folder_path = Path(tmp_folder)
            pdf_path = tmp_folder_path / (str(uuid.uuid4()) + ".pdf")

            try:
                self._download_pdf(url, pdf_path)
            except Exception as e:
                if isinstance(e, requests.exceptions.Timeout):
                    self._reject_pdf(url, pdf_path, RejectionReason.TIMEOUT)
                else:
                    self._reject_pdf(url, pdf_path, RejectionReason.NOT_FOUND)
                return
            
            # copies the pdf file to the target folder if it meets the criteria
            self._validate_pdf(url, pdf_path, target_folder)
        return True

    def download_pdfs(self, urls: list[str], target_folder: Path = Path("pdfs")):
        with ThreadPoolExecutor(max_workers=self.config["max_workers"]) as executor:
            futures = [executor.submit(self._download_pdf_worker, url, target_folder) for url in urls]
            pbar = tqdm(as_completed(futures), total=len(futures))
            for future in pbar:
                future.result()

        self._save_accepted_pdf_urls()
        self._save_rejected_pdf_urls()

In [19]:
downloader = DigitalCorporaDownloader(config)

In [7]:
urls = downloader.filter_pdf_urls_by_country("DE")

Loading hosts from hosts.csv ...
Loading provenance from provenance.csv ...


In [24]:
downloader.download_pdfs(urls[:10000], Path("pdfs"))

  0%|          | 0/1000 [00:00<?, ?it/s]

In [25]:
len(downloader.rejected_pdf_urls)

660

In [26]:
len(downloader.accepted_pdf_urls)

338