## Needed Modules

In [1]:
import pandas as pd
from bs4 import BeautifulSoup
import json
import time
import numpy as np
from tqdm import tqdm
from typing import *
import re
import os
import glob
from IPython.display import clear_output

# Scrapping and crawling modules
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from user_agent import generate_user_agent
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from datetime import datetime, timedelta
from urllib.parse import quote

In [2]:
#Credentials
with open("Credentials/twitter.json", "r") as f:
    credentials = json.load(f)

username = credentials["username"]
password = credentials["password"]
mail = credentials["email"]

In [3]:
def getTime(str):
    '''
    This function is used to convert string of datetime in isoformat to datetime object
    Params:
        str:
            - String of datetime in isoformat
    return:
        date_time_obj:
            - Datetime object
    '''
    date_time_obj = datetime.fromisoformat(str)
    date_time_obj =date_time_obj + timedelta(hours=7)
    return date_time_obj

def minOneDay(str):
    '''
    This function is used to subtract one day from the given date
    Params:
        str:
            - String of datetime in isoformat
    return:
        str:
            - String of datetime in isoformat after subtracting one day
    '''
    str = datetime.strptime(str, "%Y-%m-%d")
    str = str - timedelta(days=1)
    str = str.strftime("%Y-%m-%d")
    return str

def wait(timeout: int = 10):
    '''
    just a glorified simple function to wait for a certain amount of time
    '''
    for i in tqdm(range(timeout), desc="Waiting"):
        time.sleep(1)
    clear_output()  

In [4]:
# TODO: Make it into OOP 

