<a href="https://colab.research.google.com/github/bigliolimatteo/AMD/blob/main/market_basket_analysis_(Ukraine_Conflict).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Market Basket Analysis (Ukraine Conflict)


DISCLAIMER: In this script there will be different passages which could be improved both performance-wise and interpretability-wise. Our goal was to propose the most academic and hadoop-like approach as possible in order to mimic the algorithms explained during the course. Most of these passages are highlighted and a more compact/fast approach is provided in the comments.

In [26]:
# IMPORTANT: Put here your access token to kaggle

import os
os.environ["KAGGLE_USERNAME"] = ""
os.environ["KAGGLE_KEY"] = ""

## Download dataset from kaggle

In [27]:
%%capture
!kaggle datasets download -d bwandowando/ukraine-russian-crisis-twitter-dataset-1-2-m-rows
!unzip ukraine-russian-crisis-twitter-dataset-1-2-m-rows.zip -d data

In [28]:
# Move all *.gzip files to *.gz in order to have spark read directly the compressed file
sh = """
for file in data/*.gzip; do
    mv "$file" "data/$(basename "$file" .gzip).gz"
done
"""
with open('script.sh', 'w') as file:
  file.write(sh)

!bash script.sh

## Ingestion and Preprocessing

### Import libraries and build spark context

In [29]:
%%capture
!pip install pyspark
!pip install findspark
!wget http://setup.johnsnowlabs.com/colab.sh -O - | bash

In [30]:
import numpy as np
import pandas as pd
import csv
import os
from pyspark.sql import SparkSession, Row
import random
import sparknlp
import math

from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline

In [31]:
# Build spark context
spark = sparknlp.start()
sc = spark.sparkContext

### Ingest dataset

In [32]:
# Here we read one of the partitions of our dataset directly with spark (distributed read)
FILENAME = r"data/UkraineCombinedTweetsDeduped_FEB28_part1.csv.gz"
raw_df = spark.read.csv(FILENAME, header=True, escape="\"", quote="\"", multiLine=True)

# A possible next step could be to work w/ multiple languages
filtered_df = raw_df.where(raw_df.language == "en").select("text")

### Text preprocessing 

In [33]:
# In this step we define a sparkNLP pipeline which will preprocess our data
# by tokenizing it and removing unwanted tokens

documentAssembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")

tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

linkRemover = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("tokensWoutLinks") \
    .setCleanupPatterns(["http\S+|www\S+|https\S+"]) \
    .setLowercase(True)

punctuationRemover = Normalizer() \
    .setInputCols(["tokensWoutLinks"]) \
    .setOutputCol("tokensWoutLinksAndPuct") \
    .setCleanupPatterns(["(?U)[^\w -]|_|-(?!\w)|(?<!\w)-"])

stopWordsCleaner = StopWordsCleaner.pretrained() \
      .setInputCols("tokensWoutLinksAndPuct")\
      .setOutputCol("cleanedTokens")\
      .setCaseSensitive(False)

stemmer = Stemmer() \
    .setInputCols(["cleanedTokens"]) \
    .setOutputCol("cleanedStemmedTokens")

pipeline = Pipeline().setStages([
    documentAssembler,
    tokenizer,
    linkRemover,
    punctuationRemover,
    stopWordsCleaner,
    stemmer
])

result = pipeline.fit(filtered_df).transform(filtered_df)

preprocessed_df = result.selectExpr("cleanedStemmedTokens.result")

stopwords_en download started this may take some time.
Approximate size to download 2.9 KB
[OK!]


## Market Basket Analysis Algorithm

### Introduction code

In [34]:
# We reduce the dimension of the dataframe for the naive approach
preprocessed_df = preprocessed_df.limit(2000)

# Because from now on we will define the algorithm using the Map and Reduce functions,
# which are only defined over rdd in pyspark, we will convert our preprocessed dataframe into an rdd
# with the same structure that we saw in the hadoop framework: (key, value)
# + we will map each tweet in a list(set(tweet)) in order to remove duplicate words which are not useful in our analysis
input_rdd = preprocessed_df.rdd.map(lambda x: (1, list(set(x[0]))))

# This is our input rdd
input_rdd.take(1)

[(1,
  ['construct',
   'nation',
   'visit',
   'centr',
   'vladimir',
   'putin',
   'moscow',
   'space',
   'site'])]

