# Program: download tweets using web scraping

## Requirements to run the program

This program requires:
<br>
- creating a directory named "scraped_data" in the directory where this code is saved
<br>
- installing chrome
<br>
- being a user of the MongoDB Atlas*
<br>
- having X accounts with the language and privacy settings set to what is specified below

<br>
*: The cloud is used to be able to run several instances of the program at the same time without interference. The names of the already processed users are uploaded to the cloud and before selecting a user to process the cloud is checked. If the program is not going to be run in parallel then it should be possible to modify this and save the list of processed users locally.

##### User accounts requirements

Change the **language** settings to English: three dots->settings and privacy->accessibility, display and languages
<br>
Have your settings of **privacy and safety**->content you see->search settings so that you can see everything

##### Code disclaimers

- The code can be run in headless mode and it will run faster (uncomment the headless setting in the cell that sets the chromedriver settings). However, web scraping is highly dependent on updates and small changes that can cause failures. If you can't see where it stopped or what went wrong, debugging can be difficult. That said, the code is designed to be safely interrupted or stopped at any time, ensuring that progress is securely saved.
<br>
- Using a date range very far in the past is highly inadvisable because the scrolling down will take very long and it gets to a point where X doesn't display any more tweets. 

## Parameters to modify

In [None]:
# Initialize the list: modify this values
# Important: the first user should be the one we will first want to change to
user_name_list = ["user1", "user2"]
passwords = ["password1", "password2"]

In [None]:
from datetime import datetime
# Set the target dates range (stop scrolling when this date is reached)
# both endpoints are included in the selected interval

# Election day: Tuesday, November 5, 2024
recent_date_limit = datetime.strptime('2024-11-08', '%Y-%m-%d')
old_date_limit = datetime.strptime('2024-10-28', '%Y-%m-%d')

# number of consecutive tweets that we allow to be older than the oldest date limit without stopping (I deleted the logic of the first pinned tweet because this already covers that too)
max_out_of_date = 5

# maximum number of tweets we read per user for our date range
max_tweets = 500

In [None]:
# We get the users we want to investigate
# this should contain a list with an attribute "screen_name" with the user name for each user
url_ucm = "example_url.zip"

In [None]:
# MongoDB personal server data
dbname_parameter = "example_dbname"
url_mongo_parameter = "mongodb+srv://example.mongodb.net/"

In [None]:
# We set the debugging mode (True will print more information)
DEBUG = False

## Preliminaries: install and imports

In [None]:
modules = ["selenium","chromedriver_autoinstaller","pymongo","pandas","seaborn","bs4","numpy"]

import sys
import os.path
from subprocess import check_call
import importlib
import os

def instala(modules):
    print("Installing modules")
    for m in modules:
        # for the import we remove [...] y ==...
        p = m.find("[")
        mi = m if p==-1 else m[:p]
        p = mi.find("==")
        mi = mi if p==-1 else mi[:p]
        torch_loader = importlib.util.find_spec(mi)
        if torch_loader is not None:
            print(m,"found")
        else:
            print(m," Not found, installing...",end="")  
            try:        
                r = check_call([sys.executable, "-m", "pip", "install", "--user",  m])
                print("¡done!")
            except:
                print("¡Problem when installing ",m,"! ¿does the module exist?",sep="")

    print("¡Finished!")

instala(modules)  

In [None]:
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait # Wait until the element is present
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException
from selenium.common.exceptions import StaleElementReferenceException

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import random
import time
import uuid

We create a connection with the cloud mongo server with a local client.

In [None]:
from pymongo import MongoClient
from pymongo.errors import BulkWriteError
import sys

def connection(dbname="test",cs = "mongodb://localhost:27017"):
    # Create a connection using MongoClient. You can import MongoClient or use pymongo.MongoClient
    client = MongoClient(cs)
    try:
        s = client.server_info() # if there is an error this will raise an exception
        print("Connected to MongoDB, version",s["version"])
        db = client[dbname]
    except:
        e = sys.exc_info()[0]
        print ("Connection error")
        print(e)
        db=None
    return db

