## Imports and definitions

In [1]:
### --- default imports ---

import json
import pickle
import pandas as pd
from os import listdir, makedirs
from os.path import isfile, join
from datetime import datetime

In [2]:
### --- constants definitions ---

DATA_BASE = "../../master_cloud/corpora"
ETL_BASE = "preprocessed"
ETL_PATH = join(DATA_BASE, ETL_BASE)

# standard meta data fields
DATASET = 'dataset'
SUBSET = 'subset'
ID = 'doc_id'
ID2 = 'doc_subid'
TITLE = 'title'
TAGS = 'tags'
TIME = 'date_time'
#AUTHOR
#SUBTITLE
#CATEGORY
META = [DATASET, SUBSET, ID, ID2, TITLE, TAGS, TIME]
TEXT = 'text'
HASH = 'hash'

In [3]:
### --- store meta data and content ---

def store(corpus, df):
    """returns the file name where the dataframe was stores"""
    makedirs(ETL_PATH, exist_ok=True)
    fname = join(ETL_PATH, corpus + '.pickle')
    print('saving to', fname)
    df.to_pickle(fname)
    return fname

## Actions

In [None]:
df

In [None]:
fname = store(CORPUS, df)

In [20]:
LOCAL_PATH = ETL_BASE
FULL_PATH = join(DATA_BASE, LOCAL_PATH)

files = sorted([f for f in listdir(FULL_PATH) if isfile(join(FULL_PATH, f)) if f[:3] == 'dew'])

def read(f):
     return pd.read_pickle(f)

for name in files:
    fname = join(FULL_PATH, name)
    df = read(fname)
    df['doc_id'] = df['doc_id'].str.strip()
    df['title'] = df['title'].str.strip()
    print('saving to', fname)
    #df.to_pickle(fname)


saving to ../../master_cloud/corpora/preprocessed/dewac_00.pickle
saving to ../../master_cloud/corpora/preprocessed/dewac_01.pickle
saving to ../../master_cloud/corpora/preprocessed/dewac_02.pickle
saving to ../../master_cloud/corpora/preprocessed/dewac_03.pickle
saving to ../../master_cloud/corpora/preprocessed/dewac_04.pickle
saving to ../../master_cloud/corpora/preprocessed/dewac_05.pickle
saving to ../../master_cloud/corpora/preprocessed/dewac_06.pickle
saving to ../../master_cloud/corpora/preprocessed/dewac_07.pickle
saving to ../../master_cloud/corpora/preprocessed/dewac_08.pickle
saving to ../../master_cloud/corpora/preprocessed/dewac_09.pickle
saving to ../../master_cloud/corpora/preprocessed/dewac_10.pickle
saving to ../../master_cloud/corpora/preprocessed/dewac_11.pickle
saving to ../../master_cloud/corpora/preprocessed/dewac_12.pickle
saving to ../../master_cloud/corpora/preprocessed/dewac_13.pickle
saving to ../../master_cloud/corpora/preprocessed/dewac_14.pickle
saving to 

## Pipelines for certain corpora

In [None]:
### --- extract, transform, load (save) the following corpus:

CORPUS = "OnlineParticipation"
LOCAL_PATH = "OnlineParticipationDatasets/downloads"
FULL_PATH = join(DATA_BASE, LOCAL_PATH)


