In [1]:
%load_ext line_profiler

%load_ext autoreload
%autoreload 2

In [7]:
import boto
import boto.s3.connection

import warnings

import numpy as np
import regex as re
import zlib, codecs
import time, os
from datetime import datetime
from functools import partial
import pickle
import numba

In [3]:
def build_prefix(lang, n, version='20120701'):
    """ Builds a prefix for listing keys from the GoogleBooks bucket.
    This is mostly hardcoded and doesn't check if the prefix is valid (returns existing keys)
    
    Parameters:
    -----------
        lang : 3-letter string for language. It doesn't check if the language actually exists
        n    : the n-gram. 
        version: which data version, e.g. '20120701'
    """
    # TODO: you could validate this agains the bucket by checking each prefix
    return '{lang}/googlebooks-{lang}-all-{n}gram-{version}-'.format(lang=lang, n=n, version=version)

In [4]:
def get_s3_bucket():
    """ Returns the bucket object for downloading the files. Currently uses the boto2 interface"""
    conn = boto.connect_s3(anon=True, host='datasets.iccluster.epfl.ch', 
                           calling_format=boto.s3.connection.OrdinaryCallingFormat())
    return conn.get_bucket('google-ngrams')

In [5]:
def key_basename(key):
    """ Customised basename for google keys, where the separator is '-'.
    By default, returns the name without extension. Set extension=True to include it """
    return key.name.split('-')[-1].split('.')[0]

In [37]:
# @numba.jit
def decompress_stream(stream):
    """ Given an iterable stream of gzipped data (such as a `key` in S3 storage), this function returns an iterator
    over the uncompressed and utf-8 decoded data """
    extracter = zlib.decompressobj(16 + zlib.MAX_WBITS)
    decoder   = codecs.getincrementaldecoder('utf-8')()  # note the second () which instantiates the object

    for chunk in stream:
        yield decoder.decode( extracter.decompress(chunk) )

    yield decoder.decode( extracter.flush() , final = True)

In [36]:
# @numba.jit
def get_entry(word, range_size):
    """ Given `word` returns its clean form and an integer array if it is valid 
    or the empty string and None if it is invalid """
    # skip if it's not letters only (and space _ apostrophe) or is "word _ADJ_
    if re.search(r"[^\p{Lu}\p{Ll}_' ]|\b_[A-Z]*_\b", word):
        return "", None
    
           # remove POS tags                   # array of counts
    return re.sub(r'_[A-Z]*\b', '', word).lower(), np.zeros(range_size, dtype=np.uint32)

In [29]:
# @numba.jit
def process_key(key, start_year=1800, end_year=2012):
    """ Given the keyname, save a dictionary of the wordcount aggregates """

    # setup the counting structure
    range_size = end_year - start_year # open interval
    word_counts = {}

    # since verifying and cleaning an entry is quite expensive (2 RegEx)
    # we do it only once, when we detect the start of a new block
    # `word_raw` is used for checking if current line starts the block
    # of a new entry; `word_entry` holds the array for `word_clean`
    word_raw   = u"_unk_curr_word_"
    word_clean = u""
    word_entry = None
    word_defaut_entry = np.zeros(range_size, dtype=np.uint32)

    prev_chunk_end  = u""
    for chunk in decompress_stream(key):
        lines = chunk.split('\n')

        # complete chunk with the ending of previous one
        lines[0] = prev_chunk_end + lines[0]

        # leave last line for next chunk since it may be incomplete
        prev_chunk_end = lines[-1]

        for line in lines[:-1]:
            word_line, rest_line = line.split('\t', 1)

            if word_line != word_raw:
                if word_clean:   # STUPID!: and word_entry.sum() > range_size * 35:
                    # we have a valid entry, we add the one up to now. adding because of `.lower()`
                    word_counts[word_clean] = word_entry + word_counts.get(word_clean, word_defaut_entry)

                word_raw = word_line                                     # update the form
                word_clean, word_entry = get_entry(word_raw, range_size) # generate new entry

            if not word_clean:
                continue
            # here we can rely on word_entry (it is valid)

            # check if fits our range
            year, count, _ = rest_line.split('\t')
            year = int(year)
            if not start_year <= year < end_year:
                continue

            idx = year - start_year
            word_entry[idx] += int(count)

    return word_counts


In [9]:
def save_key(key_freqs, root_path, basename):
    """ Serializes the dict of freqs at root_path/basename.pkl"""
    path = os.path.join(root_path, basename + ".pkl")
    with open(path, 'wb') as f:
        pickle.dump(key_freqs, f, pickle.HIGHEST_PROTOCOL)