In [35]:
# We start by defining a proper threshold in order to understand if an itemset is frequent.
# To do that we count the number of baskets (tweets) in our whole dataset and we define 
# an itemset "frequent" if it appears in over x% of the baskets

# Note that the countByKey function is basically equivalent to the map-reduce structure
# .map(lambda x: (1, 1)).reduceByKey(lambda x,y: x + y)
# But in our test it performed at twice the speed!

THRESHOLD_PERCENTAGE = 0.1
n_of_baskets = input_rdd.countByKey()[1]

THRESHOLD = math.ceil(n_of_baskets * THRESHOLD_PERCENTAGE)

### Naive approach

First of all we try and code the algorithm without worrying too much about possible generalizations and without a proper "software engineering" approach, which will be discussed later

#### Frequent singletons

In [36]:
# First of all we compute the frequent singleton itemsets

# Extract all singletons
singleton_itemsets_rdd = input_rdd \
                          .flatMap(lambda x: x[1]) \
                          .map(lambda x: (x,1))

# Compute frequencies using a reduce operation
singleton_itemsets_w_frequencies_rdd = singleton_itemsets_rdd \
                                        .reduceByKey(lambda x,y: x + y)

# Filter singletons with a frequency higher than THRESHOLD
frequent_singleton_itemsets_rdd = singleton_itemsets_w_frequencies_rdd \
                                    .filter(lambda x: x[1] > THRESHOLD)

# These are examples of frequent singleton itemsets with the relative frequency
frequent_singleton_itemsets_rdd.take(10)

[('nation', 213),
 ('putin', 575),
 ('ukrain', 1413),
 ('russia', 1098),
 ('govern', 241),
 ('countri', 268),
 ('amp', 291),
 ('ukrainian', 387),
 ('kyiv', 215),
 ('russian', 477)]

#### Frequent pairs

In [37]:
# We can then compute frequent pairs by accounting only for pairs of frequent singletons.
# Thanks to the monotonicity property that ensures that in this way we won't generate false negatives

# Extract all frequent singleton without frequency
frequent_singleton_itemsets_wout_freq_rdd = frequent_singleton_itemsets_rdd \
                                              .map(lambda x: (1, x[0]))

# Generate all candidate pairs (note that we need to remove from the join all possible duplicates)
# This is not the fastest approach as we could have leveraged specific functions (like .distinct())
# but it is the most academic and hadoop like approach
candidate_pairs_rdd = frequent_singleton_itemsets_wout_freq_rdd \
                      .join(frequent_singleton_itemsets_wout_freq_rdd) \
                      .filter(lambda x: len(set(x[1])) == len(x[1])) \
                      .map(lambda x: (tuple(sorted(x[1])), 1)) \
                      .reduceByKey(lambda x,y: x)

# We can then generate a list of candidate pairs which can be broadcasted to all executors 
# due to the fact that we expect this to be small (it could be even part of our final output)
candidate_pairs_list = candidate_pairs_rdd.map(lambda x: x[0]).collect()
broadcasted_candidate_pairs_list = sc.broadcast(candidate_pairs_list)

# Compute all possible pairs on our whole dataset
input_w_unique_key_rdd = input_rdd \
                        .map(lambda x: x[1]) \
                        .zipWithUniqueId() \
                        .flatMap(lambda x: [(x[1], word) for word in x[0]])

pair_itemsets_rdd = input_w_unique_key_rdd \
                  .join(input_w_unique_key_rdd)

# Filter only pairs that are in the candidate frequent pairs list and compute their freq
frequent_pairs_itemsets_rdd = pair_itemsets_rdd \
                                .filter(lambda x: x[1] in broadcasted_candidate_pairs_list.value) \
                                .map(lambda x: (x[1],1)) \
                                .reduceByKey(lambda x,y: x + y) \
                                .filter(lambda x: x[1] > THRESHOLD)

# These are examples of frequent pairs itemsets with the relative frequency
frequent_pairs_itemsets_rdd.take(5)



# BONUS: We know that a more appropriate approach with respect to zipWithUniqueId
# would have been to generate a unique index using an hash function like the one reported below
# but it seems that (maybe due to the lazy not linear computational approach of spark)
# this approach did not manage to generate a proper joined rdd (even if we used caching)

#def unique_key(basket):
#  unique_value = f"{random()}-{basket}"
#  return hashlib.sha256(unique_value).encode("utf-8")).hexdigest()

#input_w_unique_key_rdd = input_rdd \
#                        .map(lambda x: (unique_key(x[1]), x[1])) \
#                        .flatMap(lambda x: [(x[0], word) for word in x[1]])

