In [2]:
import asyncio
from playwright.async_api import async_playwright
import time
import json

async def scrape_x_usernames(search_term):
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=False)  # Set to True for production
        context = await browser.new_context(
            viewport={'width': 1280, 'height': 800},
            user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
        )
        
        page = await context.new_page()
        
        try:
            # Navigate to the search page
            url = f"https://x.com/search?q={search_term}&src=typed_query&f=user"
            await page.goto(url)
            
            # Wait for the page to load
            await page.wait_for_selector("section[role='region']", timeout=10000)
            
            # Scroll down to load more results (adjust as needed)
            for _ in range(3):
                await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
                await asyncio.sleep(2)  # Use asyncio.sleep instead of time.sleep
            
            # Extract usernames using the pattern visible in your screenshot
            username_elements = await page.query_selector_all("div[dir='ltr'] span:has-text('@')")
            
            usernames = []
            for elem in username_elements:
                username = await elem.inner_text()
                username = username.strip().replace('@', '')
                usernames.append(username)
                
            print(f"Found {len(usernames)} usernames:")
            for username in usernames:
                print(username)
                
            # Save to a file if needed
            with open(f"x_usernames_{search_term}.json", "w") as f:
                json.dump(usernames, f, indent=2)
                
            return usernames
            
        except Exception as e:
            print(f"Error: {e}")
            return []
            
        finally:
            await browser.close()

# Full version with more details
async def scrape_x_accounts(search_term):
    """
    Scrape X accounts based on a search term using Playwright's Async API
    
    Args:
        search_term (str): The search term to find accounts
        
    Returns:
        list: List of account information
    """
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=False)  # Set to True for production
        context = await browser.new_context(
            viewport={'width': 1280, 'height': 800},
            user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
        )
        
        page = await context.new_page()
        
        try:
            # Navigate to the search page
            url = f"https://x.com/search?q={search_term}&src=typed_query&f=user"
            await page.goto(url)
            
            # Wait for the page to load
            await page.wait_for_selector("section[role='region']", timeout=10000)
            
            # Scroll down to load more results (adjust as needed)
            for _ in range(3):
                await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
                await asyncio.sleep(2)
            
            # Extract account information
            accounts = []
            
            # Using appropriate selectors based on the HTML structure from your screenshot
            account_elements = await page.query_selector_all("div[data-testid='cellInnerDiv']")
            
            for element in account_elements:
                try:
                    # Get username
                    username_element = await element.query_selector("div[dir='ltr'] span:has-text('@')")
                    if not username_element:
                        continue
                        
                    username_text = await username_element.inner_text()
                    username = username_text.strip().replace('@', '')
                    
                    # Get display name
                    name_element = await element.query_selector("div[data-testid='User-Name'] span span")
                    display_name = await name_element.inner_text() if name_element else "Unknown"
                    
                    # Get bio if available
                    bio_element = await element.query_selector("div[data-testid='UserCell-byline']")
                    bio = await bio_element.inner_text() if bio_element else ""
                    
                    accounts.append({
                        "username": username,
                        "display_name": display_name.strip(),
                        "bio": bio.strip(),
                        "profile_url": f"https://x.com/{username}"
                    })
                    
                except Exception as e:
                    print(f"Error extracting account info: {e}")
                    continue
            
            # Save to JSON file
            with open(f"x_accounts_{search_term}.json", "w", encoding="utf-8") as f:
                json.dump(accounts, f, indent=2)
                
            print(f"Found {len(accounts)} accounts for search term '{search_term}'")
            return accounts
            
        except Exception as e:
            print(f"Error: {e}")
            return []
            
        finally:
            await browser.close()

# Example usage - this is how you run the async functions
async def main():
    search_term = "Roofies"  # Based on your screenshot
    # Choose which function to use
    # accounts = await scrape_x_accounts(search_term)  # Full account details
    usernames = await scrape_x_usernames(search_term)  # Just usernames

if __name__ == "__main__":
    asyncio.run(main())

RuntimeError: asyncio.run() cannot be called from a running event loop

In [3]:
import asyncio
from playwright.async_api import async_playwright
import json

async def scrape_x_usernames(search_term):
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=False)  # Set to True for production
        context = await browser.new_context(
            viewport={'width': 1280, 'height': 800},
            user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
        )
        
        page = await context.new_page()
        
        try:
            # Navigate to the search page
            url = f"https://x.com/search?q={search_term}&src=typed_query&f=user"
            await page.goto(url)
            
            # Wait for the page to load
            await page.wait_for_selector("section[role='region']", timeout=10000)
            
            # Scroll down to load more results (adjust as needed)
            for _ in range(3):
                await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
                await asyncio.sleep(2)  # Use asyncio.sleep instead of time.sleep
            
            # Extract usernames using the pattern visible in your screenshot
            username_elements = await page.query_selector_all("div[dir='ltr'] span:has-text('@')")
            
            usernames = []
            for elem in username_elements:
                username = await elem.inner_text()
                username = username.strip().replace('@', '')
                usernames.append(username)
                
            print(f"Found {len(usernames)} usernames:")
            for username in usernames:
                print(username)
                
            # Save to a file if needed
            with open(f"x_usernames_{search_term}.json", "w") as f:
                json.dump(usernames, f, indent=2)
                
            return usernames
            
        except Exception as e:
            print(f"Error: {e}")
            return []
            
        finally:
            await browser.close()

