# Preamble: imports, logic and configuration

In [4]:
%pip install lxml

Note: you may need to restart the kernel to use updated packages.


In [5]:
#@title Imports and utility functions
import ipywidgets as widgets
import json
import logging
import os
import requests
import tarfile
import tensorflow as tf
import requests
import shelve
import sqlite3

from lxml import etree
from IPython.display import display
from pathlib import Path


TEXT_RESOURCE_PROFILE_ID = 'clarin.eu:cr1:p_1633000337997'
CMD_NAMESPACES = {
    'cmd': 'http://www.clarin.eu/cmd/1',
    'cmdp': 'http://www.clarin.eu/cmd/1/profiles/clarin.eu:cr1:p_1633000337997'
}
FULLTEXT_RESOURCES_BASE_URL='https://www.europeana.eu/api/fulltext'

logger = logging.getLogger(__name__)
logging.basicConfig()
logger.setLevel(logging.INFO)

script_path = os.getcwd()

def flatten_list(t):
  return [item for sublist in t for item in sublist]


def retrieve_cmdis_archive(url, filename='cmdis.tgz'):
  cmdis_zip = requests.get(url, allow_redirects=True)
  with open(filename, 'wb') as tarball:
    tarball.write(cmdis_zip.content)
  return filename


def unpack_collection(tarball_filename, target_dir, collection):
  try:
    with tarfile.open(tarball_filename, 'r:gz') as tarball:
      if collection in tarball.getnames():
        members = [tar_info for tar_info in tarball.getmembers()
                  if tar_info.name.startswith(f'{collection}/')]
        if len(members) > 0:
          logger.info(f'Extracting {collection} from {tarball_filename} to {target_dir}/')
          tarball.extractall(members=members, path=target_dir)
          return True
      else:
        logger.warning(f'{collection} not found in tarball')
  except Exception as ex:
    logger.error(f'Something went wrong while trying to extract CMDIs from tarball: {ex}')
  return False


def index_filenames(index_def, cmdi_files_dir):
  index = {}
  for filename in os.listdir(cmdi_files_dir):
    if not (filename.endswith(".xml") or filename.endswith(".cmdi")):
      logger.info(f"Skipping file {filename} (not an XML file)")
    else:
      file_path = f'{cmdi_files_dir}/{filename}'
      logger.debug(f'Processing file {file_path}')
      # parse file
      xmldoc = etree.parse(file_path)
      # check if it's a text resource record
      md_profile_values = xmldoc.xpath('/cmd:CMD/cmd:Header/cmd:MdProfile/text()', namespaces=CMD_NAMESPACES)
      if not (TEXT_RESOURCE_PROFILE_ID in md_profile_values):
        logger.debug(f'Skipping file {filename} (not a text resource record)')
      else:
        # get resource refs
        resource_refs = [ref for ref
                        in xmldoc.xpath('/cmd:CMD/cmd:Resources/cmd:ResourceProxyList/cmd:ResourceProxy/cmd:ResourceRef/text()', namespaces=CMD_NAMESPACES)
                        if ref.startswith(FULLTEXT_RESOURCES_BASE_URL)]
        
        # put in index according to definition
        for index_key in index_def:
          values = xmldoc.xpath(index_def[index_key], namespaces=CMD_NAMESPACES)
          
          if values and len(values) > 0:
            # create and/or get index for current key
            if not index.get(index_key, None):
              # key has not been indexed
              index[index_key] = {}
            key_index = index[index_key]

            # add refs to key/value index
            for value in values:
              if not key_index.get(value, None):
                key_index[value] = []
              key_index[value] += resource_refs
  return index

def get_json_from_http(url, session=None):
    logger.debug(f"Making request: {url}")
    if session is None:
        response = requests.get(url).text
    else:
        response = session.get(url).text
    logger.debug(f"API response: {url}")
    try:
        return json.loads(response)
    except json.JSONDecodeError:
        logger.error(f"Error decoding response from {url}")
        return None

def get_from_db(cur, url):
  #cur = db_conn.cursor()
  cur.execute("SELECT ft FROM fulltext WHERE url=?", (url,))
  rows = cur.fetchall()
  if len(rows) > 0:
    return rows[0][0]

