In [1]:
import os
import random
import time
import json
import traceback
import re
import undetected_chromedriver as uc
import argparse
from selenium.webdriver.common.by import By
from selenium.common.exceptions import NoSuchElementException, TimeoutException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# Constants
# BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# DATA_DIR = os.path.join(BASE_DIR, 'data')
# os.makedirs(DATA_DIR, exist_ok=True)

def random_sleep(mean=1.0, std=0.3):
    """Sleep for a random duration with normal distribution."""
    delay = max(0, random.normalvariate(mean, std))
    time.sleep(delay)

def print_progress(current, total, prefix='', bar_length=40):
    """Print a progress bar."""
    fraction = current / total if total else 1
    arrow = int(fraction * bar_length) * '='
    spaces = (bar_length - len(arrow)) * ' '
    print(f'\r{prefix}[{arrow}{spaces}] {current}/{total}', end='', flush=True)
    if current >= total:
        print()  # New line when complete

def setup_driver():
    """Initialize undetected_chromedriver with options."""
    options = uc.ChromeOptions()
    # Uncomment to run headless (may be less reliable with Kaggle)
    # options.add_argument('--headless')
    options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                         "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36")
    options.add_argument('--disable-blink-features=AutomationControlled')
    return uc.Chrome(options=options)

def save_json(data, filepath):
    """Save data to a JSON file."""
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=4, ensure_ascii=False)
    print(f"✅ Saved data to {filepath}")


# Scrap comment meta data

In [14]:
driver = setup_driver()
url = "https://www.kaggle.com/code/jhoward/linear-model-and-neural-net-from-scratch/comments"
# comment_data = scraper.scrape_comments(url)
driver.get(url)
info = {
    "title": "",
    "fork_count": 0,
    "comments_count":0
}

random_sleep(1, 0.3)

try:
    # Try to get title
    title_elem = driver.find_element(By.XPATH, "//h1")
    info["title"] = title_elem.text.strip()
except NoSuchElementException:
    print("Could not find notebook title.")
    
try:
    # Try to get fork count
    fork_elem = driver.find_element(By.XPATH, "//span[contains(@aria-label, 'copies')]")
    fork_text = fork_elem.text
    fork_match = re.search(r"(\d+)", fork_text)
    info["fork_count"] = int(fork_match.group(1)) if fork_match else 0
except NoSuchElementException:
    print("Could not find fork count.")
    
try:
    # try to get comments count
    comment_elem = driver.find_element(By.XPATH, "//h2[contains(text(), 'Comments')]")
    comment_text = comment_elem.text
    comment_match = re.search(r"(\d+)\s*Comments", comment_text)
    info["comments_count"] = int(comment_match.group(1)) if comment_match else 0
except NoSuchElementException:
    print("Could not find comments count.")

In [15]:
info

{'title': 'Linear model and neural net from scratch',
 'fork_count': 4691,
 'comments_count': 54}

# Scrap comments content

In [2]:
driver = setup_driver()
url = "https://www.kaggle.com/code/jhoward/linear-model-and-neural-net-from-scratch/comments"
# comment_data = scraper.scrape_comments(url)
driver.get(url)
info = {
    "title": "",
    "fork_count": 0,
    "comments_count":0
}

random_sleep(1, 0.3)

In [3]:
collected = []
last_count = 0
scroll_attempts = 0
max_attempts = 3  # Maxim

In [4]:
# Wait for comments to load
WebDriverWait(driver, 5).until(
    EC.presence_of_all_elements_located((By.XPATH, "//div[contains(@data-testid, 'discussions-comment')]"))
)

# Random wait to ensure the page has loaded properly
random_sleep(1, 0.5)

# Get all comments
comment_elements = driver.find_elements(By.XPATH, "//div[contains(@data-testid, 'discussions-comment')]")


