In [2]:
from joblib import Parallel, delayed
import multiprocessing as mp
import numpy as np
import sys
import json_lines
import pandas as pd
import ijson
import matplotlib.pyplot as plt
import collections
from itertools import count
from numpy import linspace, loadtxt, ones, convolve
from random import randint
from matplotlib import style
style.use('fivethirtyeight')
%matplotlib inline

In [3]:
%%bash

# head -n 1 2017-07-05.jsonl
# wc -l 2017-07-05.jsonl

First, we will obtain the root key values of the data we are working with.

In [4]:
root_keys = []
with open('2017-07-05.jsonl', 'rb') as f:
    for item in json_lines.reader(f):
        for key in item.keys():
            root_keys.append(key)
        break
print(root_keys)

['_id', 'tag', 'btag', 'events', 'version', 'entities', 'story_id', 'blog_rank', 'companies', 'article_id', 'ca_webrank', 'crawled_at', 'lexalytics', 'source_url', 'article_url', 'author_name', 'ca_newsrank', 'ca_printcir', 'provider_id', 'article_type', 'author_email', 'event_groups', 'harvested_at', 'published_at', 'story_volume', 'accern_alerts', 'alexa_traffic', 'article_image', 'article_title', 'ca_rechpermil', 'event_summary', 'first_mention', 'article_source', 'article_content', 'article_summary', 'source_category', 'story_sentiment', 'accern_sentiment', 'story_saturation', 'article_sentiment', 'event_author_rank', 'event_source_rank', 'event_impact_score', 'overall_author_rank', 'overall_source_rank', 'story_alexa_traffic']


We will read the relevant data into a pandas dataframe. Due to the large size of the dataset (~20 GB), we will leave out data (keys) which won't be used immediately for anomaly detection purposes.

In [13]:
%%time

# keys_to_remove = ['_id', 'btag', 'version', 'article_id', 'lexalytics', 'source_url', 'article_url', 'ca_newsrank', 'provider_id', 'author_email', 'harvested_at', 'published_at', 'accern_alerts', 'article_image', 'article_title', 'ca_rechpermil', 'event_summary', 'article_source', 'article_content', 'article_summary']
# item_list = []
# temp = 0
accern_sentiments = []
with open('2017-07-05.jsonl', 'rb') as f:
    for item in json_lines.reader(f):
#         temp += 1
#         for key in keys_to_remove:
#             item.pop(key)
        accern_sentiments.append(item['accern_sentiment'])
#         if temp > 50000:
#             break
# print(count)

CPU times: user 3min 16s, sys: 7.44 s, total: 3min 24s
Wall time: 4min 25s


In [16]:
def moving_average(data ,window_size):
    """ Computes moving average using discrete linear convolution of two one dimensional sequences.
    Args:
    -----
            data (pandas.Series): independent variable
            window_size (int): rolling window size

    Returns:
    --------
            ndarray of linear convolution

    References:
    ------------
    [1] Wikipedia, "Convolution", http://en.wikipedia.org/wiki/Convolution.
    [2] API Reference: https://docs.scipy.org/doc/numpy/reference/generated/numpy.convolve.html

    """
    window = np.ones(int(window_size))/float(window_size)
    return np.convolve(data, window, 'same')

def explain_anomalies_rolling_std(y, window_size, sigma=1.0):
    """ Helps in exploring the anamolies using rolling standard deviation
    Args:
    -----
        y (pandas.Series): independent variable
        window_size (int): rolling window size
        sigma (int): value for standard deviation

    Returns:
    --------
        a dict (dict of 'standard_deviation': int, 'anomalies_dict': (index: value))
        containing information about the points indentified as anomalies
    """
    avg = moving_average(y, window_size)
    avg_list = avg.tolist()
    residual = y - avg
    # Calculate the variation in the distribution of the residual
    testing_std = pd.rolling_std(residual, window_size)
    testing_std_as_df = pd.DataFrame(testing_std)
    rolling_std = testing_std_as_df.replace(np.nan,
                           testing_std_as_df.iloc[window_size - 1]).round(3).iloc[:,0].tolist()
    std = np.std(residual)
    return {'stationary standard deviation': round(std, 3),
           'anomalies_dict': collections.OrderedDict([(index, y_i)
                                                     for index, y_i, avg_i, rs_i in zip(count(),
                                                                                       y, avg_list, rolling_std)
                                 if (y_i > avg_i + (sigma * rs_i)) | (y_i < avg_i - (sigma *rs_i))])}

In [17]:
anomalies = explain_anomalies_rolling_std(sigma=3.0, window_size=10000, y=accern_sentiments)



In [18]:
anomalies['anomalies_dict']

