In [1]:
# %load_ext autoreload
# %autoreload 2

In [2]:
%load_ext cython

In [3]:
from bokeh.io import output_notebook
output_notebook()

In [4]:
import collections
import glob
import json
import pickle

from multiprocessing.pool import Pool, ThreadPool

import numpy as np
import pandas as pd

import dask
import dask.dataframe as dd
import dask.array as da
import dask.bag as db
from dask.diagnostics import ProgressBar, Profiler, ResourceProfiler, CacheProfiler, visualize

In [5]:
import random_indexing as ri

In [6]:
load_tweets = (
    db.read_text(
        #'rehydrated_tweets_dump.jsonl',
        'dump10K.jsonl',
        blocksize=200*(1024**1)
    )
    .map(json.loads)
)

In [7]:
%%time
_ = (
    load_tweets
    .pluck('text', '')
    .flatten()
    .frequencies()
)

with ProgressBar():
    _ = _.compute()

vocabulary = pd.DataFrame.from_records(
    _,
    columns=['letter', 'count'],
    index='letter',
)
vocabulary.loc[''] = 0

vocabulary = vocabulary['count'].sort_values(ascending=False)

[########################################] | 100% Completed |  0.3s
CPU times: user 156 ms, sys: 46.5 ms, total: 203 ms
Wall time: 406 ms


In [8]:
len(vocabulary)

576

In [9]:
D = 10_000

In [10]:
%%time
features = ri.Features(vocabulary.index, D=D)

CPU times: user 213 ms, sys: 7.57 ms, total: 221 ms
Wall time: 223 ms


In [11]:
%%cython

def pool_init(features):
    import imp, sys
    __ctx = imp.new_module('__ctx')
    sys.modules['__ctx'] = __ctx
    
    __ctx.features = features


def vectorize(tweets):
    import numpy as np
    import random_indexing as ri
    
    from __ctx import features

    def f(t):
        vector = np.stack(ri.vectorize(t, window_length=3, features=features)).prod(axis=0)
        return np.packbits(vector == 1)
        
    return [f(tweet['text']) for tweet in tweets]

In [61]:
%%time

with Pool(initializer=pool_init, initargs=[features]) as pool:
    with dask.set_options(
        pool=pool,
        get=dask.multiprocessing.get,
    ), ProgressBar():
        with Profiler() as prof, ResourceProfiler(dt=0.25) as rprof, CacheProfiler() as cprof:

            pool_init(features)    

            vectors = (
                load_tweets
                .map_partitions(vectorize)
            )
            
            tweet_ids = load_tweets.pluck('tweet_id').to_dataframe(meta=[('tweet_id', 'uint64')])['tweet_id']
            
            from_scratch = (
                vectors.to_dataframe(meta=[(i, 'uint8')for i in range(1250)])
                .set_index(tweet_ids, sorted=True)
                .repartition(npartitions=100)
            )
            
            from_scratch.to_hdf('tweet_vectors.hdf', key='data/v*', mode='w')

[########################################] | 100% Completed |  1min 10.1s
[########################################] | 100% Completed |  1min 10.7s
CPU times: user 5.45 s, sys: 1.36 s, total: 6.81 s
Wall time: 2min 21s


In [62]:
visualize(prof, rprof, cprof);

In [60]:
%%time
from_hdf = (
    dd.read_hdf('tweet_vectors.hdf', key='data/v*', sorted_index=True)
)

CPU times: user 35.5 s, sys: 908 ms, total: 36.4 s
Wall time: 37 s


In [75]:
with Profiler() as prof, ResourceProfiler(dt=0.25) as rprof, CacheProfiler() as cprof, ProgressBar():    
    from_hdf.loc[[853035540559671296, 853046831227514884]].compute()

[########################################] | 100% Completed |  0.1s


In [131]:
screen_names = pd.DataFrame(
    {
        'screen_name': [
            '@one', '@one',
            '@ONE',
            '@OTHER', '@OTHER', '@OTHER',
            #'@OTHER_MISSING'
        ],
    },
    index=[
            853035540559671296, 853035540559671296,
            853036662594707456,
            853038492380794880, 853040176620326914, 853046831227514884,
            #1,
        ],
)

In [132]:
screen_names

Unnamed: 0,screen_name
853035540559671296,@one
853035540559671296,@one
853036662594707456,@ONE
853038492380794880,@OTHER
853040176620326914,@OTHER
853046831227514884,@OTHER


In [315]:
(
    from_hdf
    .join(screen_names, how='right')
    .groupby('screen_name')
    .apply(
        lambda g: unpack(g.values[:, :-1].astype('uint8')).sum(axis=0),
        meta=('x', int),
    )
).compute()

screen_name
@one      [2, -2, 2, -2, -2, 2, -2, -2, -2, 2, 2, 2, -2,...
@OTHER    [-1, -1, 3, -1, -3, 3, -1, -1, 1, -1, -1, -1, ...
@ONE      [-1, -1, 1, -1, -1, 1, 1, 1, 1, 1, -1, 1, 1, 1...
Name: x, dtype: object