[(('russia', 'ukrain'), 889),
 (('countri', 'ukrain'), 226),
 (('countri', 'russia'), 209),
 (('russia', 'russian'), 273),
 (('russian', 'ukrain'), 383)]

#### Frequent triplets

In [38]:
# BE AWARE: because this is a naive approach, this piece of code takes ~ 30sec to compute
# (performance will increase in the generalized approach)

# Before generalizing the approach we will compute frequent triplets by accounting only for 
# those generated as join of a frequent pair and a frequent singleton.
# Thanks to the monotonicity property that ensures that in this way we won't generate false negatives

# Extract all frequent pairs without frequency
frequent_pairs_itemsets_wout_freq_rdd = frequent_pairs_itemsets_rdd.map(lambda x: (1, x[0]))

# Generate all candidate triplets (note that we need to remove from the join all possible duplicates)
# This is not the fastest approach as we could have leveraged specific functions (like .distinct())
# but it is the most academic and hadoop like approach
# NOTE that we generate candidate triplets by joining frequent pairs, in this way we generate the lowest 
# possible number of candidates by leveraging the monotonicity property
candidate_triplets_rdd = frequent_pairs_itemsets_wout_freq_rdd \
                      .join(frequent_pairs_itemsets_wout_freq_rdd) \
                      .map(lambda x: (1, sum(x[1], ()))) \
                      .map(lambda x: (1, tuple(set(x[1]))) ) \
                      .filter(lambda x: len(x[1]) == 3) \
                      .map(lambda x: (tuple(sorted(x[1])), 1)) \
                      .reduceByKey(lambda x,y: x)

# We can then generate a list of candidate triplets which can be broadcasted to all executors 
# due to the fact that we expect this to be small (it could be even part of our final output)
candidate_triplets_list = candidate_triplets_rdd.map(lambda x: x[0]).collect()
broadcasted_candidate_triplets_list= sc.broadcast(candidate_triplets_list)

# Compute all triplets on our whole dataset
# Note that this is definetly an extremely expensive approach, we could have just saved
# all the frequent pairs and join only those or (better) avoid having to compute all triplets
# and directly check, for each tweet, the presence of a candidate triplet
# (this approach will be used in the generalization phase, for now we stick with the naive approach)
triplets_itemsets_rdd = input_w_unique_key_rdd \
                        .map(lambda x: (x[0], (x[1], ))) \
                        .join(pair_itemsets_rdd)  \
                        .map(lambda x: (1, sum(x[1], ())))

# Filter only triplets that are in the candidate frequent triples list and compute their freq
frequent_triplets_itemsets_rdd = triplets_itemsets_rdd \
                                .filter(lambda x: x[1] in broadcasted_candidate_triplets_list.value) \
                                .map(lambda x: (x[1],1)) \
                                .reduceByKey(lambda x,y: x + y) \
                                .filter(lambda x: x[1] > THRESHOLD)

# These are examples of frequent pairs itemsets with the relative frequency
frequent_triplets_itemsets_rdd.take(5)

[(('russia', 'russian', 'ukrain'), 243)]

### Generalized Approach

In [41]:
# We define some functions that could help make the code more readable 

def contains_duble(t): return len(set(t)) != len(t)

def normalize_candidate_itemsets(candidate_itemsets_rdd):
  return candidate_itemsets_rdd .filter(lambda x: not(contains_duble(x[1])) ) \
                                .map(lambda x: (tuple(sorted(x[1])), 1)) \
                                .reduceByKey(lambda x,y: x)

In [42]:
# Here we define the actual function which will perform the generalized approach