def transform_subset(source, subset_name: str):
    """
    :param source: list of dictionaries in original key/value format
    :param subset_name: string identifier of the subset the data belongs to
    :return: list of dictionaries in standard key/value format
    """
    category_lookup = {}
    print('transform', subset_name)

    for doc in source:
        target = dict()
        target[DATASET] = CORPUS
        target[SUBSET] = subset_name
        target[ID] = doc['suggestion_id']
        target[TITLE] = doc['title']
        target[TIME] = doc['date_time']

        # wuppertal has a different data scheme
        if subset_name == 'wuppertal2017':
            if 'tags' in doc:
                target[TAGS] = tuple(doc['tags'])
                category_lookup[target[ID]] = target[TAGS]
            else:
                target[TAGS] = category_lookup[target[ID]]
            target[ID2] = 0
            target[TEXT] = doc['title'] + ' .\n' \
                      + doc['content'] + ' .\n' \
                      + doc['Voraussichtliche Rolle für die Stadt Wuppertal'] + ' .\n' \
                      + doc['Mehrwert der Idee für Wuppertal'] + ' .\n'
                      # + doc['Eigene Rolle bei der Projektidee'] + ' .\n'
                      # + doc['Geschätzte Umsetzungsdauer und Startschuss'] + ' .\n'
                      # + doc['Kostenschätzung der Ideeneinreicher'] + ' .\n'
                    
        else:
            if 'category' in doc:
                target[TAGS] = doc['category']
                category_lookup[target[ID]] = target[TAGS]
            else:
                target[TAGS] = category_lookup[target[ID]]
            target[ID2] = doc['comment_id'] if ('comment_id' in doc) else 0
            # ignore if no content
            if not doc['content']:
                continue
            target[TEXT] = doc['title'] + ' .\n' + doc['content'] if doc['title'] else doc['content']
        
        target[HASH] = hash(tuple([target[key] for key in META]))
        yield target


def load_data(number_of_subsets: int=None, start: int=0):
    """
    :param number_of_subsets: number of subsets to process in one call (None for no limit)
    :param start: index of first subset to process
    :yield: data set name, data subset name, data json
    """
    
    print("process", CORPUS)
    
    # --- read files ---
    files = [f for f in listdir(FULL_PATH) if isfile(join(FULL_PATH, f))]

    if number_of_subsets:
        number_of_subsets += start
        if number_of_subsets > len(files):
            number_of_subsets = None

    for name in files[start:number_of_subsets]:
        if name[-9:-5] != 'flat':
            continue
        
        fpath = os.path.join(FULL_PATH, name)
        try: 
            with open(fpath, 'r') as fp:
                print('open:', fpath)
                data = json.load(fp)
                if not data:
                    continue
        except IOError:
            print("Could not open", fpath)
            continue
        subset = name[6:-10]

        yield transform_subset(data, subset)


dfs = [pd.DataFrame(item) for item in load_data()]
if dfs:
    df = pd.concat(dfs)
    df = df.set_index(HASH)[META+[TEXT]]

In [None]:
### --- extract, transform, load (save) the following corpus:

CORPUS = "FAZ"
LOCAL_PATH = "scrapy/faz"
FULL_PATH = join(DATA_BASE, LOCAL_PATH)


def transform_subset(source, subset_name: str):
    """
    :param source: list of dictionaries in original key/value format
    :param subset_name: string identifier of the subset the data belongs to
    :return: list of dictionaries in standard key/value format
    """
    print('transform', subset_name)

    for doc in source:
        target = dict()
        target[DATASET] = CORPUS
        target[SUBSET] = doc['url'].split('/')[4]
        target[ID] = doc['url']
        target[ID2] = 0
        target[TITLE] = doc['title']
        target[TAGS] = tuple(doc['keywords'])
        if doc['published']:
            target[TIME] = datetime.strptime(doc['published'], '%Y-%m-%dT%H:%M:%S%z')  # 2018-08-22T11:11:45+0200
            # target[TIME] = datetime.fromisoformat(doc['published'])  # may work in Python 3.7
        else: 
            target[TIME] = None
        target[TEXT] = doc['title'] + ' .\n' \
                     + doc['description'] + ' .\n' \
                     + doc['text']

        target[HASH] = hash(tuple([target[key] for key in META]))
        yield target


def load_data(number_of_subsets: int=None, start: int=0):
    """
    :param number_of_subsets: number of subsets to process in one call (None for no limit)
    :param start: index of first subset to process
    :yield: data set name, data subset name, data json
    """
    
    print("process", CORPUS)
    
    # --- read files ---
    files = [f for f in listdir(FULL_PATH) if isfile(join(FULL_PATH, f))]
    print(files)

    if number_of_subsets:
        number_of_subsets += start
        if number_of_subsets > len(files):
            number_of_subsets = None

    for name in files[start:number_of_subsets]:
        fpath =join(FULL_PATH, name)
        try: 
            with open(fpath, 'r') as fp:
                print('open:', fpath)
                data = [json.loads(d) for d in fp.readlines()]
                if not data:
                    continue
        except IOError:
            print("Could not open", fpath)
            continue
        subset = name[4:-3]

        yield transform_subset(data, subset)