def save_to_db(cur, url, text):
  #cur = db_conn.cursor()
  cur.execute("INSERT INTO fulltext(url, ft) VALUES(?,?)", (url, text))
  db_conn.commit()

def get_fulltext(urls_set, db_conn, progress_cb=None):
  text = []
  downloaded = 0
  cur = db_conn.cursor()
  for url in urls_set:
    from_cache = get_from_db(cur, url)
    if from_cache:
      # use from cache
      text_value = from_cache
    else:
      # retrieve
      logger.debug(f'Retrieving text from {url}')
      json_response = get_json_from_http(url)
      if json_response:
        text_value = json_response.get('value', None)
        if text_value:
          save_to_db(cur, url, text_value)
          downloaded += 1
          if downloaded % 100 == 0:
            cur.close()
            db_conn.commit()
            cur = db_conn.cursor()

    if text_value:
      text += [text_value]
      if progress_cb:
        progress_cb(1)
    else:
      logger.warning(f'No response and/or text value at {url}')
  cur.close()
  return text

In [6]:
#@title Constants and settings
COLLECTIONS = {'Finland': '9200301', 'Latvia': '9200303', 'Luxembourg': '9200396'}
COLLECTION_NAME = "Latvia" #@param ["Finland", "Latvia", "Luxembourg"]
COLLECTION = COLLECTIONS[COLLECTION_NAME]

INDEX_DEF = {
    'language': '/cmd:CMD/cmd:Components/cmdp:TextResource/cmdp:Language/cmdp:code/text()',
    'years': '/cmd:CMD/cmd:Components/cmdp:TextResource/cmdp:TemporalCoverage/cmdp:Start/cmdp:year/text()'
}

CMDI_TARBALL_URL = 'https://alpha-vlo.clarin.eu/data/test/europeana-aggregations.tar.gz'

CMDIS_DIR = 'cmdis'
OUTPUT_DIR = 'output'

OUTPUT_ON_GOOGLE_DRIVE = False
GOOGLE_DRIVE_BASE_DIR = '/gdrive/My Drive/colab/europeana_fulltext'

CACHE_ON_GOOGLE_DRIVE = False
DRIVE_CACHE_DIR = GOOGLE_DRIVE_BASE_DIR

# Retrieval and indexing of the CMDI metadata for the newspaper collections

First we need to download an archive (`.tar.gz` file) from the CLARIN server

In [7]:
logger.info('Retrieving CMDIs')
tarball_filename = retrieve_cmdis_archive(CMDI_TARBALL_URL)

INFO:__main__:Retrieving CMDIs


Next we unpack the collection that we are interested in from the downloaded archive, so that we can access the individual metadata records

In [8]:
logger.info(f'Reading tarball contents (looking for {COLLECTION})')

if not unpack_collection(tarball_filename, CMDIS_DIR, COLLECTION):
  raise RuntimeError(f'Failed to extract member {COLLECTION} from tarball!')

COLLECTION_FILES_DIR=f'{CMDIS_DIR}/{COLLECTION}'
logger.info(f'CMDI files available in {COLLECTION_FILES_DIR}/')

INFO:__main__:Reading tarball contents (looking for 9200303)
INFO:__main__:Extracting 9200303 from cmdis.tgz to cmdis/
INFO:__main__:CMDI files available in cmdis/9200303/


The metadata records are now available to be processed.

In the next cell, the links to the full text resources are **indexed** based on the index definition in the configuration section.

In [9]:
logger.info('Indexing resource links from records')
index = index_filenames(INDEX_DEF, COLLECTION_FILES_DIR)

INFO:__main__:Indexing resource links from records
INFO:__main__:Skipping file index.json (not an XML file)


In [10]:
logger.info('Index summary:')
for index_key in index:
  key_index = index[index_key]
  logger.info(f'{index_key}: {sorted(list(key_index))}')