def a_priori_algorithm(input_rdd, THRESHOLD, approach = "single_machine-wise"):

  # Compute singleton itemsets with a frequency higher than THRESHOLD
  frequent_singleton_itemsets_rdd = input_rdd \
                                    .flatMap(lambda x: x[1]) \
                                    .map(lambda x: (x,1)) \
                                    .reduceByKey(lambda x,y: x + y) \
                                    .filter(lambda x: x[1] > THRESHOLD) \
                                    .map(lambda x: (1, x[0]))

  # Create a list which will contain all frequent itemsets
  # this list will be updated during each step of the while cycle
  frequent_itemsets_list = frequent_singleton_itemsets_rdd.map(lambda x: x[1]).collect()

  # Compute candidate pairs of words by joining frequent singletons (leveraging monotonicity)
  candidate_itemsets_rdd = normalize_candidate_itemsets( 
                            frequent_singleton_itemsets_rdd \
                            .join(frequent_singleton_itemsets_rdd))
                          
  # We start by looking for candidate pairs, we already compute frequent singletons
  candidate_itemsets_length = 2


  # If the approach is hadoop-wise we need to compute this rdd.
  # note that we can put it here without worrying for a drop in performance
  # because due to the lazy computation of spark if will be executed the first time 
  # (amd cached) during the first action
  if approach == "hadoop-wise":
    input_w_unique_key_rdd = input_rdd \
                          .map(lambda x: x[1]) \
                          .zipWithUniqueId() \
                          .flatMap(lambda x: [(x[1], word) for word in x[0]]) \
                          .map(lambda x: (x[0], (x[1], )))

    itemsets_rdd = input_w_unique_key_rdd

    # We cache the RDD as we will use it different times during our computation
    input_w_unique_key_rdd.cache()


  # Loop until we have candidates for frequent itemsets
  while candidate_itemsets_rdd.count() != 0:

    # Collect and broadcast the list of candidate itemsets 
    # (we expect its size to be manageable, due to the fact that it could be part of our output)
    candidate_itemsets_list = candidate_itemsets_rdd.map(lambda x: x[0]).collect()
    broadcasted_candidate_itemsets_list = sc.broadcast(candidate_itemsets_list)

    # Generate all itemsets of length "candidate_itemsets_length" and filter them based on THRESHOLD
    if approach == "single_machine-wise":
      tmp_frequent_itemsets_rdd = input_rdd \
                                  .map(lambda x : [(candidate_itemset, 1) 
                                                    for candidate_itemset in broadcasted_candidate_itemsets_list.value 
                                                    if set(candidate_itemset).issubset(x[1])]) \
                                  .flatMap(lambda x : x) \
                                  .reduceByKey(lambda x,y: x + y) \
                                  .filter(lambda x: x[1] > THRESHOLD) \
                                  .map(lambda x: (1, x[0]))

    elif approach == "hadoop-wise":
      itemsets_rdd = input_w_unique_key_rdd \
                      .join(itemsets_rdd) \
                      .map(lambda x: (x[0], sum(x[1], ())))

      tmp_frequent_itemsets_rdd = itemsets_rdd \
                                  .filter(lambda x: x[1] in broadcasted_candidate_itemsets_list.value) \
                                  .map(lambda x: (x[1],1)) \
                                  .reduceByKey(lambda x,y: x + y) \
                                  .filter(lambda x: x[1] > THRESHOLD) \
                                  .map(lambda x: (1, x[0]))
                         
    # Add the current frequent itemsets to the list of all frequent itemsets
    frequent_itemsets_list += tmp_frequent_itemsets_rdd.map(lambda x: x[1]).collect()

    # Increment the length of the candidate itemsets we are going to work with
    candidate_itemsets_length += 1 

    # Compute the candidate itemsets for the next iteration
    candidate_itemsets_rdd = normalize_candidate_itemsets( 
                            tmp_frequent_itemsets_rdd \
                            .join(tmp_frequent_itemsets_rdd) \
                            .map(lambda x: (1, sum(x[1], ()))) \
                            .map(lambda x: (1, tuple(set(x[1]))) ) \
                            .filter(lambda x: len(x[1]) == candidate_itemsets_length))

  # Unpersist previously cached RDD
  if approach == "hadoop-wise":
    input_w_unique_key_rdd.unpersist()

  return frequent_itemsets_list

In [43]:
a_priori_algorithm(input_rdd, THRESHOLD, "hadoop-wise")

['nation',
 'putin',
 'ukrain',
 'russia',
 'govern',
 'countri',
 'amp',
 'ukrainian',
 'kyiv',
 'russian',
 'peopl',
 'russiaukrainewar',
 'ukrainerussiawar',
 'war',
 ('russia', 'ukrain'),
 ('countri', 'ukrain'),
 ('countri', 'russia'),
 ('russia', 'russian'),
 ('russian', 'ukrain'),
 ('amp', 'ukrain'),
 ('ukrain', 'ukrainian'),
 ('putin', 'ukrain'),
 ('putin', 'russia'),
 ('govern', 'ukrain'),
 ('russia', 'russian', 'ukrain')]

In [44]:
a_priori_algorithm(input_rdd, THRESHOLD, "single_machine-wise")