# Here's the modified part that works with existing event loops:
def run_async_scraper(search_term):
    """Run the async scraper in any environment"""
    loop = asyncio.get_event_loop()
    if loop.is_running():
        # We're in a Jupyter notebook, IPython, or similar environment
        # Use asyncio.ensure_future or create_task
        return asyncio.create_task(scrape_x_usernames(search_term))
    else:
        # We're in a regular Python script 
        return loop.run_until_complete(scrape_x_usernames(search_term))

# Example usage that works in both regular Python scripts and interactive environments
if __name__ == "__main__":
    # This will work in a regular Python script
    search_term = "Roofies"  # Based on your screenshot
    usernames = run_async_scraper(search_term)
else:
    # For interactive environments, you can call this directly
    # search_term = "Roofies"
    # task = run_async_scraper(search_term)
    # If you're in a Jupyter notebook or similar, you would do:
    # usernames = await task  # You need to await the task
    pass


Error: Page.wait_for_selector: Timeout 10000ms exceeded.
Call log:
  - waiting for locator("section[role='region']") to be visible



In [4]:
from playwright.sync_api import sync_playwright
import json
import time

def scrape_x_usernames_sync(search_term):
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=False)  # See the browser in action
        context = browser.new_context(
            viewport={'width': 1280, 'height': 800},
            user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
        )
        
        page = context.new_page()
        
        try:
            # Navigate to the search page
            url = f"https://x.com/search?q={search_term}&src=typed_query&f=user"
            page.goto(url)
            
            # Give the page some time to load initially
            time.sleep(5)
            
            print("Waiting for content to load...")
            
            # Try different selectors that appear in the search results page
            selectors_to_try = [
                "div[data-testid='cellInnerDiv']",  # Common cell container
                "a[role='link'][tabindex='0']",     # Links in the results
                "span:has-text('@')",               # Username spans
                "div[data-testid='User-Name']",     # User name container
                "div[dir='ltr']",                   # Text direction containers (common on X)
                "main[role='main']"                 # Main content area
            ]
            
            found_selector = None
            for selector in selectors_to_try:
                if page.locator(selector).count() > 0:
                    print(f"Found elements with selector: {selector}")
                    found_selector = selector
                    break
            
            if not found_selector:
                print("Could not find any recognizable elements on the page.")
                # Try to save the page content for debugging
                page_content = page.content()
                with open(f"debug_page_{search_term}.html", "w", encoding="utf-8") as f:
                    f.write(page_content)
                print(f"Saved page content to debug_page_{search_term}.html for inspection")
                return []
            
            # Scroll down to load more results (adjust as needed)
            print("Scrolling to load more results...")
            for i in range(3):
                page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
                time.sleep(2)
                print(f"Scroll {i+1}/3 completed")
            
            # Look for username elements more carefully
            print("Extracting usernames...")
            
            # Try multiple selector patterns for usernames based on your screenshot
            username_patterns = [
                "div[dir='ltr'] span:has-text('@')",
                "a[role='link'] span:has-text('@')",
                "span:has-text('@')",
                "div[data-testid='User-Name'] div[dir='ltr']"
            ]
            
            usernames = []
            for pattern in username_patterns:
                elements = page.query_selector_all(pattern)
                if elements:
                    print(f"Found {len(elements)} elements with pattern: {pattern}")
                    for elem in elements:
                        text = elem.inner_text()
                        if '@' in text:
                            username = text.strip().replace('@', '')
                            if username and username not in usernames:
                                usernames.append(username)
            
            # If we still don't have usernames, try a more general approach
            if not usernames:
                print("Trying alternative approach to find usernames...")
                # Get all text elements on the page
                all_text_elements = page.query_selector_all("span")
                for elem in all_text_elements:
                    text = elem.inner_text()
                    if '@' in text and not text.startswith('@'):
                        potential_username = text.strip()
                        # Look for typical username pattern (@ followed by letters/numbers)
                        if '@' in potential_username:
                            username = potential_username.split('@')[1].split()[0]
                            if username and username not in usernames:
                                usernames.append(username)
            
            print(f"Found {len(usernames)} usernames:")
            for username in usernames:
                print(username)
                
            # Take a screenshot for debugging purposes
            page.screenshot(path=f"search_results_{search_term}.png")
            print(f"Saved screenshot to search_results_{search_term}.png")
            
            # Save to a file
            with open(f"x_usernames_{search_term}.json", "w", encoding="utf-8") as f:
                json.dump(usernames, f, indent=2)
                
            return usernames
            
        except Exception as e:
            print(f"Error: {e}")
            # Try to capture a screenshot of the error state
            try:
                page.screenshot(path=f"error_state_{search_term}.png")
                print(f"Saved error state screenshot to error_state_{search_term}.png")
            except:
                pass
            return []
            
        finally:
            browser.close()

# Example usage - simple synchronous call
if __name__ == "__main__":
    search_term = "Roofies"  # Based on your screenshot
    usernames = scrape_x_usernames_sync(search_term)

Error: It looks like you are using Playwright Sync API inside the asyncio loop.
Please use the Async API instead.