Delete all variables in the current environment (if you have already run some cells) - clean state.

In [1]:
%reset

Import all necessary packages.

NOTE: Replace the download directory of the NLTK tokenizer files with your preferred directory (I chose the root directory of the Research Internship project)

In [2]:
import numpy as np
import pandas as pd
import os
import shutil
from datetime import datetime
import multiprocessing
from multiprocessing import Pool

from sentistrength import PySentiStr

import re
import contractions
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import nltk.data
nltk.download('punkt')
# Load the punkt tokenizer data from the local directory
nltk.data.load('tokenizers/punkt/PY3/english.pickle')

import json
from collections import defaultdict

from IPython.core.getipython import get_ipython

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/andreistoica12/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     /home/andreistoica12/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


Replace with the path to the root folder of the project.

In [3]:
rootdir_path = '/home/andreistoica12/research-internship'

Replace with the path to the folder where the dataset is stored.

In [4]:
data_path = rootdir_path + '/data/covaxxy-csv-complete'

In [5]:
files_path = rootdir_path + '/files'

In [6]:
opinion_changes_path = files_path + '/opinion-changes'

In [7]:
graphs_path = rootdir_path + '/graphs'

Create 1 subfolder within the graphs/ folder to store important graphs for the covaxxy dataset. If it already existed (from previous runnings of the project), delete the folder and its contents and create an empty folder to store the current graphs, relevant to the current state of the project.

In [8]:
covaxxy_graphs_path = os.path.join(graphs_path, 'covaxxy')
if os.path.exists(covaxxy_graphs_path):
   shutil.rmtree(covaxxy_graphs_path, ignore_errors=False, onerror=None)
os.makedirs(covaxxy_graphs_path)

Create subfolders specific to the different types of analyses performed in the project.

In [9]:
covaxxy_longitudinal_analysis_graphs_path = os.path.join(covaxxy_graphs_path, 'longitudinal-analysis')
if os.path.exists(covaxxy_longitudinal_analysis_graphs_path):
   shutil.rmtree(covaxxy_longitudinal_analysis_graphs_path, ignore_errors=False, onerror=None)
os.makedirs(covaxxy_longitudinal_analysis_graphs_path)

Replace with the path to the folder where the SentiStrength library is stored.

NOTE: Due to their policy, the Java version of the library (the one I am using) is only free for academic use. Therefore, I could not make it publicly available. If you wish to use the free library (for academic purposes), I will gladly redirect you to the author at M.Thelwall@wlv.ac.uk . 

More information is available at: http://sentistrength.wlv.ac.uk/

In [10]:
path_to_sentistrength = rootdir_path + '/SentiStrength'

Replace with the path to the Java executable file of SentiStrength.

In [11]:
path_to_sentistrength_jar = path_to_sentistrength + '/SentiStrengthCom.jar'

Replace with the path to the language folder, which is used along with the .jar file to compute sentiment scores.

In [12]:
path_to_sentistrength_language_folder = path_to_sentistrength + '/LanguageFolder'

In [13]:
# stop_words = set(stopwords.words('english'))

Due to the general nature of the NLTK built-in stop words list, most words could actually have an impact in the computation of sentiment scores if removed from the texts (e.g. "all" or "not"), thus I decided against using this pre-defined list. Instead I created a custom list of stop words, which can be found at the following relative location:

In [14]:
path_to_stopwords = files_path + '/stopwords.txt'

In [15]:
def custom_stop_words(path_to_stopwords):
    """Function to read a .txt file containing (custom) stop words and return a set of these stop words.

    Args:
        path_to_stopwords (str): path to the.txt file containing stop words (e.g. /your/path/to/files/stop_words.txt)

    Returns:
        set: set of stop words
    """    
    stop_words = set()
    with open(path_to_stopwords, 'r') as f:
        for line in f:
            word = line.strip()  # remove whitespace and newline characters
            stop_words.add(word)
    return stop_words

In [16]:
stop_words = custom_stop_words(path_to_stopwords)

In [17]:
def remove_emoji(text):
    """Function that takes a text string as input and uses a regular expression pattern to match all Unicode characters
    that are classified as emojis. The regular expression includes different ranges of Unicode characters 
    that represent different types of emojis, such as emoticons, symbols, and flags.

    Args:
        text (str): text string to remove emokis from

    Returns:
        str: text string with all emojis removed
    """    
    emoji_pattern = re.compile("["
        u"\U0001F600-\U0001F64F"  # emoticons
        u"\U0001F300-\U0001F5FF"  # symbols & pictographs
        u"\U0001F680-\U0001F6FF"  # transport & map symbols
        u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
        u"\U00002702-\U000027B0"
        u"\U000024C2-\U0001F251"
                           "]+", flags=re.UNICODE)
    
    return emoji_pattern.sub(r'', text)

In [18]:
def remove_stopwords(text, stop_words):
    """Function that removes stop words from a given text.

    Args:
        text (str): text string
        stop_words (set): set of stop words

    Returns:
        str: text string without stop words
    """    
    # Tokenize the text
    tokens = word_tokenize(text)

    # Remove the stopwords
    filtered_tokens = [token for token in tokens if token.lower() not in stop_words]

    # Join the filtered tokens back into a string
    filtered_text = ' '.join(filtered_tokens)

    return filtered_text