['nation',
 'putin',
 'ukrain',
 'russia',
 'govern',
 'countri',
 'amp',
 'ukrainian',
 'kyiv',
 'russian',
 'peopl',
 'russiaukrainewar',
 'ukrainerussiawar',
 'war',
 ('russia', 'ukrain'),
 ('countri', 'ukrain'),
 ('amp', 'ukrain'),
 ('countri', 'russia'),
 ('govern', 'ukrain'),
 ('putin', 'ukrain'),
 ('ukrain', 'ukrainian'),
 ('putin', 'russia'),
 ('russian', 'ukrain'),
 ('russia', 'russian'),
 ('russia', 'russian', 'ukrain')]

### Generate Results

In here we develop a small script on top in order to easily generate results for different days

#### Useful functions

In [45]:
# We define some utility functions that could help make the code more readable 

def generate_preprocessing_pipeline():
  documentAssembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")

  tokenizer = Tokenizer() \
      .setInputCols(["document"]) \
      .setOutputCol("token")

  linkRemover = Normalizer() \
      .setInputCols(["token"]) \
      .setOutputCol("tokensWoutLinks") \
      .setCleanupPatterns(["http\S+|www\S+|https\S+"]) \
      .setLowercase(True)

  punctuationRemover = Normalizer() \
      .setInputCols(["tokensWoutLinks"]) \
      .setOutputCol("tokensWoutLinksAndPuct") \
      .setCleanupPatterns(["(?U)[^\w -]|_|-(?!\w)|(?<!\w)-"])

  stopWordsCleaner = StopWordsCleaner.pretrained() \
        .setInputCols("tokensWoutLinksAndPuct")\
        .setOutputCol("cleanedTokens")\
        .setCaseSensitive(False)

  stemmer = Stemmer() \
      .setInputCols(["cleanedTokens"]) \
      .setOutputCol("cleanedStemmedTokens")

  return Pipeline().setStages([
      documentAssembler,
      tokenizer,
      linkRemover,
      punctuationRemover,
      stopWordsCleaner,
      stemmer
  ])

def apply_preprocessing_pipeline(pipeline, dataframe):
  return pipeline.fit(dataframe).transform(dataframe)\
          .selectExpr("cleanedStemmedTokens.result")

def generate_numeric_threshold_from_percentage(threshold_percentage, rdd):
  n_of_baskets = rdd.countByKey()[1]
  return math.ceil(n_of_baskets * threshold_percentage)


#### Generalize to different days

In [46]:
# Set limit = -1 to avoid limit
# for FEB28 we specify only part1

# The idea of this function is to apply the a_priori_algorithm just by specifing the date which we want to process 
# (and a few other input params)
def generate_results(date, preprocessing_pipeline = generate_preprocessing_pipeline(), threshold_percentage = 0.2, limit = 20000):

  # Extracting correct filename 
  if date[5:7] == "02":
    month = "FEB"
  elif date[5:7] == "03":
    month = "MAR"
  else:
    raise Exception("Wrong month!")

  if date[5:7] == "02" and date[8:10] == "28":
    suffix = "_part1"
  else:
    suffix = ""

  FILENAME = f"data/UkraineCombinedTweetsDeduped_{month}{date[8:10]}{suffix}.csv.gz"

  # Reading and filtering the dataset
  raw_df = spark.read.csv(FILENAME, header=True, escape="\"", quote="\"", multiLine=True)
  filtered_df = raw_df.where(raw_df.language == "en").select("text")

  # Preprocessing the dataset using a predefined pipeline (This avoid having to re-download the stopwords set each time)
  preprocessed_df = apply_preprocessing_pipeline(preprocessing_pipeline, filtered_df)

  # Subsetting our dataset if needed
  if limit > 0:
    preprocessed_df = preprocessed_df.limit(limit)

  # Preparing the RDD for the a_priori_algorithm
  input_rdd = preprocessed_df.rdd.map(lambda x: (1, list(set(x[0]))))

  # Defining the threshold
  THRESHOLD = generate_numeric_threshold_from_percentage(threshold_percentage, input_rdd)

  # Returning the actual result
  return a_priori_algorithm(input_rdd, THRESHOLD)

stopwords_en download started this may take some time.
Approximate size to download 2.9 KB
[OK!]


In [47]:
# Defining the pipeline once (donwload stopwords only once)
preprocessing_pipeline = generate_preprocessing_pipeline()