db = connection(dbname=dbname_parameter,cs=url_mongo_parameter)

In [None]:
def user_passwd(driver,name,passwd):
    try:
            # Wait until the element is present and visible
            elemento = WebDriverWait(driver, 10).until(
                EC.presence_of_element_located((By.NAME, 'text'))
            )
            # Click on the element
            elemento.send_keys(name)
    except Exception as e:
            print(f"There was an error in login: {e}")
    
    clase = "css-175oi2r r-sdzlij r-1phboty r-rs99b7 r-lrvibr r-ywje51 r-184id4b r-13qz1uu r-2yi16 r-1qi8awa r-3pj75a r-1loqt21 r-o7ynqc r-6416eg r-1ny4l3l"
    
    try:
        # Wait until the element is present and visible
        elemento = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, '//button[@class="'+clase+'"]'))
        )
        # Click on the element
        elemento.click()
    except Exception as e:
        print(f"There was an error in login: {e}")
    ## Password
    try:
        # Wait until the element is present and visible
        elemento = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, '//input[@type="password"]'))
        )
        # Click on the element
        elemento.send_keys(passwd)
    except Exception as e:
        print(f"There was an error in login: {e}")
    clase = "css-175oi2r r-sdzlij r-1phboty r-rs99b7 r-lrvibr r-19yznuf r-64el8z r-1fkl15p r-1loqt21 r-o7ynqc r-6416eg r-1ny4l3l"
    try:
        # Wait until the element is present and visible
        elemento = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, '//button[@class="'+clase+'"]'))
        )
        # Click on the element
        elemento.click()
    except Exception as e:
        print(f"There was an error in password: {e}")   
        
def migration(driver):
    ## data migration
    try:
        # Wait until the element is present and visible
        elemento = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, '//button[@data-testid="xMigrationBottomBar"]'))
        )
        # Click on the element
        elemento.click()
    except Exception as e:
        print(f"Data migration banner not found, skipped")   

def cookies(driver):
    try:
        # Wait until the element is present and visible
        cookies = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.XPATH, '//div[@data-testid="BottomBar"]'))
        )
        botones = cookies.find_elements(By.TAG_NAME, "button")
        botones[-1].click() # rechazar es el último
    except Exception as e:
        print(f"There was an error in banner cookies: {e}")

def resto_logins(driver):
    for i in range(1,len(user_name_list)):
        ### MENÚ INSIDE ACCOUNT
        try:
            # Wait until the element is present and visible
            elemento = WebDriverWait(driver, 10).until(
                EC.presence_of_element_located((By.XPATH, '//button[@aria-label="Account menu"]'))
            )
            # Click on the element
            elemento.click()
        except Exception as e:
            print(f"There was an error in account: {e}")
        time.sleep(1)  
        ### SUBMENU to add existing account
        try:
                # Wait until the element is present and visible
                elemento = WebDriverWait(driver, 10).until(
                    EC.presence_of_element_located((By.XPATH, '//a[@href="/i/flow/login"]'))
                )
                # Click on the element
                elemento.click()
               
        except Exception as e:
                print(f"There was an error in login: {e}")
    
        name =  user_name_list[i]
        passwd = passwords[i]
        user_passwd(driver,name,passwd)
        time.sleep(2)  
        
def login(driver):
     # remove migration banner
    migration(driver)
    # remove cookies banner
    cookies(driver)    
    try:
        # Wait until the element is present and visible
        elemento = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, '//a[@href="/login"]'))
        )
        # Click on the element
        elemento.click()

        # FIRST LOGIN
        name =  user_name_list[0]
        passwd = passwords[0]
        user_passwd(driver,name,passwd)
        migration(driver) # new migration banner
        resto_logins(driver)
       
    except Exception as e:
        print(f"There was an error in login: {e}")


# Start of Program

In [None]:
def debug_print(*args):
    if DEBUG:
        print(*args)

