# Phase 2. Augment data

## Contents
- [Configuration](#Configuration)
  - [Imports](#Imports)
  - [Variables](#Variables)
  - [Support functions](#Support-functions)
- [Users' Botscore](#Users'-Botscore)
  - [Execute Botscore update](#Execute-Botscore-update)
- [Friendship analysis](#Friendship-analysis)
  - [Execute friendship analysis](#Execute-friendship-analysis)
- [Tag verified users with a political party](#Tag-verified-users-with-a-political-party)
  - [Execute tagging functions](#Execute-tagging-functions)
- [Augment political party tags](#Augment-political-party-tags)
  - [Execute users' tagging](#Execute-users'-tagging)
- [Bag of words creation](#Bag-of-words-creation)
  - [Hashtags definitions](#Hashtags-definitions)
  - [Verified political party regex definitions](#Verified-political-party-regex-definitions)
  - [Matching of interactions within BOWs](#Matching-of-interactions-within-BOWs)
- [Sentiment analysis extraction](#Sentiment-analysis-extraction)
  - [Execution of the sentiment analysis and database update](#Execution-of-the-sentiment-analysis-and-database-update)
- [Anonymization](#Anonymization)
  - [Calculate-UUIDs-for-the-users]
  
  


## Configuration

### Imports

In [None]:
# Utilities
from IPython.display import display
from fastprogress import master_bar, progress_bar
from datetime import datetime
from unidecode import unidecode
import os
import ntpath
import numpy as np
import statistics 
import re
import math
import random
import datetime
import uuid
import numbers
from collections.abc import MutableMapping
import pandas as pd
from multiprocessing import Pool

# Botometer API
import botometer

# MongoDB functionality
from pymongo.errors import BulkWriteError
from pymongo import MongoClient, InsertOne, UpdateOne, DeleteOne, UpdateMany, DeleteMany
from pymongo.bulk import BulkOperationBuilder
from bson import ObjectId

# Tweet API for friendships
import tweepy

# Specific imports for sentiment analysis
import emoji
import classifier as cl

# Sentiment algorithm
clf = cl.SentimentClassifier()

### Variables

In [None]:
# Directories where CSV data is stored
ROOT_DIR = "ABOSLUTE_PATH_TO_ROOT_FOLDER"
DATA_DIR = ROOT_DIR + "data/"
# Change path to root
os.chdir(ROOT_DIR)

# Botometer and Twitter Keys for parallel processing
keys = {
     #0: botometer.Botometer(wait_on_ratelimit=True, rapidapi_key='RAPID_API_KEY', **{'consumer_key':'TWITTER_DEV_CONSUMER_KEY', 'consumer_secret':'TWITTER_DEV_CONSUMER_SECRET'}),
     #1: botometer.Botometer(wait_on_ratelimit=True, rapidapi_key='RAPID_API_KEY', **{'consumer_key':'TWITTER_DEV_CONSUMER_KEY', 'consumer_secret':'TWITTER_DEV_CONSUMER_SECRET'}),
}

# MongoDB parameters
mongoclient = MongoClient('IP_ADDRESS', PORT)
db = mongoclient.BB10NPUBLIC
# It will automatically create the tweets' and users' collections.

### Support Functions

In [None]:
def make_objid(text):
    """Makes an ObjectId of 4 bytes
    
    Keyword arguments:
    text -- string to be converted into Object ID
    """
    text = str(text)
    if not text.strip():
        return None
    try:
        return ObjectId(text.rjust(24,"0"))
    except Exception as ex:
        print(text, ex)
        return None

def flatten(d, parent_key='', sep='_'):
    """Formats MongoDB results
    
    Keyword arguments:
    d -- dictionary with key and uncleaned values
    parent_key --
    sep --
    """
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, MutableMapping):
            items.extend(flatten(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)

def chunks(l, n):
    """Yield successive n-sized chunks from l."""
    for i in range(0, len(l), n):
        yield l[i:i + n]

## Users' Botscore

In [None]:
def get_users_without_botscore(user_collection):
    """
    Extracts the ObjectID of those users without botscore
    
    Keyword arguments:
    user_collection -- MongoDB Users' Collection
    """
    users = list(user_collection.find({'$and' : [ {'scores': { '$exists': False }},     
                               { '$or' : 
                                        [{'ignore' : {'$exists' : False}}, 
                                        { '$and' : 
                                                 [ {'ignore' : {'$exists' : True}},
                                                   {'ignore' : False} ]
                                        }]
                               }
                             ]
                   },
                   {'_id': 1}))
    
    users = [u['_id'] for u in users]
    print("Number of users without botscore:",len(users))
    return users

def get_new_users(user_collection):
    """
    Extracts the ObjectID of those users annotated with botscore -1 (first time)
    
    Keyword arguments:
    user_collection -- MongoDB Users' Collection
    """
    users = list(user_collection.find({'scores': -1},
                                      {'_id': 1}))
    
    users = [u['_id'] for u in users]
    print("Number of users to consult the botscore:",len(users))
    return users

def get_botscore_by_userid(user_id):
    """
    Collects the botscore from Botometer
    
    Keyword arguments:
    user_id -- Twitter users' identificator
    """
    
    try:
        botometer_instance = random.choice(keys)
        consumer_key = botometer_instance.consumer_key
        result = botometer_instance.check_account(user_id)
        return UpdateOne({'_id': user_id}, 
                         {'$set': {'scores': result}},
                         upsert=True
                        )
    except Exception as e:
        # Locked account (private)
        auth_match = re.search('Not authorized', str(e))
        timeline_match = re.search('has no tweets in timeline', str(e))
        notExist_match = re.search('Sorry, that page does not exist', str(e))
        overCapacity_match = re.search('Over capacity', str(e))
        
        if auth_match:
            return UpdateOne({'_id': make_objid(user_id)},
                             {'$unset': {'scores':""},
                              '$set': {'ignore': False, 'ignore_reason': 'not authorized'},
                              '$push': {'ignore_key_used': consumer_key}},
                             upsert=True
                            )
        elif overCapacity_match:
            return UpdateOne({'_id': make_objid(user_id)}, 
                             {'$unset': {'scores':""},
                              '$set': {'ignore': False, 'ignore_reason': 'over capacity'},
                              '$push': {'ignore_key_used': consumer_key}},
                             upsert=True
                            )
        elif timeline_match:
            #print("User", user_id, " has no tweets in timeline")
            return UpdateOne({'_id': make_objid(user_id)}, 
                             {'$unset': {'scores':""},
                              '$set': {'ignore': True, 'ignore_reason': 'has no tweets in timeline'}},
                              upsert=True
                            )
        elif notExist_match:
            #print("User", user_id, " does not exists anymore")
            return UpdateOne({'_id': make_objid(user_id)}, 
                             {'$unset': {'scores':""},
                              '$set': {'ignore': True, 'ignore_reason': 'does not exists anymore'}},
                              upsert=True
                            )
        else:
            print("Exception. User:", user_id, "API:", consumer_key, "Message:", e)
        return None

    
def botscores_to_mongodb(users, user_collection, processes=18):
    """
    Saves a list of users' botscores in MongoDB.
    The process can be paralelized with available keys for more speed and handle API Twitter limits
    Note: This method should be improved by implementing non-blocking calls

    Keyword arguments:
    users -- list of Twitter users' identificator
    processes -- number of processes to employ (must be less or equal to the number of available keys)
    """
    
    pool = Pool(processes=processes)
    processes = []

    for uid in progress_bar(users):       
        processes.append(pool.apply_async(
            get_botscore_by_userid, 
            (uid,)
        ))

    pool.close()


    #pool.join()
    print('Getting user botscores...')
    operations = []
    for p in progress_bar(processes):
        #p.wait()
        response = p.get()
        if response is not None:
            operations.append(response)
        
        
        if len(operations) > 1000:
            results = user_collection.bulk_write(operations)
            print("M:", str(results.matched_count).rjust(8, " "),
                  " I:", str(results.inserted_count).rjust(8, " "),
                  " U:", str(results.upserted_count).rjust(8, " "))
            operations = []

    if len(operations) > 0: 
        results = user_collection.bulk_write(operations)
        print("M:", str(results.matched_count).rjust(8, " "),
              " I:", str(results.inserted_count).rjust(8, " "),
              " U:", str(results.upserted_count).rjust(8, " "))

## Execute Botscore update

In [None]:
%%time
# get users for whom the botscore has never been consulted
users = get_new_users(db.users)

In [None]:
# try to find the botscore for users that was already tried at another time
users = get_users_without_botscore(db.users)

In [None]:
users[1]

In [None]:
# update the database with botscores
botscores_to_mongodb(users, db.users, 12)

### Removed users that are not available

In [None]:
users_operations = []
tweets_operations = []
for u in users:
    users_operations.append(DeleteOne({'_id': u}))
    tweets_operations.append(DeleteMany({
        '$or': [
            {'user_id': u}, 
            {'in_reply_to_user_id': u}, 
            {'retweet_or_quote_user_id': u}
        ]}))
    if len(users_operations) > 3000:
        print(len(users_operations), " users to be deleted", end="; ")
        results = db.users.bulk_write(users_operations, ordered=False)
        print("U:", str(results.deleted_count).rjust(8, " "), end="; ")
        results = db.tweets.bulk_write(tweets_operations, ordered=False)
        print("T:", str(results.deleted_count).rjust(8, " "))
        users_operations = []
        tweets_operations = []
    
if len(users_operations) > 0:
    print(len(users_operations), " users to be deleted", end="; ")
    results = db.users.bulk_write(users_operations, ordered=False)
    print("U:", str(results.deleted_count).rjust(8, " "), end="; ")
    results = db.tweets.bulk_write(tweets_operations, ordered=False)
    print("T:", str(results.deleted_count).rjust(8, " "))

## Friendship analysis

In [None]:
def get_user_ids(user_collection):
    """
    Extracts the ObjectID of all users
    
    Keyword arguments:
    user_collection -- MongoDB Users' Collection
    """
    
    total_users = list(user_collection.find({},{'_id': 1}))
    total_users = [u['_id'] for u in total_users]
    print("Number of total users:",len(total_users))
    return total_users

def get_users_without_friends(user_collection):
    """
    Extracts the ObjectID of users with no friends consulted
    
    Keyword arguments:
    user_collection -- MongoDB Users' Collection
    """
    pipeline = [
    {
        '$match': {
            'ignore' : {
                '$exists':False
            },
            'friends': {
                '$exists':False
            }
        }
    } 
    ,
    {
        '$project': {
            '_id':1 
        }
    }
    ]
    
    print("Query", end=" ")
    users = user_collection.aggregate(pipeline, allowDiskUse=True)
    print("OK; List", end=" ")
    users = list(users)
    users = [u['_id'] for u in users]
    print("OK; Total records:", len(users))
    return users

def get_bots_without_friends(user_collection):
    """
    Extracts the ObjectID of bots with no friends consulted
    
    Keyword arguments:
    user_collection -- MongoDB Users' Collection
    """
    
    # 95th percentile
    p95 = 0.6908019160064479

    pipeline = [
    {
        '$match': {
            'ignore' : {
                '$exists':False
            },
            'friends': {
                '$exists':False
            },
            'scores.scores.universal': {
                '$gte': p95
            }
        }
    } 
    ,
    {
        '$project': {
            '_id':1 
        }
    }
    ]
    
    print("Query", end=" ")
    bots = user_collection.aggregate(pipeline, allowDiskUse=True)
    print("OK; List", end=" ")
    bots = list(bots)
    bots = [b['_id'] for b in bots]
    print("OK; Total records:", len(bots))
    return bots

def get_friendships_by_userid(user_id, total_users, user_collection):
    """
    Consults followers and followings of a user and save in MongoDB
    those who are within the total recollected sample of users.
    
    Keyword arguments:
    user_id -- Twitter users' identificator
    total_users -- List of the total of Twitter users' identificators within our database
    user_collection -- MongoDB Users' Collection
    """
    botometer_instance = random.choice(keys)
    consumer_key = botometer_instance.consumer_key
    consumer_secret = botometer_instance.consumer_secret

    filter_uid = {'_id': user_id}
    message = "Checking:" + str(user_id) + " "
    filter_content = {}

    try:
        auth = tweepy.AppAuthHandler(consumer_key,consumer_secret)
        api = tweepy.API(auth, wait_on_rate_limit=True)
        methods = [api.friends_ids,api.followers_ids]
        political_friendship_ids = {
            'friends' : [],
            'followers': [],
        }

        for name,method in zip(['friends','followers'],[api.friends_ids,api.followers_ids]):
            #print("\tQuerying", name, method)
            for friendships in tweepy.Cursor(method, user_id = user_id, count=5000).pages():
                #print("\tCrawled:", len(friendships), end="; ")
                filtered = [make_objid(f) for f in friendships if make_objid(f) in total_users]
                #print("In our DB:", len(filtered), end="; ")
                political_friendship_ids[name] += filtered

        message += "\tFriends:" + str(len(political_friendship_ids['friends']))
        message += "\tFollowers:" + str(len(political_friendship_ids['followers']))
        filter_content = {
            '$push': {
                'friends' : {
                    '$each' : political_friendship_ids['friends']
                },
                'followers' : {
                    '$each' : political_friendship_ids['followers']
                }
            }
        }

    except tweepy.TweepError as err:
        print(message+"tweepy.TweepError=", err)
        filter_content = {
            '$set': {
                'ignore': True, 'ignore_reason': str(err)
            },
            '$push': {
                'ignore_key_used': consumer_key}
        }
    except Exception as e:
        print(message+"Exception. User:", user_id, "API:", consumer_key, "Message:", e)
        return False

    #print(filter_uid,filter_content)
    res = user_collection.update_one(filter_uid, filter_content, upsert=True)
    print(message + "\tMa:", res.matched_count, "\tMo:", res.modified_count, "\tUp:", res.upserted_id, ";\tDONE!")
    return True

def friendships_to_mongodb(users_to_analyze,total_users, processes=18):
    """
    Extracts the followers a followings of a list of users and save in
    database those who are within the total recollected sample of users.
    Note: This method should be improved by implementing non-blocking calls
    
    Keyword arguments:
    users_to_analyze -- List of Twitter users' identificators to analyze the friendships
    total_users -- List of the total of Twitter users' identificators within our database
    processes -- number of processes to employ (must be less or equal to the number of available keys)
    """
    
    pool = Pool(processes=processes)
    processes = []
    print('Preparing processes...')
    for uid in progress_bar(users_to_analyze):       
        processes.append(pool.apply_async(
            get_friendships_by_userid, 
            (uid, total_users, MongoClient('127.0.0.1',27017).botbusters.users)   # each process should have a new MongoClient session!
        ))

    pool.close()
    #pool.join()
    operations = []
    print('Getting friendships...')
    for p in progress_bar(processes):
        p.get()

### Execute friendship analysis

In [None]:
# get the list of all users to know always with which followers/followings of the consulted ones it is necessary to keep
total_users = get_user_ids(db.users)

In [None]:
# get bots without friends: we prefer to give priority to bots, then to other users
bots_without_friends = get_bots_without_friends(db.users)
friendships_to_mongodb(bots_without_friends,total_users,processes=10)

In [None]:
# get users without friends once bots have been processed
users_without_friends = get_users_without_friends(db.users)
friendships_to_mongodb(users_without_friends,total_users,processes=10)

## Tag verified users with a political party

In [None]:
def get_name_description_verified_users(user_metadata_collection):
    """
    Extracts the name and description of verified users. 
    
    Keyword arguments:
    user_metadata_collection -- MongoDB Users' Metadata Collection (containing screen name and description of users)
    """
    
    # pipeline to extract user_id and screen_name+user_description verified accounts
    # as we have different temporal instances of this information, we select the first one
    political_verified_users_pipeline = [
    {
        '$match': {
            'user_verified': True
        }
    }
    , {
        '$group': {
            '_id': '$user_id', 
            'user_info': {
                '$push': {
                    '$concat': [
                        '$user_screen_name', ' ', '$user_description'
                    ]
                }
            }
        }
    }
    ]
    
    verified_users = list(user_metadata_collection.aggregate(political_verified_users_pipeline,allowDiskUse=True))
    print(len(verified_users),"verified users found!")
    return verified_users

def get_political_parties(verified_users):
    """
    Returns the users belonging to the five main political parties.
    
    Keyword arguments:
    verified_users -- List of user ids + user info (description+name) 
    """
    
    parties = {
        'UP': set([]),
        'PSOE': set([]),
        'Ciudadanos': set([]),
        'PP': set([]),
        'VOX': set([])
    }

    # regexs used against user info (screen name + description) to determine the political party
    regexs  = {
        'UP': '.*(Unidas Podemos|Podemos| Podem|Izquierda Unida|iunida|iu ).*',
        'PSOE': '.*(PSOE|Partido Socialista|psc).*',
        'Ciudadanos': '.*(Cs_| Cs |Ciudadanos|ciutadans).*',
        'PP': '.*( PP|populares|Partido popular).*',
        'VOX': '.*( VOX|@vox_es).*'
    }

    print('Getting political party of verified users...')
    
    ## Fills PARTIES dictionary with users
    for verified_user in progress_bar(verified_users):
        partyFound = False
        for political_party in ['UP','PSOE','PP','VOX','Ciudadanos']:
            for info in verified_user['user_info']:
                if info is not None:
                    if re.match(regexs[political_party],info,re.IGNORECASE):
                        parties[political_party].add(verified_user['_id'])
                        partyFound = True
                        break
            if partyFound:
                break
    return parties

def political_parties_to_mongodb(parties, user_collection):
    """
    Inserts in DB the political party of those users which have been identified
    
    Keyword arguments:
    parties -- Dictionary with the five political parties as keys, and a list of users' ObjectID as values.
    """
    operations = []
    for political_party in ['UP','PSOE','Ciudadanos','PP','VOX']:
        for user_id in parties[political_party]:
            operations.append(UpdateOne({'_id': user_id}, 
                             {'$set': {'political_party': political_party}},
                             upsert=False
                            ))

    print(len(operations), "users related to political parties saved!")
    results = user_collection.bulk_write(operations)
    print("M:", str(results.matched_count).rjust(8, " "),
              " I:", str(results.inserted_count).rjust(8, " "),
              " U:", str(results.upserted_count).rjust(8, " "))

### Execute tagging functions

Each tagged user is then manually verified.

In [None]:
# getting the name and description of those users with Twitter's VERIFIED attribute set to true
verified_users = get_name_description_verified_users(db.users_metadata)

In [None]:
# getting group of user ids belonging to each party
parties = get_political_parties(verified_users)

In [None]:
# update users in database if political party has been identified
political_parties_to_mongodb(parties, db.users)

## Augment political party tags

Retrieve the users with the most tweets and tag them with the political party. Users' will need to be manually validated.

In [None]:
def get_users_for_political_party(user_metadata_collection):
    """
    Extracts no verified humans ordered by number of interactions
    
    Keyword argument:
    user_metadata_collection -- MongoDB Users' Metadata Collection (containing screen name and description of users)
    """
    
    # 75th percentile of botscores
    p75 = 0.23633691139538376
    
    users_pipeline = [
    {
        '$match': {
            'user_verified': False
        }
    }, {
        '$group': {
            '_id': '$user_id', 
            'user_screen_names': {
                '$push': '$user_screen_name'
            }, 
            'user_descriptions': {
                '$push': '$user_description'
            }, 
            'count': {
                '$sum': 1
            }
        }
    }, {
        '$sort': {
            'count': -1
        }
    }, {
        '$lookup': {
            'from': 'users', 
            'localField': '_id', 
            'foreignField': '_id', 
            'as': 'user'
        }
    }, {
        '$unwind': {
            'path': '$user'
        }
    }, {
        '$match': {
            'user.scores.scores.universal': {
                '$lte': p75
            }
        }
    }, {
        '$project': {
            '_id': 1, 
            'user_screen_names': 1, 
            'user_descriptions': 1
        }
    }
    ]

    users = list(user_metadata_collection.aggregate(users_pipeline,allowDiskUse=True))
    return users


def get_augmented_political_parties(users):
    """
    Returns the users belonging to the five main political parties.
    
    Keyword arguments:
    users -- List of user ids + user info (description+name) 
    """
    parties = {
        'UP': set([]),
        'PSOE': set([]),
        'Ciudadanos': set([]),
        'PP': set([]),
        'VOX': set([])
    }

    # regexs used against user info (screen name + description) to determine the political party
    regexs  = {
        'UP': '.*(Unidas Podemos|Podemos| Podem|Izquierda Unida|iunida|iu ).*',
        'PSOE': '.*(PSOE|Partido Socialista|psc).*',
        'Ciudadanos': '.*(Cs_| Cs |Ciudadanos|ciutadans).*',
        'PP': '.*( PP|populares|Partido popular).*',
        'VOX': '.*( VOX|@vox_es).*'
    }

    print('Getting political party for augmented users...')
    
    ## Fills PARTIES dictionary with users
    for user in progress_bar(users):
        partyFound = False
        if user['user_screen_names'] is not None:
            for screen_name in user['user_screen_names']:
                    for political_party in ['VOX','PSOE','UP', 'PP','Ciudadanos']:
                            if screen_name is not None:
                                # only regex on screen name to avoid ambiguous descriptions (for example, with Ciudadanos)
                                if re.match(regexs[political_party],screen_name,re.IGNORECASE):
                                    parties[political_party].append(user['_id'])
                                    partyFound = True
                                    break
                    if partyFound:
                        break
    return parties

### Execute users' tagging

In [None]:
# getting users ordered by number of interactions
users = get_users_for_political_party(db.users_metadata)

In [None]:
# getting users with identified polical party
parties = get_augmented_political_parties(users)

In [None]:
# only save the most influential users of each political party (depending on remining numbers to reach at least 200)

# check how many users we need to reach at least 200 users for each political party
verified_users = list(db.users.find({'political_party': { '$exists': True}},{'_id':1,'political_party':1}))
users_to_check = {
    'Ciudadanos':0,
    'PP':0,
    'PSOE':0,
    'UP':0,
    'VOX':0
}

for party, party_group in verified_users.groupby(by=['political_party']):
    users_to_check[party] = (200-len(party_group)) + (200-len(party_group))//4

# we select the necessary users from all the analyzed users
parties_to_mongodb = {
    'VOX': [],
    'PSOE':[],
    'UP':[],
    'PP':[],
    'Ciudadanos':[]
}

for p in PARTIES:
    number = users_to_check[p]
    for i in range(0,number):
        parties_to_mongodb[p].append(parties[p][i])

In [None]:
# update database with manually labeled users
political_parties_to_mongodb(parties_to_mongodb)

## Bag-of-words creation

### Hashtags definitions

In [None]:
BAG_OF_WORDS = {
    "VOX": [
        "#VOX",  
        "#EspañaSiempre",
        "Abascal",
        "Santiago Abascal",
        "Santi Abascal",
    ], 
    "PP": [
        "#PartidoPopular", 
        "Partido Popular",
        "#PP", 
        "#PorTodoLoQueNosUne",
        "Pablo Casado",
    ], 
    "CIUDADANOS": [
        "#Ciudadanos", 
        "#Cs ", 
        "#EspañaEnMarcha",
        "Albert Rivera",
        "Rivera",
    ], 
    "PSOE": [
        "#AhoraSí", 
        "#AhoraEspaña", 
        "#PSOE", 
        "#PSOEcompraVotos",
        "Pedro Sánchez",
    ], 
    "UP": [
        "#UnidasPodemos", 
        "Unidas Podemos",
        "#ElPoderDeLaGente", 
        "#MamadasPodemos", 
        "#SePuede", 
        "#UnGobiernoContigo",
        "Pablo Iglesias",
    ], 
    "Elecciones": [
        "#10N", 
        "#10NElecciones", 
        "#10Noviembre", 
        "#Elecciones10N", 
        "#eleccionesgenerales10N", 
        "#EleccionesNoviembre2019", 
    ], 
    "Exhumacion": [
        "#exhumacionFranco", 
        "#francisfrancoesp", 
        "#FrancoCalientaQueSales", 
        "#unboxingfranco", 
    ], 
    "Cataluña": [
        "#116YA", 
        "#disturbiosBarcelona", 
        "#EstadoDeExcepcion", 
        "#MarlaskaDimisionYa", 
        "#SpainIsAFascistState", 
        "#ThisIsTheRealSpain", 
        "#tsunamidemocractic", 
        "#tsunamiinfiltrado", 
    ], 
    "Debates": [
        "#Debate10N", 
        "#DebateA5", 
        "#Debatea7RTVE", 
        "#DebateElectoral", 
        "#DebatePresidencial", 
        "#ElDebate4N", 
        "#ElDebateEnRTVE", 
        "#UltimaOportunidadL6", 
    ], 
    "AbascalEH": [
        "#SantiagoAbascalEH", 
        "#elhormigueroabascal",
        "#BoicotElHormiguero",
    ]
}

### Verified political party regex definitions

In [None]:
verified_parties_users_pipeline = [
    {
        '$match': {
            'political_party': {
                '$exists': True
            }
        }
    }, {
        '$lookup': {
            'from': 'users_metadata', 
            'localField': '_id', 
            'foreignField': 'user_id', 
            'as': 'user'
        }
    }, {
        '$unwind': {
            'path': '$user'
        }
    }, {
        '$group': {
            '_id': '$user.user_screen_name', 
            'political_party': {
                '$first': '$political_party'
            }
        }
    }
]


verified_users = list(db.users.aggregate(verified_parties_users_pipeline))

for vu in progress_bar(verified_users):
    BAG_OF_WORDS[vu['political_party'].upper()].append('@'+vu['_id'])

### Matching of interactions within BOWs

In [None]:
def get_original_interactions(tweet_collection):
    """
    Extracts the Object ID, text and retweet count of originals, replies and quotes
    
    Keyword arguments:
    tweet_collection -- MongoDB Tweets' Collection  
    """
    original_interactions = list(tweet_collection.find(
        filter={'tweet_type': {'$in' : ['original','reply','quote']}}, #  'retweet_count': {'$gte': 100}},
        projection={'_id': 1, 'text':1, 'retweet_count':1},
    ))

    print("Original interactions:",len(original_interactions))    
    return original_interactions

def make_dictionary_keywords(bag_of_words, tweet_id, text, progress=None):
    """
    Checks matching keywords for a specific tweets' text. Returns a boolean dictionary of
    specific keywords and a boolean dictionary summarizing by themes
    
    Keyword arguments:
    bag_of_words -- dictionary of parties/themes as keys, list of associated keywords as values
    tweet_id -- ObjectID of the tweet
    text -- Text of the tweet
    progress -- Progress bar
    """
    
    keywords = {}           # for specific terms
    keywords_summary = {}   # for themes
    if progress is not None:
        pb = progress_bar(bag_of_words.items(), parent=progress)
    else:
        pb = bag_of_words.items()
    

    for party, phashtags in pb:            
        keywords[party] = {}

        # checks for all hashtags, wither as hashtag as it is, 
        # or as the closest unicode character (ì -> i), 
        # or without the # and as the closes unicode character
        for hashtag in phashtags:
            try:
                match = re.search(hashtag, text, re.IGNORECASE) is not None               
                if match:
                    keywords[party][hashtag] = True
                else:
                    if len(hashtag) < 3:
                        continue

                    check = re.sub(r'^[@#]', '', hashtag)
                    match = re.search(check, unidecode(text), re.IGNORECASE) is not None
                    if match:
                        keywords[party][hashtag] = True
                    else:
                        keywords[party][hashtag] = re.search(unidecode(check), unidecode(text), re.IGNORECASE) is not None
            except Exception as ex:
                print(party,hashtag,ex,tweet_id,text)
                keywords[party][hashtag] = False
                continue

        keywords_summary[party] = any(match for match in keywords[party].values())
        
    return (keywords, keywords_summary)

def bows_to_mongodb(bag_of_words,original_interactions,tweet_collection):
    """
    Inserts in DB, for each original, reply or quote, the bag-of-word dictionary containing true/false in every possible keyword depending if they match on the text
    In addition, the bag-of-word of the tweet is propagated to associated retweets.
    
    Keyword arguments:
    bag_of_words -- dictionary of parties/themes as keys, list of associated keywords as values
    original_interactions -- List of tweets' ObjectID and text (originals, replies and quotes)
    tweet_collection -- MongoDB Tweets' Collection  
    """
    operations = []
    for original_tweet in progress_bar(original_interactions):

        tweet_id = original_tweet['_id']
        text = original_tweet['text']

        keywords, keywords_summary = make_dictionary_keywords(bag_of_words, tweet_id, text)

        operations.append(UpdateOne({'_id': tweet_id}, 
                                     {'$set': { 'keywords': keywords,
                                                'keywords_summary': keywords_summary}},
                                     upsert=False))

        if original_tweet['retweet_count']>0:
            operations.append(UpdateMany({'retweet_or_quote_id': tweet_id, 'tweet_type': 'retweet'}, 
                                         {'$set': { 'keywords': keywords,
                                                    'keywords_summary': keywords_summary}},
                                         upsert=False))

        if len(operations) > 25000:
            results = tweet_collection.bulk_write(operations)
            print("Ma:", str(results.matched_count).rjust(8, " "),
                  " Mo:", str(results.modified_count).rjust(8, " ")
                 )
            operations = []

    if len(operations) > 0: 
        results = tweet_collection.bulk_write(operations)
        print("Ma:", str(results.matched_count).rjust(8, " "),
              " Mo:", str(results.modified_count).rjust(8, " "))   

In [None]:
# getting originals, replies and quotes...
original_interactions = get_original_interactions(db.tweets)

In [None]:
# updates database with BOWs
bows_to_mongodb(BAG_OF_WORDS,soriginal_interactions,db.tweets)

## Sentiment analysis extraction

In [None]:
def get_tweets_without_sentiment(tweet_collection):
    """
    Extracts the ObjectID, tweet type, text and retweet count of originals, replies and quotes without sentiment score
    
    Keyword arguments:
    tweet_collection -- MongoDB Tweets' Collection
    """
    
    tweets_without_sentiment = list(tweet_collection.find(
    filter={ 'sentiment_score' : {'$exists' : False },
        'tweet_type': {'$in' : ['original','reply','quote']}},
    projection={'_id': 1, 'tweet_type':1,'text':1,'retweet_count':1},
    ))

    print("Tweets without sentiment extracted:",len(tweets_without_sentiment))
    return tweets_without_sentiment

def get_original_interactions(tweet_collection):
    """
    Extracts the ObjectID, text and retweet count of originals, replies and quotes
    
    Keyword arguments:
    tweet_collection -- MongoDB Tweets' Collection  
    """
    original_interactions = list(tweet_collection.find(
        filter={'tweet_type': {'$in' : ['original','reply','quote']}},
        projection={'_id': 1, 'text':1, 'retweet_count':1},
    ))

    print("Original interactions:",len(original_interactions))    
    return original_interactions

def clean_str(tweet: str):
    """
    Preprocess tweet text before sentiment analysis.
    
    Keyword arguments:
    tweet -- text to be cleaned
    """
    
    def processK(text):
        dirtyKQ = re.compile('[^o]k[i|e]')
        dirtyKC = re.compile('[^o]k[a|o|u]')
        dirtyK = re.compile('[^o]?k')
        while dirtyKQ.search(text)!=None:
            grp = dirtyKQ.search(text).group()
            text = text.replace(grp,grp[0]+'qu'+grp[-1], 1)
        while dirtyKC.search(text)!=None:
            grp = dirtyKC.search(text).group()
            text = text.replace(grp,grp[0]+'c'+grp[-1], 1)
        while dirtyK.search(text)!=None:
            grp = dirtyK.search(text).group()
            if len(grp) > 1:
                text = text.replace(grp,grp[0]+"que", 1)
            else:
                text = text.replace(grp,"que", 1)
        return text

    def replaceAccents(text):
        return unidecode(text)
    
    def remove_user_handlers(text):
        return re.sub('@[\w]+', 'USER', text)
    
    def replaceVerbs(text):
        punctuation = list("[.,:;!?]()")
        if len(text)==0:
            text = text
        else:
            addBack = False
            if text[-1] in punctuation:
                endPunctu = text[-1]
                addBack = True
                text = text[:-1]
            for infinitif in cl.sentimentPipeline.dictConjug.keys():
                foundMatch = any(e in text.replace(' ','_') for e in set(cl.sentimentPipeline.dictConjug[infinitif]))
                if foundMatch==False:
                    pass
                else:
                    text = text.replace(' ','_')
                    matches = [e for e in set(cl.sentimentPipeline.dictConjug[infinitif]) if '_'+e+'_' in x]
                    for e in matches:
                        text = text.replace('_'+e+'_','_'+infinitif+'_')
                    del matches
                    if text.split('_')[0] in set(cl.sentimentPipeline.dictConjug[infinitif]):
                        text = '_'.join([infinitif] + text.split('_')[1:])   
                        
                    if '_'.join(text.split('_')[:2]) in set(cl.sentimentPipeline.dictConjug[infinitif]):
                        text = '_'.join([infinitif] + text.split('_')[2:])
                        
                    if text.split('_')[-1] in set(cl.sentimentPipeline.dictConjug[infinitif]):
                        text = '_'.join(text.split('_')[:-1] + [infinitif])
                        
                    if '_'.join(text.split('_')[-2:]) in set(cl.sentimentPipeline.dictConjug[infinitif]):
                        text = '_'.join(text.split('_')[:-2] + [infinitif])

                    text = text.replace('_',' ')
            if addBack:
                text = text + endPunctu
            return text
        
    x = tweet.lower()
    
    # Remove URL
    x = re.sub(r'https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)', '', x)
    x = str(x).replace('\r','').replace('\n','')
    
    # Process emojies
    placeholder_emoji = 'BASTIONEMOJIBASTION'
    regex_emoji = re.compile(r':\w+:')
    x = emoji.demojize(x)
    emojies = regex_emoji.findall(x)
    x = regex_emoji.sub(placeholder_emoji, x)
    
    x = replaceAccents(x)
    #x = remove_user_handlers(x)
    x = cl.sentimentPipeline.processNumbers(x)
    x = cl.sentimentPipeline.processDetails(x)
    x = cl.sentimentPipeline.processRep(x)
    x = cl.sentimentPipeline.processJaja(x)
    x = cl.sentimentPipeline.processSpaces(x)
    x = cl.sentimentPipeline.processPoint(x)
    x = cl.sentimentPipeline.processExps(x)
    x = processK(x)
    x = replaceVerbs(x)
    tokens = cl.sentimentPipeline.word_tokenize(x)

    result = ' '.join(str(re.sub('[¿?;,¡!`~"#@\(\)\'.]','',' '.join(tokens))).split()).replace(cl.sentimentPipeline.uglySeparator,'_')
    
    # Reinsert emojies
    for match in emojies:
        result = result.replace(placeholder_emoji, match, 1)
    return result

def sentiment_analysis_to_mongodb(original_tweets,tweet_collection):
    """
    Updates the DB with sentiment score of each tweet, propagating it to associated retweets
    
    Keyword arguments:
    original_tweets -- List of tweets' ObjectID and text (originals, replies and quotes)
    tweet_collection -- MongoDB Tweets' Collection  
    """
    operations=[]
    for original_tweet in progress_bar(original_tweets):
        tweet_id = original_tweet['_id']
        text = original_tweet['text']

        try:
            clean = clean_str(text)       
        except Exception as ex:
            print(original_tweet, ex, end=";")
            if not re.sub(r'https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)', '', text).strip():
                print("Tweet", tweet_id, " is empty. Ignoring")
            else:
                print("Catastrophic failure with tweet", tweet_id, text, ex)

            operations.append(UpdateOne({'_id': tweet_id}, 
                        {'$unset': { 'sentiment_score': "", 'clean_text':""}},
                         upsert=False
                        ))
            operations.append(UpdateMany({'retweet_or_quote_id': tweet_id, 'tweet_type':'retweet'}, 
                         {'$unset': { 'sentiment_score': ""}},
                         upsert=False
                        ))
            continue

                        
        sentiment_score = clf.predict(clean)

        operations.append(UpdateOne({'_id': tweet_id}, 
                             {'$set': { 'clean_text': clean,
                                        'sentiment_score': sentiment_score}},
                             upsert=False
                            ))

        operations.append(UpdateMany({'retweet_or_quote_id': tweet_id, 'tweet_type':'retweet'}, 
                             {'$set': { 'sentiment_score': sentiment_score}},
                             upsert=False
                            ))

        if len(operations) > 10000:
            results = tweet_collection.bulk_write(operations)
            print("Ma:", str(results.matched_count).rjust(8, " "),
                  " Mo:", str(results.modified_count).rjust(8, " "))
            operations = []

    if len(operations) > 0: 
        results = tweet_collection.bulk_write(operations)
        print("Ma:", str(results.matched_count).rjust(8, " "),
              " Mo:", str(results.modified_count).rjust(8, " "))

#### Execution of the sentiment analysis and database update

In [None]:
original_interactions = get_original_interactions(db.tweets)

In [None]:
%%time
sentiment_analysis_to_mongodb(original_interactions,db.tweets)

## Anonymization


### Calculate UUIDs for the users

In [None]:
%%time
pipeline = [
        {
            '$project': {
                '_id':1 
            }
        }
    ]
    
print("Query", end=" ")
users = db.users.aggregate(pipeline, allowDiskUse=True)
print("OK; List", end=" ")
users = list(users)
users = [u['_id'] for u in users]
print("OK; Total records:", len(users))

uuids = [uuid.uuid4() for _ in users]

print("Total users: ", len(users))

In [None]:
%%time

users_operations = []
tweets_operations = []
for u,uid in progress_bar(list(zip(users,uuids))):
    users_operations.append(UpdateOne({'_id': u}, {'$set': {'uuid': uid}}))
    tweets_operations.append(UpdateMany({'user_id': u}, {'$set': {'user_id': uid}}))
    tweets_operations.append(UpdateMany({'retweet_or_quote_user_id': u}, {'$set': {'retweet_or_quote_user_id': uid}}))
    tweets_operations.append(UpdateMany({'in_reply_to_user_id': u}, {'$set': {'in_reply_to_user_id': uid}}))
    

    if len(tweets_operations) > 10000:
        try:
            results = db.users.bulk_write(users_operations)
            print("USERS: Ma:", str(results.matched_count).rjust(8, " "), " Mo:", str(results.modified_count).rjust(8, " "), end="; ")
        except BulkWriteError as bwe:
            print(bwe.details)
            break
        try:
            results = db.tweets.bulk_write(tweets_operations)
            print("TWEETS: Ma:", str(results.matched_count).rjust(8, " "), " Mo:", str(results.modified_count).rjust(8, " "))
        except BulkWriteError as bwe:
            print(bwe.details)
            break
        users_operations = []
        tweets_operations = []

if len(tweets_operations) > 0: 
    results = db.users.bulk_write(users_operations)
    print("USERS: Ma:", str(results.matched_count).rjust(8, " "), " Mo:", str(results.modified_count).rjust(8, " "), end="; ")
    results = db.tweets.bulk_write(tweets_operations)
    print("TWEETS: Ma:", str(results.matched_count).rjust(8, " "), " Mo:", str(results.modified_count).rjust(8, " "))
    users_operations = []
    tweets_operations = []

### Calculate UUIDs for the tweets

In [None]:
%%time
pipeline = [
        {
            '$project': {
                '_id':1 
            }
        }
    ]
    
print("Query", end=" ")
tweets = db.tweets.aggregate(pipeline, allowDiskUse=True)
print("OK; List", end=" ")
tweets = list(tweets)
tweets = [t['_id'] for t in tweets]
print("OK; Total records:", len(tweets))

tuids = [uuid.uuid4() for _ in tweets]

In [None]:
%%time
tweets_operations = []
for u,uid in progress_bar(list(zip(tweets,tuids))):
    tweets_operations.append(UpdateOne({'_id': u}, {'$set': {'uuid': uid}}))
    tweets_operations.append(UpdateMany({'in_reply_to_status_id': u}, {'$set': {'in_reply_to_status_id': uid}}))
    tweets_operations.append(UpdateMany({'retweet_or_quote_id': u}, {'$set': {'retweet_or_quote_id': uid}}))

    if len(tweets_operations) > 10000:
        try:
            results = db.tweets.bulk_write(tweets_operations)
            print("TWEETS: Ma:", str(results.matched_count).rjust(8, " "), " Mo:", str(results.modified_count).rjust(8, " "))
        except BulkWriteError as bwe:
            print(bwe.details)
            break
        tweets_operations = []

if len(tweets_operations) > 0: 
    results = db.users.bulk_write(users_operations)
    print("USERS: Ma:", str(results.matched_count).rjust(8, " "), " Mo:", str(results.modified_count).rjust(8, " "), end="; ")
    results = db.tweets.bulk_write(tweets_operations)
    print("TWEETS: Ma:", str(results.matched_count).rjust(8, " "), " Mo:", str(results.modified_count).rjust(8, " "))
    users_operations = []
    tweets_operations = []

#### Swap users' IDs

In [None]:
%%time
print("Query", end=" ")
users = db.users.find({})
print("OK; List", end=" ")
users = list(users)
print("OK; Total records:", len(users))

users_uuids_map = {}
for u in progress_bar(users):
    users_uuids_map[u['_id']] = u['uuid']

In [None]:
insert_operations = []
delete_operations = []
exceptions = []

for u in progress_bar(users):
    try:
        user = u.copy()

        try:
            user['_id'] = u['uuid']
            del(user['uuid'])
        except Exception as ex:
            print("User",u['_id'],"- Exception: ", type(ex).__name__, ex, "Added to exception list")
            exceptions.append(u)
            continue

        try:
            del(user['scores']['user'])
        except Exception as ex:
            if type(ex).__name__ == "KeyError":
                pass
            else:
                print("User",u['_id'],"- Exception: ", type(ex).__name__, ex, "Added to exception list")
                exceptions.append(u)
                continue
                
        try:            
            if 'followers' in u:
                user['followers'] = []
                for f in u['followers']:
                    if f in users_uuids_map:
                        user['followers'].append(users_uuids_map[f])
            if 'friends' in u:
                user['friends'] = []
                for f in u['friends']:
                    if f in users_uuids_map:
                        user['friends'].append(users_uuids_map[f])
        except Exception as ex:
            print("User",u['_id'],"- Exception: ", type(ex).__name__, ex, "Added to exception list")
            exceptions.append(u)
            break

        insert_operations.append(InsertOne(user))
        delete_operations.append(DeleteOne({'_id': u['_id']}))
        
    except Exception as ex:
        print("User",u['_id'],"- Exception: ", type(ex).__name__, ex, "Added to exception list")
        exceptions.append(u)
        continue

In [None]:
%%time
results = db.users.bulk_write(insert_operations)
print("USERS: Ma:", str(results.matched_count).rjust(8, " "), 
      " In:", str(results.inserted_count).rjust(8, " "),
      " Mo:", str(results.modified_count).rjust(8, " "),  
      " De:", str(results.deleted_count).rjust(8, " "), 
      end="; ")

In [None]:
%%time
results = db.users.bulk_write(delete_operations)
print("USERS: Ma:", str(results.matched_count).rjust(8, " "), 
      " In:", str(results.inserted_count).rjust(8, " "),
      " Mo:", str(results.modified_count).rjust(8, " "),  
      " De:", str(results.deleted_count).rjust(8, " "), 
      end="; ")

#### Check for potentially missed ObjectIDs

In [None]:
%%time
print("Query", end=" ")
tweets = db.tweets.find({})
print("OK; List", end=" ")
tweets = list(tweets)
print("OK; Total records:", len(tweets))

tweets_uuids_map = {}
for t in progress_bar(tweets):
    tweets_uuids_map[t['_id']] = t['uuid']

In [None]:
update_operations = []
changes = {}
changes['in_reply_to_status_id'] = 0
changes['retweet_or_quote_id'] = 0

pb = progress_bar(oids)
for oid in pb:
    for var in ['in_reply_to_status_id', 'retweet_or_quote_id']:
        if oid[var] is not None and not isinstance(oid[var], uuid.UUID):
            try:
                tuid = tweets_uuids_map.get(oid[var], uuid.uuid4())
                update_operations.append(UpdateMany({var: oid[var]}, {'$set': {var: tuid}}))
                tweets_uuids_map[oid[var]] = tuid
                changes[var] += 1
            except Exception as ex:
                print("Tweet",oid['_id']," (", var, ")\t Exception: ", type(ex).__name__, ex)
                break
    pb.comment = "Updates: " + str(len(update_operations))
    
print("OK; Total records:", len(update_operations), 
      " of which: ", changes['in_reply_to_status_id'], "in_reply_to_status_id and", changes['retweet_or_quote_id'], "retweet_or_quote_id")

In [None]:
%%time
results = db.tweets.bulk_write(update_operations)
print("TWEETS: Ma:", str(results.matched_count).rjust(8, " "), 
      " In:", str(results.inserted_count).rjust(8, " "),
      " Mo:", str(results.modified_count).rjust(8, " "),  
      " De:", str(results.deleted_count).rjust(8, " "), 
      end="; ")

#### Strip information from the tweets and swap IDs

In [None]:
print("Query", end=" ")
tweets = db.tweets.find({},{
            'keywords_summary': True, 
            'sentiment_score': True, 
            'created_at': True, 
            'favorite_count': True, 
            'in_reply_to_status_id': True, 
            'in_reply_to_user_id': True, 
            'retweet_count': True, 
            'retweet_or_quote_id': True, 
            'retweet_or_quote_user_id': True, 
            'tweet_type': True, 
            'user_id': True,
            'uuid': True,
        })
print("OK; List", end=" ")
tweets = list(tweets)
print("OK; Total records:", len(tweets))

In [None]:
insert_operations = []
delete_operations = []
exceptions = []
pb = progress_bar(tweets)
for t in pb:
    
    if 'uuid' not in t:
        if t['_id'] in tweets_uuids_map:
            t['uuid'] = tweets_uuids_map[t['_id']]
        else:
            exceptions.append(t)
            pb.comment = "Exceptions: " + str(len(exceptions))
            print(t)
            break 
    
    try:
        tweet = t.copy()
        
        try:
            tweet['_id'] = t['uuid']
            del(tweet['uuid'])
        except Exception as ex:
            #print("Tweet",t['_id'],"- Exception: ", type(ex).__name__, ex, "Added to exception list")
            exceptions.append(t)
            pb.comment = "Exceptions: " + str(len(exceptions))
            continue
            
        
        try:
            # Replace timestamp information to avoid precise tweet identification
            tweet['created_at'] = t['created_at'].replace(hour=12, minute=0, second=0, microsecond=0)
        except Exception as ex:
            #print("Tweet",t['_id'],"- Exception: ", type(ex).__name__, ex, "Added to exception list")
            exceptions.append(t)
            pb.comment = "Exceptions: " + str(len(exceptions))
            continue
        

        insert_operations.append(InsertOne(tweet))
        delete_operations.append(DeleteOne({'_id': t['_id']}))
        
    except Exception as ex:
        print("Tweet",t['_id'],"- Exception: ", type(ex).__name__, ex, "Added to exception list")
        exceptions.append(t)
        continue

In [None]:
%%time
results = db.tweets.bulk_write(insert_operations)
print("TWEETS: Ma:", str(results.matched_count).rjust(8, " "), 
      " In:", str(results.inserted_count).rjust(8, " "),
      " Mo:", str(results.modified_count).rjust(8, " "),  
      " De:", str(results.deleted_count).rjust(8, " "), 
      end="; ")

In [None]:
%%time
results = db.tweets.bulk_write(delete_operations)
print("TWEETS: Ma:", str(results.matched_count).rjust(8, " "), 
      " In:", str(results.inserted_count).rjust(8, " "),
      " Mo:", str(results.modified_count).rjust(8, " "),  
      " De:", str(results.deleted_count).rjust(8, " "), 
      end="; ")