class twitterScrapper:
    def __init__(self, username, password, email):
        '''
        This function is used to initialize the class
        Params:
            username:
                - Username of the twitter account
            password:
                - Password of the twitter account
            email:
                - Email of the twitter account
        '''
        self.username = username
        self.password = password
        self.email = email
        self.theDict = { "User" : [], "Date" : [], "Text" : [],
                         "Reply": [], "Repost": [], "Like": [], "View": []}
        self.login()
    
    def login(self):
        '''
        This function is used to initialize the driver and login to the twitter account
        '''
        loginURL = 'https://x.com/i/flow/login?redirect_after_login=%2Fsearch%3Fq%3Disrael%26src%3Dtyped_query%26f%3Dlive%26mx%3D2'
        
        # Initialize the driver
        usergAgent = generate_user_agent(device_type="desktop", os="win", navigator="chrome", platform="win")
        options = Options()
        options.add_argument(f'user-agent={usergAgent}')
        self.driver = webdriver.Chrome()
        self.driver.get(loginURL)

        # Login handling
        WebDriverWait(self.driver, 30).until(EC.presence_of_element_located((By.XPATH, "//input[@autocomplete = 'username']")))
        self.driver.find_element(By.XPATH, "//input[@autocomplete = 'username']").send_keys(self.username)
        self.driver.find_element(By.XPATH, "//div/button[2]").click()
        time.sleep(5)               # This will wait for the next login pop-up to appear

        # if email is neeeded. This means you logged a lot to the account and X raises a suspicious login attempt.
        try:
            if self.driver.find_element(By.XPATH, "//div[1]/div/h1/span/span").text == "Enter your phone number or email address":
                print("Suspicious login attempt detected, attempting to enter email on login prochedures.")
                self.driver.find_element(By.XPATH, "//input").send_keys(self.email)
                self.driver.find_element(By.XPATH, "//div[2]/div/div/div/button").click()
        except:
            pass
        
        # Put password
        WebDriverWait(self.driver, 30).until(EC.presence_of_element_located((By.XPATH, "//input[@name='password']")))
        self.driver.find_element(By.XPATH, "//input[@name='password']").send_keys(self.password)
        self.driver.find_element(By.XPATH, "//button[@data-testid='LoginForm_Login_Button']").click()
        time.sleep(5)
        print("zoom out 25%!")      # Please zoom out 25% manually in order to get more tweets loaded
        print("Login sucess!")
        wait(10)
    
    def searchAndscrap(self, keyword, startDate, endDate, continueifTimeout = True):
        '''
        This function is used to load the post and scrap the data
        Params:
            keyword:
                - Keyword to search need to be encoded beforehand
            startDate:
                - Start date of the search
            endDate:
                - End date of the search
        '''
        notTimeout = True
        notDateReached = True 
        
        while True:
            if not notDateReached:
                print("startDate has been reached!")
                pd.DataFrame(self.theDict).to_csv(f'Completed.csv', index=False)
                break
            
            untilDate = endDate
            searchLink = f"https://x.com/search?q={keyword}%20until%3A{untilDate}%20since%3A{startDate}&src=typed_query&f=live"
            self.driver.get(searchLink)
            
            # This will check if scrapping is detected, if it is. It'll wait for 20 mins
            if continueifTimeout:
                try:
                    #   This is essential to check if the scrapping is detected
                    WebDriverWait(self.driver, 5).until(EC.presence_of_element_located((By.XPATH, '//div[@aria-label="Home timeline"]/div/div/div/span[@style="text-overflow: unset;"]')))
                    print("Saving!")
                    pd.DataFrame(self.theDict).to_csv(f'Savepoints/{untilDate}.csv', index=False)
                    print("Scrapping detected, waiting for 15 mins")
                    for i in tqdm(range(900), desc="Waiting"):
                        time.sleep(1)
                    clear_output()
                    continue
                except:
                    pass
            if not continueifTimeout:
                try:
                    WebDriverWait(self.driver, 5).until(EC.presence_of_element_located((By.XPATH, '//div[@aria-label="Home timeline"]/div/div/div/span[@style="text-overflow: unset;"]')))
                    notTimeout = False
                except:
                    pass
                if not notTimeout: break
            
            # This will wait for the page to load, if nothing exists. Minus one day and repeat
            try:
                WebDriverWait(self.driver, 20).until(EC.presence_of_element_located((By.XPATH, "//div[@data-testid='cellInnerDiv']")))   # Initialize
            except:
                endDate = minOneDay(endDate)
                continue
            
            last_height = self.driver.execute_script("return document.body.scrollHeight")

            while True:

                elements = self.driver.find_elements(By.XPATH, '//div[@aria-label="Timeline: Search timeline"]/div/div')
                
                for element in elements[:-1]:
                    
                    # Check if text exists, if not then continue.  
                    try:
                        text = ''.join([i.text for i in element.find_elements(By.XPATH, './/div[@data-testid="tweetText"]/span')])
                    except:
                        continue
                    if text in self.theDict["Text"]:
                        continue
                    
                    
                    # TODO: Get date and check if theDate is lower than startDate. Break entire loop
                    theDate = getTime(element.find_element(By.XPATH, './/time').get_attribute("datetime"))
                    
                    # append text
                    self.theDict["Text"].append(text)
                    
                    # append user
                    self.theDict["User"].append(element.find_element(By.XPATH, './/a/div/span').text)
                    
                    # append Date
                    self.theDict["Date"].append(theDate.strftime("%Y-%m-%d-%H:%M:%S"))
                    
                    # post attrs
                    for group in element.find_elements(By.XPATH, './/div[@role="group"]'):
                        self.theDict["Reply"].append(int(re.search(r'\d+', group.find_element(By.XPATH, './/div[1]/button').get_attribute("aria-label"))[0]))
                        self.theDict["Repost"].append(int(re.search(r'\d+', group.find_element(By.XPATH, './/div[2]/button').get_attribute("aria-label"))[0]))
                        self.theDict["Like"].append(int(re.search(r'\d+',group.find_element(By.XPATH, './/div[3]/button').get_attribute("aria-label"))[0]))
                        self.theDict["View"].append(int(re.search(r'\d+', group.find_element(By.XPATH, './/div[4]/a').get_attribute("aria-label"))[0]) if re.search(r'\d+', group.find_element(By.XPATH, './/div[4]/a').get_attribute("aria-label")) else 0)
                
                self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
                time.sleep(5)
                new_height = self.driver.execute_script("return document.body.scrollHeight")
                
                # This check if startdate has been reached, thus breaking the entire function.
                if getTime(self.theDict["Date"][-1]) < getTime(startDate):
                    notDateReached = False
                    break
                
                # will break if the scrollbar is at the bottom
                if new_height == last_height:
                    endDate = '-'.join(self.theDict["Date"][-1].split("-")[:3])
                    # This will check if the untilDate is the same as endDate, if it is. It'll minus one day.
                    if untilDate == endDate:
                        endDate = minOneDay(endDate)
                    break
                
                last_height = new_height