In [None]:
from bs4 import BeautifulSoup
from selenium import webdriver
import chromedriver_autoinstaller
from selenium.webdriver.chrome.options import Options

# We add options to minimize resource usage
# chrome_options = webdriver.ChromeOptions() # default
options = Options()
options.add_argument("--disable-extensions")
options.add_argument("--disable-gpu")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--incognito")

# Running in headless mode can improve stability and performance
# options.add_argument("--headless")

# set path to chromedriver as per your configuration
chromedriver_autoinstaller.install()

# set up the webdriver
driver = webdriver.Chrome(options=options)
url = 'https://x.com/' # x lower case is the standard and how twitter saves it
driver.get(url)

Option to run the next cell to perform the login automatically or type it in manually and omit the cell. The automatic login can raise errors if an unknown message is prompt in the navigator that interrupts the usual flow.

In [None]:
login(driver)

In [None]:
# Download user list
users_characteristics = pd.read_csv(url_ucm, dtype={'_id':'string'}) # we read it with pandas that supports zip files, we don't use _id now
total_users = users_characteristics["screen_name"].to_list() # Total user list

In [None]:
# Assertions
assert(len(user_name_list) == len(passwords))
assert(recent_date_limit > old_date_limit) # e.g.: 2024 > 2023
assert(len(total_users) == len(set(total_users))) # No duplicates
print(len(total_users))

In [None]:
# Function to get the text of the last tweet with text
def get_last_tweet_text(tweet_elements):
    for tweet in reversed(tweet_elements):
        try:
            tweet_text = tweet.find_element(By.XPATH, './/div[@data-testid="tweetText"]').text
            debug_print(tweet_text)
            return tweet_text
        except Exception:
            time.sleep(1)  # Wait for a second before retrying
            continue  # Retry with the next tweet in the list
    return None  # No tweets with text found

In [None]:
# Returns true if element is present and false if it isn't
def is_element_present_case_insensitive(message):
    # Define accented characters and their replacements
    # in the function translate, characters not found in the first parameter (from) are left unchanged in the output
    accented_chars = 'ÀÁÂÃÄÅàáâãäåÈÉÊËèéêëÌÍÎÏìíîïÒÓÔÕÖØòóôõöøÙÚÛÜùúûüÑñÇçÝŸÿýŠšŽž'
    unaccented_chars = 'AAAAAAaaaaaaEEEEeeeeIIIIiiiiOOOOOOooooooUUUUuuuuNnCcYYyySsZz'
    
    # Include additional accented characters with diacritics
    additional_accented   = 'ÂÊÎÔÛâêîôûÄËÏÖÜäëïöüÀÈÌÒÙàèìòù'
    additional_unaccented = 'AEIOUaeiouAEIOUaeiouAEIOUaeiou'
    
    # Combine all accented characters
    accented_chars += additional_accented
    unaccented_chars += additional_unaccented
    
    # Ensure both strings are of equal length
    assert len(accented_chars) == len(unaccented_chars), "Accent and unaccent strings must be the same length"
    
    # Build the XPath expression
    xpath_expression = f"""
    //span[contains(
        translate(
            translate(
                text(),
                '{accented_chars}',
                '{unaccented_chars}'
            ),
            'ABCDEFGHIJKLMNOPQRSTUVWXYZ',
            'abcdefghijklmnopqrstuvwxyz'
        ),
        '{message.lower()}'
    )]
    """

    try:
        # Wait for the element
        WebDriverWait(driver, 15).until(
            EC.presence_of_element_located((By.XPATH, xpath_expression))
        )
        return True
    except Exception:
        return False

