In [1]:
import os, tarfile, re
from typing import List, Optional
import folia.main as folia
from folia import fql
from utils import NLNEWS_LOC, subdivide_dir
import spacy
import numpy as np
from tqdm import tqdm
import multiprocessing as mp
import time
rng = np.random.default_rng()
nlp = spacy.load("nl_core_news_sm")
base = '../NLNews/WR-P-P-G_newspapers/'

### Preprocessing the tarfiles
* Download the [SoNaR corpus](https://taalmaterialen.ivdnt.org/download/tstc-sonar-corpus/) (60+GB).
* Extract newspaper files to separate archive.
    * Delete full Sonar corpus (as it's kinda big).
* Extract text from new archive to new folder.
* Finally create multi-segmented docs.

https://stackoverflow.com/questions/17616340/add-files-from-one-tar-into-another-tar-in-python

#### SoNaR corpus --> Newspapers corpus

In [13]:
# Select the newspaper files.
# Inside the tarfile, newspapers folia-files are stored as './SoNaRCorpus_NC_1.2/SONAR500/FoLiA/WR-P-P-G_newspapers/###/WR-P-P-G-##########.folia.xml'
# Takes ~30 mins.
archive = tarfile.open("../Datasets/NLNews/20150602_SoNaRCorpus_NC_1.2.1.tgz")
select = [tarinfo for tarinfo in archive if tarinfo.name.startswith('./SoNaRCorpus_NC_1.2/SONAR500/FoLiA/WR-P-P-G_newspapers/')]

In [20]:
# Create the new archive and add the selected files to it.
# Takes +1h.
# NOTE: possible to parellize? (https://stackoverflow.com/questions/13446445/python-multiprocessing-safely-writing-to-a-file & https://stackoverflow.com/questions/43313666/python-parallel-processing-to-unzip-files)
with tarfile.open("../Datasets/NLNews/WR-P-P-G_newspapers.tgz", "x:gz") as new_archive:
    for member in select:
        member.name = member.name.replace('./SoNaRCorpus_NC_1.2/SONAR500/FoLiA/WR-P-P-G_newspapers/', '')
        new_archive.addfile(member, archive.extractfile(member))
    print(new_archive.getnames()[:10])
archive.close()

['000', '000/sonar-foliaviewer.xsl', '000/WR-P-P-G-0000000001.folia.xml', '000/WR-P-P-G-0000000002.folia.xml', '000/WR-P-P-G-0000000003.folia.xml', '000/WR-P-P-G-0000000004.folia.xml', '000/WR-P-P-G-0000000005.folia.xml', '000/WR-P-P-G-0000000006.folia.xml', '000/WR-P-P-G-0000000007.folia.xml', '000/WR-P-P-G-0000000008.folia.xml']


#### Newspaper corpus --> txt files

In [11]:
def create_txt(filepath: str) -> str:
    """
    Returns the text from a folia.xml document, filtering out paragraphs containing 10 or less words (this includes things such as image captions) and footnotes (e.g. '( ANP )').
    https://foliapy.readthedocs.io/en/latest/folia.html#
    """
    doc = folia.Document(file=filepath)
    paragraphs = [p.text() for p in doc.select(folia.Paragraph) if p.count(folia.Word) > 10]
    if len(paragraphs) == 0:
        return ''
    # Sometimes articles start with: "LOCATION -"; the following removes it.
    # Look for '-' in first sentence of the first paragraph.
    query = fql.Query(f'SELECT w WHERE text = "-" IN ID {doc.paragraphs(0)[0].id}')
    found = query(doc)
    if found:
        pp = doc.paragraphs(0).copy()
        word = found[0]
        # Walk backwards, only if the word directly before the '-' is a capitalized location.
        while True:
            prev = word.previous(folia.Word)
            if prev is None:
                pp[0].remove(word)
                break
            if prev.annotation(folia.PosAnnotation, set="http://ilk.uvt.nl/folia/sets/frog-mbpos-cgn").cls == 'SPEC(deeleigen)' and prev.text().isupper():
                pp[0].remove(word)
                word = prev
                prev = word.previous()
            else:
                break
        try:
            paragraphs[0] = pp.text()
        except folia.NoSuchText:
            paragraphs = paragraphs[1:]
    res = ' '.join(paragraphs)
    res = re.sub(r'\( \w+ \).?$', '\n', res, flags=re.DOTALL)
    return f'==={doc.id}===\n{res}\n'

class Extractor(mp.Process):
    def __init__(self, tar_loc: str, temp_loc: str, txt_loc: str, in_q: mp.Queue, out_q: mp.Queue, name: str):
        super().__init__()
        self.tar_loc = tar_loc
        self.temp_loc = temp_loc
        self.txt_loc = txt_loc
        self.in_q = in_q
        self.out_q = out_q
        self.name = name

    def run(self):
        print(f'Starting {self.name}')
        with tarfile.open(self.tar_loc, 'r:gz') as tar:
            while True:
                tarinfo = self.in_q.get()
                if tarinfo is None:
                    break
                if tarinfo.name.endswith('.folia.xml'):
                    tarinfo.name = tarinfo.name.split('/')[1]
                    new_loc = os.path.join(self.txt_loc, tarinfo.name.split('.')[0])
                    if not os.path.exists(new_loc): # Skip already extracted files.
                        tar.extract(tarinfo, path=self.temp_loc)
                        self.out_q.put(tarinfo.name)
        return

def writer(q: mp.Queue, temp_loc: str, txt_loc: str):
    print(f'Starting {mp.current_process().name}')
    while True:
        name = q.get()
        if name is None:
            break
        temp_file = os.path.join(temp_loc, name)
        text = create_txt(temp_file)
        if text:
            new_loc = os.path.join(txt_loc, name.split('.')[0])
            with open(new_loc, 'w') as f:
                f.write(text)
        os.remove(temp_file)
    return


In [15]:
# Extract the text from each of the folia-files and puts them into a seperate folder.
# Takes a while.
temp_loc = '../Datasets/NLNews/temp'
txt_loc = '../Datasets/NLNews/WR-P-P-G_newspapers_txt'
tar_loc = '../Datasets/NLNews/WR-P-P-G_newspapers.tgz'
if not os.path.exists(temp_loc):
    os.mkdir(temp_loc)
if not os.path.exists(txt_loc):
    os.mkdir(txt_loc)

num_procs = int(mp.cpu_count() / 4)
extract_q = mp.Queue()
file_q = mp.Queue(maxsize=num_procs*4)

print('===Starting extraction===')
with tarfile.open(tar_loc, 'r:gz') as tar:
    # Check if the foliaviewer.xsl file exists in side temp_loc; it is required for reading the folia-files.
    if not os.path.exists(os.path.join(temp_loc, 'sonar-foliaviewer.xsl')):
        foliaviewer = tar.getmember('000/sonar-foliaviewer.xsl')
        foliaviewer.name = 'sonar-foliaviewer.xsl'
        tar.extract(foliaviewer, temp_loc)
    try:
        members
    except NameError:
        print('    Extracting members...')
        members = tar.getmembers()
size = len(members)
print(f'Found {size} files.')
for i in range(num_procs): members.append(None)
for member in members: extract_q.put(member)
time.sleep(1) # Fixes brokenpipe error

extractors = [Extractor(tar_loc, temp_loc, txt_loc, extract_q, file_q, f'extractor_{i}') for i in range(num_procs)]
writers = [mp.Process(name=f'writer_{i}', target=writer, args=(file_q, temp_loc, txt_loc)) for i in range(num_procs*2)]
for e in extractors: e.start()
for w in writers: w.start()

for e in extractors: e.join()
for w in writers: file_q.put(None)
for w in writers: w.join()
subdivide_dir(txt_loc)

===Starting extraction===
Found 708979 files.
Starting extractor_0
Starting extractor_1
Starting extractor_2
Starting extractor_3
Starting extractor_4
Starting extractor_5

Starting extractor_6Starting extractor_7
Starting writer_0
Starting writer_1
Starting writer_2
Starting writer_3
Starting writer_4
Starting writer_5
Starting writer_6
Starting writer_7
Starting writer_8
Starting writer_9
Starting writer_10
Starting writer_11
Starting writer_12
Starting writer_13
Starting writer_14
Starting writer_15


In [14]:
for e in extractors: e.kill()
for w in writers: w.kill()

In [None]:
# Non-parallel code:
# Takes absolutely forever...
temp_loc = '../Datasets/NLNews/temp'
txt_loc = '../Datasets/NLNews/WR-P-P-G_newspapers_txt'
tar_loc = '../Datasets/NLNews/WR-P-P-G_newspapers.tgz'
if not os.path.exists(temp_loc):
    os.mkdir(temp_loc)
if not os.path.exists(txt_loc):
    os.mkdir(txt_loc)
with tarfile.open('../NLNews/WR-P-P-G_newspapers.tgz', 'r:gz') as new_archive:
    # Check if the foliaviewer.xsl file exists in side temp_loc; it is required for reading the folia-files.
    if not os.path.exists(os.path.join(temp_loc, 'sonar-foliaviewer.xsl')):
        foliaviewer = new_archive.getmember('000/sonar-foliaviewer.xsl')
        foliaviewer.name = 'sonar-foliaviewer.xsl'
        new_archive.extract(foliaviewer, temp_loc)
    
    for tarinfo in new_archive:
        if tarinfo.name.endswith('.folia.xml'):
            tarinfo.name = tarinfo.name.replace(cur_folder + '/', '')
            new_loc = os.path.join(txt_loc, tarinfo.name.split('.')[0])
            if not os.path.exists(new_loc): # Skip already extracted files.
                new_archive.extract(tarinfo, path=temp_loc)
                text = create_txt(os.path.join(temp_loc, tarinfo.name))
                if text:
                    with open(new_loc, 'w') as f:
                        f.write(text)
                os.remove(os.path.join(temp_loc, tarinfo.name))
        elif tarinfo.isdir():
            cur_folder = tarinfo.name

In [69]:
# Check last temp file.
# with open(f"{temp_loc}/{os.listdir(temp_loc)[-1]}", 'r') as f:
#     print(f.read())
os.listdir(temp_loc)[-1]

'WR-P-P-G-0000434148.folia.xml'

#### Create multi-segment documents

In [2]:
# List containing the location of all files
txt_loc = '../Datasets/NLNews/WR-P-P-G_newspapers_txt'
txt_locs = [os.path.join(root,file) for root, _, files in os.walk(txt_loc) for file in files]

In [3]:
MIN_SECTIONS = 2
MAX_SECTIONS = 5

def create_docs(locs: List[str], save=True) -> Optional[List[str]]:
    """
    Creates a folder containing .txt files from concatenated articles.
    :param locs: List/np.ndarray containing locations to the .txt files to be processed.
    :param save: Whether to save to a folder or to return the results as a list of Strings.
    """
    if isinstance(locs, List):
        locs = np.array(locs)

    i = 0
    pad = int(np.log10(len(locs))) + 1

    if save:
        if not os.path.exists(NLNEWS_LOC):
            os.mkdir(NLNEWS_LOC)
    else:
        docs = []

    with tqdm(total=locs.size, desc='Articles processed') as pbar:
        while locs.size > 0:
            if locs.size > MAX_SECTIONS + 1:
                n = rng.integers(MIN_SECTIONS, MAX_SECTIONS + 1)
            elif locs.size > MAX_SECTIONS:
                n = rng.integers(MIN_SECTIONS, MAX_SECTIONS)
            else:
                n = locs.size
            slice = rng.choice(locs.size, size=n, replace=False)

            doc = ''
            for l in locs[slice]:
                with open(l, 'r') as f:
                    doc += f.read()
            locs = np.delete(locs, slice)

            if save:
                with open(f'{NLNEWS_LOC}/{i:0{pad}}', 'w') as tfile:
                    tfile.write(doc)
            else:
                docs.append(doc)
            i += 1
            pbar.update(n)

    print(f'Created {i} Documents')
    if save is False: 
        return docs

# NOTE: could be parellized.
# def subdivide_list(l: List, n: int) -> List:
#     """
#     Subdivides a list into n sublists.
#     :param l: List to subdivide.
#     :param n: Number of sublists.
#     :return: List of sublists.
#     """
#     sub_size, leftover = divmod(len(l), n)
#     rng.shuffle(l)
#     res = [l[sub_size * i:sub_size * (i + 1)] for i in range(n)]
#     if leftover:
#         res[-1].extend(l[-leftover:])
#     return res

In [4]:
# Takes 2+ hours to finish.
create_docs(txt_locs)
subdivide_dir(NLNEWS_LOC)

Articles processed: 100%|██████████| 696609/696609 [2:26:02<00:00, 79.50it/s]  


Created 199153 Documents
