In [82]:
from notebook_utils import resolve_paths_from_parent_directory
resolve_paths_from_parent_directory()
# auto reload notebook deps
%reload_ext autoreload
%autoreload 2

In [83]:
import json
import pandas as pd

DATASETS = {
    "CentralParkNYC": {
        "input_location": "../data/tweet_objects/CentralParkNYC/2021-01-27-2021-02-06.json",
        "output_location": "../data/labeled_datasets/CentralParkNYC-2021-01-27-2021-02-06.json",
        "spike_start": "Feb 1, 2021, 17:00",
        "spike_end": "Feb 3, 2021, 04:00",
    },
    "united": {
        "input_location": "../data/tweet_objects/united/2020-12-05-2020-12-15.json",
        "output_location": "../data/labeled_datasets/united-2020-12-05-2020-12-15.json",
        "spike_start": "Dec 12, 2020, 05:00",
        "spike_end": "Dec 13, 2020, 06:00",
    },
    "nationalGridUS": {
        "input_location": "../data/tweet_objects/nationalGridUS/2020-10-01-2020-10-12.json",
        "output_location": "../data/labeled_datasets/nationalGridUS-2020-10-01-2020-10-12.json",
        "spike_start": "Oct 7, 2020, 17:00",
        "spike_end": "Oct 9, 2020, 04:00",
    }
}

selected_dataset = DATASETS["united"]
labeled_spike_start = pd.to_datetime(selected_dataset["spike_start"]).tz_localize("UTC")
labeled_spike_end = pd.to_datetime(selected_dataset["spike_end"]).tz_localize("UTC")

In [84]:
from utils.dataset import create_tweet_df

with open(selected_dataset["input_location"], "r") as f:
    raw_dataset = json.load(f)

df_tweets = create_tweet_df(raw_dataset["tweets"])

# lower entities
df_tweets["hashtags"] = df_tweets.hashtags.apply(lambda xs: [x.lower() for x in xs])
df_tweets["mentions"] = df_tweets.mentions.apply(lambda xs: [x.lower() for x in xs])

# Set retweet count 0 for retweets
df_tweets["retweet_count"] = df_tweets.apply(
    lambda x: x.retweet_count if x.retweeted is None else 0, 
    axis=1
)

# Label tweets
df_tweets["is_anomaly"] = df_tweets.created_at.apply(
    lambda x: 0 if (x < labeled_spike_start or x > labeled_spike_end) else 1
)

df_tweets.head()

Unnamed: 0,id,text,created_at,hashtags,mentions,in_reply_to_user_id,user_id,retweet_count,quote_count,reply_count,like_count,replied_to,retweeted,quoted,is_anomaly
34642,1336444683875016705,"RT @DavidCornDC: Hey @United, a family member ...",2020-12-08 22:56:57+00:00,[],"[davidcorndc, united]",,779922519285301248,0,0,0,0,,1336361619107049472,,0
34641,1336444690640465920,"RT @DavidCornDC: Hey @United, a family member ...",2020-12-08 22:56:58+00:00,[],"[davidcorndc, united]",,161003217,0,0,0,0,,1336361619107049472,,0
34640,1336444721556668416,"RT @DavidCornDC: Worse, @United, your flight a...",2020-12-08 22:57:06+00:00,[],"[davidcorndc, united]",,161003217,0,0,0,0,,1336361619857809408,,0
34639,1336444729723064320,"RT @DavidCornDC: Worse, @United, your flight a...",2020-12-08 22:57:08+00:00,[],"[davidcorndc, united]",,794448786,0,0,0,0,,1336361619857809408,,0
34638,1336444732554113024,"RT @DavidCornDC: Hey @United, a family member ...",2020-12-08 22:57:08+00:00,[],"[davidcorndc, united]",,1165791085626396672,0,0,0,0,,1336361619107049472,,0


In [85]:
## Create timeseries from tweets
from utils.dataset import count_array_column

df_top_hashtags = count_array_column(df_tweets["hashtags"])
df_top_mentions = count_array_column(df_tweets["mentions"])

df_top_hashtags[:5]

Unnamed: 0,value,count,pct
7,boycottunitedairlines,313,0.009035
10,unitedairlines,277,0.007996
33,boycottunited,166,0.004792
63,covid19,87,0.002511
18,travel,78,0.002252


In [86]:
def count_col_occurrence(df_col, value):
    return df_col.apply(lambda values: value in values).sum()

time_bucket_size = "60Min"

