In [None]:
import time
import datetime
import dateutil.parser as dtp
import os
import sys
import json
import bz2
import io
import heapq
import functools

import collections as coll
import itertools as itr
import regex as re

import numpy as np
import pandas as pd
import math

#edit path the local minerva git repo
# path_elements =[
#     os.sep,
#     'Users',
#     'cjc73',
#     'gits',
#     'minerva',
#     'detector',
#     'swm',
# ]
# GIT_DB_PATH = os.path.abspath(os.path.join(*path_elements))
# sys.path.append(GIT_DB_PATH)

import swm_py3 as swm
import tweet2token as tt
import streamtools as st

In [None]:
# print(pwd)

In [None]:
###################################################################
# helpers
###################################################################

def print_all(df):
    with pd.option_context('display.max_rows', None, 'display.max_columns', None):
        print(df)


def data_pusher(stream, consumer):
    for item in stream:
        #print(item)
        consumer.send(item)
    print("All done!")
    consumer.close()


def file_streamer(in_file_path: str, skip=0):
    with io.open(in_file_path, encoding='utf-8', newline='\r\n') as infile:
        for i in range(skip):
            infile.readline()
        for line in infile:
            yield line

def linejson_file_streamer(in_file_path: str, skip=0):
    with io.open(in_file_path, encoding='utf-8', newline='\n') as infile:
        for i in range(skip):
            infile.readline()
        for line in infile:
            yield line
            
def bz2_linejson_file_streamer(in_file_path: str, skip=0):
    with bz2.open(in_file_path, mode='rt', encoding='utf-8', newline='\n') as infile:
        for i in range(skip):
            infile.readline()
        for line in infile:
            yield line
            
def print_stream():
    while True:
        to_print = (yield)
        if to_print is not None:
            print(to_print)

def get_tid():
    """Extract tid from a tweet"""def verbose_yield(func):
    def in_fn(*args, **kwargs):
        print(f"{func.__name__} got {list(args)}") 
        res = func(*args, **kwargs)
        print(res)
        yield res
    return in_fn
    return tweet['id_str']


def verbose_yield(func):
    def in_fn(*args, **kwargs):
        print(f"{func.__name__} got {list(args)}") 
        res = func(*args, **kwargs)
        print(res)
        yield res
    return in_fn


In [None]:
class SigWordGramTracker(st.TokenTracker):
    def __init__(self, grams=(2, 3), sig_cutoff=None, **kwargs):
        super().__init__(**kwargs)
        self.grams = grams
        self.sig_cutoff = sig_cutoff

        # setup SWM coprocess
        self.swm_co_gen = self.swm_cp()
        self.swm_co_gen.send(None)
        
        # Create necessary TF-IDF tracking variables
        ##
        ##
        self.word_doc_freq = {}
        # for now, accumulate counts forever
        # otherwise implement a hist tracker
        # of documents

        # Hold off reporting n-grams until we learn about the text
        # for now, let us hold the first 1000 tweets
        self.pre_history = {}
        self.doc_count = {}

    def ingress(self):
        co_gen = self.co_process()
        co_gen.send(None)
        return co_gen

    def update_word_sig(self, words, lang):
        if not lang in self.word_doc_freq:
            self.word_doc_freq[lang] = {}
        for word in set(words):
            self.word_doc_freq[lang][word] = self.word_doc_freq[lang].get(word,0) + 1
        # could expire old
    
    def get_sig_words(self, words, lang):
        return [word for word in set(words) if self.is_sig_word(word, lang)]

    def is_sig_word(self, word, lang):
        # is the word infrequent enough?
        if self.sig_cutoff is None:
            return True
        if not lang in self.word_doc_freq:
            return True
        if not lang in self.doc_count:
            return True
        return self.word_doc_freq.get(lang, {}).get(word,0)/self.doc_count[lang] < self.sig_cutoff

    def send_ngrams(self, sig_words, dtime):
        for n in self.grams:
            for ngram in itr.combinations(sorted(sig_words), n):
                self.swm_co_gen.send((ngram, dtime))

    def co_process(self):
        while True:
            (lang, words), dtime = (yield)
            if not words:
                continue

            self.doc_count[lang] = self.doc_count.get(lang,0) + 1
            if not lang in self.pre_history:
                self.pre_history[lang] = []
            self.update_word_sig(words, lang)

            if self.doc_count[lang] < 1000:
                self.pre_history[lang].append((words, dtime))
                continue

            elif self.doc_count[lang] == 1000:
                for words, dtime in self.pre_history[lang]:
                    sig_words = self.get_sig_words(words, lang)
                    self.send_ngrams(sig_words, dtime)
                del self.pre_history[lang]
            
            else:
                sig_words = self.get_sig_words(words, lang)
                self.send_ngrams(sig_words, dtime)
                
                

In [None]:
#infile_string = "/Users/cjc73/Downloads/tweets_gnip_latest_2018_7_linejson.bz2"
#infile_string = '/Users/cjc73/Expire/tweet_stream/baton_rouge_twitter.json'
infile_string = '/Users/cjc73/Downloads/bad_linejson.bz2'
in_file_path = os.path.realpath(infile_string)
out_file_path = ''.join(os.path.splitext(in_file_path)[:-1]) + u'.csv'

In [None]:
%%time
##########################
# Build a extract-track-report SWM pipeline and process data


# ----------------------------
# Create report consumers
tag_rpt = st.SimpleAccumulator() # printing=True)
taggram_rpt = st.SimpleAccumulator()
url_rpt = st.SimpleAccumulator()
ngram_rpt = st.SimpleAccumulator()
mention_rpt = st.SimpleAccumulator()