In [None]:
def do_user_checks(user): # returns true if checks failed
    
    # Wait until url of the current user is fully loaded
    if not is_element_present_case_insensitive("@"+user):
        print("The user profile didn't load or an exception occurred.")
        return True

    # We make sure that the user provided has an account and hasn't been suspended or erased
    try:
        driver.find_element(By.XPATH, "//*[text()='This account doesn’t exist' or text()='Account suspended']")
        print("User is not available (doesn't exist, has no posts or was suspended)")
        return True # if the user doesn't exist we return empty dataframe
    except NoSuchElementException:
        pass

    # We make sure that the user provided has a public account
    try:
        driver.find_element(By.XPATH, "//*[text()='These posts are protected' or text()='0 posts']")
        print("User has a private account or no posts")
        return True
    except NoSuchElementException:
        pass

    try:
        driver.find_element(By.XPATH, "//*[text()='Caution: This profile may include potentially sensitive content' or text()='Caution: This account is temporarily restricted']")
        print("User has sensitive content")
        try:
            # Locate the button by its data-testid attribute
            button = driver.find_element(By.XPATH, "//button[@data-testid='empty_state_button_text']")
            
            # Check if the span inside has the expected text
            button.find_element(By.XPATH, ".//span[text()='Yes, view profile']")
            
            # Click the button if the span matches
            button.click()
            time.sleep(1)
        except Exception:
            print(f"Could not click the button")
            return True
    except NoSuchElementException:
        pass
        
    return False

In [None]:
def scroll_down(last_tweet_before_scroll):
    driver.execute_script("window.scrollBy(0, 3 * window.innerHeight);")
    if last_tweet_before_scroll is not None: # If no tweets are found before scrolling, just scroll without waiting
        debug_print("Tweet after scroll:")
        try:
            # Wait until a new tweet with text appears
            WebDriverWait(driver, 10).until(
                lambda d: get_last_tweet_text(
                    d.find_elements(By.XPATH, '//article[@data-testid="tweet"]')
                ) != last_tweet_before_scroll
            )
        except Exception:
            # We try one more time just in case it is a coincidence (e.g.: two different tweets beginning with the same text)
            try:
                driver.execute_script("window.scrollBy(0, 5 * window.innerHeight);")
                WebDriverWait(driver, 15).until(
                    lambda d: get_last_tweet_text(
                        d.find_elements(By.XPATH, '//article[@data-testid="tweet"]')
                    ) != last_tweet_before_scroll
                )
            except Exception as error:            
                if is_element_present_case_insensitive('Something went wrong. Try reloading.'): # returns true or false
                    print("SOMETHING WENT WRONG MESSAGE")
                    raise error
                else:
                    print("No new tweets loaded after scrolling, the new last tweet is the same to the old one")
                    return True
    return False