df_timeseries = df_tweets.groupby(df_tweets.created_at.dt.ceil(time_bucket_size)).agg(
    total_count=('id', 'count'), 
    is_anomaly=('is_anomaly', lambda x: x.any()),
    retweet_count=('retweeted', lambda x: pd.notna(x).sum()),
    quote_count=('quoted', lambda x: pd.notna(x).sum()),
    replied_to_count=('replied_to', lambda x: pd.notna(x).sum()),
    top1_hashtag_count=(
        'hashtags', 
        lambda x: count_col_occurrence(x, df_top_hashtags.iloc[0].value)
    ),
    top2_hashtag_count=(
        'hashtags', 
        lambda x: count_col_occurrence(x, df_top_hashtags.iloc[1].value)
    ),
    top3_hashtag_count=(
        'hashtags', 
        lambda x: count_col_occurrence(x, df_top_hashtags.iloc[2].value)
    ),
    top1_mention_count=(
        'mentions', 
        lambda x: count_col_occurrence(x, df_top_mentions.iloc[0].value)
    ),
    top2_mention_count=(
        'mentions', 
        lambda x: count_col_occurrence(x, df_top_mentions.iloc[1].value)
    ),
    top3_mention_count=(
        'mentions', 
        lambda x: count_col_occurrence(x, df_top_mentions.iloc[2].value)
    )
)
df_timeseries.head()

Unnamed: 0_level_0,total_count,is_anomaly,retweet_count,quote_count,replied_to_count,top1_hashtag_count,top2_hashtag_count,top3_hashtag_count,top1_mention_count,top2_mention_count,top3_mention_count
created_at,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
2020-12-08 23:00:00+00:00,51,False,45,1,5,0,0,0,51,0,49
2020-12-09 00:00:00+00:00,744,False,580,23,135,1,1,0,736,0,676
2020-12-09 01:00:00+00:00,484,False,390,11,74,0,1,0,472,0,437
2020-12-09 02:00:00+00:00,561,False,465,12,83,0,0,2,556,0,525
2020-12-09 03:00:00+00:00,484,False,373,12,94,0,1,0,476,0,440


In [87]:

from merlion.utils import TimeSeries
from merlion.models.anomaly.forecast_based.prophet import ProphetDetector, ProphetDetectorConfig
from merlion.plot import plot_anoms_plotly
from merlion.post_process.threshold import AggregateAlarms

def convert_to_merlion(df, column):
    df = df.copy()
    df = df[column]
    df.index = df.index.tz_convert(None)
    return TimeSeries.from_pd(df)

def detect_anomalies(
    df_timeseries, 
    column,
    model=ProphetDetector(ProphetDetectorConfig(
        threshold=AggregateAlarms(alm_threshold=1.5),
        yearly_seasonality=False,
        weekly_seasonality=False,
        daily_seasonality=False,
        add_seasonality=False,
        uncertainty_samples=1000
    )),
    plot=True
):
    train_data = convert_to_merlion(df_timeseries, column)
    anomaly_score = model.train(train_data=train_data, anomaly_labels=None)
    scores = model.get_anomaly_score(train_data)
    df_scores = scores.to_pd()
    labels_train = model.get_anomaly_label(train_data)

    fig = model.plot_anomaly_plotly(
        time_series=train_data,
        plot_forecast=True,
        plot_forecast_uncertainty=True
    )
    plot_anoms_plotly(fig, anomaly_labels=labels_train)
    fig.show()

    df_labels = labels_train.to_pd()
    df_labels.resample(time_bucket_size).fillna("bfill")

    def set_anom_score(created_at):
        lookup = created_at.tz_convert(None).ceil("30Min")
        if lookup in df_labels.index:
            return df_labels.loc[lookup].anom_score
        else:
            return 0    

    ## Label df_tweets with anomaly
    df_tweets["merlion_anomaly_" + column] = df_tweets.created_at.apply(
        set_anom_score
    )
    return df_scores

anomaly_scores = detect_anomalies(
    df_timeseries, 
    "total_count"
)

Initial log joint probability = -14.1564
    Iter      log prob        ||dx||      ||grad||       alpha      alpha0  # evals  Notes 
      99       266.201   0.000141769       97.1144      0.3363           1      119   
    Iter      log prob        ||dx||      ||grad||       alpha      alpha0  # evals  Notes 
     105       266.227   0.000120227       91.3872   1.284e-06       0.001      163  LS failed, Hessian reset 
     152       266.251   5.13516e-09       96.1561      0.2147      0.2147      220   
Optimization terminated normally: 
  Convergence detected: absolute parameter change was below tolerance


In [88]:
from merlion.transform.moving_average import MovingAverage
from merlion.post_process.threshold import AggregateAlarms

prophet_config = ProphetDetectorConfig(
    threshold=AggregateAlarms(alm_threshold=1),
    transform=MovingAverage(n_steps=6),
    yearly_seasonality=False,
    weekly_seasonality=False,
    daily_seasonality=False,
    add_seasonality=False,
    uncertainty_samples=1000
)

anomaly_scores = detect_anomalies(
    df_timeseries, 
    "total_count",
    model=ProphetDetector(prophet_config)
)