In [5]:
session = twitterScrapper(username, password, mail)

In [6]:
session.searchAndscrap('Pemakzulanjokowi', "2010-10-01", "2024-09-29")

NoSuchWindowException: Message: no such window: target window already closed
from unknown error: web view not found
  (Session info: chrome=132.0.6834.197)
Stacktrace:
	GetHandleVerifier [0x00007FF664DD02F5+28725]
	(No symbol) [0x00007FF664D32AE0]
	(No symbol) [0x00007FF664BC510A]
	(No symbol) [0x00007FF664B9EEA5]
	(No symbol) [0x00007FF664C46F87]
	(No symbol) [0x00007FF664C5FA52]
	(No symbol) [0x00007FF664C3FD53]
	(No symbol) [0x00007FF664C0A0E3]
	(No symbol) [0x00007FF664C0B471]
	GetHandleVerifier [0x00007FF6650FF30D+3366989]
	GetHandleVerifier [0x00007FF6651112F0+3440688]
	GetHandleVerifier [0x00007FF6651078FD+3401277]
	GetHandleVerifier [0x00007FF664E9AAAB+858091]
	(No symbol) [0x00007FF664D3E74F]
	(No symbol) [0x00007FF664D3A304]
	(No symbol) [0x00007FF664D3A49D]
	(No symbol) [0x00007FF664D28B69]
	BaseThreadInitThunk [0x00007FFDDD47E8D7+23]
	RtlUserThreadStart [0x00007FFDDD7FFBCC+44]


In [10]:
len(session.theDict["Date"])

699

In [None]:
merged_DF = []
for i in os.listdir("Savepoints/Collected but not merged/"):
    merged_DF.append(pd.read_csv(f"Savepoints/Collected but not merged/{i}").values)

merged_DF = pd.DataFrame(np.concatenate(merged_DF, axis=0), columns=["User", "Date", "Text", "Reply", "Repost", "Like", "View"])
merged_DF["Date"] = pd.to_datetime(merged_DF["Date"])
merged_DF = merged_DF.sort_values(by="Date", ascending=False).reset_index(drop=True)
merged_DF.to_csv("DATA.csv", index=False)

In [None]:
# Check for duplicates, get their indexs, dates they were posted, usernames, and len
merged_DF = pd.read_csv("DATA.csv")

theTempDict = []
for i in tqdm(merged_DF["Text"].unique()):
    if len(merged_DF[merged_DF["Text"] == i]) > 1:
        theTempDict.append([i, merged_DF[merged_DF["Text"] == i].index, merged_DF[merged_DF["Text"] == i]["Date"].values, merged_DF[merged_DF["Text"] == i]["User"].values, len(merged_DF[merged_DF["Text"] == i].values)])
pd.DataFrame(theTempDict, columns=["Text", "Index", "Date", "User", "Len"]).to_csv("DuplicateChecking.csv", index=False)

100%|██████████| 18871/18871 [00:25<00:00, 742.48it/s]


In [None]:
# Check which user blows the wind the most
theTempDict = []
for i in tqdm(merged_DF["User"].unique()):
    theTempDict.append([i, len(merged_DF[merged_DF["User"] == i])])
pd.DataFrame(sorted(theTempDict, key = lambda x: x[1], reverse=True), columns=["User", "Len"]).to_csv("UserChecking.csv", index=False)

100%|██████████| 11683/11683 [00:15<00:00, 730.72it/s]


In [None]:
print(merged_DF[merged_DF["User"] == "@HarcumTj"][["User", "Text", "Date"]].values)