In [19]:
def clean_text(text, stop_words):
    """Function to clean the raw text, e.g. from a tweet. Performs the following steps:
    1. Lowercase all the words in the text
    2. Replace all new line characters with a white space
    3. Remove tags
    4. Remove URLs
    5. Remove punctuations
    6. Convert contractions to their full forms
    7. Remove emojis (emoticons, symbols, flags, etc.)
    8. Remove stopwords


    Args:
        text (str): text string to be cleaned before passing it to the sentiment analysis model
        stop_words (set): set of stop words to be removed from the text

    Returns:
        str: cleaned text string
    """        
    # 1. Lowercase all words in the text
    text = text.lower()

    # 2. Replace the new line character with empty string
    text = text.replace("\n", "")
    
    # 3. Remove words starting with '@' - tags (most common noise in replies)
    text = re.sub(r'@\w+', '', text, flags=re.MULTILINE)

    # 4. Remove words starting with 'http' - hyperlinks
    text = re.sub(r'http\S+|www.\S+', '', text, flags=re.MULTILINE)

    # 5. Remove punctuation from the text using regular expressions
    text = re.sub(r'[^\w\s]', '', text)

    # 6. Remove contractions, such as you're => you are
    contractions.fix(text)

    # 7. Remove emojis
    text = remove_emoji(text)

    # 8. Remove stopwords in English
    text = remove_stopwords(text, stop_words)

    return text

In [20]:
def filter_df_by_date(path):
    """Function to filter a dataframe by date, given the path to the csv file. 
    Returns a dictionary with the dates as keys and the values being the rows where the value of the 'created_at' column
    corresponds to the key.

    Args:
        path (str): path to the csv file (e.g. /your/path/to/tweet_ids--2021-03-01.csv)

    Returns:
        dict: dictionary with the dates as keys and the values being the rows where the value of the 'created_at' column corresponds
              to the key
    """    
    # Read the CSV file into a pandas dataframe
    df_from_file = pd.read_csv(path, index_col= False)
        
    # Convert the "created_at" column to a pandas datetime object
    df_from_file['created_at'] = pd.to_datetime(df_from_file['created_at'])

    # Get all unique timestamp values from the "created_at" column
    unique_dates = df_from_file['created_at'].dt.date.unique()

    # Create a dictionary where the keys are the unique timestamp values
    # and the values are dataframes that correspond to each unique timestamp value
    days = {}
    for date in unique_dates:
        # Extract the rows that have the current timestamp value
        mask = df_from_file['created_at'].dt.date == date
        filtered_df = df_from_file[mask]
        # Store the resulting subset of rows as a dataframe in the dictionary
        days[date] = filtered_df
    
    return days

In [21]:
def create_days(data_path):
    """Function to create the merged days dictionary, performed using parallel computation.

    Args:
        data_path (str): path to the folder where the .csv files are stored

    Returns:
        dict: dictionary containing the merged/concatenated days dictionary, based on all available files
    """    
    # In order to read the data from the files, I need the paths of the files to be passed on to the read_csv() function. 
    file_paths = [ os.path.join(data_path, file) for file in os.listdir(data_path) ]

    # Set the number of processes to run in parallel
    num_processes = multiprocessing.cpu_count() * 2
    # Create a pool of workers to execute the filter_df_by_date function
    with Pool(processes=num_processes) as pool:
        # Use the pool to execute the filter_df_by_date function on each file in parallel
        results = pool.map(filter_df_by_date, file_paths)

    days = dict()
    # Loop that merges all separate days dictionaries obtained after running the parallel computation
    # into one final dictionary, associated with all available data.
    for result in results:
        days = {k: pd.concat([days.get(k, pd.DataFrame()), result.get(k, pd.DataFrame())]) for k in set(days) | set(result)}

    # Dictionary comprehension to format datetime object keys to strings - useful for ease of accessing further down the line.
    days = {datetime_key.strftime('%d-%m-%Y'): df for datetime_key, df in days.items()}

    # Iterate over all the keys in the dictionary
    for key in days.keys():
        days[key].sort_values('created_at', inplace=True)
        # Drop the "id" column from the dataframe corresponding to each key
        days[key].drop('id', axis=1, inplace=True)


    return days

In [22]:
days = create_days(data_path)

EXACT DAYS IN OUR DATASET:

Note: I double checked which days were actually used in the dataset.

In [23]:
sorted_dates_datetimes = sorted([ datetime.strptime(date_string, '%d-%m-%Y') for date_string in days.keys() ])

In [24]:
[ date_object.strftime('%d-%m-%Y') for date_object in sorted_dates_datetimes ]

['24-02-2021',
 '25-02-2021',
 '26-02-2021',
 '27-02-2021',
 '28-02-2021',
 '01-03-2021',
 '02-03-2021',
 '03-03-2021',
 '04-03-2021',
 '05-03-2021',
 '06-03-2021',
 '07-03-2021',
 '08-03-2021',
 '09-03-2021',
 '10-03-2021']