INFO:__main__:Index summary:
INFO:__main__:language: ['deu', 'est', 'lav', 'pol', 'rus']
INFO:__main__:years: ['1868', '1869', '1870', '1871', '1872', '1873', '1874', '1875', '1876', '1877', '1878', '1879', '1880', '1881', '1882', '1883', '1884', '1885', '1886', '1887', '1888', '1889', '1890', '1891', '1892', '1893', '1894', '1895', '1896', '1897', '1898', '1899', '1900', '1901', '1902', '1903', '1904', '1905', '1906', '1907', '1908', '1909', '1910', '1911', '1912', '1913', '1914', '1915', '1916', '1917', '1918', '1919', '1920', '1921', '1922', '1923', '1924', '1925', '1926', '1927', '1928', '1929', '1930', '1931', '1932', '1933', '1934', '1935', '1936', '1937', '1938', '1939', '1940', '1941', '1942', '1943', '1944', '1945', '1946']


# Prepare for processing

In [12]:
db_name = 'fulltext.db'

def create_db(db_dir, db_name):
  %cd "$db_dir"
  with sqlite3.connect(db_name) as db_conn:
    logger.info(f'Connect: {db_conn}')
    db_conn.execute("CREATE TABLE IF NOT EXISTS fulltext(url text, ft text)")
    db_conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS index_fulltext_url ON fulltext(url)")

# prepare persistent cache on drive
if CACHE_ON_GOOGLE_DRIVE:
  from google.colab import drive
  drive.mount('/gdrive')
  os.makedirs(DRIVE_CACHE_DIR, exist_ok=True)
  db_dir=DRIVE_CACHE_DIR
else:
  db_dir=f'{script_path}/fulltext_cache'
  os.makedirs(db_dir, exist_ok=True)

create_db(db_dir, db_name)
db_path=f'{db_dir}/{db_name}'

logger.info(f'Database file: {db_path}')

INFO:__main__:Connect: <sqlite3.Connection object at 0x7f84541d8c60>
INFO:__main__:Database file: /home/jovyan/work/fulltext_cache/fulltext.db


/home/jovyan/work/fulltext_cache


First, we segment the data based on properties from the metadata. In this case, we use language and year of publication.

In [13]:
# Create segments, then retrieve text

urls_lav = set(index['language']['lav'])
urls_deu = set(index['language']['deu'])
urls_20s = set(flatten_list([ index['years'][year] for year in list(index['years']) if (int(year) >= 1920) and (int(year) < 1930) ]))
urls_30s = set(flatten_list([ index['years'][year] for year in list(index['years']) if (int(year) >= 1930) and (int(year) < 1940) ]))

# make intersections
segments_urls = {
  'deu_20s': urls_deu.intersection(urls_20s), #3k urls
  'deu_30s': urls_deu.intersection(urls_30s), #283 urls
  'lav_20s': urls_lav.intersection(urls_20s), #97k urls
  'lav_30s': urls_lav.intersection(urls_30s)
}

So far, we only have resource URLs. In the next cell, the actual full text content is downloaded (and/or retrieved from cache) for the URLs in the segments that we have defined. 

Note that this can take a while!

In [14]:
# define widgets to visualize download progress
download_total = sum([len(segments_urls[seg_name]) for seg_name in segments_urls])
progress_bar = widgets.IntProgress(
  min = 0, max = download_total,
  description = 'Retrieving:', bar_style = 'info',
)
progress_label = widgets.Label(value='0.0%')

download_count = 0
def progress_cb(increment):
  global download_count
  download_count += increment
  progress_bar.value = download_count
  progress_label.value = f'{download_count}/{download_total} ({download_count/download_total:2.1%})'

In [24]:
display(widgets.HBox([progress_bar, progress_label]))

# retrieve text for segments
segments_text = {}
download_count = 0

%cd "$db_dir"
with sqlite3.connect(db_path) as db_conn:
  for seg_name in segments_urls:
    urls = segments_urls[seg_name]
    logger.info(f"Retrieving text content for segment '{seg_name}' ({len(urls)} urls)")
    segments_text[seg_name] = get_fulltext(urls, db_conn, progress_cb)
    db_conn.commit()

progress_bar.bar_style = 'success'

