# Data Step 2: Counting up Global Counts for Word List

In the EF processing script, token counts were collected in batches, folded to language x token counts in batches, and saved to HDF5 stores in `/store`. This script will fold those batches into a single list, so each language-token combination only has one count. The previous script was a `map`, this script with `reduce`.

In [None]:
import glob
import re
import pandas as pd
import numpy as np
from ipyparallel import Client
import logging
import os
from tqdm import tqdm_notebook
from bokeh.io import output_notebook
import dask.dataframe as dd
from dask.diagnostics import ProgressBar, Profiler, ResourceProfiler, CacheProfiler, visualize
output_notebook()
rawstores = glob.glob("/notebooks/data/batch2/stores/*h5")

## Preparation

Attach engines and initialize logging. *We'll be processing large in-memory chunks, so don't start too many processes.* I'm using a machine with 128MB RAM, and 10 processes hits around 2/3 of the RAM (80MB) for chunksize=1m in Step 1.

In [None]:
def init_log(name=False):
    import logging, os
    if not name:
        name = os.getpid()
    handler = logging.FileHandler("/notebooks/data2/logs/bw-%s.log" % name, 'a')
    formatter = logging.Formatter('%(asctime)s:%(levelname)s:%(message)s', "%m/%d-%H:%M:%S")
    handler.setFormatter(formatter)
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    logger.addHandler(handler)
    logging.info("Log initialized")

init_log('final')

In [None]:
rc = Client()
dview = rc[:]
v = rc.load_balanced_view()

# Need this later to force garbage collection
@dview.remote(block=True)
def force_gc():
    import gc
    before = gc.get_count()
    gc.collect()
    return before[0], gc.get_count()[0]

dview.push(dict(init_log=init_log))
%px init_log()

## Step 1: Triage and merge small chunks by lang (Parallelized)

Iterate through all the stores, groupby by language then summing counts by token. These counts are still saved to an engine's own store under merge1/{language}, so that it can be parallelized.

In [None]:
def triage(inputstore):
    try:
        import numpy as np
        import pandas as pd
        import logging
        import os
        import gc
    except:
        return "import error for " + inputstore

    chunksize = 1000000
    storefolder = 'merged1' # this is in the h5 hierarchy
    outputstorename = "/notebooks/data2/batch2-redo/merge-%s.h5" % os.getpid()
    max_str_bytes = 50
    
    errors = 0
    with pd.HDFStore(outputstorename, complevel=9, mode="a", complib='blosc') as outstore:
        with pd.HDFStore(inputstore, complevel=9, mode="r", complib='blosc') as store:
            row_size = store.get_storer('/tf/corpus').nrows
            storeiter = store.select('/tf/corpus', start=0, chunksize=chunksize)

            i = 0
            for chunk in storeiter:
                i += 1
                try:
                    lang_groups = chunk.groupby(level=['language'])
                    for lang,df in lang_groups:
                        if df.empty:
                            continue
                        merged = df.groupby(level=['token']).sum()
                        
                        fname = "%s/%s" % (storefolder, lang)
                        outstore.append(fname, merged, data_columns=['count'], min_itemsize = {'index': max_str_bytes})
                    logging.info("Completed %d/%d" % (i, np.ceil(row_size/chunksize)))
                except:
                    errors += 1
                    logging.exception("Error processing batch %d (docs %d-%d) of input store" % (i, (i-1)*chunksize, i*chunksize))
                gc.collect()
    gc.collect()
    if errors == 0:
        return "success"
    else:
        return "%d errors on process %s, check logs" % (errors, os.getpid())
dview.push(dict(triage=triage))

<AsyncResult: _push>

In [None]:
logging.info("Processing Started")
parallel_job = v.map(triage, rawstores, ordered=False)
i = 0

for result in tqdm_notebook(parallel_job, smoothing=0):
    i += 1
    if result == "success":
        logging.info("Done processing batch %d" % i)
    else:
        logging.error(result)
        
print(force_gc())

