In [None]:
# | default_exp europena

# Europeana newspaper parsers

> Parsers for Europena newspapers

The goal of this code is to create a pipeline for parsing the [Europeana newspaper bulk downloads](https://pro.europeana.eu/page/iiif#download) and converting the orignal ALTO XML formats + metadata into a format that can be ingested easily into the 🤗 [datasets](https://huggingface.co/docs/datasets/index) library and cons

 for #BigLAM. This code is mostly colated from other places. We used [nbdev](https://nbdev.fast.ai/) to give our code some:

- basic tests
- some basic documentation 
- make it easily instalable as a Python package. 

**note** some of these parsers are likely to be more generic but we'll develop them for europena newspapers for now. Once they have been tested on other collections they may be moved to a core module. 

In [None]:
# |hide
from nbdev.showdoc import *

In [None]:
# |export
import io
import os
import xml
import xml.etree.ElementTree as ET
from concurrent.futures import ProcessPoolExecutor, as_completed
# from dataclaises import asdict, dataclass, field
from functools import lru_cache
from pathlib import Path
from statistics import mean, stdev
from attrs import asdict

from typing import Any, Dict, Iterable, List, Optional, Union
from attrs import define, field

import xmltodict
from toolz import partition_all
from tqdm.auto import tqdm

## ALTO Processing

ALTO is an XML format commonly used to store the outout of Opitcal Character Recogniton software. 

### Create test data

A subset of data is included in the GitHub repo for testing 

In [None]:
# # |slow
# !mkdir test_data
# !aria2c -x 4 -d test_data/ ftp://download.europeana.eu/newspapers/fulltext/alto/9200396.zip
# !unzip test_data/*.zip -d test_data/
# !rm test_data/*.zip
# !mkdir test_data/metadata
# !aria2c -x 4 -d test_data/metadata/ ftp://download.europeana.eu/newspapers/metadata/9200396.zip
# !unzip test_data/metadata/*.zip -d test_data/metadata/
# !rm test_data/metadata/*.zip

We'll create a list of ALTO XML files we can use for testing as we go

In [None]:
alto_xmls = [f for f in Path("test_data").rglob("*.xml") if "edm" not in f.name]
len(alto_xmls)

## Parse ALTO XMLs

The first step is to parse the xml file from disk into a elementree that we can use for other takss
stolen from; https://github.com/cneud/alto-tools/blob/master/alto_tools.py

In [None]:
# |export
from loguru import logger

In [None]:
# |export
def alto_parse(alto: Union[str, Path], **kwargs):
    """Convert ALTO xml file to element tree"""
    try:
        xml = ET.parse(alto, **kwargs)
    except ET.ParseError as e:
        logger.error(f"Parser Error in file '{alto}': {e}")
        return None
    # Register ALTO namespaces
    # https://www.loc.gov/standards/alto/ | https://github.com/altoxml
    # alto-bnf (unoffical) BnF ALTO dialect - for further info see
    # http://bibnum.bnf.fr/alto_prod/documentation/alto_prod.html
    namespace = {
        "alto-1": "http://schema.ccs-gmbh.com/ALTO",
        "alto-2": "http://www.loc.gov/standards/alto/ns-v2#",
        "alto-3": "http://www.loc.gov/standards/alto/ns-v3#",
        "alto-4": "http://www.loc.gov/standards/alto/ns-v4#",
        "alto-5": "http://schema.ccs-gmbh.com/docworks/version20/alto-1-4.xsd",
        "alto-bnf": "http://bibnum.bnf.fr/ns/alto_prod",
    }
    # Extract namespace from document root
    if "http://" in str(xml.getroot().tag.split("}")[0].strip("{")):
        xmlns = xml.getroot().tag.split("}")[0].strip("{")
    else:
        try:
            ns = xml.getroot().attrib
            xmlns = str(ns).split(" ")[1].strip("}").strip("'")
        except IndexError:
            logger.warning(f"File {alto.name}: no namespace declaration found.")
            xmlns = "no_namespace_found"
    if xmlns in namespace.values():
        return alto, xml, xmlns
    else:
        logger.warning(f"File {alto.name}: namespace {xmlns} is not registered.")

In [None]:
alto_xmls[0]

In [None]:
fname, xml, ns = alto_parse(alto_xmls[0])

In [None]:
assert all([fname, xml, ns])
Path("fake.xml").touch(exist_ok=True)
bad_xml = alto_parse(Path("fake.xml"))
# assert isinstance(bad_xml, None)

In [None]:
# |export
def get_alto_text(xml, xmlns, join_lines=True):
    """Extract text content from ALTO xml file"""
    all_text = []
    all_wc = []
    # Find all <TextLine> elements
    for lines in xml.iterfind(".//{%s}TextLine" % xmlns):
        # Find all <String> elements
        for line in lines.findall("{%s}String" % xmlns):
            wc = line.attrib["WC"]
            if wc is not None:
                all_wc.append(float(wc))
            # Check if there are no hyphenated words
            if "SUBS_CONTENT" not in line.attrib and "SUBS_TYPE" not in line.attrib:
                # Get value of attribute @CONTENT from all <String> elements
                text = line.attrib.get("CONTENT")  # + ' '
            elif "HypPart1" in line.attrib.get("SUBS_TYPE"):
                text = line.attrib.get("SUBS_CONTENT")  # + ' '
                if "HypPart2" in line.attrib.get("SUBS_TYPE"):
                    pass
            all_text.append(text)
    if all_wc:
        mean_ocr = mean(all_wc)
    if len(all_wc) > 2:
        std_ocr = stdev(all_wc)
    else:
        mean_ocr = None
        std_ocr = None
    return " ".join(all_text), mean_ocr, std_ocr

In [None]:
text, wc, std_ocr = get_alto_text(xml, ns)
assert all([text, wc, std_ocr])
assert isinstance(text, str)
assert isinstance(wc, float)
assert isinstance(std_ocr, float)

In [None]:
# |export
def alto_illustrations(xml, xmlns):
    """Extract bounding boxes of illustration from ALTO xml file"""
    # Find all <Illustration> elements
    bounding_boxes = []
    for illustration in xml.iterfind(".//{%s}Illustration" % xmlns):
        # Get @ID of <Illustration> element
        illustration_id = illustration.attrib.get("ID")
        # Get coordinates of <Illustration> element
        illustration_coords = list(
            map(
                float,
                (
                    illustration.attrib.get("HEIGHT"),
                    illustration.attrib.get("WIDTH"),
                    illustration.attrib.get("VPOS"),
                    illustration.attrib.get("HPOS"),
                ),
            )
        )
        bounding_boxes.append(illustration_coords)
    return bounding_boxes

In [None]:
alto_illustrations(xml, ns)

In [None]:
def get_illustrations(xmls):
    for file in xmls:
        with open(file, "r") as f:
            for line in f:
                if "Illustration" in line:
                    yield file
                    break

In [None]:
from toolz import take

In [None]:
illustration_xmls = list(take(10, get_illustrations(alto_xmls)))

In [None]:
illustration_xmls

In [None]:
for file in illustration_xmls:
    fname, xml, ns = alto_parse(file)
    bounding_boxes = alto_illustrations(xml, ns)
    assert bounding_boxes
    for box in bounding_boxes:
        assert isinstance(box, list)
        assert len(box) == 4
        assert all(isinstance(x, float) for x in box)


## Newspaper page container 



In [None]:
#|export
@define(slots=True)
class NewspaperPageAlto:
    fname: Union[str, Path]
    text: Optional[str]
    mean_ocr: Optional[float]
    std_ocr: Optional[float]
    bounding_boxes: List[Union[float, None]]
    item_id: str = field(init=False)
    def _get_id(self):
        return "/".join(Path(self.fname).parts[-3:-1])

    def __attrs_post_init__(self):
        self.item_id = self._get_id()


In [None]:
# |export
def parse_newspaper_page(xml_fname: Union[str, Path]):
    fname, xml, ns = alto_parse(xml_fname)
    text, wc, std_ocr = get_alto_text(xml, ns)
    bounding_boxes = alto_illustrations(xml, ns)
    return NewspaperPageAlto(xml_fname, text, wc, std_ocr, bounding_boxes)

In [None]:
page = parse_newspaper_page(alto_xmls[20])

In [None]:
assert page
assert isinstance(page, NewspaperPageAlto)
assert isinstance(page.text, (str, None))
assert isinstance(page.mean_ocr, (float, None))
assert isinstance(page.std_ocr, (float, None))
assert isinstance(page.bounding_boxes, List)
assert isinstance(page.item_id, str)

In [None]:
page

## Get metadata 

The next step is to create some functionality to get metadata for the items. There are two possible ways we can do this:
- via the metadata download dumps
- via the Europena API


In [None]:
metadata_examples = list(take(500, Path("test_data").rglob("*edm.xml")))

In [None]:
# |export
@define(slots=True)
class NewspaperPageMetadata:
    metadata_xml_fname: Union[str, Path]
    title: Optional[str]
    date: Optional[str]
    languages: Union[List[str], str, None]
    item_iiif_url: Optional[str]
    all_metadata_dict: Dict[Any, Any]

    def __attrs_post_init__(self):
        self.languages = (
            self.languages.split(",")
            if isinstance(self.languages, str)
            else self.languages
        )
        self.title = self.title.split("-")[0].strip(" ")
        self.metadata_xml_fname = str(self.metadata_xml_fname)

In [None]:
# |export
def get_metadata_from_xml(xml_file: Union[Path, str]):
    with open(xml_file, "r") as f:
        xml = xmltodict.parse(f.read())
    metadata = xml["rdf:RDF"]
    ProvidedCHO = metadata["edm:ProvidedCHO"]
    title = ProvidedCHO["dc:title"]
    data = ProvidedCHO["dcterms:issued"]
    languages = ProvidedCHO["dc:language"]
    iiif_url = metadata["ore:Aggregation"]["edm:isShownBy"]["@rdf:resource"]
    return NewspaperPageMetadata(xml_file, title, data, languages, iiif_url, metadata)

In [None]:
for metadata_xml in metadata_examples:
    metadata = get_metadata_from_xml(metadata_xml)
    assert metadata
    assert isinstance(metadata.languages, (list, None))

## Linking metadata and XML
We need to be able to link from the data we got from the ALTO XML with our metadata and smush them together. We have our `page.item_id` attribute which will hopefully be sufficient to grab the related metadata file. 

In [None]:
page.item_id

In [None]:
# |export
def get_metadata_for_page(
    page: NewspaperPageAlto, metadata_directory: Optional[str] = None
):
    short_id = page.item_id.split("_")[-1]
    metadata_xml = f"{metadata_directory}/http%3A%2F%2Fdata.theeuropeanlibrary.org%2FBibliographicResource%2F{short_id}.edm.xml"
    return get_metadata_from_xml(metadata_xml)

In [None]:
metadata = get_metadata_for_page(page, metadata_directory="test_data/metadata")
assert metadata
assert isinstance(metadata, NewspaperPageMetadata)
assert page.item_id.split("_")[-1] in metadata.metadata_xml_fname

## Issue processor

In [None]:
# |export
@define(slots=True)
class NewspaperPage:
    fname: Union[str, Path]
    text: Optional[str]
    mean_ocr: Optional[float]
    std_ocr: Optional[float]
    bounding_boxes: List[Union[float, None]]
    item_id: str
    metadata_xml_fname: Union[str, Path]
    title: Optional[str]
    date: Optional[str]
    languages: Union[List[str], None]
    item_iiif_url: Optional[str]
    # all_metadata_dict: Dict[Any, Any]
    multi_language: bool = field(init=False)
    issue_uri: str = field(init=False)
    id: str = field(init=False)
    def __attrs_post_init__(self):
        self.issue_uri = f"https://www.europeana.eu/item/{self.item_id}"
        self.metadata_xml_fname = str(self.metadata_xml_fname)
        self.languages = (
            [lang for lang in self.languages if lang != "=="]
            if isinstance(self.languages, list)
            else self.languages
        )
        self.multi_language = (
            isinstance(self.languages, list) and len(self.languages) > 1
        )
        self.id = f"{self.issue_uri}/${self.fname.name.strip('.xml')}"

In [None]:
# |export
def process_newspaper_page(
    xml_file: Union[str, Path], metadata_directory: Optional[str] = None
) -> Dict[Any, Any]:
    page = parse_newspaper_page(xml_file)
    metadata = get_metadata_for_page(page, metadata_directory=metadata_directory)
    metadata = asdict(metadata)
    metadata.pop("all_metadata_dict")
    page = asdict(page)
    return NewspaperPage(**page, **metadata)

In [None]:
assert all(
    process_newspaper_page(xml, metadata_directory="test_data/metadata")
    for xml in alto_xmls[:32]
)

In [None]:
page = process_newspaper_page(alto_xmls[0], metadata_directory="test_data/metadata")
page

## Load into datasets

## Dump to parquet

In [None]:
# |export
from datasets import Dataset
from datasets import Value, Sequence, Features

In [None]:
# |export
features=Features({
    'fname': Value(dtype='string', id=None),
    'text': Value(dtype='string', id=None),
    'mean_ocr': Value(dtype='float64', id=None),
    'std_ocr': Value(dtype='float64', id=None),
    'bounding_boxes': Sequence(
        feature=Sequence(feature=Value(dtype='float64', id=None), length=-1, id=None),
        length=-1,
        id=None
    ),
    'item_id': Value(dtype='string', id=None),
    "id": Value(dtype="string",id=None),
    "issue_uri": Value(dtype="string", id=None),
    'metadata_xml_fname': Value(dtype='string', id=None),
    'title': Value(dtype='string', id=None),
    'date': Value(dtype='string', id=None),
    'languages': Sequence(feature=Value(dtype='string', id=None), length=-1, id=None),
    'item_iiif_url': Value(dtype='string', id=None),
    'multi_language': Value(dtype='bool', id=None)
})


### Processing a batch

We create a function that will take a batch of XML files and load it into a dataset. We use `logger.catch` as a super lazy way of catching errors. `logger.catch` will catch exceptions and log so we can more easily debug and errors we run into as we work on this code. 

In [None]:
temp_keys = {"fname",'item_id'}

In [None]:
# |export
@logger.catch()
def process_batch(xml_batch: Iterable[Union[str, Path]], metadata_directory: Optional[Union[str,Path]]=None)-> Dataset:
    """Returns a dataset containing parsed newspaper pages."""
    batch = [
        asdict(process_newspaper_page(xml, metadata_directory=metadata_directory))
        for xml in xml_batch
    ]
    batch = {key: [i[key] for i in batch] for key in batch[0]}
    dataset = Dataset.from_dict(batch,features=features)
    dataset = dataset.remove_columns(["item_id","metadata_xml_fname","fname"])
    dataset = dataset.rename_columns({"languages":"language"})
    return dataset


In [None]:
ds = process_batch(alto_xmls[:32], metadata_directory="test_data/metadata")

In [None]:
ds = process_batch(alto_xmls[:32], metadata_directory="test_data/metadata")
assert len(ds) == 32
assert len(ds.column_names) == 11
assert ds.column_names == [
 'text',
 'mean_ocr',
 'std_ocr',
 'bounding_boxes',
 'title',
 'date',
 'language',
 'item_iiif_url',
 'multi_language',
 'issue_uri',
 'id']

In [None]:
ds.features

In [None]:
# |export
import multiprocessing 

def process(
    xml_files: Iterable[Union[str, Path]],
    batch_size: int = 32,
    metadata_directory: Optional[Union[str,Path]] = None,
    max_workers: Optional[int] = None
):
    with tqdm(total=len(xml_files) // batch_size) as pbar:
        if not max_workers:
            max_workers = multiprocessing.cpu_count()
        futures = []
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
            for batch in partition_all(batch_size, xml_files):
                batch = list(batch)
                future = executor.submit(
                    process_batch, batch, metadata_directory=metadata_directory
                )
                future.add_done_callback(lambda p: pbar.update(1))
                futures.append(future)
    return [future.result() for future in as_completed(futures)]



In [None]:
datasets = process(alto_xmls, metadata_directory="test_data/metadata")

In [None]:
from datasets import concatenate_datasets

In [None]:
dataset = concatenate_datasets(datasets)
dataset[9]