Initial log joint probability = -7.05865
    Iter      log prob        ||dx||      ||grad||       alpha      alpha0  # evals  Notes 
      73       187.399   8.36198e-05       79.6571   1.185e-06       0.001      159  LS failed, Hessian reset 
      99       187.409   0.000158366       83.9444       4.419           1      197   
    Iter      log prob        ||dx||      ||grad||       alpha      alpha0  # evals  Notes 
     101       187.411   3.04007e-05       79.6057   3.592e-07       0.001      237  LS failed, Hessian reset 
     128       187.413   4.63415e-09       72.6984      0.1991      0.1991      279   
Optimization terminated normally: 
  Convergence detected: absolute parameter change was below tolerance


In [89]:
for feat in ["total_count", "top1_hashtag_count", "top2_hashtag_count", "top3_hashtag_count"]:
    detect_anomalies(
        df_timeseries, 
        feat,
        model=ProphetDetector(prophet_config)
    )

Initial log joint probability = -7.05865
    Iter      log prob        ||dx||      ||grad||       alpha      alpha0  # evals  Notes 
      73       187.399   8.36198e-05       79.6571   1.185e-06       0.001      159  LS failed, Hessian reset 
      99       187.409   0.000158366       83.9444       4.419           1      197   
    Iter      log prob        ||dx||      ||grad||       alpha      alpha0  # evals  Notes 
     101       187.411   3.04007e-05       79.6057   3.592e-07       0.001      237  LS failed, Hessian reset 
     128       187.413   4.63415e-09       72.6984      0.1991      0.1991      279   
Optimization terminated normally: 
  Convergence detected: absolute parameter change was below tolerance


Initial log joint probability = -8.83585
    Iter      log prob        ||dx||      ||grad||       alpha      alpha0  # evals  Notes 
      99       169.586   0.000118594       64.9917           1           1      129   
    Iter      log prob        ||dx||      ||grad||       alpha      alpha0  # evals  Notes 
     162       170.235    0.00040976       92.3991   5.676e-06       0.001      250  LS failed, Hessian reset 
     199       170.402   1.06447e-06        84.413      0.6463      0.6463      298   
    Iter      log prob        ||dx||      ||grad||       alpha      alpha0  # evals  Notes 
     241       170.402   4.85562e-09       61.1014      0.2677           1      358   
Optimization terminated normally: 
  Convergence detected: absolute parameter change was below tolerance


Initial log joint probability = -6.37247
    Iter      log prob        ||dx||      ||grad||       alpha      alpha0  # evals  Notes 
      69       203.422    0.00300759        84.365    3.38e-05       0.001      130  LS failed, Hessian reset 
      99       203.604   2.64385e-05       75.8737           1           1      170   
    Iter      log prob        ||dx||      ||grad||       alpha      alpha0  # evals  Notes 
     110       203.613   6.93806e-05       77.5385   8.162e-07       0.001      219  LS failed, Hessian reset 
     152       203.616   3.21001e-06       71.1483   4.126e-08       0.001      311  LS failed, Hessian reset 
     185       203.616   2.77471e-09       83.1181      0.0779      0.0779      360   
Optimization terminated normally: 
  Convergence detected: absolute parameter change was below tolerance


Initial log joint probability = -6.01908
    Iter      log prob        ||dx||      ||grad||       alpha      alpha0  # evals  Notes 
      99       199.954   1.53719e-07       83.0131      0.3811      0.3811      125   
    Iter      log prob        ||dx||      ||grad||       alpha      alpha0  # evals  Notes 
     154       200.146   0.000579687       81.2062   6.744e-06       0.001      232  LS failed, Hessian reset 
     199        200.26   7.66185e-05        82.435      0.5388      0.5388      291   
    Iter      log prob        ||dx||      ||grad||       alpha      alpha0  # evals  Notes 
     239       200.265    2.8749e-09       81.0128     0.06565           1      348   
Optimization terminated normally: 
  Convergence detected: absolute parameter change was below tolerance


In [90]:
# Export 
df_tweets.to_json(
    selected_dataset["output_location"],
    orient="records"
)
df_tweets.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 34643 entries, 34642 to 0
Data columns (total 19 columns):
 #   Column                              Non-Null Count  Dtype              
---  ------                              --------------  -----              
 0   id                                  34643 non-null  object             
 1   text                                34643 non-null  object             
 2   created_at                          34643 non-null  datetime64[ns, UTC]
 3   hashtags                            34643 non-null  object             
 4   mentions                            34643 non-null  object             
 5   in_reply_to_user_id                 15238 non-null  object             
 6   user_id                             34643 non-null  object             
 7   retweet_count                       34643 non-null  int64              
 8   quote_count                         34643 non-null  int64              
 9   reply_count                         346