## X Scraper  

### Overview  
The **X Scraper** is a web scraping tool designed to automatically log into the X platform (formerly known as Twitter) and extract posts based on specific hashtags or keywords. Utilizing the Selenium WebDriver, this scraper navigates the X website, interacts with the user interface, and collects relevant data from posts for analysis.  

### How It Works  

#### Workflow Overview  

1. **Driver Initialization**:  
   - The scraper initializes a Selenium WebDriver instance configured with Chrome.   
   - It can run in headless mode (without a graphical user interface) for efficiency and can be customized for different screen sizes.  

2. **Login Process**:  
   - The scraper navigates to the X login page and enters the provided username and password.  
   - It waits for the necessary elements to load and clicks through the login process, ensuring a successful login.  

3. **Tag Scraping**:  
   - Once logged in, the scraper iterates through a list of specified hashtags or keywords.  
   - For each tag, it clears the search box, enters the tag, and navigates to the "Latest" tab to view **recent** posts related to the tag.  

4. **Data Extraction**:  
   - The scraper scrolls through the loaded posts, extracting relevant data from each post, including:  
     - **Username**: The name of the user who made the post.  
     - **Time**: The timestamp of the post in a formatted string.  
     - **Content**: The text content of the post.  

5. **Handling Duplicates**:  
   - To avoid processing the same post multiple times, the scraper keeps track of seen posts using their URLs.  

6. **Data Storage**:  
   - Extracted data is stored in a structured format (list of dictionaries) for further analysis.  
   - After scraping, the data is saved to an Excel file for easy access and analysis.  

7. **Optional Features**:  
   - The scraper includes commented-out sections for saving data in YAML format and uploading files to AWS S3 for cloud storage.  

#### Example of Extracted Data  

The scraper collects the following types of data for each post:  

| Field          | Description                              |  
|----------------|------------------------------------------|  
| Post Number    | A unique number for the post            |  
| Tag            | The hashtag or keyword being discussed   |  
| Username       | The name of the user who made the post   |  
| Time           | The formatted time of the post           |  
| Date           | The date of extraction (YYYY-MM-DD)     |  
| Content        | The actual content of the post           |  

### Use Cases  

The data extracted by the X Scraper can be utilized for various purposes, including:  

- **Social Media Analysis**: Analyze trends and sentiments around specific topics or events.  
- **Market Research**: Gather insights into public opinion regarding products or brands.  
- **Content Monitoring**: Track discussions and feedback related to specific hashtags or campaigns.  

### Important Notes  

- **Temporary Accounts**: It is recommended to use temporary accounts for scraping to avoid potential violations of X's terms of service. Be cautious when using genuine accounts.  
- **Dynamic Content**: The scraper relies on specific XPath and CSS selectors to extract data. Changes in the X platform's layout may require updates to these selectors.  
- **AWS S3 Configuration**: The scraper has provisions for uploading extracted data to AWS S3, which can be uncommented and configured as needed.  

### Conclusion  

The X Scraper provides an efficient way to gather and analyze social media data from the X platform. By automating the login and data extraction processes, it enables users to gain valuable insights into trends and discussions in a timely manner.   


**Important Note**: A temporary account has been created for this scraping task. Please be cautious when using genuine accounts.

In [151]:
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.chrome.options import Options
from datetime import datetime, timedelta
import pandas as pd
import time
import csv
import yaml

In [2]:
# AWS S3 Configuration (uncomment and fill in your credentials)
# AWS_ACCESS_KEY = 'YOUR_AWS_ACCESS_KEY'
# AWS_SECRET_KEY = 'YOUR_AWS_SECRET_KEY'
# BUCKET_NAME = 'YOUR_BUCKET_NAME'
# s3_client = boto3.client('s3', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY)

In [135]:
# Initialize WebDriver with or wihtout Headless Mode
def init_driver(chromedriver_path):
    chrome_options = Options()
    # chrome_options.add_argument("--headless")
    chrome_options.add_argument("--disable-gpu")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--log-level=3")
    chrome_options.add_argument("--disable-web-security")  # Disable web security 
    chrome_options.add_argument("--window-size=1280,1024")
    chrome_options.add_argument("--ignore-certificate-errors")  # Ignore SSL errors  
    chrome_options.add_argument("--allow-insecure-localhost")  # Allow insecure localhost connections
    driver = webdriver.Chrome(executable_path=chromedriver_path, options=chrome_options)
    wait = WebDriverWait(driver, 10)
    print("driver intiated successfuly")
    return driver, wait


In [None]:
chromedriver_path = r"D:\coding\freelancing\stockMarket\chromedriver.exe"
driver, wait = init_driver(chromedriver_path)