[['@HarcumTj'
  'Pembangunan infrastruktur Jokowi membuat Indonesia lebih maju! Terima kasih! #TerimaKasihPakJokowi #KerjaNyataJokowi #JokowiMerakyat #JokowiBerdedikasi #KaryaNyata'
  '2024-10-19 14:02:59']
 ['@HarcumTj'
  'Jokowi bawa perubahan dari kota hingga pelosok! Terima kasih atas kerja kerasnya! #TerimaKasihPakJokowi #KerjaNyataJokowi #JokowiMerakyat #JokowiBerdedikasi #KaryaNyata'
  '2024-10-19 14:02:50']
 ['@HarcumTj'
  'Terima kasih Pak Jokowi, pembangunan infrastruktur telah membawa Indonesia lebih maju! #TerimaKasihPakJokowi #KerjaNyataJokowi #JokowiMerakyat #JokowiBerdedikasi #KaryaNyata'
  '2024-10-19 14:02:11']
 ['@HarcumTj'
  'Papua merasakan perubahan nyata, terima kasih atas dedikasi Jokowi! #TerimaKasihPakJokowi #JokowiSejahterakanPapua #PapuaMajuBersamaJokowi #JokowiBangunPapua #PapuaHebatBersamaJokowi #10TahunJokowiMemimpin #KerjaNyataJokowi'
  '2024-10-18 15:24:22']
 ['@HarcumTj'
  'Terima kasih atas kerja kerasnya, Pak Jokowi. Kami bangga dan siap berkontribusi

: 

# ============================================================================================================================================

## Manual, deprecrated in favour of the syntax above

### Login

In [18]:
loginURL = 'https://x.com/i/flow/login?redirect_after_login=%2Fsearch%3Fq%3Disrael%26src%3Dtyped_query%26f%3Dlive%26mx%3D2'

In [None]:
theDict ={
    "User" : [],
    "Date" : [],
    "Text" : [],
    "Reply": [],
    "Repost": [],
    "Like": [],
    "View": []
}

In [13]:
user_agent = generate_user_agent(device_type="desktop", os="win", navigator="chrome", platform="win")

options = Options()
options.add_argument(f'user-agent={user_agent}')

driver = webdriver.Chrome()
driver.get(loginURL)


# Login handling
WebDriverWait(driver, 30).until(EC.presence_of_element_located((By.XPATH, "//input[@autocomplete = 'username']")))
driver.find_element(By.XPATH, "//input[@autocomplete = 'username']").send_keys(username)
driver.find_element(By.XPATH, "//div/button[2]").click()
time.sleep(5)               # This will wait for the next login pop-up to appear

# if email is neeeded
try:
    if driver.find_element(By.XPATH, "//div[1]/div/h1/span/span").text == "Enter your phone number or email address":
        print("Suspicious login attempt detected, attempting to enter email on login prochedures.")
        driver.find_element(By.XPATH, "//input").send_keys(mail)
        driver.find_element(By.XPATH, "//div[2]/div/div/div/button").click()
except:
    pass
        
WebDriverWait(driver, 30).until(EC.presence_of_element_located((By.XPATH, "//input[@name='password']")))
driver.find_element(By.XPATH, "//input[@name='password']").send_keys(password)
driver.find_element(By.XPATH, "//button[@data-testid='LoginForm_Login_Button']").click()
time.sleep(5)
print("zoom out 25%!")
time.sleep(10)

  return generate_navigator(os=os, navigator=navigator,


zoom out 25%!


### Scrapping here

In [None]:
temp = '2020-07-16'
iter=0
notTimeout = True
while True:
    untilDate = temp
    searchURL = f'https://x.com/search?q=%22%23terimakasihjokowi%22%20until%3A{untilDate}&src=typed_query&f=live'
    driver.get(searchURL)
    
    # This will check if scrapping is detected, and break the loop.
    try:
        WebDriverWait(driver, 5).until(EC.presence_of_element_located((By.XPATH, '//div[@aria-label="Home timeline"]/div/div/div/span[@style="text-overflow: unset;"]')))
        notTimeout = False
    except:
        pass
    if not notTimeout: break
    
    # This will wait for the page to load, if nothing exists. Minus one day
    try:
        WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.XPATH, "//div[@data-testid='cellInnerDiv']")))   # Initialize
    except:
        temp = minOneDay(temp)
        continue
    
    last_height = driver.execute_script("return document.body.scrollHeight")
    while True:

        elements = driver.find_elements(By.XPATH, '//div[@aria-label="Timeline: Search timeline"]/div/div')
        for element in elements[:-1]:
            # The text  
            try:
                text = ''.join([i.text for i in element.find_elements(By.XPATH, './/div[@data-testid="tweetText"]/span')])
            except:
                continue
            if text in theDict["Text"]:
                continue
            theDict["Text"].append(text)
            
            theDict["User"].append(element.find_element(By.XPATH, './/a/div/span').text)
            
            theDate = getTime(element.find_element(By.XPATH, './/time').get_attribute("datetime"))
            theDict["Date"].append(theDate.strftime("%Y-%m-%d-%H:%M:%S"))
            
            for group in element.find_elements(By.XPATH, './/div[@role="group"]'):
                theDict["Reply"].append(int(re.search(r'\d+', group.find_element(By.XPATH, './/div[1]/button').get_attribute("aria-label"))[0]))
                theDict["Repost"].append(int(re.search(r'\d+', group.find_element(By.XPATH, './/div[2]/button').get_attribute("aria-label"))[0]))
                theDict["Like"].append(int(re.search(r'\d+',group.find_element(By.XPATH, './/div[3]/button').get_attribute("aria-label"))[0]))
                theDict["View"].append(int(re.search(r'\d+', group.find_element(By.XPATH, './/div[4]/a').get_attribute("aria-label"))[0]) if re.search(r'\d+', group.find_element(By.XPATH, './/div[4]/a').get_attribute("aria-label")) else 0)
        
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(5)
        new_height = driver.execute_script("return document.body.scrollHeight")
        
        if new_height == last_height:
            temp = '-'.join( theDict["Date"][-1].split("-")[:3])
            break
        
        last_height = new_height
        if untilDate == temp:
            temp = minOneDay(temp)
        
    iter+=1