In [25]:
def create_merged_days(days):
    """Function to merge all available days in the days dictionary into a single pandas Dataframe.
    The resulting DataFrame will be sorted by date (each value in the input days dictionary has sorted values and
    I make sure I sort the keys in ascending order as well, when iterating through the dictionary and creating the
    dataframe).

    Args:
        days (dict): dictionary where the keys are the date strings and the values are dataframes containing the
                     rows of each day in the whole dataset

    Returns:
        pandas.core.frame.DataFrame: dataframe containing the merged days, sorted in ascending order by date
    """    
    # Here, I merged all data (from all available days) into a single dataframe (they have the same structure).
    # I did that because some replies to a tweet posted today can come some days after, so we need to take care
    # of the dataset as a whole.

    
    # Convert string keys to datetime objects and sort them
    sorted_keys = sorted([datetime.strptime(k, '%d-%m-%Y') for k in days.keys()])

    # Convert datetime objects back to string keys with format '%d-%m-%Y'
    sorted_key_strings = [k.strftime('%d-%m-%Y') for k in sorted_keys]

    # concatenate the dataframes and reset the index
    merged_days = pd.concat([days[key] for key in sorted_key_strings], ignore_index=True)


    def string_to_int(reference_id):
        try:
            return int(reference_id)
        except ValueError:
            return reference_id

    # Convert string column to datetime
    merged_days['created_at'] = pd.to_datetime(merged_days['created_at'])
    # change the data type of the 'reference_id' column from string to int - where possible (the '#' values remain the same)
    merged_days['reference_id'] = merged_days['reference_id'].apply(string_to_int)

    return merged_days

In [26]:
merged_days = create_merged_days(days)

In [27]:
merged_days

Unnamed: 0,created_at,tweet_id,credible,author_id,text,urls,name,username,verified,location,...,retweet_author_id,retweet_id,retweeted_screen_name,user_mentions_id,user_mentions_screen_name,in_reply_to_user_id,in_reply_to_tweet_id,in_reply_to_username,reference_type,reference_id
0,2021-02-24 18:00:10+00:00,1364636249852502018,1,107501328,RT @Maricopahealth: At one of our community po...,#,2-1-1 Arizona,211arizona,False,Arizona,...,29816986,1364632754042802176,Maricopahealth,29816986,Maricopahealth,#,#,#,retweeted,1364632754042802176
1,2021-02-24 18:00:18+00:00,1364636282664574978,1,26761523,Ready for DAY 2 of State of the Valley? Join u...,"jointventure.org,twitter.com,",Joint Venture SV,JointVentureSVN,False,"San Jose, CA",...,#,#,#,#,#,#,#,#,#,#
2,2021-02-24 18:00:30+00:00,1364636333596008449,1,1234926105234034689,RT @SteveStaeger: When #COVID19Colorado is ove...,#,Colorado Coronavirus Updates,COVIDinColorado,False,"Denver, Colorado",...,182037688,1364293582157307906,SteveStaeger,182037688,SteveStaeger,#,#,#,retweeted,1364293582157307906
3,2021-02-24 18:03:16+00:00,1364637028948709377,1,1329106574082641920,"#SD37: Starting next week, @OCHealth will star...","bit.ly,www.ocregister.com,",Senator Dave Min,SenDaveMin,True,"Irvine, CA",...,#,#,#,36069538,ochealth,#,#,#,#,#
4,2021-02-24 18:03:35+00:00,1364637110951583746,1,1363750425459970048,RT @jatinde45666597: Vaccination has been star...,#,Reena Sharma,write2reena,False,"Auckland, New Zealand",...,1295748297529884673,1364087633538859008,jatinde45666597,1295748297529884673,jatinde45666597,#,#,#,retweeted,1364087633538859008
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5123691,2021-03-10 23:59:52+00:00,1369800203939745796,1,434360613,RT @Philo: The boys of #SouthPark are at it ag...,#,ami_,ami_tvdfan,False,#,...,81766872,1369799981763162113,philoTV,23827692,ComedyCentral,#,#,#,retweeted,1369799981763162113
5123692,2021-03-10 23:59:52+00:00,1369800204094963712,1,3083078947,RT @ericswalwell: The #AmericanRescuePlan puts...,#,Thomas Albrecht 🇺🇸☮️,TomAlb88,False,#,...,377609596,1369727803768201218,ericswalwell,377609596,ericswalwell,#,#,#,retweeted,1369727803768201218
5123693,2021-03-10 23:59:53+00:00,1369800204761899011,1,29801287,"RT @TheDweck: Wow, vision boards work",#,Fauxnly Fans,thenickkontz,False,"ÜT: 43.508306,-96.779489",...,98247788,1369742802590990336,TheDweck,98247788,TheDweck,#,#,#,retweeted,1369742802590990336
5123694,2021-03-10 23:59:53+00:00,1369800205445521409,1,1095155084,just saw some lady on the news say she’s not g...,#,cristal✨,cristal_guz,False,Raleigh |22|,...,#,#,#,#,#,#,#,#,#,#


In [28]:
merged_days.columns

Index(['created_at', 'tweet_id', 'credible', 'author_id', 'text', 'urls',
       'name', 'username', 'verified', 'location', 'followers_count',
       'following_count', 'tweet_count', 'like_count', 'quote_count',
       'reply_count', 'retweet_count', 'retweet_author_id', 'retweet_id',
       'retweeted_screen_name', 'user_mentions_id',
       'user_mentions_screen_name', 'in_reply_to_user_id',
       'in_reply_to_tweet_id', 'in_reply_to_username', 'reference_type',
       'reference_id'],
      dtype='object')

REACTIONS

There are 3 types of reactions:
- replies ('replied_to')
- quotes ('quoted')
- retweets ('retweeted')

The types of reactions you wish to take into account further down the line should be specified in the list below. The list should contain one or more of the strings mentioned in parentheses above.

