# Data retrieval script
Running this notebook will populate a directory in the local environment with text files with content extracted from [Europeana Newspapers dumps](https://pro.europeana.eu/page/iiif#download).

**If you are not sure what to do, but need to have data to get started with the tutorial and/or exercises**, simply to the following set of steps:
- Open the `Run` menu on the top of page
- Select the option `Run all cells`
- Scroll to the bottom of the notebook
- Wait for the message `Finished retrieval of all configured sets` to appear in red at the very end of the notebook

## Configuration
Here we define the sets that we want to download text for. 

**Keep in mind that some of these sets are very big and downloading them may takes up a lot of TIME and DISK SPACE! In most cases, you should only select the sets that you actually plan to process.**

In [None]:
set_Austria     = '9200300'
set_Finland     = '9200301' 
set_Latvia      = '9200303' 
set_Hamburg     = '9200338' 
set_Serbia      = '9200339' 
set_Berlin      = '9200355' 
set_Estonia     = '9200356' 
set_Poland      = '9200357' 
set_Netherlands = '9200359' 
set_Luxembourg  = '9200396'

# Define the sets to retrieve below as an array (comma separated between square brackets)
sets = [
    set_Poland
]

## Expert configuration

Here we configure some technical parameters include the output directory. **Changing this might break scripts that use the data contained in this directory!**

In [None]:
## Data directory (target)
from os.path import expanduser
target_dir = expanduser("~") + '/data'

## Set to 'True' to enable debugging output
DEBUG = False

## Location of the dumps
ZIP_BASE_URL = 'http://download.europeana.eu/newspapers/fulltext/edm_issue'

## Alternative retrieval option over FTP (generally faster but doesn't work in all environments)
#ZIP_BASE_URL = 'ftp://download.europeana.eu/newspapers/fulltext/edm_issue'

## File name for the {identifier -> file} map that is generated
MAP_FILE_NAME = 'id_file_map.json'

## Expert settings for processing, don't change unless you know what you are doing
ENV_BLOCK_SIZE = 65536
ENV_QUEUE_SIZE_LIMIT = 1024


PREPARED_DATA = {
    '9200357' : 'https://europeana-oai.clarin.eu/metadata/fulltext-aggregation/9200357_text.zip'   
}

In [None]:
%pip install \
  stream_unzip==0.0.69 \
  lxml==4.8.0

In [None]:
import logging
from lxml import etree

logger = logging.getLogger(__name__)

xml_parser = etree.XMLParser(resolve_entities=False, huge_tree=True, remove_pis=True)

EDM_NAMESPACES = {
    'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
    'edm': 'http://www.europeana.eu/schemas/edm/'
}

block_size = ENV_BLOCK_SIZE
queue_size_limit = ENV_QUEUE_SIZE_LIMIT

## Data retrieval logic

The main function defines the global logic for the data retrieval

In [None]:
import os
import re
import requests
import time
import threading
import json

from stream_unzip import stream_unzip
from io import BytesIO
from ftplib import FTP
from urllib.parse import urlparse
from queue import Queue

def retrieve(collection_id, output_dir):
    
    logging.basicConfig()
    if DEBUG:
        logger.setLevel(logging.DEBUG)
    else:
        logger.setLevel(logging.INFO)

    start_time = time.perf_counter()
    
    if collection_id in PREPARED_DATA:
        retrieve_prepared(PREPARED_DATA[collection_id], output_dir)
    else:  
        retrieve_and_extract(collection_id, output_dir)

    time_elapsed = time.perf_counter() - start_time
    logger.info(f'Completed processing of {collection_id} in {time_elapsed/60:0.0f}m{(time_elapsed%60):02.0f}s')

def retrieve_and_extract(collection_id, output_dir):
    logger.info(f'Retrieving and extracting fulltext from dump for collection {collection_id} to {output_dir}')
    # This {identifier -> file name} map will be filled while extracting text from the .zip
    id_file_map = {}

    # 'Streaming' loop that takes the contents of the .zip and extracts text on the fly
    written_count = 0
    chunks_generator = create_dump_chunk_generator(collection_id)
    for file_name_b, file_size, unzipped_chunks in stream_unzip(chunks_generator):
        # Determine target file on disk from the file name in .zip
        file_name = file_name_b.decode()
        output_file = f'{os.path.splitext(file_name)[0]}.txt'
        full_output_path = f'{output_dir}/{output_file}'

        # Process the 'chunks' from the stream into text that we can store in a file
        logger.debug(f'Reading file from zip: {file_name}')
        xml = read_file_from_zip(file_name, unzipped_chunks)
        logger.debug('Extracting text')

        # Full content read, from which we can now extract the text content
        text = process_xml(BytesIO(xml), id_file_map, os.path.basename(output_file))

        # Text extracted, save to disk
        logger.debug('Writing text to file')
        write_to_file(text, full_output_path)

        # File written, do some log so that we can monitor the progress
        written_count += 1
        if written_count == 1:
            logger.info('Started writing files')
        if written_count % 100 == 0:
            logger.info(f'Retrieval in progress... {written_count} files written...')

    logger.info(f'Retrieval completed: {written_count} files written')

    # Write {identifier -> file name} map to file
    map_file = f'{os.path.realpath(output_dir)}/{collection_id}/{MAP_FILE_NAME}'
    logger.info(f'Writing id -> file name map to {map_file}')
    with open(map_file, 'w') as f:
        json.dump(id_file_map, f)

        
def retrieve_prepared(url, output_dir):
    logger.info(f'Retrieving fulltext from prepared data dump from {url} and extracting to {output_dir}')
    chunks_generator = zipped_chunks_http_url(url)
    written_count = 0
    for file_name_b, file_size, unzipped_chunks in stream_unzip(chunks_generator):
        file_name = file_name_b.decode()
        full_output_path = f'{output_dir}/{file_name}'
        if file_name.endswith('/'):
            if not os.path.exists(full_output_path):
                logger.info(f'Making directory {full_output_path}')
                os.mkdir(full_output_path)
        else:
            logger.debug(f'Extracting file {full_output_path}')            
            with open(full_output_path, 'wb') as outfile:
                for chunk in unzipped_chunks:
                    outfile.write(chunk)
            written_count += 1
            if written_count == 1:
                logger.info('Started writing files')
            if written_count % 100 == 0:
                logger.info(f'Retrieval in progress... {written_count} files written...')
    logger.info(f'Retrieval completed: {written_count} files written')

In [None]:
def create_dump_chunk_generator(collection_id):
    if ZIP_BASE_URL:
        parsed_url = urlparse(ZIP_BASE_URL)
        if parsed_url.scheme == 'ftp':
            return zipped_chunks_ftp(parsed_url, collection_id)
        else:
            if parsed_url.scheme in ('http', 'https'):
                return zipped_chunks_http(parsed_url, collection_id)
            else:
                logger.error(f"Don't know how to handle URL: {ZIP_BASE_URL}")
                exit(1)
    if ZIP_BASE_PATH:
        return zipped_chunks_local(collection_id)
    else:
        logger.error("No data to process - configure FTP or local path for dump")
        exit(1)


def read_file_from_zip(file_name, chunks):
    content = bytearray()
    for chunk in chunks:
        content += chunk
    xml = bytes(content)
    return xml


def process_xml(source, id_file_map, output_file):
    tree = etree.parse(source, xml_parser)
    # get identifier and add to map
    if id_file_map is not None:
        root_node = tree.xpath('/rdf:RDF', namespaces=EDM_NAMESPACES)
        if len(root_node) > 0:
            record_id = root_node[0].get('{http://www.w3.org/XML/1998/namespace}base')
            if record_id:
                id_file_map[normalize_identifier(record_id)] = output_file

    # get text content and return
    text_node = tree.xpath('/rdf:RDF/edm:FullTextResource/rdf:value', namespaces=EDM_NAMESPACES)
    if len(text_node) > 0:
        return text_node[0].text


def write_to_file(text, output_file):
    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    with open(output_file, 'w') as f:
        logger.debug(f'Writing text to {os.path.realpath(output_file)}')
        f.write(text)


def zipped_chunks_http(base_url, collection_id):
    if base_url.scheme not in ('http', 'https'):
        logger.warning(f'Configured base URL is "{base_url.scheme}", expecting "http(s)"')

    file = f'{collection_id}.zip'
    logger.info(f'Streaming {base_url}/{file} over HTTP(S)')

    with requests.get(f'{base_url.geturl()}/{file}', stream=True) as r:
        r.raise_for_status()
        for chunk in r.iter_content(chunk_size=block_size):
            if chunk:
                yield chunk


def zipped_chunks_http_url(url):
    with requests.get(url, stream=True) as r:
        r.raise_for_status()
        for chunk in r.iter_content(chunk_size=block_size):
            if chunk:
                yield chunk


def zipped_chunks_ftp(parsed_url, collection_id):
    if parsed_url.scheme != 'ftp':
        logger.warning(f'Configured base URL is "{parsed_url.scheme}", expecting "ftp"')

    file = f'{collection_id}.zip'
    logger.info(f'Streaming {ZIP_BASE_URL}/{file} over FTP')

    ftp = FTP(parsed_url.hostname)
    ftp.login()
    ftp.cwd(parsed_url.path)

    queue = Queue(queue_size_limit)

    def ftp_thread_target():
        ftp.retrbinary(f'RETR {file}', callback=queue.put, blocksize=block_size)
        queue.put(None)

    logger.info(f'Starting retrieval from {ftp.host}')
    ftp_thread = threading.Thread(target=ftp_thread_target)
    ftp_thread.start()

    count = 0
    while True:
        chunk = queue.get()
        if chunk:
            if logger.level == logging.DEBUG:
                count += 1
                if (count % 100) == 0:
                    logger.debug(f'Chunk count: {count}. Queue size: {queue.qsize()}.')
            yield chunk
        else:
            return


def zipped_chunks_local(collection_id):
    path = f'{ZIP_BASE_PATH}/{collection_id}.zip'
    logger.info(f'Opening {path}')
    with open(path, mode='rb', buffering=2*block_size) as f:
        while True:
            data = f.read(block_size)
            if data:
                yield data
            else:
                break


def normalize_identifier(identifier):
    # ex. http://data.theeuropeanlibrary.org/BibliographicResource/3000118435146
    # ex. http://data.europeana.eu/annotation/9200396/BibliographicResource_3000118435009
    match = re.search(r"http.*[^\d](\d+)$", identifier)
    if match:
        logger.debug(f"Normalised identifier: {identifier} -> {match.group(1)}")
        return match.group(1)
    else:
        logger.warning(f"Identifier {identifier} does not match pattern, skipping normalisation!")
        return identifier

Now we are ready to carry out the actual retrieval for all configured sets!

# Retrieval

In [None]:
for set_id in sets:
    logger.info(f'Starting retrieval of set: {set_id}')
    retrieve(set_id, target_dir)
    logger.info(f'Completed retrieval of set: {set_id}')

logger.info(f'Finished retrieval of all configured sets: {sets}')