In [10]:
def linear_pass(lang, n):
    bucket = get_s3_bucket()
    prefix = build_prefix(lang, n)
    log_file = open("linear_pass.log", "a")

    total_time, num_keys, num_kbytes = 0, 0, 0
    num_words = 0
    
    save_path = os.path.join('/mnt/cluster-nas/ciprian/n-grams/', lang, str(n))
    
    for key in bucket.list(prefix, '-'):
        # skip uninteresting files; including "a_"
        name = key_basename(key)
        if not name.isalpha() or name in ['other', 'punctuation']:
            continue
        
        if os.path.exists(os.path.join(save_path, name + ".pkl")):
            print("Exists", name)
            continue

        print(name)
        key_start  = time.time()
        key_counts = process_key(key)
        key_end    = time.time()
        key_time   = key_end - key_start
        
        save_key(key_counts, save_path, name)

        total_time += key_time
        
        num_keys += 1
        num_kbytes += key.size >> 10
        num_words += len(key_counts.keys())
        
        print("{info} Key {kn} took {t:.0f}s ({avg:.0f} s/key, {b} kbytes/s)"
              "Elapsed: {total:.0f}s. {w} words in {k} keys.".format(info = datetime.now(), kn = name, 
                  t = key_time, avg=total_time // num_keys, b = num_kbytes // total_time, total=total_time, 
                  w=num_words, k=num_keys), file=log_file, flush=True)
    
    log_file.close()

In [12]:
bucket = get_s3_bucket()

In [13]:
prefix = build_prefix('spa', 5)

In [14]:
keys= []
for key in bucket.list(prefix, '-'):
    keys.append(key)

In [22]:
kde = keys[4 * 26 - 4]

In [38]:
wc = process_key(kde, 1800, 2012)

error: Error -3 while decompressing data: incorrect header check

In [35]:
keys[:15]

[<Key: google-ngrams,eng/googlebooks-eng-all-5gram-20120701-punctuation.gz>,
 <Key: google-ngrams,eng/googlebooks-eng-all-5gram-20120701-th.gz>,
 <Key: google-ngrams,eng/googlebooks-eng-all-5gram-20120701-of.gz>,
 <Key: google-ngrams,eng/googlebooks-eng-all-5gram-20120701-an.gz>,
 <Key: google-ngrams,eng/googlebooks-eng-all-5gram-20120701-to.gz>,
 <Key: google-ngrams,eng/googlebooks-eng-all-5gram-20120701-in.gz>,
 <Key: google-ngrams,eng/googlebooks-eng-all-5gram-20120701-be.gz>,
 <Key: google-ngrams,eng/googlebooks-eng-all-5gram-20120701-co.gz>,
 <Key: google-ngrams,eng/googlebooks-eng-all-5gram-20120701-wh.gz>,
 <Key: google-ngrams,eng/googlebooks-eng-all-5gram-20120701-ha.gz>,
 <Key: google-ngrams,eng/googlebooks-eng-all-5gram-20120701-is.gz>,
 <Key: google-ngrams,eng/googlebooks-eng-all-5gram-20120701-ad.gz>,
 <Key: google-ngrams,eng/googlebooks-eng-all-5gram-20120701-wa.gz>,
 <Key: google-ngrams,eng/googlebooks-eng-all-5gram-20120701-fo.gz>,
 <Key: google-ngrams,eng/googlebooks-en

In [43]:
t = 0
for key in keys[1:]:
#     print(key_basename(key), key.size / 2**30 * 6100 / 3.7 / 3600)
    t += key.size / 2**30 * 6100 / 3.7 / 3600
print(t / 9)

10.089391899725692


In [31]:
def key_sort(key):
#     print(key.size)
    return key.size

In [35]:
sorted(b.list(build_prefix('ger', 3), '-'), key=lambda k: k.size, reverse=True)

[<Key: google-ngrams,ger/googlebooks-ger-all-3gram-20120701-punctuation.gz>,
 <Key: google-ngrams,ger/googlebooks-ger-all-3gram-20120701-de.gz>,
 <Key: google-ngrams,ger/googlebooks-ger-all-3gram-20120701-_NOUN_.gz>,
 <Key: google-ngrams,ger/googlebooks-ger-all-3gram-20120701-_DET_.gz>,
 <Key: google-ngrams,ger/googlebooks-ger-all-3gram-20120701-di.gz>,
 <Key: google-ngrams,ger/googlebooks-ger-all-3gram-20120701-_ADJ_.gz>,
 <Key: google-ngrams,ger/googlebooks-ger-all-3gram-20120701-un.gz>,
 <Key: google-ngrams,ger/googlebooks-ger-all-3gram-20120701-be.gz>,
 <Key: google-ngrams,ger/googlebooks-ger-all-3gram-20120701-ei.gz>,
 <Key: google-ngrams,ger/googlebooks-ger-all-3gram-20120701-ge.gz>,
 <Key: google-ngrams,ger/googlebooks-ger-all-3gram-20120701-_ADP_.gz>,
 <Key: google-ngrams,ger/googlebooks-ger-all-3gram-20120701-_PRON_.gz>,
 <Key: google-ngrams,ger/googlebooks-ger-all-3gram-20120701-au.gz>,
 <Key: google-ngrams,ger/googlebooks-ger-all-3gram-20120701-da.gz>,
 <Key: google-ngrams,g

In [15]:
k = list(b.list())[250]

In [17]:
k.size

1441031