dfs = [pd.DataFrame(item) for item in load_data()]
if dfs:
    df = pd.concat(dfs)
    df = df.set_index(HASH)[META+[TEXT]]

In [None]:
### --- extract, transform, load (save) the following corpus:
import gzip
from bs4 import BeautifulSoup

CORPUS = "Europarl"
LOCAL_PATH = "Europarl/Europarl/xml/de"
FULL_PATH = join(DATA_BASE, LOCAL_PATH)


def transform_subset(source, subset_name: str):
    """
    :param source: list of dictionaries in original key/value format
    :param subset_name: string identifier of the subset the data belongs to
    :return: list of dictionaries in standard key/value format
    """
    print('transform', subset_name)
    
    soup = BeautifulSoup(source, 'xml')

    for chapter in soup.find_all('CHAPTER'):
        target = dict()
        target[DATASET] = CORPUS
        target[SUBSET] = subset_name
        target[ID] = subset_name
        target[ID2] = chapter.attrs['ID']
        target[TITLE] = ' '.join([w.string for w in chapter.find('s').find_all('w')])
        target[TAGS] = None
        target[TIME] = datetime.strptime(subset_name[3:11], '%y-%m-%d')  # ep-07-01-18-009-07
        
        text = []
        for paragraph in chapter.find_all(["SPEAKER", "P"]):
            if paragraph.name == 'SPEAKER':
                text.append(paragraph.attrs['NAME'])
            elif paragraph.name == 'P':
                text.append(' '.join([w.string for w in paragraph.find_all('w')]))
        target[TEXT] = '\n'.join(text)

        target[HASH] = hash(tuple([target[key] for key in META]))
        yield target


def load_data(number_of_subsets: int=None, start: int=0):
    """
    :param number_of_subsets: number of subsets to process in one call (None for no limit)
    :param start: index of first subset to process
    :yield: data set name, data subset name, data json
    """
    
    print("process", CORPUS)
    
    # --- read files ---
    files = [f for f in listdir(FULL_PATH) if isfile(join(FULL_PATH, f))]

    if number_of_subsets:
        number_of_subsets += start
        if number_of_subsets > len(files):
            number_of_subsets = None

    for name in files[start:number_of_subsets]:
        fpath = join(FULL_PATH, name)
        try:
            with gzip.open(fpath, 'rb') as fp:
                data = fp.read()
        except IOError:
            print("Could not open", fpath)
            continue
        subset = name[:-7]

        yield transform_subset(data, subset)


dfs = [pd.DataFrame(item) for item in load_data(number_of_subsets=None)]
if dfs:
    df = pd.concat(dfs)
    df = df.set_index(HASH)[META+[TEXT]]

In [None]:
### --- extract, transform, load (save) the following corpus:
import gzip
from bs4 import BeautifulSoup
import re

CORPUS = "PoliticalSpeeches"
LOCAL_PATH = "German-political-speeches-2018-release"
FULL_PATH = join(DATA_BASE, LOCAL_PATH)


def transform_subset(source, subset_name: str):
    """
    :param source: list of dictionaries in original key/value format
    :param subset_name: string identifier of the subset the data belongs to
    :return: list of dictionaries in standard key/value format
    """
    print('transform', subset_name)
    
    soup = BeautifulSoup(source, 'xml')

    months = dict(Januar='01', Februar='02', März='03', April='04', Mai='05', Juni='06',
                  Juli='07', August='08', September='09', Oktober='10', November='11', Dezember='12')
    pattern = re.compile(r'(' + '|'.join(months.keys()) + r')')

    for speech in soup.find_all('text'):
        target = dict()
        target[DATASET] = CORPUS
        target[SUBSET] = subset_name
        target[ID] = speech.attrs['url']
        target[ID2] = 0
        target[TITLE] = speech.attrs['titel']
        target[TAGS] = speech.attrs['person']
        if speech.attrs['datum']:
            match = pattern.search(speech.attrs['datum'])
            if match:
                datum = speech.attrs['datum'].replace(" ", "")
                time = pattern.sub(lambda key: months[key.group()] + '.', datum)
                target[TIME] = datetime.strptime(time, '%d.%m.%Y')
            else:
                target[TIME] = datetime.strptime(speech.attrs['datum'], '%d.%m.%Y')
        else:
            target[TIME] = None

        target[TEXT] = speech.attrs['titel'] + ' .\n' \
                     + speech.attrs['untertitel'] + ' .\n' if 'untertitel' in speech.attrs else ''\
                     + speech.find('rohtext').string

        target[HASH] = hash(tuple([target[key] for key in META]))
        yield target