stopwords_en download started this may take some time.
Approximate size to download 2.9 KB
[OK!]


In [48]:
# Perform a computation on the dataset of 2022-02-28
generate_results("2022-02-28", preprocessing_pipeline)

['putin', 'ukrain', 'russia', 'russian', ('russia', 'ukrain')]

In [50]:
# Perform a computation on the dataset of 2022-03-04 (the day in which a power plant was attacked)
generate_results("2022-03-04", preprocessing_pipeline)

['ukrain',
 'russian',
 'plant',
 'power',
 'russia',
 'nuclear',
 'zaporizhzhia',
 ('russian', 'ukrain'),
 ('plant', 'power'),
 ('plant', 'ukrain'),
 ('power', 'ukrain'),
 ('russia', 'ukrain'),
 ('nuclear', 'plant'),
 ('nuclear', 'power'),
 ('nuclear', 'ukrain'),
 ('plant', 'power', 'ukrain'),
 ('nuclear', 'plant', 'ukrain'),
 ('nuclear', 'power', 'ukrain'),
 ('nuclear', 'plant', 'power'),
 ('nuclear', 'plant', 'power', 'ukrain')]

#### Compute results for different days

In [51]:
# The small piece of code below just computes the results for all the days in our dataset and
# writes them down to a file so that they could be easier to compare

from datetime import date, timedelta
import json

# Define daterange
def daterange(start_date, end_date):
    for n in range(int((end_date - start_date).days)):
        yield start_date + timedelta(n)
start_date = date(2022, 2, 27)
end_date = date(2022, 3, 27)

# Prepare result dict
result_dict = dict()

# Define once the preprocessing pipeline
preprocessing_pipeline = generate_preprocessing_pipeline()

# Generate and store results for each day
for single_date in daterange(start_date, end_date):
  date_str = single_date.strftime("%Y-%m-%d")
  print("processing " + date_str)
  result_dict[date_str] = generate_results(date_str, preprocessing_pipeline, threshold_percentage=0.15, limit=1000)

# Write results to a file
with open('result.json', 'w') as fp:
    json.dump(result_dict, fp)

stopwords_en download started this may take some time.
Approximate size to download 2.9 KB
[OK!]
processing 2022-02-27
processing 2022-02-28
processing 2022-03-01
processing 2022-03-02
processing 2022-03-03
processing 2022-03-04
processing 2022-03-05
processing 2022-03-06
processing 2022-03-07
processing 2022-03-08
processing 2022-03-09
processing 2022-03-10
processing 2022-03-11
processing 2022-03-12
processing 2022-03-13
processing 2022-03-14
processing 2022-03-15
processing 2022-03-16
processing 2022-03-17
processing 2022-03-18
processing 2022-03-19
processing 2022-03-20
processing 2022-03-21
processing 2022-03-22
processing 2022-03-23
processing 2022-03-24
processing 2022-03-25
processing 2022-03-26


In [52]:
# Visualize the different results
result_dict

{'2022-02-27': ['ukrain',
  'russian',
  'putin',
  'russia',
  'kyiv',
  'war',
  'anonym',
  ('russian', 'ukrain'),
  ('russia', 'ukrain'),
  ('putin', 'ukrain'),
  ('ukrain', 'war'),
  ('anonym', 'russian'),
  ('anonym', 'ukrain'),
  ('anonym', 'russian', 'ukrain')],
 '2022-02-28': ['putin',
  'ukrain',
  'russia',
  'countri',
  'amp',
  'ukrainian',
  'russian',
  'russiaukrainewar',
  ('russia', 'ukrain'),
  ('amp', 'ukrain'),
  ('putin', 'ukrain'),
  ('ukrain', 'ukrainian'),
  ('putin', 'russia'),
  ('russian', 'ukrain'),
  ('russia', 'russian'),
  ('russia', 'russian', 'ukrain')],
 '2022-03-01': ['ukrain',
  'russia',
  'ukrainerussiawar',
  'russian',
  'kyiv',
  ('russia', 'ukrain'),
  ('russian', 'ukrain')],
 '2022-03-02': ['ukrain',
  'russia',
  'war',
  'russian',
  'peopl',
  'ukrainerussiawar',
  ('russia', 'ukrain'),
  ('russian', 'ukrain')],
 '2022-03-03': ['ukrain', 'russian', 'russia', 'kyiv', ('russia', 'ukrain')],
 '2022-03-04': ['ukrain',
  'russian',
  'plant',
