In [8]:
import websockets
import pandas as pd

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
import time

import asyncio


In [9]:
k_categories = 20
minimum_viewers = 1000
k_target_channels = 50
k_maximum_channels_per_category = 10
n_messages = 100
scraped_channel = []

In [10]:
def setup_driver():
    options = webdriver.ChromeOptions()
    options.add_argument('--headless')  # Run in headless mode
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')
    service = Service('C:/Users/user/Desktop/WebScrapingProject/chromedriver.exe')
    driver = webdriver.Chrome(service=service, options=options)
    return driver

In [11]:
def get_top_twitch_categories(k=10):
    # Set up Chrome WebDriver
    driver = setup_driver()
    
    url = 'https://www.twitch.tv/directory?sort=VIEWER_COUNT'
    driver.get(url)
    time.sleep(3)  # Wait for page to load
    
    # Find category elements
    categories = driver.find_elements(By.XPATH, '//a[contains(@class, "ScCoreLink-sc-16kq0mq-0") and h2]')
    top_k_categories = []
    for cat in categories[:k]:
        # Try to get the category name from a span or h2 element
        try:
            # Sometimes the category name is in a span with data-a-target="tw-core-button-label-text"
            name_elem = cat.find_element(By.XPATH, './/h2')
            if "viewers" in name_elem.text.lower():
                continue  # Skip if it contains viewer count
            name = name_elem.text
            link = cat.get_attribute('href')
            print(f"Category: {name}, Link: {link}")
            top_k_categories.append((name, link))
        except Exception:
            continue
    driver.quit()
    return top_k_categories


In [12]:
def scrape_channels_from_categories(name, link, max_channels_per_category=20):
    driver = setup_driver()
    all_channels = []

    # top_categories = top_categories[:1]  # Limit to top 1 category for brevity
    print(f"Scraping category: {name}")
    driver.get(link + "?sort=VIEWER_COUNT")
    time.sleep(3)  # Wait for page to load

    # Set language filter to English if the option is available
    try:
        # Open the language filter menu first
        filter_button = driver.find_element(
            By.XPATH,
            '//button[.//div[@data-a-target="tw-core-button-label-text" and contains(text(), "Language")]]'
        )
        filter_button.click()
        time.sleep(1)
        english_label = driver.find_element(
            By.XPATH,
            '//label[.//div[text()="English"]]'
        )
        # Find the associated checkbox input
        checkbox = english_label.find_element(
            By.XPATH,
            './preceding-sibling::input[@type="checkbox"]'
        )
        # Only click if not already checked
        if not checkbox.is_selected():
            english_label.click()
            time.sleep(1)  # Wait for the page to reload with the filter applied
        else:
            print("English language filter already checked, skipping click.")
        print("Set language filter to English.")
    except Exception:
        print("English language filter not found or could not be set.")

    # Scroll to load more channels if needed
    last_height = driver.execute_script("return document.body.scrollHeight")
    while True:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(2)
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height or len(driver.find_elements(
            By.XPATH, '//a[@data-a-target="preview-card-channel-link"]')) >= max_channels_per_category:
            break
        last_height = new_height

    channel_elements = driver.find_elements(
        By.XPATH, '//a[@data-a-target="preview-card-image-link"]')
    for elem in channel_elements[:max_channels_per_category]:
        href = elem.get_attribute('href')
        channelId = href.split('/')[-1]
        # Find the viewer count element inside the channel element
        try:
            viewer_elem = elem.find_element(
                By.XPATH, './/div[contains(@class, "tw-media-card-stat")]')
            viewer_text = viewer_elem.text
            # Extract number of viewers (e.g., "27K viewers" -> 27000)
            if 'K' in viewer_text:
                viewers = int(float(viewer_text.split('K')[0].replace(',', '').strip()) * 1000)
            else:
                viewers = int(viewer_text.split('viewers')[0].replace(',', '').strip())
        except Exception:
            viewers = 0  # If not found, default to 0

        if viewers >= 1000:
            print(f"Category: {name}, Channel ID: {channelId}, URL: {href}, Viewers: {viewers}")
            all_channels.append({'category': name, 'href': href, 'channelId': channelId, 'viewers': viewers})
        else:
            break

    driver.quit()
    return all_channels

In [13]:
async def twitch_chat_scraper(channel, n_messages):
    uri = "wss://irc-ws.chat.twitch.tv/"
    messages = []
    async with websockets.connect(uri) as websocket:
        await websocket.send("PASS SCHMOOPIIE\r\n")
        await websocket.send("NICK justinfan26589\r\n")
        await websocket.send(f"JOIN #{channel}\r\n")
        print(f"Connected and joined #{channel}. Listening for messages...")
        last_message_time = time.time()
        while True:
            try:
                if len(messages) >= n_messages:
                    df = pd.DataFrame({'message': messages})
                    df.to_csv(f"../data/{channel}_chat_messages.csv", index=False)
                    scraped_channel.append(channel)
                    break
                # Wait for a message with timeout
                try:
                    message = await asyncio.wait_for(websocket.recv(), timeout=60)
                    last_message_time = time.time()
                except asyncio.TimeoutError:
                    print("No message received for 1 minute. Exiting.")
                    break
                if "PRIVMSG" in message:
                    parts = message.split("PRIVMSG", 1)
                    if len(parts) > 1:
                        msg_text = parts[1].split(":", 1)[-1].strip()
                        messages.append(msg_text)
                        print(f"[{len(messages)}/{n_messages}] {msg_text}")
            except Exception as e:
                print(f"Error: {e}")
                break
        scraped_channel


In [14]:
top_categories = get_top_twitch_categories(k_categories)

for name, link in top_categories:
    channels = scrape_channels_from_categories(name, link, k_maximum_channels_per_category)
    if len(scraped_channel) >= k_target_channels:
            break
    for channel_info in channels:
        if len(scraped_channel) >= k_target_channels:
            break
        channel_id = channel_info['channelId']
        print(f"Scraping chat for channel: {channel_id}")
        await twitch_chat_scraper(channel_id, n_messages)
        

Category: Just Chatting, Link: https://www.twitch.tv/directory/category/just-chatting
Category: League of Legends, Link: https://www.twitch.tv/directory/category/league-of-legends
Category: Call of Duty: Black Ops 7, Link: https://www.twitch.tv/directory/category/call-of-duty-black-ops-7
Category: Owl Lights, Link: https://www.twitch.tv/directory/category/owl-lights
Category: Minecraft, Link: https://www.twitch.tv/directory/category/minecraft
Category: Grand Theft Auto V, Link: https://www.twitch.tv/directory/category/grand-theft-auto-v
Category: IRL, Link: https://www.twitch.tv/directory/category/irl
Category: Fortnite, Link: https://www.twitch.tv/directory/category/fortnite
Category: VALORANT, Link: https://www.twitch.tv/directory/category/valorant
Category: EA Sports FC 26, Link: https://www.twitch.tv/directory/category/ea-sports-fc-26
Category: Dead by Daylight, Link: https://www.twitch.tv/directory/category/dead-by-daylight
Category: Counter-Strike, Link: https://www.twitch.tv/dir

CancelledError: 