# Extra reporters for retweets
rt_tag_rpt = st.SimpleAccumulator()
#rt_url_rpt = st.SimpleAccumulator()
#rt_ngram_rpt = st.SimpleAccumulator()
#rt_mention_rpt = st.SimpleAccumulator()

# ----------------------------
# Create Trackers

windows_ms = (
    20 * 60 * 1000,      # 20 minutes in ms
    60 * 60 * 1000,      # 1 hr in ms
    4 * 60 * 60 * 1000   # 4 hrs in ms
)
report_triggers = [[2], [2], [2]]

common_kwargs = {
    'windows': windows_ms,
    'report_triggers': report_triggers,
    'high_val_thresh_fn': lambda x, t_max: math.log2(x/t_max),
    'require_run': False,
    'track_all_tags': True,
    'no_window_overlap': True,
    'report_decreases': False,
}

twt = {
    'rt_tag': [[{'minutes': 20}, [2]], [{'hours': 1}, [2]], [{'hours': 4}, [2]]],
    'tag':    [[{'minutes': 20}, [2]], [{'hours': 1}, [2]], [{'hours': 4}, [2]]],
    'words':  [[{'minutes': 20}, [2]], [{'hours': 1}, [5]], [{'hours': 4}, [10]]],
}

# ----------------------------
# Set up monitoring coprocesses

# tags
tag_mon = st.TokenTracker(
    consumer=tag_rpt.ingress(),
    tag_window_triggers=twt['tag'],
    **common_kwargs
)
rt_tag_mon = st.TokenTracker(
    consumer=rt_tag_rpt.ingress(),
    tag_window_triggers=twt['rt_tag'],
    **common_kwargs
)

# mentions
at_men_mon = st.TokenTracker(
    consumer=mention_rpt.ingress(),
    tag_window_triggers=twt['tag'],
    **common_kwargs
)

# urls
url_mon = st.TokenTracker(
    consumer=url_rpt.ingress(),
    tag_window_triggers=twt['tag'],
    **common_kwargs
)

# words
word_gram_mon = SigWordGramTracker(
    consumer=ngram_rpt.ingress(),
    tag_window_triggers=twt['words'],
    grams=(2,),
    sig_cutoff=.025,
    **common_kwargs
)

# tag grams
tag_gram_mon = st.TokenTracker(
    consumer=taggram_rpt.ingress(),
    tag_window_triggers=twt['tag'],
    **common_kwargs
)

# ----------------------------
# Create and prime extractor
te_pipelines = [
    # tags
    {'getter':   tt.get_tags,
     'for_each': tag_mon.ingress(),
     'for_rt':   rt_tag_mon.ingress()
    },
    # words
    #{'getter':   tt.get_lang_words,
    # 'for_each': word_gram_mon.ingress()
    #},
    # tag grams
    {'getter':   tt.get_tag_grams,
     'for_each': tag_gram_mon.ingress(),
    },
    # urls
    {'getter':   tt.get_urls,
     'for_each': url_mon.ingress(),
    },
    # mentions
    {'getter':   tt.get_mentions,
     'for_each': at_men_mon.ingress(),
    },    
]
te = tt.token_extractor(te_pipelines)
te.send(None)

# -----------------------------
# Preprocessor to compute/fix fields
pre = tt.pre_process(consumer=te)
pre.send(None)

# ----------------------------
# Clean stream of duplicates and out-of-order entries
#  before processing
stream_cleaner = st.StreamCleaner(
    delay=datetime.timedelta(seconds=1 * 60),
    #consumer=te
    consumer=pre
)
# ---------------------------
# Parse or skip lines from file:
ps = print_stream()
ps.send(None)
#jparser = tt.json_parser(consumer=ps)
jparser = tt.json_parser(consumer=stream_cleaner.ingress())
jparser.send(None)

# ----------------------------
# Push data through pipeline 
#line_iter = iter(file_streamer(in_file_path, skip=0))
#line_iter = iter(linejsoln_file_streamer(in_file_path, skip=0))
line_iter = iter(bz2_linejson_file_streamer(in_flile_path, skip=0))

data_pusher(line_iter, jparser)
# flush queue
stream_cleaner.clearq()

In [None]:
# ----------------------------
# Report results

labeled_mon_rpt_list = [
    ('tag',       tag_mon,       tag_rpt     ),
    ('rt_tag',    rt_tag_mon,    rt_tag_rpt  ),
    ('tag_gram',  tag_gram_mon,  taggram_rpt ),
    ('url',       url_mon,       url_rpt     ),
    ('mention',   at_men_mon,    mention_rpt ),
    ('word_gram', word_gram_mon, ngram_rpt   ),
]

for label, mon, rpt in labeled_mon_rpt_list:
    print("Total {}s seen: {}".format(label, len(mon.swm.total_tag_count)))
    print("Total {} uses: {}".format(label, sum(mon.swm.total_tag_count.values())))

    token_counter = coll.Counter(mon.swm.total_tag_count)
    print("**10 Most Common {}s:**".format(label))
    for tag, count in token_counter.most_common(10):
        print("\t{} : {}".format(tag, count))
    
    rpt.create_df()
    report_counter = coll.Counter(rpt.df['tag'].values)
    print("**10 Most Reported {}s:**".format(label))
    for tag, count in report_counter.most_common(10):
        print("\t{} : {}".format(tag, count))
    
    print('\n')

if out_file_path:
    name, ext = os.path.splitext(out_file_path)
    tag_outpath = '{}_{}_reports{}'.format(name, label, ext)