HBox(children=(IntProgress(value=201731, bar_style='success', description='Retrieving:', max=201731), Label(va…

INFO:__main__:Retrieving text content for segment 'deu_20s' (3079 urls)
INFO:__main__:Retrieving text content for segment 'deu_30s' (283 urls)
INFO:__main__:Retrieving text content for segment 'lav_20s' (97301 urls)


/home/jovyan/work/fulltext_cache


INFO:__main__:Retrieving text content for segment 'lav_30s' (101068 urls)


We now save the retrieved text content to disk for later use

In [25]:
# save retrieved text to file
if OUTPUT_ON_GOOGLE_DRIVE:
  output_dir = f'{GOOGLE_DRIVE_BASE_DIR}/{OUTPUT_DIR}'
else:
  output_dir = f'{script_path}/{OUTPUT_DIR}'

os.makedirs(output_dir, exist_ok=True)
for seg_name in segments_text:
  filename=f'{output_dir}/{seg_name}.txt'
  with open(filename, 'w') as out_file:
    logger.info(f"Writing all text for segment '{seg_name}' to '{filename}'")
    written = sum([out_file.write(text) for text in segments_text[seg_name]])
    logger.info(f'{written} characters written')

INFO:__main__:Writing all text for segment 'deu_20s' to '/home/jovyan/work/fulltext_cache/output/deu_20s.txt'
INFO:__main__:41932571 characters written
INFO:__main__:Writing all text for segment 'deu_30s' to '/home/jovyan/work/fulltext_cache/output/deu_30s.txt'
INFO:__main__:1648545 characters written
INFO:__main__:Writing all text for segment 'lav_20s' to '/home/jovyan/work/fulltext_cache/output/lav_20s.txt'
INFO:__main__:1338445598 characters written
INFO:__main__:Writing all text for segment 'lav_30s' to '/home/jovyan/work/fulltext_cache/output/lav_30s.txt'
INFO:__main__:1325724677 characters written


# Processing

We run the [Tensorflow tokenizer](https://www.tensorflow.org/api_docs/python/tf/keras/preprocessing/text/Tokenizer) on each of the data segments

In [26]:
# Run tokenizer
from tensorflow.keras.preprocessing.text import Tokenizer

segments_tokenized = {}
for seg_name in segments_text:
  logger.info(f"Running tokenizer on text for '{seg_name}'")
  tokenizer = Tokenizer(num_words = 1000)
  tokenizer.fit_on_texts(segments_text[seg_name])
  segments_tokenized[seg_name] = tokenizer.word_index


INFO:__main__:Running tokenizer on text for 'deu_20s'
INFO:__main__:Running tokenizer on text for 'deu_30s'
INFO:__main__:Running tokenizer on text for 'lav_20s'
INFO:__main__:Running tokenizer on text for 'lav_30s'


Saving the token collections to disk for later use

In [27]:
# save tokenizer output to file
if OUTPUT_ON_GOOGLE_DRIVE:
  output_dir = f'{GOOGLE_DRIVE_BASE_DIR}/{OUTPUT_DIR}'
else:
  output_dir = f'{script_path}/{OUTPUT_DIR}'

os.makedirs(output_dir, exist_ok=True)
for seg_name in segments_tokenized:
  filename=f'{output_dir}/{seg_name}_tokenized.json'
  with open(filename, 'w') as out_file:
    logger.info(f"Writing all text for segment '{seg_name}' to '{filename}'")
    logger.info(f"Preview: {list(segments_tokenized[seg_name])[:10]}...")
    json.dump(segments_tokenized[seg_name], out_file)

INFO:__main__:Writing all text for segment 'deu_20s' to './output/deu_20s_tokenized.json'
INFO:__main__:Preview: ['der', 'die', 'und', 'in', 'zu', 'von', 'den', 'mit', 'des', 'das']...
INFO:__main__:Writing all text for segment 'deu_30s' to './output/deu_30s_tokenized.json'
INFO:__main__:Preview: ['der', 'die', 'und', 'in', 'den', 'des', 'zu', 'das', 'von', '—']...
INFO:__main__:Writing all text for segment 'lav_20s' to './output/lav_20s_tokenized.json'
INFO:__main__:Preview: ['un', 'no', 'ar', 'par', 'ka', '—', 'i', 'us', 's', 'a']...
INFO:__main__:Writing all text for segment 'lav_30s' to './output/lav_30s_tokenized.json'
INFO:__main__:Preview: ['un', 'ar', 'no', '—', 'ka', 'par', 'us', 'ari', 'i', 'ir']...