In [137]:
def login_to_x(username, password):

    try:
        # Navigate to X login page
        driver.get("https://x.com/login/")
        time.sleep(5)
        username_input = wait.until(
            EC.presence_of_element_located((By.XPATH, "//input[@autocomplete='username']")))
        username_input.click()
        username_input.send_keys(username)

        # Click the "Next" button
        next_button = wait.until(
            EC.element_to_be_clickable((By.XPATH, "//button[.//span[text()='Next']]")))
        next_button.click()

        # Wait for the password input box and enter the password
        password_input = wait.until(
            EC.visibility_of_element_located((By.XPATH, "//input[@name='password']")))

        password_input.click()
        password_input.send_keys(password)

        # Wait for the login button to be clickable and click it
        final_login_button =  wait.until(  
        EC.element_to_be_clickable((By.XPATH, "//button[@data-testid='LoginForm_Login_Button']"))  )  
        final_login_button.click()
        # Verify login was successful (optional)
        time.sleep(5)  # Allow some time for the home page to load
        print("Login successful!")

    except Exception as e:
        print(f"An error occurred during login: {e}")

In [None]:
username = "Mary01909527219"
password = "maria33221"

# Log in to X
login_to_x(username, password)

In [195]:
def clear_search_box(driver, wait):
    """
    Function to clear and refocus on the search box safely.
    """
    try:
        # Re-locate the search box dynamically
        search_box = wait.until(
            EC.visibility_of_element_located((By.XPATH, "//input[@data-testid='SearchBox_Search_Input' and @placeholder='Search']"))
        )
        
        # Clear the value using JavaScript
        driver.execute_script("arguments[0].value = '';", search_box)
        
        # Ensure the search box is focused for further interaction
        search_box.click()
        
        return search_box  # Return the cleared search box
    except Exception as e:
        print(f"Error while clearing search box: {e}")
        return None

def search_tag(driver, wait, tag):
    """
    Search for the tag in the search box and click on the "Latest" tab.
    """
    try:
        # Clear and focus on the search box
        search_box = clear_search_box(driver, wait)
        if not search_box:
            print(f"Failed to locate search box for tag: {tag}")
            return  # Skip this tag if search box is unavailable

        # Type the new tag
        search_box.send_keys(tag)
        search_box.send_keys(Keys.RETURN)  # Start search

        # Wait for the "Latest" tab to be visible and click it
        tab_list = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[role='tablist']")))
        latest_tab = tab_list.find_elements(By.TAG_NAME, 'a')[1]  # Select "Latest" tab
        latest_tab.click()

    except Exception as e:
        print(f"Error while searching for tag '{tag}': {e}")

In [199]:
def scrape_x(driver, wait, tags, scroll_count, max_posts_per_tag):
    """
    Scrape X (Twitter) posts and comments for a list of tags/tickers.

    Args:
        driver: Selenium WebDriver instance.
        wait: WebDriverWait instance.
        tags: List of tags or tickers to scrape.
        scroll_count: Number of scrolls to load more posts.
        max_posts_per_tag: Maximum number of posts to scrape per tag.

    Returns:
        List of dictionaries containing post data.
    """
    all_data = []
    post_number = 1  # Initialize post number globally

    for tag in tags:
        print(f"Scraping posts for: {tag}")
        try:
            # Navigate to the tag page and perform the search
            driver.get(f"https://x.com")
            time.sleep(5)  # Allow page to load

            # Perform the search and click on the "Latest" tab
            search_tag(driver, wait, tag)

            total_scraped_posts = 0  # Track total posts scraped for this tag
            post_links = set()  # Track links to avoid duplicates

            # Capture post links from the first scroll (first batch of posts)
            for scroll in range(scroll_count):
                if total_scraped_posts >= max_posts_per_tag:
                    break  # Stop scrolling if max posts per tag reached

                print(f"Scrolling {scroll + 1}/{scroll_count} for: {tag}")

                # Wait for the post links to load
                wait.until(EC.presence_of_all_elements_located((By.XPATH, "//a[contains(@href, '/status/')]")))

                # Capture post links (Re-fetch after scrolling)
                post_links_elements = driver.find_elements(By.XPATH, "//a[contains(@href, '/status/')]")
                for link in post_links_elements:
                    href = link.get_attribute("href")
                    # Only include links that match the specified pattern
                    if href and "/status/" in href and not any(ends in href for ends in (
                        "/photo", "/analytics", "/media_tags")):
                        post_links.add(href)  # Add links to the set

                print(f"Captured {len(post_links)} post links.")

                # Scroll for more posts (if needed)
                driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
                time.sleep(3)  # Allow time for the page to load more posts

            # Scrape each post sequentially using captured links
            for link in list(post_links)[:max_posts_per_tag]:
                try:
                    print(f"Scraping post from link: {link}")
                    # Navigate to the post
                    driver.get(link)
                    time.sleep(2)  # Allow time for the post to load

                    # Wait for the specific post content element
                    post_element = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-testid='primaryColumn'] .css-175oi2r h2 span")))

                    # Extract username
                    username = "N/A"
                    try:
                        username_element = wait.until(EC.presence_of_element_located(
                            (By.XPATH, "//div[@data-testid='User-Name']//span[starts-with(text(), '@')]")))
                        username = username_element.text
                    except Exception:
                        username = "N/A"

                    # Extract time and date posted
                    date_posted, time_posted = "N/A", "N/A"
                    try:
                        time_element = wait.until(EC.presence_of_element_located(
                            (By.XPATH, "//a[contains(@href, '/status/') and @role='link']")))
                        datetime_value = time_element.find_element(By.TAG_NAME, "time").get_attribute("datetime")
                        date_posted, time_posted = datetime_value.split('T')
                        time_posted = time_posted[:-1]  # Remove 'Z'
                    except Exception:
                        date_posted, time_posted = "N/A", "N/A"

                    # Extract message content
                    message_content = "N/A"
                    try:
                        post_content_element = driver.find_element(By.XPATH, "//div[contains(@data-testid, 'tweetText')]")
                        message_content = post_content_element.text
                    except Exception:
                        message_content = "N/A"

                    # Extract comments
                    comments = []
                    try:
                        comments = driver.execute_script("""
                            let commentsArray = [];
                            const commentElements = document.querySelectorAll("div[data-testid='tweetText'] span");
                            commentElements.forEach(el => {
                                const text = el.innerText.trim();
                                if (text && text.length > 1 && !text.startsWith("#")) {
                                    commentsArray.push(text);
                                }
                            });
                            return commentsArray;
                        """)
                    except Exception:
                        comments = []

                    # Remove message_content from comments if present  
                    # if isinstance(comments, str):  
                    #     cleaned_comments = comments.replace(message_content, '').strip()  
                    # else:  
                    #     cleaned_comments = comments

                    # Append data
                    all_data.append({
                        'Post Number': post_number,
                        'label': "x_data",
                        'Tag': tag,
                        'Username': username,
                        'Time Posted': time_posted,
                        'Date Posted': date_posted,
                        'Content': message_content,
                        'Comments': comments,
                    })

                    post_number += 1  # Increment post number
                    total_scraped_posts += 1  # Increment total scraped posts

                except Exception as e:
                    print(f"Skipping post from link {link} due to an error: {e}")
                    continue  # Skip to the next iteration

        except Exception as e:
            print(f"Error processing tag {tag}: {e}")

    return all_data