OrderedDict([(1569, 0.925),
             (76238, 1.0),
             (131735, -0.81),
             (142555, 1.0),
             (157660, 0.925),
             (157759, 0.925),
             (157966, 0.925),
             (178146, 0.925),
             (178702, 0.925),
             (178721, 1.0),
             (179267, 0.925),
             (180565, 0.925),
             (181433, 0.925),
             (195681, 0.925),
             (197061, 0.925),
             (207678, 1.0),
             (223030, 0.925),
             (223782, 0.925),
             (223882, 0.925),
             (224131, 0.925),
             (224241, 0.925),
             (236554, 0.925),
             (240239, 0.925),
             (242096, 0.925),
             (245385, 0.925),
             (249964, -0.81),
             (251755, 0.925),
             (253472, 0.925),
             (267283, 0.852),
             (270344, 0.925),
             (271430, 0.925),
             (272452, 0.925),
             (272527, 0.925),
             (272750,

In [None]:
# %%time

# # keys_to_remove = ['_id', 'btag', 'version', 'article_id', 'lexalytics', 'source_url', 'article_url', 'ca_newsrank', 'provider_id', 'author_email', 'harvested_at', 'published_at', 'accern_alerts', 'article_image', 'article_title', 'ca_rechpermil', 'event_summary', 'article_source', 'article_content', 'article_summary']
# # item_list = []

# break_count = [10]

# def hollow_func(break_count):
#     count = 0
#     with open('2017-07-05.jsonl', 'rb') as f:
#         for item in json_lines.reader(f):
#             count += 1
#             if count > break_count:
#                 break
#     return count

# counts = Parallel(n_jobs=6)(delayed(hollow_func) for i in break_count)

# print(counts)

# Extract columns
# id_list = []
# tags = []
# events = []
# story_ids = []
# blog_ranks = []
# companies = []
# ca_webranks = []
# crawled_at = []
# ca_newsranks = []
# article_types = []
# event_groups = []
# story_volumes = []
# alexa_traffic = []
# first_mentions = []
# source_categories = []
# story_sentiments = []
# accern_sentiments = []
# story_saturations = []
# article_sentiments = []
# event_author_ranks = []
# event_source_ranks = []
# event_impact_scores = []
# overall_author_ranks = []
# overall_source_ranks = []
# story_alexa_traffics = []

# count = 0
# with open('2017-07-05.jsonl', 'rb') as f:
#     for item in json_lines.reader(f):
#         id_list.append(item['_id'])
#         tags.append(item['tag'])
#         events.append(item['events'])
#         story_ids.append(item['story_id'])
#         blog_ranks.append(item['blog_rank'])
#         companies.append(item['companies'])
#         ca_webranks.append(item['ca_webrank'])
#         crawled_at.append(item['crawled_at'])
#         ca_newsranks.append(item['ca_newsrank'])
#         article_types.append(item['article_type'])
#         event_groups.append(item['event_groups'])
#         story_volumes.append(item['story_volume'])
#         alexa_traffic.append(item['alexa_traffic'])
#         first_mentions.append(item['first_mention'])
#         source_categories.append(item['source_category'])
#         story_sentiments.append(item['story_sentiment'])
#         accern_sentiments.append(item['accern_sentiment'])
#         story_saturations.append(item['story_saturation'])
#         article_sentiments.append(item['article_sentiment'])
#         event_author_ranks.append(item['event_author_rank'])
#         event_source_ranks.append(item['event_source_rank'])
#         event_impact_scores.append(item['event_impact_score'])
#         overall_author_ranks.append(item['overall_author_rank'])
#         overall_source_ranks.append(item['overall_source_rank'])
#         story_alexa_traffics.append(item['story_alexa_traffic'])
        
# #         count += 1
# #         if count > 5:
# #             break

# plt.plot(accern_sentiments[1:100])
# plt.show()

# (sys.getsizeof(id_list) + sys.getsizeof(tags) + sys.getsizeof(events) + sys.getsizeof(story_ids) +
# sys.getsizeof(blog_ranks) +
# sys.getsizeof(companies) + 
# sys.getsizeof(ca_webranks) +
# sys.getsizeof(crawled_at) +
# sys.getsizeof(ca_newsranks) +
# sys.getsizeof(article_types) +
# sys.getsizeof(event_groups) +
# sys.getsizeof(story_volumes) +
# sys.getsizeof(alexa_traffic) +
# sys.getsizeof(first_mentions) +
# sys.getsizeof(source_categories) +
# sys.getsizeof(story_sentiments) +
# sys.getsizeof(accern_sentiments) +
# sys.getsizeof(story_saturations) +
# sys.getsizeof(article_sentiments) +
# sys.getsizeof(event_author_ranks) +
# sys.getsizeof(event_source_ranks) +
# sys.getsizeof(event_impact_scores) +
# sys.getsizeof(overall_author_ranks) +
# sys.getsizeof(overall_source_ranks) +
# sys.getsizeof(story_alexa_traffics))