def load_data(number_of_subsets: int=None, start: int=0):
    """
    :param number_of_subsets: number of subsets to process in one call (None for no limit)
    :param start: index of first subset to process
    :yield: data set name, data subset name, data json
    """
    
    print("process", CORPUS)
    
    # --- read files ---
    files = [f for f in listdir(FULL_PATH) if isfile(join(FULL_PATH, f))]

    if number_of_subsets:
        number_of_subsets += start
        if number_of_subsets > len(files):
            number_of_subsets = None

    for name in files[start:number_of_subsets]:
        if name[-3:] != 'xml':
            continue
        
        fpath = join(FULL_PATH, name)
        try:
            with open(fpath, 'r') as fp:
                data = fp.read()
        except IOError:
            print("Could not open", fpath)
            continue
        subset = name[:-4]

        yield transform_subset(data, subset)


dfs = [pd.DataFrame(item) for item in load_data(number_of_subsets=None)]
if dfs:
    df = pd.concat(dfs)
    df = df.set_index(HASH)[META+[TEXT]]

In [13]:
### --- extract, transform, load (save) the following corpus:

CORPUS = "dewac"
LOCAL_PATH = "WaCKy/dewac"
FULL_PATH = join(DATA_BASE, LOCAL_PATH)
mn = 12


def transform_subset(source, subset_name: str):
    """
    :param source: list of dictionaries in original key/value format
    :param subset_name: string identifier of the subset the data belongs to
    :return: list of dictionaries in standard key/value format
    """
    # print('transform', source['url'][mn:])
    
    assert source['url'][:mn] == "CURRENT URL "
    target = dict()
    target[DATASET] = CORPUS
    target[SUBSET] = subset_name
    target[ID] = source['url'][mn:]
    target[ID2] = None
    target[TITLE] = target[ID].split('/')[-1]
    target[TAGS] = None
    target[TIME] = None
    target[TEXT] = source['text']
    target[HASH] = hash(tuple([target[key] for key in META]))
    yield target


def load_data(number_of_documents: int=None, start: int=0):
    """
    :param number_of_subsets: number of subsets to process in one call (None for no limit)
    :param start: index of first subset to process
    :yield: data set name, data subset name, data json
    """
    
    print("process", CORPUS)
    
    # --- read files ---
    name = 'dewac_preproc'
    fpath = join(FULL_PATH, name)
    print(fpath)
    try:
        with open(fpath, 'r', encoding='latin-1') as fp:
            i = 0
            # not the most efficient with respect to CPU-cycles and I/O operations 
            # but quite feasible in regards of memory consumption
            while True:
                data = dict()
                if number_of_documents and i >= start+number_of_documents:
                    break
                data['url'] = fp.readline().strip()
                if not data['url']:
                    break 
                data['text'] = fp.readline().strip()
                i += 1
                if i < start:
                    # could be more efficient if we don't close the file pointer
                    continue
                if data['url'][:mn] != "CURRENT URL ":
                    fp.readline()
                    continue
                # print(i, ':')
                yield transform_subset(data, '')
    except IOError:
        print("Could not open", fpath)


for i in range(0, 20):
    dfs = [pd.DataFrame(item) for item in load_data(number_of_documents=100000, start=i*100000)]
    if dfs:
        df = pd.concat(dfs)
        df = df.set_index(HASH)[META+[TEXT]]
        store(("%s_%02d" % CORPUS, i), df)
    else:
        break

print('done.')

process dewac
../../master_cloud/corpora/WaCKy/dewac/dewac_preproc
done.