In [None]:
def scrap(user):
    df_empty = pd.DataFrame({
        'User': pd.Series(dtype='str'),
        'Tweet': pd.Series(dtype='str'),
        'Date': pd.Series(dtype='datetime64[ns]')
    })
        
    url = 'https://x.com/'
    debug_print("------------------------------------------------")
    debug_print("------------------------------------------------")
    debug_print("------------------------------------------------")
    print(url+user)
    driver.get(url+user)

    failed = do_user_checks(user)

    if failed: 
        return df_empty
    
    # Initialize lists to store data for this user
    texts = []
    dates = []
    
    # Keep track of seen tweet to avoid duplicates when we scroll down
    # Tweets ID change when we scroll down so we consider two tweets to be the same if they have the same date and text
    seen_tweets = set() # set of pairs (text, date)

    end = False
    
    counter_out_of_date = 0 # number of consecutive tweets that are out of date

    # We only wait for the tweets to load the first time
    try:
        tweet_containers = WebDriverWait(driver, 10).until(
            EC.presence_of_all_elements_located((By.XPATH, '//article[@data-testid="tweet"]'))
        )
    except Exception as error: # this could be that we receive the "Something went wrong message"
        if is_element_present_case_insensitive('Something went wrong. Try reloading.'):
            print("SOMETHING WENT WRONG MESSAGE")
            raise error
        else:
            print("Exception occured, tweets didn't load")
            return df_empty
        
    while True:

        # Find parent containers of tweet text
        tweet_containers = driver.find_elements(By.XPATH, '//article[@data-testid="tweet"]')
        debug_print(f"tweet containers length: {len(tweet_containers)}")
        debug_print(f"Last tweet before scroll:")
        last_tweet_before_scroll = get_last_tweet_text(tweet_containers) # it will be none if no tweets with text (or no tweets at all)
        
        
        for container in tweet_containers:
            # A container only has 1 tweet
            # Extract the tweet text
            try:
                text_box = container.find_element(By.XPATH, './/div[@data-testid="tweetText"]') # very important this "." for the relative address inside the container
                
                # Extract the text elements in a tweet (maybe more than one because of @tags) that are in span containers
                # We also get the emoji (get attribute 'alt') or if it is more interesting the emoji 'title' e.g. "Two hearts"
                text_elements = text_box.find_elements(By.XPATH, './/span | .//img')
                tweet_text = "".join([(elem.get_attribute('alt') if elem.tag_name == 'img' else elem.text) for elem in text_elements])
                
            except:
                continue # skip if text not found, this will skip the tweets with just pictures

            try:
                date_element = container.find_element(By.XPATH, './/time')
                tweet_date_str = date_element.get_attribute('datetime')
                tweet_date = datetime.strptime(tweet_date_str.split('T')[0], '%Y-%m-%d')
            except:
                continue

            # Check tweet to avoid duplicates
            if (tweet_text, tweet_date) in seen_tweets:
                continue
                
            if len(texts) >= max_tweets:
                print("we have passed the tweet limit for this user")
                end = True
                break

            seen_tweets.add((tweet_text, tweet_date))
            
            # Check if we've passed the oldest date allowed and increment counter
            if tweet_date < old_date_limit:
                counter_out_of_date += 1
                if counter_out_of_date == max_out_of_date:
                    end = True
                    break
            elif tweet_date > recent_date_limit: # We haven't reached the date range yet, the current dates are too recent
                counter_out_of_date = 0 # We don't save this tweet
            else:
                counter_out_of_date = 0
                texts.append(tweet_text)
                dates.append(tweet_date)
    
        # If we escaped the for loop is because we have already checked all containers of the current screen view
        # or because we reached the target date or there are more tweets than the max allowed
        if end:
            break
        else:
            try:
                failed = scroll_down(last_tweet_before_scroll)
            except Exception as error:
                raise error
                
            if failed:
                break
                
    if len(texts) == 0:
        return df_empty
        
    df_temp = pd.DataFrame({
        'User': [user] * len(texts),
        'Tweet': texts,
        'Date': dates
    })

    return df_temp

In [None]:
def save_data_cloud(list_users_tweetdf, users, processed_users, total_users):
    now = datetime.now()
    formatted_time = now.strftime("%d-%m-%Y %H:%M")  
    print(formatted_time + " saving...", end=" ")
    df_users_tweetdf = pd.concat(list_users_tweetdf) # concatenates the dfs in the list
    
    unique_id = uuid.getnode() # Obtiene el número MAC de la máquina 

    # In the csv name we use a random number at the end because otherwise there is a small chance that we have two files with the same name and different content
    df_users_tweetdf.to_csv(f"scraped_data/{recent_date_limit.year}_{users[0]}_{unique_id}.csv", index=False, encoding="utf-8") # it saves it with the name of the first user of the list. Pollars doesn't save indexes. encoding="utf-8" doesn't exist. In pollars:  df_users_tweetdf.write_csv(f"scraped_data/{recent_date_limit.year}_{users[0]}.csv")
    docs = [{"_id":name} for name in users] # creates a list of dictionaries with keys "_id" and value the names
    processed_users.extend(users)

    # This method transforms the DataFrame into a list of dictionaries, where each dictionary represents a row with column names as keys
    data_records = df_users_tweetdf.to_dict(orient='records')
    
    try:
        db[f"processed{recent_date_limit.year}"].insert_many(docs, ordered=False) # if a document (dictionary) contains a duplicate unique key => error because insert_many() does not skip duplicates but because of ordered=false the server continues inserting records
    except BulkWriteError: # ordered=False allows MongoDB to skip duplicates during insertion but still raises a BulkWriteError at the end if there are duplicates
        pass  # Only new documents will be inserted, and duplicates will be left as they are in the database

    if data_records: # if it is not empty
        try:
            db["scraped_data"].insert_many(data_records, ordered=False)
        except BulkWriteError:
            pass
        
    # it is possible to have a user in the local csv of many computers because of concurrency (we will deal with duplicates later)
    list_users_tweetdf.clear() # modifies the current reference, while = [] assigns it to a new empty list
    users.clear()
    print(f"{len(processed_users)}/{len(total_users)} ", end=" - ")