In [29]:
# reaction_types = ['quoted']
# reaction_types = ['quoted', 'retweeted']
# reaction_types = ['replied_to']
reaction_types = ['replied_to', 'quoted']
# reaction_types = ['replied_to', 'quoted', 'retweeted']
# reaction_types = ['replied_to', 'retweeted']

In [30]:
def create_path_to_opinion_changes(reaction_types):
    """Function to create the path to the opinion changes JSON file, based on the reaction types we took into consideration.

    Args:
        reaction_types (list): list of reaction types

    Returns:
        str: path to the opinion changes file
    """    
    type = "_".join(reaction_types)
    path = opinion_changes_path + f"/{type}_OC.json"

    return path

In [31]:
def group_reactions(merged_days, reaction_types):
    """Function to group reactions based on the reaction types list given as an input parameter, by the
    'author_id' and 'reference_id' columns. This means that each group of reactions contains a (set of) reaction(s)
    posted by the user identified by the 'author_id' and the source tweet identified by the 'reference_id'.

    Args:
        merged_days (pandas.core.frame.DataFrame): dataframe with all the data
        reaction_types (list): list of reaction types we want to consider

    Returns:
        dict: dictionary where the key is a tuple of the form (author_id, reference_id)
              and the value is a dataframe with all reactions corresponding to that combination
    """    
    reactions = merged_days[merged_days['reference_type'].isin(reaction_types)]
    multiple_reactions = reactions[reactions.duplicated(subset=['author_id', 'reference_id'], keep=False)]

    # group the rows by the two columns
    grouped_df = multiple_reactions.groupby(['author_id', 'reference_id'])
    groups_of_reactions = grouped_df.groups

    return groups_of_reactions

In [32]:
groups_of_reactions = group_reactions(merged_days, reaction_types)

In [33]:
len(groups_of_reactions)

12221

We need to create an instance of the PySentiStr class to which we set the path to the Java executable and the path to the language folder. After this, we are all set to use the SentiStrength library.

In [34]:
senti = PySentiStr()
senti.setSentiStrengthPath(path_to_sentistrength_jar)
senti.setSentiStrengthLanguageFolderPath(path_to_sentistrength_language_folder)

In [35]:
def compute_sentiments(rows_indexes, dataset, stop_words):
    """Function to compute the sentiment list for a set of rows in the dataset (given by dataset), taking
    into account the given stop words.

    Args:
        rows_indexes (pandas.core.indexes.numeric.Int64Index): indexes of rows in the dataset that we want to compute the sentiment for
        dataset (pandas.core.frame.DataFrame): dataframe containing the dataset
        stop_words (set): set of stop words

    Returns:
        list: list of sentiment scores for each row identified by rows_indexes
    """    
    texts = [ clean_text(dataset.loc[index, 'text'], stop_words) 
             if dataset.loc[index, 'reference_type'] != 'retweeted' else 'extremely fabulous'
             for index in rows_indexes ]
    
    sentiments = senti.getSentiment(texts, score='scale')

    return sentiments

In [36]:
def opinion_change(rows_indexes, dataset, stop_words):
    """Function to detect whether an opinion change occured within a group of reactions (replies/quotes/retweets).

    Args:
        rows_indexes (pandas.core.indexes.numeric.Int64Index): list of indexes in the original dataframe (dataset)
                                                               where we aim to detect an opinion change
                                                               (e.g. Int64Index([1848965, 1850146, 1850687], dtype='int64'))
        dataset (pandas.core.frame.DataFrame): dataframe containing the opinion changes
        stop_words (list): list of stopwords

    Returns:
        bool: boolean value which confirms or denies the existence of an opinion change between the rows analyzed
    """ 
    sentiments = compute_sentiments(rows_indexes, dataset, stop_words)
    sentiments = np.array(sentiments)

    positive = np.any(sentiments > 0)
    negative = np.any(sentiments < 0)

    return positive and negative

CREATION OF TEST DATA - NECESSARY WHEN RUNNING TESTS

In [37]:
def create_test_groups(full_dataset):
    replies = full_dataset[full_dataset['reference_type'] == 'replied_to']
    multiple_replies = replies[replies.duplicated(subset=['author_id', 'reference_id'], keep=False)].copy()
    multiple_replies['reference_id'] = multiple_replies['reference_id'].astype(int)
    test_multiple_replies = multiple_replies.head(2000).copy()

    # group the rows by the two columns
    grouped_df = test_multiple_replies.groupby(['author_id', 'reference_id'])
    test_groups = grouped_df.groups

    return test_groups

In [38]:
# test_groups = create_test_groups(merged_days)

In [39]:
# test_data = merged_days.head(500).copy()

IMPORTANT NOTE:

It took more than 80 minutes when I ran the creation of the opinion_changes dictionary, in parallel, on the all types of reactions -  replied_to, quoted, retweeted ... I did not manage to run the sequential algorithm on all reactions due to hardware limitations (kernel crashed or execution stalled) ...

It took almost 11 minutes when I ran the creation of the opinion_changes dictionary, in parallel, on the replies in the dataset.

I saved the resulting dictionaries into JSON files, which can be found in the files/ directory of the project. These can be imported into dictionaries with ease (code can be found in the next parts of the notebook).

HOWEVER, if you still wish to run the creation of the opinion_changes dictionary using the algorithms, uncomment the commented cells between the lines.

--------------------------------------------------------------------------------------------------------------------------------------------

SEQUENTIAL COMPUTATION - OPINION CHANGES

In [40]:
group_counter = 0
progress = 0.001