### Df

In [10]:
pd.DataFrame(theDict)

Unnamed: 0,User,Date,Text,Reply,Repost,Like,View
0,@IkanKepri,2020-07-16-03:51:55,Seharusnya kita Ngaca dan malu hidup mengeluh ...,0,3,3,0
1,@IkanKepri,2020-07-16-03:39:01,Terimakasih pak atas pilihan tidak me-Lockdow...,0,3,4,0


### Merging with recent df

In [61]:
# Get list of all CSV files in the current directory
csv_files = glob.glob("*.csv")

# Find the latest CSV file by modification time
latest_csv = max(csv_files, key=os.path.getmtime)

df = pd.read_csv(latest_csv)

a = pd.concat((df, pd.DataFrame(theDict)), axis=0).reset_index().drop(columns="index")
a = a.drop_duplicates(subset="Text").sort_values(by="Date", ascending=False)
a.to_csv("twitter20200716.csv", index=False)

# Fixing date with incorrect len number of observation

In [21]:
df = pd.read_csv("Data.csv")
newObservations = pd.DataFrame(session.theDict)

df = pd.concat((df, newObservations), axis=0).reset_index().drop(columns="index")

In [26]:
newObservations["Date"] = pd.to_datetime(newObservations["Date"])

In [29]:
for i in newObservations["Date"].dt.date.unique():
    print(i, len(newObservations[newObservations["Date"].dt.date == i]))

2024-09-29 5
2024-09-28 77
2024-09-27 43
2024-09-26 60
2024-09-25 75
2024-09-24 56
2024-09-23 82
2024-09-22 32


In [24]:
df["Date"] = pd.to_datetime(df["Date"])
df["Date"].dt.date

ValueError: time data "2024-10-02-05:39:13" doesn't match format "%Y-%m-%d %H:%M:%S", at position 6956. You might want to try:
    - passing `format` if your strings have a consistent format;
    - passing `format='ISO8601'` if your strings are all ISO8601 but not necessarily in exactly the same format;
    - passing `format='mixed'`, and the format will be inferred for each element individually. You might want to use `dayfirst` alongside this.

In [22]:
df.sort_values(by="Date", ascending=False).to_csv("Data.csv", index=False)

In [30]:
len(df)

20098