In [None]:
def _extract_comment_data(self, comment_element):
    """Extract data from a single comment element."""
    comment_data = {
    "content": "",
    "user": "",
    "user_url": "",
    "date": "",
    "votes": 0
    }

    try:
        # Get user info using aria-label containing 'profile'
        user_elem = comment_element.find_element(By.XPATH, ".//a[contains(@aria-label, \"'s profile\")]")
        comment_data["user"] = user_elem.get_attribute("aria-label").replace("'s profile", "").strip()
        comment_data["user_url"] = user_elem.get_attribute("href")
    except NoSuchElementException:
        comment_data["user"] = "Anonymous or Deleted User"

    try:
        # Get date from element that contains "Posted" and a <span> with title
        date_elem = comment_element.find_element(By.XPATH, ".//p[contains(text(), 'Posted')]/span[@title]")
        comment_data["date"] = date_elem.get_attribute("title")
    except NoSuchElementException:
        pass

    try:
        # Get votes from button with aria-label containing 'votes'
        vote_elem = comment_element.find_element(By.XPATH, ".//button[contains(@aria-label, 'votes')]")
        vote_text = vote_elem.get_attribute("aria-label")
        vote_match = re.search(r"(\d+)\s*votes?", vote_text)
        comment_data["votes"] = int(vote_match.group(1)) if vote_match else 0
    except NoSuchElementException:
        pass

    try:
        # Find the reply button
        reply_elem = comment_element.find_element(By.XPATH, ".//span[text()='reply']")
        all_children = comment_element.find_elements(By.XPATH, ".//*")

        content_parts = []
        for elem in all_children:
            if elem == reply_elem:
                break  # Stop once we reach the reply button
            tag_name = elem.tag_name.lower()
            class_attr = elem.get_attribute("class") or ""

            if tag_name == "p" or "uc-code-block" in class_attr:
                text = elem.text.strip()
                if text:
                    content_parts.append(text)

        comment_data["content"] = "\n".join(content_parts)
    except NoSuchElementException:
        pass

    return comment_data


<div data-testid="discussions-comment" class="sc-hifXeo fEcaKb"><div id="2442279" class="sc-jSOf hNXJXP"><div class="sc-hdBiUU jQlFUD"><div class="sc-fNaani hYZtOz"><a href="/mendhak" class="sc-ehIYnC jNnqAS sc-fdUNyB freEyA" aria-label="mendhak's profile"><div data-testid="avatar-image" title="mendhak" class="sc-iDAWOb kjKwxb" style="background-image: url(&quot;https://storage.googleapis.com/kaggle-avatars/thumbnails/default-thumb.png&quot;);"></div><svg width="48" height="48" viewBox="0 0 48 48"><circle r="22.5" cx="24" cy="24" fill="none" stroke-width="3" style="stroke: rgb(241, 243, 244);"></circle><path d="M 45.39877161664096 17.047117626563683 A 22.5 22.5 0 0 0 24 1.5" fill="none" stroke-width="3" style="stroke: rgb(31, 166, 65);"></path></svg></a><div class="sc-chAcSA iLlRjp"><div class="sc-cdUTjK hVvTQB"><div class="sc-dIOQHv ceKzfG"><div class="sc-Mugbu kinlRl"><a href="/mendhak" target="_blank" class="sc-iJuKTj grUMoV"><h3 class="sc-eOzmre sc-gknnfs iyFMKB bpLtdP">mendhak</h3

In [10]:
comment_element = comment_elements[0]

In [13]:
comment_data = {
    "content": "",
    "user": "",
    "user_url": "",
    "date": "",
    "votes": 0
}

try:
    # Get user info using aria-label containing 'profile'
    user_elem = comment_element.find_element(By.XPATH, ".//a[contains(@aria-label, \"'s profile\")]")
    comment_data["user"] = user_elem.get_attribute("aria-label").replace("'s profile", "").strip()
    comment_data["user_url"] = user_elem.get_attribute("href")
except NoSuchElementException:
    comment_data["user"] = "Anonymous or Deleted User"

try:
    # Get date from element that contains "Posted" and a <span> with title
    date_elem = comment_element.find_element(By.XPATH, ".//p[contains(text(), 'Posted')]/span[@title]")
    comment_data["date"] = date_elem.get_attribute("title")
except NoSuchElementException:
    pass

try:
    # Get votes from button with aria-label containing 'votes'
    vote_elem = comment_element.find_element(By.XPATH, ".//button[contains(@aria-label, 'votes')]")
    vote_text = vote_elem.get_attribute("aria-label")
    vote_match = re.search(r"(\d+)\s*votes?", vote_text)
    comment_data["votes"] = int(vote_match.group(1)) if vote_match else 0
except NoSuchElementException:
    pass

try:
    # Find the reply button
    reply_elem = comment_element.find_element(By.XPATH, ".//span[text()='reply']")
    all_children = comment_element.find_elements(By.XPATH, ".//*")

    content_parts = []
    for elem in all_children:
        if elem == reply_elem:
            break  # Stop once we reach the reply button
        tag_name = elem.tag_name.lower()
        class_attr = elem.get_attribute("class") or ""

        if tag_name == "p" or "uc-code-block" in class_attr:
            text = elem.text.strip()
            if text:
                content_parts.append(text)

    comment_data["content"] = "\n".join(content_parts)