In [41]:
def print_progress(groups_of_reactions):
    """Function that prints the progress of the computation of the opinion_changes dictionary,
    as it takes a lot of time for large datasets.

    Args:
        groups_of_reactions (dict): dictionary of reactions grouped by some columns 
                                    (we expect the columns to be 'author_id' and 'reference_id')
    """    
    global group_counter
    global progress

    ipython = get_ipython()
    if ipython is not None:
        group_counter = ipython.user_ns['group_counter']
        progress = ipython.user_ns['progress']

    group_counter += 1
    if ((group_counter / len(groups_of_reactions)) >= progress):
        print(f"Progress: {group_counter} / {len(groups_of_reactions)} groups of reactions processed.")
        progress += 0.001
    if group_counter == len(groups_of_reactions):
        print("All groups have been processed.")

In [42]:
def create_opinion_changes(groups_of_reactions, dataset, progress_printing, stop_words):
    """Function to create the data structure associated with the groups (pairs of user id - source tweet id)
    where an opinion change occured, i.e. when, between their interactions (e.g. one's replies to the other's original post),
    there have been both positive and negative opinions.

    Args:
        groups_of_reactions (dict): dictionary of reactions grouped by some columns
                                    (we expect the columns to be 'author_id' and 'reference_id')
        progress_printing (bool): boolean value indicating whether the user wishes to print the progress of the groups processed or not
                                  (this can be useful to track when processing large datasets - they usually take a lot of time)

    Returns:
        dict: dictionary where the keys represent the groups where opinion changes occured (as tuples) and the values are
              lists of the sentiments associated to the interactions within each group
    """    
    if progress_printing == True:
        opinion_changes = {}
        for group, rows_indexes in groups_of_reactions.items():
            print_progress(groups_of_reactions)
            if opinion_change(rows_indexes, dataset, stop_words) == True:
                opinion_changes[group] = compute_sentiments(rows_indexes, dataset, stop_words)
    else:
        print("Gradual progress will not be printed.")
        print("If you wish to see it, change the value of the progress_printing input parameter to True.")
        opinion_changes = { group: compute_sentiments(rows_indexes, dataset, stop_words) for group, rows_indexes in groups_of_reactions.items() 
                        if opinion_change(rows_indexes, dataset, stop_words) == True }
    
    return opinion_changes

In [43]:
progress_printing = True

In [44]:
# opinion_changes = create_opinion_changes(groups_of_reactions, merged_days, progress_printing, stop_words)

PARALLEL COMPUTATION - OPINION CHANGES

In [45]:
def process_sentiments_for_group(rows_indexes):
    """Function to compute the sentiments of the tweets provided by the row indexes within the merged_days dataframe.
    Returns a list of sentiments corresponding to each of the tweets or an empty list if there was no opinion change 
    within that group.

    Args:
        rows_indexes (pandas.core.indexes.numeric.Int64Index): list of indexes in the original dataframe (dataset)
                                                               where we aim to detect an opinion change
                                                               (e.g. Int64Index([1848965, 1850146, 1850687], dtype='int64'))
    Returns:
        list: list of sentiments for the rows or empty list if there was no opinion change within that group.
    """    
    global merged_days
    global stop_words

    ipython = get_ipython()
    if ipython is not None:
        merged_days = ipython.user_ns['merged_days']
        stop_words = ipython.user_ns['stop_words']

    processed_values = []
    if opinion_change(rows_indexes, merged_days, stop_words):
        processed_values = compute_sentiments(rows_indexes, merged_days, stop_words)

    return processed_values

In [46]:
def process_dict_chunk(input_dict):
    """Function to create an opinion_changes dictionary only for a chunk of data.

    Args:
        input_dict (dict): chunk of data

    Returns:
        dict: opinion_changes dictionary for a chunk of data
    """    
    # Process a chunk of the input dictionary
    processed_dict = {}
    counter = 0
    progress = 0.0001
    
    for group, rows_indexes in input_dict.items():
        processed_values = process_sentiments_for_group(rows_indexes)
        if processed_values:  # only add non-empty lists to the dictionary
            processed_dict[group] = processed_values

        counter += 1
        if ((counter / len(input_dict)) >= progress):
            print(f"{counter} / {len(input_dict)} entries processed...\n")
            progress += 0.0001
        if counter == len(input_dict):
            print(f"Thread has finished processing all {len(input_dict)} entries.")


    return processed_dict

In [47]:
def process_dict_in_parallel(input_dict, num_processes=None):
    """Function to process the input dictionary of reactions in parallel and merge the atomic results together into a single dictionary,
    which will be the final opinion_changes dictionary.

    Args:
        input_dict (dict): dictionary of reactions grouped by some columns
                           (we expect the columns to be 'author_id' and 'reference_id')
        num_processes (int): number of parallel(worker) threads/processes. Defaults to None.

    Returns:
        dict: the final opinion_changes dictionary, which contains all the pairs of 'author_id' and 'reference_id'
              (and their respective rows in the original dataframe) in the whole dataset, where an opinion change occured
    """    
    # Default to using all available CPU cores
    if num_processes is None:
        num_processes = multiprocessing.cpu_count()

    # Split the input dictionary into smaller chunks for parallel processing
    chunk_size = len(input_dict) // num_processes
    input_chunks = [dict(list(input_dict.items())[i:i + chunk_size]) for i in range(0, len(input_dict), chunk_size)]

    # Process the input chunks in parallel using a pool of worker processes
    with multiprocessing.Pool(processes=num_processes) as pool:
        processed_dicts = pool.map(process_dict_chunk, input_chunks)

    # Merge the processed dictionaries from each input chunk
    processed_dict = {}
    for d in processed_dicts:
        processed_dict.update(d)

    return processed_dict