The installed widget Javascript is the wrong version.


CompositeError: one or more exceptions from call to method: triage
[Engine Exception]EngineError: Engine b'c426022b-5b16-44cd-a480-ca5d11c0d96e' died while running task 'd3269715-7885-49c3-806b-99d7af445991'

## Step 2: Sum token counts by store (Parallelized)

Using Dask for out-of-core processing, run a big split-apply-combine on each language table, *summing* counts for each token and saving to a single store. Here we're simply doing `groupby('count').sum()` and saving to `savestore`, with some safety checks.

Dask counts these up in low-memory partitions, then sums intermediate representations. In the last step, where Dask puts it together, the memory use can get high, so there is still a batch_limit (so we're not summing more than 600m rows at a time). This is mainly encountered by German and English.

Because there are multiple stores being processed and saving it, the table in `savestore` will not be fully unique, but nearly there. It needs one final sum() in step 3.

In [None]:
stores = glob.glob("/notebooks/data3/fold-232/*.h5")

In [None]:
max_str_bytes = 50
chunksize = 100000
batch_limit = 6*10**8
savestore = "/notebooks/data2/final/fromnodes-323.h5"

for storefile in stores:
    logging.info("Next store: %s" % storefile)
    try:
        # Get Unique languages
        with pd.HDFStore(storefile, complevel=9, mode="a", complib='blosc') as store:
            langs = set([key.split("/", maxsplit=-1)[-1] for key in store.keys() if 'merged1' in key])
    except:
        logging.exception("Can't read languages from %s" % storefile)
        continue

    for lang in langs:
        batch = False
        logging.info("Starting lang %s from %s" % (lang, storefile))
        
        if not re.match('[a-z]{3}', lang):
            logging.error("lang '%s' is not three alphanumeric characters. Skipping for now. (%s)" % (lang, storefile))
            continue
        
        try:
            ddf = dd.read_hdf(storefile, '/merged1/'+lang, chunksize=chunksize, mode='r')
        except:
            logging.exception("Can't load Dask DF for %s in %s" % (lang, storefile))
            continue
        
        # Assuming partitions are equally sized, which they should be if read from a single file
        if ddf.npartitions > np.ceil(batch_limit/chunksize):
            batch = True
            niters = np.floor((ddf.npartitions*chunksize)/batch_limit)
            i = 0
        
        while True:
            if batch:
                start = i * batch_limit
                logging.info("Starting batch %d for %s" % (i, lang))
                if i == niters:
                    # Last batch, no stop value
                    ddf = dd.read_hdf(storefile, '/merged1/'+lang, chunksize=chunksize, start=start)
                    batch = False
                else:
                    ddf = dd.read_hdf(storefile, '/merged1/'+lang, chunksize=chunksize,
                                      start=start, stop=(start+batch_limit))
                    i += 1
            try:
                logging.info("Starting full merge for %s with %d partitions" % (lang, ddf.npartitions))
                with ProgressBar():
                    full_merge = ddf.reset_index().groupby('token').sum().compute()
                if lang == 'eng':
                    # For curiosity: see the profiling for English
                    prof.visualize()
                logging.info("Success! Saving merged.")
                # The /fromnodes table is the sum from all the different stores, but will need to be summed one more time
                with pd.HDFStore(savestore, complevel=9, mode="a", complib='blosc') as store:
                    store.append(lang,
                                 full_merge,
                                 data_columns=['count'],
                                 min_itemsize = {'index': max_str_bytes})
            except:
                logging.exception("Can't compute or save lang for %s in %s" % (lang, storefile))
            
            if batch == False:
                break

[########################################] | 100% Completed |  1min  3.7s
[########################################] | 100% Completed |  0.6s
[########################################] | 100% Completed |  0.4s
[########################################] | 100% Completed |  2min 34.8s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed | 52.5s


## Step 3: Final combine

All our information is in `savestore` now. For example, English language tokens are in the `/eng` table of `savestore`. 

Now, we just need to sum all the rows, so each token only has one total count. While the very sparse words have been kept around until now, this code would be much too complex if we include them, so we will filter to `count >= 10`. The reduces the final size to $1/100$ for the languages where I tested. Presumably it would be less drastic for German and English, where the sheer number of texts might increase the liklihood that a junky OCR error would occur a few times.

In [None]:
stores = glob.glob('/notebooks/data2/final/fromnodes*h5') + glob.glob('/notebooks/data3/final/fromnodes*.h5')

In [None]:
# Collect a list of which stores have information for each possible language
storelist = dict()
for storepath in stores:
    with pd.HDFStore(storepath) as store:
        for key in store.keys():
            if key in storelist:
                storelist[key].append(storepath)
            else:
                storelist[key] = [storepath]

In [None]:
query = "count >= 10"
listfilter = '/[n-z]'
processlist = [item for item in storelist.items() if re.match(listfilter, item[0])]

logging.info("Processing %s, filtered to %s" % (", ".join([p[0] for p in processlist]), query))

for lang, langstores in processlist:
    try:
        # Get dask dataframe for given language from multiple sources
        dask_dfs = [dd.read_hdf(path, lang, chunksize=100000) for path in langstores]
        ddf = dd.concat(dask_dfs)
        logging.info("Processing %s with %d partitions" % (lang, ddf.npartitions))

        with ProgressBar():
            ddf.query(query).reset_index().groupby('token').sum()\
               .to_hdf('/notebooks/data2/final/final.h5', lang, complevel=9, complib='blosc')
    except:
        logging.exception("Error with %s" % lang)
logging.info("Done")

[########################################] | 100% Completed |  0.3s


## Step 4: Sort by occurrences, descending

`/notebooks/data2/final/final.h5` is done processing, but it would be more useful to store sorted. Here, read each Dataframe, sort, and save to `/notebooks/data2/final/final-sorted.h5`.

In [None]:
with pd.HDFStore('/notebooks/data2/final/final.h5') as store:
        keys = store.keys()
with ProgressBar(), Profiler() as prof:
    for key in keys:
        logging.info("Sorting %s" % key)
        df = dd.read_hdf('/notebooks/data2/final/final.h5', key)
        df.compute().sort_values('count', ascending=False).to_hdf('/notebooks/data2/final/final-sorted.h5', key, complevel=9, complib='blosc')
    logging.info("Done sorting")

[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.6s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.5s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100



[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.2s
[########################################] | 100% Completed | 12.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100

# Done! Clean-up and stats

Run a sanity check on `final-sorted.h5`, then the intermediate files can be deleted. Here's what is in the store:

In [None]:
with pd.HDFStore('/notebooks/data2/final/final-sorted.h5') as store:
    keys = store.keys()
    sizes = [store.get_storer(key).shape[0] for key in keys]
pd.Series(sizes, index=keys).sort_values(ascending=False)

/eng    79005095
/ger    30446373
/fre    17440715
/lat    13691932
/rus    10851839
/jpn     8333906
/ita     7069154
/spa     7027856
/chi     5210120
/und     4886094
/dut     4504989
/pol     3685618
/cze     3659240
/mul     3170099
/swe     3132429
/heb     3113011
/kor     2902061
/scr     2873738
/hun     2802224
/dan     2778130
/ara     2625908
/por     2236777
/gre     2230249
/grc     2041854
/unk     1707922
/scc     1566006
/nor     1565011
/tur     1537497
/rum     1193284
/ukr     1179118
          ...   
/chy         272
/ale         257
/tsw         251
/ada         245
/aus         238
/syc         237
/aft         235
/bua         229
/frr         224
/ert         205
/gal         204
/mas         196
/kaa         186
/erg         185
/gua         167
/anp         156
/lez         147
/tso         140
/wak         138
/ast         124
/sog         122
/srr          99
/bai          98
/hat          89
/ipk          70
/ijo          57
/yap          47
/ven          

In [None]:
pd.Series(sizes, index=keys).sort_values(ascending=False).shape

(443,)