except NoSuchElementException:
    pass


In [14]:
comment_data

{'content': "Posted 2 years ago\n· Posted on Version 13 of 13\nDespite copying cell for cell, on the independent colums line, I get this error below. Any ideas what could be going wrong? I'm running this in a local Jupyter Notebook.\nindep_cols = ['Age', 'SibSp', 'Parch', 'LogFare'] + added_cols\n\nt_indep = tensor(df[indep_cols].values, dtype=torch.float)\nt_indep\nProduces:\n---------------------------------------------------------------------------\nTypeError                                 Traceback (most recent call last)\nCell In[18], line 3\n      1 indep_cols = ['Age', 'SibSp', 'Parch', 'LogFare'] + added_cols\n----> 3 t_indep = tensor(df[indep_cols].values, dtype=torch.float)\n      4 t_indep\n\nTypeError: can't convert np.ndarray of type numpy.object_. The only supported types are: float64, float32, float16, complex64, complex128, int64, int32, int16, int8, uint8, and bool.",
 'user': 'mendhak',
 'user_url': 'https://www.kaggle.com/mendhak',
 'date': 'Sat Sep 16 2023 14:09:00

In [None]:

while True:
    # Wait for comments to load
    WebDriverWait(driver, 5).until(
        EC.presence_of_all_elements_located((By.XPATH, "//div[contains(@data-testid, 'discussions-comment')]"))
    )
    
    # Random wait to ensure the page has loaded properly
    random_sleep(1, 0.5)
    
    # Get all comments
    comment_elements = driver.find_elements(By.XPATH, "//div[contains(@data-testid, 'discussions-comment')]")
    
    # Process new comments
    for i in range(last_count, len(comment_elements)):
        try:
            comment_data = _extract_comment_data(comment_elements[i])
            collected.append(comment_data)
            print_progress(len(collected), expected_total, prefix='Comments collected: ')
        except Exception as e:
            print(f"\nError extracting comment: {str(e)}")
    
    # Check if we've reached our expected total
    if len(collected) >= expected_total:
        print(f"\nReached expected comment count. Total collected: {len(collected)}")
        break
        
    # Check if we found new comments during this iteration
    if len(comment_elements) > last_count:
        last_count = len(comment_elements)
        scroll_attempts = 0  # Reset scroll attempts counter
        
        # Scroll to the last comment to load more
        if comment_elements:
            self.driver.execute_script(
                "arguments[0].scrollIntoView({behavior: 'smooth', block: 'center'});",
                comment_elements[-1]
            )
            
        # Also scroll a bit more to trigger loading
        driver.execute_script("window.scrollBy(0, 500);")
        random_sleep(2, 0.5)
    else:
        # If no new comments were found in this iteration
        scroll_attempts += 1
        if scroll_attempts >= max_attempts:
            print(f"\nNo more comments found after {max_attempts} scroll attempts.")
            print(f"Expected: {expected_total}, Collected: {len(collected)}")
            break
            
        print(f"\nNo new comments found. Scrolling more... (attempt {scroll_attempts}/{max_attempts})")
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        random_sleep(3, 0.5)


In [None]:
class KaggleCommentsScraper:
    def __init__(self, driver):
        self.driver = driver
        
    def scrape_comments(self, url):
        """Scrape all comments from a Kaggle notebook comments page."""
        print(f"Scraping comments from: {url}")
        self.driver.get(url)
        
        # Wait for page load
        WebDriverWait(self.driver, 15).until(
            EC.presence_of_element_located((By.TAG_NAME, "body"))
        )
        random_sleep(1, 0.3)
        # Get notebook metadata (title, author, forks)
        notebook_info = self._extract_notebook_info()
        
        # Check if there are comments
        if notebook_info["comments_count"] == 0:
            print("No comments found on this page.")
            return {
                "notebook_url": url,
                "notebook_info": notebook_info,
                "comments": []}

        # Scroll to load all comments
        comments_collected = self._collect_all_comments(notebook_info["comments_count"])
        
        result = {
            "notebook_info": notebook_info,
            "comments": comments_collected
        }
        
        return result
    
    def _extract_notebook_info(self):
        """Extract metadata about the notebook itself."""
        self.driver.get(url)
        info = {
            "title": "",
            "fork_count": 0,
            "comments_count":0
        }

        random_sleep(1, 0.3)

        try:
            # Try to get title
            title_elem = self.driver.find_element(By.XPATH, "//h1")
            info["title"] = title_elem.text.strip()
        except NoSuchElementException:
            print("Could not find notebook title.")
            
        try:
            # Try to get fork count
            fork_elem = self.driver.find_element(By.XPATH, "//span[contains(@aria-label, 'copies')]")
            fork_text = fork_elem.text
            fork_match = re.search(r"(\d+)", fork_text)
            info["fork_count"] = int(fork_match.group(1)) if fork_match else 0
        except NoSuchElementException:
            print("Could not find fork count.")
            
        try:
            # try to get comments count
            comment_elem = self.driver.find_element(By.XPATH, "//h2[contains(text(), 'Comments')]")
            comment_text = comment_elem.text
            comment_match = re.search(r"(\d+)\s*Comments", comment_text)
            info["comments_count"] = int(comment_match.group(1)) if comment_match else 0
        except NoSuchElementException:
            print("Could not find comments count.")
            
        return info
    
    def _collect_all_comments(self, expected_total):
        """Scroll and collect all comments."""
        collected = []
        last_count = 0
        scroll_attempts = 0
        max_attempts = 3  # Maximum number of attempts if no new comments are found
        
        while True:
            # Wait for comments to load
            WebDriverWait(self.driver, 5).until(
                EC.presence_of_all_elements_located((By.XPATH, "//div[contains(@data-testid, 'discussions-comment')]"))
            )
            
            # Random wait to ensure the page has loaded properly
            random_sleep(1, 0.5)
            
            # Get all comments
            comment_elements = self.driver.find_elements(By.XPATH, "//div[contains(@data-testid, 'discussions-comment')]")
            
            # Process new comments
            for i in range(last_count, len(comment_elements)):
                try:
                    comment_data = self._extract_comment_data(comment_elements[i])
                    collected.append(comment_data)
                    print_progress(len(collected), expected_total, prefix='Comments collected: ')
                except Exception as e:
                    print(f"\nError extracting comment: {str(e)}")
            
            # Check if we've reached our expected total
            if len(collected) >= expected_total:
                print(f"\nReached expected comment count. Total collected: {len(collected)}")
                break
                
            # Check if we found new comments during this iteration
            if len(comment_elements) > last_count:
                last_count = len(comment_elements)
                scroll_attempts = 0  # Reset scroll attempts counter
                
                # Scroll to the last comment to load more
                if comment_elements:
                    self.driver.execute_script(
                        "arguments[0].scrollIntoView({behavior: 'smooth', block: 'center'});",
                        comment_elements[-1]
                    )
                    
                # Also scroll a bit more to trigger loading
                self.driver.execute_script("window.scrollBy(0, 500);")
                random_sleep(2, 0.5)
            else:
                # If no new comments were found in this iteration
                scroll_attempts += 1
                if scroll_attempts >= max_attempts:
                    print(f"\nNo more comments found after {max_attempts} scroll attempts.")
                    print(f"Expected: {expected_total}, Collected: {len(collected)}")
                    break
                    
                print(f"\nNo new comments found. Scrolling more... (attempt {scroll_attempts}/{max_attempts})")
                self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
                random_sleep(3, 0.5)
        
        return collected
    
    def _extract_comment_data(self, comment_element):
        """Extract data from a single comment element."""
        comment_data = {
            "content": "",
            "user": "",
            "user_url": "",
            "date": "",
            "votes": 0
        }
        
        try:
            # Get user info
            user_elem = comment_element.find_element(By.XPATH, ".//a[contains(@href, '/profile/')]")
            comment_data["user"] = user_elem.text.strip()
            comment_data["user_url"] = user_elem.get_attribute("href")
        except NoSuchElementException:
            comment_data["user"] = "Anonymous or Deleted User"
        
        try:
            # Get date
            date_elem = comment_element.find_element(By.XPATH, ".//span[contains(@title, '-') or contains(@title, '/')]")
            comment_data["date"] = date_elem.get_attribute("title")
        except NoSuchElementException:
            pass
            
        try:
            # Get votes
            vote_elem = comment_element.find_element(By.XPATH, ".//button[contains(@aria-label, 'votes')]")
            vote_text = vote_elem.get_attribute("aria-label")
            vote_match = re.search(r"(\d+)\s*votes?", vote_text)
            comment_data["votes"] = int(vote_match.group(1)) if vote_match else 0
        except NoSuchElementException:
            pass
            
        try:
            # Get comment content
            content_elem = comment_element.find_element(By.XPATH, ".//div[contains(@class, 'comment-content')]")
            comment_data["content"] = content_elem.text.strip()
        except NoSuchElementException:
            pass
            
        return comment_data