In [48]:
# opinion_changes_parallel = process_dict_in_parallel(groups_of_reactions)

SAVE DICTIONARY TO JSON FILE

In [49]:
def save_opinion_changes_to_JSON(opinion_changes, reaction_types):
    """Function to save the dictionary of opinion changes to a JSON file.

    Args:
        opinion_changes (dict): dictionary with opinion changes
        reaction_types (list): list of reaction types
    """    
    path = create_path_to_opinion_changes(reaction_types)

    # create a new dictionary with string keys
    opinion_changes_for_JSON_file = {str(key): value for key, value in opinion_changes.items() }
    with open(path, 'w') as file:
        json.dump(opinion_changes_for_JSON_file, file, indent=4)

In [50]:
reaction_types

['replied_to', 'quoted']

In [51]:
# save_opinion_changes_to_JSON(opinion_changes_parallel, reaction_types)

--------------------------------------------------------------------------------------------------------------------------------------------

LOAD DICTIONARY FROM JSON FILE

In [52]:
def load_opinion_changes(path_to_opinion_changes):
    """Function that generates a dictionary based on a JSON file which contains the opinion changes within the reactions of the dataset.

    Args:
        path_to_opinion_changes (str): path to the JSON file associated with the opinion changes within the reactions
                                               (e.g. /your/path/to/research-internship/files/replied_to_opinion_changes.json)

    Returns:
        dict: the original dictionary containing opinion changes from reactions
    """    
    with open(path_to_opinion_changes) as f:
        # Load the JSON data into a Python dictionary
        opinion_changes_from_file = json.load(f)
        # Create a new dictionary with tuple keys
        original_opinion_changes = {}
        for key in opinion_changes_from_file:
            # Convert the string key to a tuple
            new_key = eval(key)
            # Add the key-value pair to the new dictionary
            original_opinion_changes[new_key] = opinion_changes_from_file[key]
            
    return original_opinion_changes

In [53]:
opinion_changes = load_opinion_changes(create_path_to_opinion_changes(reaction_types))

INSIGHTS

In [54]:
print(f"Percentage of opinion changes out of the interactions where one user reacted multiple times to a source tweet:")
print(f"{round(len(opinion_changes) / len(groups_of_reactions) * 100, 1)}%.")

Percentage of opinion changes out of the interactions where one user reacted multiple times to a source tweet:
13.1%.


In [55]:
def biggest_opinion_change(opinion_changes):
    """Function that returns the group (pair of user id - source tweet id) which interacted more than once 
    in the context of a single source tweet, i.e. one user posted more than one reply to the same source tweet, 
    where the user who reacted had the most drastic opinion change,
    based on the previously computed sentiments of the text.

    Args:
        opinion_changes (dict): dictionary with opinion changes

    Returns:
        tuple: pair of user id - source tweet id, where the biggest opinion change occured
        str: type of change that occured, e.g. one user tends to agree with the source tweet after some time, 
             when initially he disagreed or vice-versa
    """    
    change_type = 'negative'
    biggest_change = 0
    target_group = tuple()
    for group, sentiments in opinion_changes.items():
        change = max(biggest_change, max(sentiments) - min(sentiments))
        if change > biggest_change:
            biggest_change = change
            target_group = group
    
    min_sentiment_index = opinion_changes[target_group].index(min(opinion_changes[target_group]))
    max_sentiment_index = opinion_changes[target_group].index(max(opinion_changes[target_group]))
    change_type = 'positive' if min_sentiment_index < max_sentiment_index else change_type

    return target_group, change_type

In [56]:
target_group, change_type = biggest_opinion_change(opinion_changes)

In [57]:
target_group

(118788479, 1367613364923424769)

In [58]:
change_type

'negative'

In [59]:
def reactions_with_biggest_opinion_change(reactions, target_group):
    """Function that queries the reactions dataset and returns a list of the actual texts that the pair of users
     (the author of the reaction and the author of the source tweet) posted.
     The user id and source tweet id are passed on as input parameters (the target group).

    Args:
        replies (pandas Dataframe): the dataframe with the reactions
        target_group (tuple): pair of user ids - source tweet id, whose posts had the biggest opinion change

    Returns:
        list: list of texts posted by the 2 users
    """    
    condition1 = reactions['author_id'] == target_group[0]
    condition2 = reactions['reference_id'] == target_group[1]

    return reactions[condition1 & condition2].loc[:, 'text'].tolist()

In [60]:
reactions_biggest_change = reactions_with_biggest_opinion_change(merged_days, target_group)

In [61]:
reactions_biggest_change