In [None]:
import winsound

def play_beep():
    winsound.Beep(1001, 1000)  # Frequency, Duration
    winsound.Beep(1001, 1000)
    winsound.Beep(1001, 1000)

In [None]:
processed_users = [ doc["_id"]  for doc in db[f"processed{recent_date_limit.year}"].find()] # Already treated users in Atlas

print(f"{len(processed_users)}/{len(total_users)}")
print(f"Are there duplicates in processed_users: {len(processed_users) != len(set(processed_users))}")

max_users_saving = 10 # max number of users we can read before we save them to a local file and update their names to the db
list_users_tweetdf = [] # list of dataframes, one df per user with all its tweets and dates inside. It is more memory efficient because we avoid the intermediate copies of concatenating dfs as we go.
users = [] # list of users we are going to be processing locally now

user_name_counter = 0

while len(total_users) > (len(processed_users) + len(users)):
    
    difference = list(set(total_users) - (set(processed_users) | set(users))) # the union is with |
    if not difference:
        print("No more users to process. Exiting loop.")
        break  # Exit loop if there's nothing left to process
    
    user = random.choice(difference)
    time.sleep(random.randint(5, 30))

    try:
        df = scrap(user)
        list_users_tweetdf.append(df)
        users.append(user)
    except Exception:
        print("Process was interrupted")
        print(f"Changing user to @{user_name_list[user_name_counter]}")
        time.sleep(random.randint(1,30)+10) # to trick the website
        
        account_menu_button = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.CSS_SELECTOR, "button[data-testid='SideNav_AccountSwitcher_Button']"))
        )
        account_menu_button.click()
        time.sleep(1)  # Small delay to ensure the menu loads

        try:
            # Try to find and click the "Switch to account" button
            account_switch_button = WebDriverWait(driver, 10).until(
                EC.element_to_be_clickable((By.CSS_SELECTOR, f"button[aria-label='Switch to @{user_name_list[user_name_counter]}']"))
            )
            account_switch_button.click()
        
            time.sleep(3)
        
            # Increment counter in round-robin fashion
            user_name_counter += 1
            if user_name_counter >= len(user_name_list):
                user_name_counter = 0  # Reiniciar al valor inicial

            # Check that we haven't passed the tweet limit for the day
            try:
                driver.find_element(By.XPATH, "//*[text()='Unlock more posts by subscribing']")
                print("We have reached the limit for seeing posts today")
                break
            except NoSuchElementException:
                pass

        
        except Exception:
            if is_element_present_case_insensitive('Something went wrong. Try reloading.'):
                print("Something went wrong and it blocked all users")
                break # comment here if you want the program to go to sleep and try again later
                time.sleep(60 * 10)
                user_name_counter += 1 # in case it wants to change to its own user
            else:
                print("We don't know what happened")
                break        
    
    # time to update the server with the new processed users' names and locally save the tweets
    if len(users) >= max_users_saving: 
        save_data_cloud(list_users_tweetdf, users, processed_users, total_users)


# if we have finished processing everyone and there are users left to save because they don't amount to max_users_saving
if len(users) > 0:
    save_data_cloud(list_users_tweetdf, users, processed_users, total_users)

play_beep()

In [None]:
driver.close() # uncomment if you want to restart the program, otherwise rerun previous cell with all accounts open