In [198]:
# def save_and_upload_data(data, bucket_name):
def save_and_upload_data(data):
    """
    Save data to Excel and YAML and upload to an S3 bucket.

    Args:
        data: The data to save and upload.
        bucket_name: The name of the S3 bucket to upload to.
    """
    try:
        # Save to YAML with UTF-8 encoding
        yaml_filename = f"x_data_{datetime.now().strftime('%Y-%m-%d')}.yaml"
        with open(yaml_filename, 'w', encoding='utf-8') as yaml_file:
            yaml.dump(data, yaml_file, allow_unicode=True)
        print(f"Data saved to YAML: {yaml_filename}")

        # Save to Excel (if needed, implement this part)
        # excel_filename = f"stocktwits_data_{datetime.now().strftime('%Y-%m-%d')}.xlsx"
        # Implement Excel saving logic here using pandas or openpyxl

        # Upload to S3
        # s3 = boto3.client('s3')
        # # s3.upload_file(excel_filename, bucket_name, excel_filename)
        # s3.upload_file(yaml_filename, bucket_name, yaml_filename)
        # print(f"Files uploaded to S3 bucket: {bucket_name}")

    except Exception as e:
        print(f"Error saving or uploading data: {e}")

In [201]:
if __name__ == "__main__":
    # chromedriver_path = r"D:\coding\freelancing\stockMarket\chromedriver.exe"
    # driver, wait = init_driver(chromedriver_path)
    # username = "Mary01909527219"
    # password = "maria33221"


    tags = ["#tesla"]
    try:
        # Log in to X
        # login_to_x(username, password)
        scraped_data = scrape_x(driver, wait, tags, scroll_count=1, max_posts_per_tag=6)
        # save_and_upload_data(scraped_data, bucket_name)
        save_and_upload_data(scraped_data)
    except Exception as e:
        print(f"error in main: {e}")
    # finally:
        # driver.quit()

Scraping posts for: #tesla
Scrolling 1/1 for: #tesla
Captured 6 post links.
Scraping post from link: https://x.com/taki98001/status/1883476742599569755
Scraping post from link: https://x.com/hujailgor/status/1883476548193542634
Scraping post from link: https://x.com/ElleCoco2/status/1883476839047512067
Scraping post from link: https://x.com/NIOSwitzerland/status/1883476956970467448
Scraping post from link: https://x.com/VQuaschning/status/1883476498088403395
Scraping post from link: https://x.com/mariuskarma/status/1883477247761453416
Data saved to YAML: x_data_2025-01-26.yaml