['In January, a troop of gorillas at the zoo’s Safari Park tested positive for the virus. The zoo vaccinated four orangutans and five bonobos with the experimental vaccine, which is not designed for use in humans. https://t.co/1ME1BqpKPi',
 'Infection of apes is a major concern for zoos and conservationists. They easily fall prey to human respiratory infections, and common cold viruses have caused deadly outbreaks in chimpanzees in Africa. https://t.co/1ME1BqpKPi',
 'Scientists are worrying not just about the danger the virus poses to great apes and other animals, but also about the potential for the virus to gain a foothold in a wild animal population that could become a permanent reservoir and emerge at a later date to reinfect humans. https://t.co/1ME1BqpKPi',
 'Infections in farmed mink have produced the biggest scare so far. When Danish mink farms were devastated by the virus, which can kill mink just as it kills people, a mutated form of the virus emerged from the mink and reinfe

In [62]:
def biggest_opinion_change_type(opinion_changes, group):
    """Function to detect what type of opinion change occured in the case of a group (pair of user id - source tweet id) 
    which interacted.

    Args:
        opinion_changes (dict): dictionary with opinion changes
        group (tuple): pair of user id - source tweet id that interacted through reactions 
                       and the respondent changed his/her viewpoint w.r.t. a source tweet

    Returns:
        str: either 'positive' (if the respondent now agrees after initially disagreeing) or 'negative'
    """    
    min_sentiment_index = opinion_changes[group].index(min(opinion_changes[group]))
    max_sentiment_index = opinion_changes[group].index(max(opinion_changes[group]))
    
    change_type = 'negative'
    change_type = 'positive' if min_sentiment_index < max_sentiment_index else change_type

    return change_type

In [63]:
# Create a boolean mask indicating what type of opinion change each group has
mask = {group: biggest_opinion_change_type(opinion_changes, group) for group in opinion_changes}

In [64]:
def value_count_in_dict(dict, value_to_count):
    """Function to count the occurences of a certain value in a dictionary.

    Args:
        dict (dict): dictionary where we need to count the occurences of a value
        value_to_count (any): value to be counted

    Returns:
        int: number of occurences of value_to_count
    """    
    # Create a reverse dictionary that maps values to their frequencies
    reverse_dict = defaultdict(int)
    for value in dict.values():
        reverse_dict[value] += 1

    # Count the occurrences of the specific value
    count = reverse_dict.get(value_to_count, 0)

    return count

In [65]:
print(f"Percentage of positive opinion changes out of:")
print(f"- the interactions where one user reacted multiple times to a source tweet and an opinion change was detected => {round(value_count_in_dict(mask, 'positive') / len(mask) * 100, 1)}%")

Percentage of positive opinion changes out of:
- the interactions where one user reacted multiple times to a source tweet and an opinion change was detected => 48.1%


In [66]:
print(f"Percentage of negative opinion changes out of:")
print(f"- the interactions where one user reacted multiple times to a source tweet and an opinion change was detected => {round(value_count_in_dict(mask, 'negative') / len(mask) * 100, 1)}%")

Percentage of negative opinion changes out of:
- the interactions where one user reacted multiple times to a source tweet and an opinion change was detected => 51.9%


In [67]:
def compute_opinion_changes_deltas(path_to_opinion_changes):
    opinion_changes = load_opinion_changes(path_to_opinion_changes)
    deltas = { key: max(value) - min(value) for key, value in opinion_changes.items() }

    return deltas

In [68]:
opinion_changes_deltas = compute_opinion_changes_deltas(create_path_to_opinion_changes(reaction_types))

In [69]:
opinion_changes_deltas

{(1819091, 1366831567251865602): 2,
 (2030711, 1368252669421305857): 2,
 (2325381, 1367200561347629057): 5,
 (5895742, 1368371467805728770): 2,
 (6512482, 1369680695321370627): 4,
 (6524022, 1368695745633804302): 3,
 (6611482, 1366788190930038789): 2,
 (7721972, 1366735037849358341): 2,
 (9229302, 1367167700745732102): 2,
 (9532792, 1368301172944216065): 4,
 (10074602, 1366513798618763270): 2,
 (10074602, 1367983340461166592): 4,
 (14188527, 1366466134984314882): 2,
 (14307494, 1366418763101396997): 3,
 (14340043, 1366864501899993097): 2,
 (14691900, 1369372318770884612): 3,
 (14911096, 1368113977872556032): 3,
 (14982131, 1369348837857460228): 2,
 (15009171, 1367159066531880961): 4,
 (15009171, 1367658838522925060): 3,
 (15088010, 1367471479424753674): 2,
 (15200896, 1367528484348649473): 2,
 (15573205, 1367629183208284161): 4,
 (15599745, 1367547104772558848): 2,
 (15711069, 1366960025957236740): 3,
 (15840102, 1366819174782271489): 2,
 (15963711, 1366812131774054401): 2,
 (15984676,

In [71]:
# TODO: Create function to label intervals for the deltas in opinion change

In [72]:
# TODO: Create mask for opinion_changes_deltas, where I apply the function defined below

In [92]:
# TODO: Compute percentages + ADD VISUALIZATIONS FOR ALL INSIGHTS + IMPROVE PRINT MESSAGES

NEW COLUMN ADDITION

CREATION OF REPLIES_AND_QUOTES DATAFRAME, FOR WHICH WE WANT TO SEE IF THEY SUPPORT OR NOT THE SOURCE TWEET.

In [74]:
def create_replies_and_quotes(full_dataset):
    condition_1 = full_dataset['reference_type'] == 'replied_to'
    condition_2 = full_dataset['reference_type'] == 'quoted'

    return full_dataset[condition_1 | condition_2].copy()

In [75]:
replies_and_quotes = create_replies_and_quotes(merged_days)

HELPER FUNCTIONS TO ADD A NEW COLUMN TO THE test_replies_and_quotes DATAFRAME IN PARALLEL.

In [76]:
test_replies_and_quotes = replies_and_quotes.head(1000).copy()

In [77]:
counter = 0
progress = 0.001

In [78]:
def print_progress():
    global counter
    global progress
    global test_replies_and_quotes
    ipython = get_ipython()
    if ipython is not None:
        counter = ipython.user_ns['counter']
        progress = ipython.user_ns['progress']
        test_replies_and_quotes = ipython.user_ns['test_replies_and_quotes']

    counter += 1

    if ((counter / len(test_replies_and_quotes)) >= progress):
        print(f"{counter} / {len(test_replies_and_quotes)} replies or quotes processed.\n")
        progress += 0.001
    if counter == len(test_replies_and_quotes):
        print("New column inserted in the replies_and_quotes dataframe.\n")

In [79]:
def supports_source_tweet(text):
    if not isinstance(text, str):
        return '#'
    
    sentiment = senti.getSentiment(text, score='scale')[0]

    # print_progress()
    
    return sentiment > 0

In [80]:
# Define a wrapper function that applies supports_source-tweet to a chunk of data
def apply_function_to_chunk(chunk):
    return chunk.apply(supports_source_tweet)

In [81]:
def add_support_source_tweet_column_parallel(replies_and_quotes):
    # Split the DataFrame into chunks for parallel processing
    num_chunks = multiprocessing.cpu_count()
    chunks = np.array_split(replies_and_quotes['text'], num_chunks)

    # Create a multiprocessing pool and apply the function to each chunk in parallel
    with multiprocessing.Pool(processes=num_chunks) as pool:
        results = pool.map(apply_function_to_chunk, chunks)

    # Concatenate the results back into a single DataFrame
    # replies_and_quotes['supports_source_tweet'] = pd.concat(results)
    replies_and_quotes.insert(replies_and_quotes.columns.get_loc('text') + 1, 'supports_source_tweet', pd.concat(results))

    return replies_and_quotes

PARALLEL EXECUTION. Benchmark tests (my machine): 1000 recordings => 1m11.1s

In [82]:
# test_replies_and_quotes_parallel = add_support_source_tweet_column_parallel(test_replies_and_quotes)

SEQUENTIAL EXECUTION. Benchmark tests (my machine): 1000 recordings => 2m30.0s

In [83]:
test_replies_and_quotes = replies_and_quotes.head(1000).copy()

In [84]:
# test_replies_and_quotes.insert(test_replies_and_quotes.columns.get_loc('text') + 1, 'supports_source_tweet', test_replies_and_quotes['text'].apply(supports_source_tweet))

CHECK IF RESULTS ARE THE SAME FOR BOTH METHODS

In [85]:
# test_replies_and_quotes_parallel.equals(test_replies_and_quotes)

CODE TO MODIFY THE ORIGINAL DATAFRAME (WITH ALL REPLIES AND QUOTES), WITH AN ADDED COLUMN NAMED 'supports_source_tweet', USING PARALLEL COMPUTATION, AS WELL AS SAVE IT TO A .CSV FILE (UNCOMMENT CELLS TO RUN)

NOTE: There are almost 1 million replies and quotes in the original dataframe, so the following statement is extremely time-consuming.

In [86]:
# replies_and_quotes = add_support_source_tweet_column_parallel(replies_and_quotes)

In [87]:
# path_to_replies_and_quotes = files_path + '/replies_and_quotes_modified.csv'

In [88]:
# # save the DataFrame to a CSV file
# replies_and_quotes.to_csv(path_to_replies_and_quotes, index=False)

In order to calculate the distribution of the tweets per hour, I will parse the "created_at" column, extract the hour property and create a separate column in each dataframe. I will place it next to the "created_at" column in order to be easily verifiable. Data originates frmo the Twitter API, so it comes in a standard ISO 8601 format, which can be easily parsed using the parser module from the dateutil package.

Note: the cell below runs for approximately 2m30' on my machine (~25-30 seconds for each file).

In [89]:
# for key, day in days.items():
#     if 'hour' not in day.columns:
#         day.insert(1, 'hour', day['created_at'].apply(lambda date: parser.parse(date).hour))
#         print(f"New 'hour' column inserted in the {key} dataframe")

In [90]:
# for key, day in days.items():
#     if 'hour' not in day.columns:
#         hours = []
#         for time in day.loc[:,"created_at"]:
#             hour = parser.parse(time).hour
#             hours.append(hour)
#         day.insert(1, "hour", hours, True)
#         print(key + " - added 'hour' column")


The final distribution is made up of the sum of all individual days' distributions. I save a figure in the graphs/ folder for each day, as well as an overall distribution.

In [91]:
# final_distribution = pd.Series(0, index=days['1-3-2021'].loc[:,'hour'].sort_values(ascending=True).unique())
# for key, day in days.items():
#     hour_column_ascending = day.loc[:,"hour"].sort_values(ascending=True)
#     distribution = hour_column_ascending.value_counts()[hour_column_ascending.unique()]
#     final_distribution = final_distribution.add(distribution)
#     axes = distribution.plot(kind='bar')
#     figure_path = f"{covaxxy_longitudinal_analysis_graphs}/{key}_distribution.png"
#     axes.figure.savefig(figure_path)
#     plt.close()
# axes = final_distribution.plot(kind='bar')
# figure_path = f"{covaxxy_longitudinal_analysis_graphs}/overall_distribution.png"
# axes.figure.savefig(figure_path)
# plt.close()
