#  Install dependencies

In [None]:
%pip install -q selenium webdriver-manager langdetect pandas pyarrow tqdm

# 1) Project paths and utilities

In [None]:
# Common imports
import os, re, json, time, hashlib, unicodedata
from pathlib import Path
from datetime import datetime, timedelta
from typing import List, Dict, Any

import numpy as np
import pandas as pd

# Reproducibility 
SEED = 42
np.random.seed(SEED)

# Paths
PROJ = Path.cwd().resolve().parents[0] if Path.cwd().name == 'notebooks' else Path.cwd()
DATA = PROJ / "data"
RAW = DATA / "raw"
PROC = DATA / "processed"
MODELS = PROJ / "models"
REPORTS = PROJ / "reports"
FIGS = REPORTS / "figures"
for p in [RAW, PROC, MODELS, REPORTS, FIGS]:
    p.mkdir(parents=True, exist_ok=True)

def timestamp():
    return datetime.utcnow().strftime("%Y%m%d_%H%M%S")

def latest_file(folder: Path, pattern="*.parquet"):
    files = sorted(folder.glob(pattern), key=lambda p: p.stat().st_mtime)
    return files[-1] if files else None

RUN_ID = timestamp()
RUN_ID

# 2) Scraper

In [None]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import StaleElementReferenceException
from webdriver_manager.chrome import ChromeDriverManager

from langdetect import detect

def hash_username(username: str) -> str:
    """Create a consistent hash for a username."""
    return hashlib.sha256(username.encode()).hexdigest()[:16]

def detect_language(text: str) -> str:
    """Detect the language of the text, defaulting to 'en' if uncertain."""
    try:
        return detect(text)
    except:
        return 'en'

def parse_date(date_str: str) -> datetime:
    """Parse Nitter date format into datetime object."""
    now = datetime.now()
    try:
        if not date_str:
            return now

        # Handle the title attribute format (e.g. "Mar 23, 2025 · 5:15 PM UTC")
        if '·' in date_str:
            parts = date_str.split('·')
            if len(parts) >= 2:
                date_part = parts[0].strip()
                time_part = parts[1].strip().replace(' UTC', '')
                full_datetime_str = f"{date_part} {time_part}"
                return datetime.strptime(full_datetime_str, '%b %d, %Y %I:%M %p')

        if 'ago' in date_str:
            # Relative times
            if 'h ago' in date_str:
                hours = int(date_str.split('h')[0])
                return now - timedelta(hours=hours)
            elif 'm ago' in date_str:
                minutes = int(date_str.split('m')[0])
                return now - timedelta(minutes=minutes)
            elif 'd ago' in date_str:
                days = int(date_str.split('d')[0])
                return now - timedelta(days=days)
        else:
            # Absolute dates
            if ',' in date_str:  # "Dec 25, 2023"
                return datetime.strptime(date_str, '%b %d, %Y')
            else:  # "Dec 25"
                date = datetime.strptime(date_str, '%b %d')
                result = date.replace(year=now.year)
                if result > now:
                    result = result.replace(year=now.year - 1)
                return result
    except:
        return now  # Fallback to current time

def extract_tweet_id(url: str):
    """Extract tweet ID from the Nitter URL."""
    try:
        match = re.search(r'/status/(\d+)', url or '')
        return match.group(1) if match else None
    except:
        return None

def extract_urls(tweet_element) -> List[str]:
    """Extract URLs from tweet."""
    try:
        urls = []
        link_elements = tweet_element.find_elements(By.CSS_SELECTOR, '.tweet-content a')
        for link in link_elements:
            url = link.get_attribute('href')
            if url and not url.startswith('/'):  # Exclude internal Nitter links
                urls.append(url)
        return urls
    except:
        return []

def is_retweet(container) -> bool:
    """Check if the tweet is a retweet."""
    try:
        retweet_header = container.find_elements(By.CSS_SELECTOR, '.retweet-header')
        return len(retweet_header) > 0
    except:
        return False

def is_quote_tweet(container) -> bool:
    """Check if the tweet is a quote tweet."""
    try:
        quote_container = container.find_elements(By.CSS_SELECTOR, '.quote')
        return len(quote_container) > 0
    except:
        return False

def extract_engagement_stats(container) -> Dict[str, int]:
    """Extract retweet, like, and comment counts using CSS selectors."""
    try:
        stats = {'retweet_count': 0, 'like_count': 0, 'comment_count': 0}
        
        def parse_count(element):
            try:
                if element:
                    text = element.text or ""
                    text = text.strip().lower()
                    if not text:
                        return 0
                    # Convert abbreviated numbers (e.g., "1.2k")
                    if 'k' in text:
                        return int(float(text.replace('k', '')) * 1000)
                    if 'm' in text:
                        return int(float(text.replace('m', '')) * 1_000_000)
                    # Remove non-digits and convert
                    digits = ''.join(filter(str.isdigit, text))
                    return int(digits) if digits else 0
                return 0
            except:
                return 0
        
        try:
            comment_element = container.find_element(By.CSS_SELECTOR, '.icon-comment').find_element(By.XPATH, '..')
            stats['comment_count'] = parse_count(comment_element)
        except Exception:
            pass
            
        try:
            retweet_element = container.find_element(By.CSS_SELECTOR, '.icon-retweet').find_element(By.XPATH, '..')
            stats['retweet_count'] = parse_count(retweet_element)
        except Exception:
            pass
            
        try:
            like_element = container.find_element(By.CSS_SELECTOR, '.icon-heart').find_element(By.XPATH, '..')
            stats['like_count'] = parse_count(like_element)
        except Exception:
            pass
                
        return stats
    except Exception:
        return {'retweet_count': 0, 'like_count': 0, 'comment_count': 0}

def _normalize_host(nitter_host: str) -> str:
    nh = (nitter_host or "https://nitter.net").strip().rstrip('/')
    if not nh.startswith("http"):
        nh = "https://" + nh
    return nh

def scrape_x_posts(
    username: str,
    num_scrolls: int = 5,
    tweet_type: str = 'original',
    nitter_host: str = "https://nitter.net",
    headless: bool = True,
    user_agent: str = None,
    initial_wait_s: float = 5.0
) -> List[Dict[str, Any]]:
    """Scrapes posts from Nitter with enhanced data collection.
    
    Args:
        username (str): The Twitter username to scrape
        num_scrolls (int): Number of times to scroll/load more tweets
        tweet_type (str): 'original' | 'original_and_quotes' | 'all'
        nitter_host (str): Base Nitter host, e.g., "https://nitter.net"
        headless (bool): Run Chrome in headless mode
        user_agent (str): Optional custom user-agent
        initial_wait_s (float): Initial wait to let page settle
    """
    nitter_host = _normalize_host(nitter_host)
    
    os.environ["WDM_LOG"] = "0"  # silence webdriver-manager
    options = webdriver.ChromeOptions()
    if headless:
        # modern headless
        options.add_argument("--headless=new")
    options.add_argument("--disable-gpu")
    options.add_argument("--no-sandbox")
    options.add_argument("--disable-dev-shm-usage")
    options.add_argument("--window-size=1920,1080")
    options.add_argument("--start-maximized")
    options.add_argument("--lang=en-US,en")
    if user_agent:
        options.add_argument(f"--user-agent={user_agent}")
    
    driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
    wait = WebDriverWait(driver, 12)
    
    try:
        profile_url = f"{nitter_host}/{username}"
        driver.get(profile_url)
        time.sleep(initial_wait_s)  # Initial load
        
        posts = []
        scroll_count = 0
        user_id_hashed = hash_username(username)
        seen_ids = set()  # Track seen tweet IDs
        
        while scroll_count < num_scrolls:
            try:
                tweet_containers = wait.until(
                    EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.timeline-item'))
                )

                for container in tweet_containers:
                    try:
                        # Filter by tweet type
                        is_rt = is_retweet(container)
                        is_quote = is_quote_tweet(container)
                        if tweet_type == 'original' and (is_rt or is_quote):
                            continue
                        elif tweet_type == 'original_and_quotes' and is_rt:
                            continue
                        # 'all' includes everything
                            
                        # Identify tweet URL and ID
                        date_element = container.find_element(By.CSS_SELECTOR, '.tweet-date a')
                        tweet_url = date_element.get_attribute('href')
                        tweet_id = extract_tweet_id(tweet_url)
                        if not tweet_id or tweet_id in seen_ids:
                            continue
                        seen_ids.add(tweet_id)
                            
                        # Content
                        tweet_element = container.find_element(By.CSS_SELECTOR, '.tweet-content')
                        tweet_text = tweet_element.text.strip()
                        
                        # Prefer the title attribute for precise datetime
                        date_text = date_element.get_attribute('title') or date_element.text.strip()
                        parsed_date = parse_date(date_text)
                        
                        # Engagement and metadata
                        stats = extract_engagement_stats(container)
                        is_reply = bool(container.find_elements(By.CSS_SELECTOR, '.replying-to'))
                        urls = extract_urls(tweet_element)
                        
                        tweet_data = {
                            'tweet_id': tweet_id,
                            'username': username,
                            'tweet_url': tweet_url,
                            'text': tweet_text,
                            'created_at': parsed_date.strftime('%Y-%m-%d %H:%M:%S'),
                            'lang': detect_language(tweet_text),
                            'user_id_hashed': user_id_hashed,
                            'retweet_count': stats['retweet_count'],
                            'like_count': stats['like_count'],
                            'comment_count': stats['comment_count'],
                            'is_reply': is_reply,
                            'is_retweet': is_rt,
                            'is_quote': is_quote,
                            'urls': urls
                        }
                        posts.append(tweet_data)
                            
                    except StaleElementReferenceException:
                        continue
                    except Exception:
                        continue
                
                # Pagination
                try:
                    load_more = driver.find_element(By.XPATH, "//a[contains(text(), 'Load more')]")
                    driver.execute_script("arguments[0].scrollIntoView();", load_more)
                    time.sleep(0.8)
                    load_more.click()
                    time.sleep(2.5)
                except Exception:
                    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
                    time.sleep(2.5)
                finally:
                    scroll_count += 1

            except Exception as e:
                print(f"Error while scrolling: {str(e)}")
                scroll_count += 1
                continue

        # Sort posts by date descending
        posts.sort(key=lambda x: datetime.strptime(x['created_at'], '%Y-%m-%d %H:%M:%S'), reverse=True)
        return posts

    finally:
        driver.quit()

# 3) Parameters

In [None]:
from pathlib import Path

# Preferred Nitter host
NITTER_HOST = "https://nitter.net"  # "https://nitter.lacontrevoi.fr"

# What to collect: 'original' | 'original_and_quotes' | 'all'
TWEET_TYPE = "original"

# Scrolling/pagination (per user)
NUM_SCROLLS = 100

# Browser settings
HEADLESS = True
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
INITIAL_WAIT_S = 5.0

# Users list: read from sources/usernames.txt if present, else fallback
usernames_file = PROJ / "sources" / "usernames.txt"
if usernames_file.exists():
    USERNAMES = [ln.strip().lstrip("@") for ln in usernames_file.read_text(encoding="utf-8").splitlines() if ln.strip()]
else:
    USERNAMES = ["ilyasut"] 

USERNAMES

# 4) Run scraping

In [None]:
from tqdm import tqdm

all_posts = []
errors = []

start_ts = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")

for u in tqdm(USERNAMES, desc="Scraping users"):
    try:
        posts = scrape_x_posts(
            username=u,
            num_scrolls=NUM_SCROLLS,
            tweet_type=TWEET_TYPE,
            nitter_host=NITTER_HOST,
            headless=HEADLESS,
            user_agent=USER_AGENT,
            initial_wait_s=INITIAL_WAIT_S
        )
        # Tag source host and normalize type
        for p in posts:
            p["nitter_host"] = NITTER_HOST
            p["type"] = "retweet" if p.get("is_retweet") else ("quote" if p.get("is_quote") else "original")
        all_posts.extend(posts)
        print(f"Collected {len(posts):,} posts from @{u}")
    except Exception as e:
        errors.append({"username": u, "error": str(e)})
        print(f"Error with @{u}: {e}")

len(all_posts), len(errors)

# 5) Convert

In [None]:
if not all_posts:
    raise RuntimeError("No posts collected. Try reducing NUM_SCROLLS, switching NITTER_HOST, or disabling headless to debug.")

df = pd.DataFrame(all_posts)

# Types
if "created_at" in df.columns:
    df["created_at"] = pd.to_datetime(df["created_at"], errors="coerce")

# Dedupe on tweet_id (global uniqueness); fall back to username + text if needed
before = len(df)
if "tweet_id" in df.columns:
    df = df.drop_duplicates(subset=["tweet_id"])
else:
    df = df.drop_duplicates(subset=["username", "text", "created_at"])
after = len(df)
removed = before - after

# Sort newest first
if "created_at" in df.columns:
    df = df.sort_values("created_at", ascending=False)

print(f"Rows before dedupe: {before:,}, after: {after:,} (removed {removed:,})")
display(df.head(3))

# 6) Save 

In [None]:
ts = RUN_ID
raw_path = RAW / f"x_posts_{ts}.parquet"
csv_path = RAW / f"x_posts_{ts}.csv"  # optional CSV for quick look
manifest_path = RAW / f"manifest_{ts}.json"

# Save Parquet (pyarrow handles list-typed 'urls')
df.to_parquet(raw_path, index=False)

# Also CSV (serialize lists to JSON strings)
df_csv = df.copy()
if "urls" in df_csv.columns:
    df_csv["urls"] = df_csv["urls"].apply(lambda v: json.dumps(v) if isinstance(v, (list, tuple)) else v)
df_csv.to_csv(csv_path, index=False, encoding="utf-8")

manifest = {
    "run_id": ts,
    "paths": {
        "parquet": str(raw_path.relative_to(PROJ)),
        "csv": str(csv_path.relative_to(PROJ)),
    },
    "rows": int(len(df)),
    "columns": df.columns.tolist(),
    "users": USERNAMES,
    "users_count": len(USERNAMES),
    "params": {
        "nitter_host": NITTER_HOST,
        "tweet_type": TWEET_TYPE,
        "num_scrolls": NUM_SCROLLS,
        "headless": HEADLESS,
        "user_agent": USER_AGENT,
    },
    "collected_at_utc": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
    "min_created_at": (df["created_at"].min().strftime("%Y-%m-%d %H:%M:%S") if "created_at" in df else None),
    "max_created_at": (df["created_at"].max().strftime("%Y-%m-%d %H:%M:%S") if "created_at" in df else None),
    "deduplicated": {
        "removed": removed,
        "key": "tweet_id" if "tweet_id" in df.columns else "username,text,created_at"
    },
    "errors": errors,
}

with open(manifest_path, "w", encoding="utf-8") as f:
    json.dump(manifest, f, indent=2)

print(f"Saved raw snapshot to:\n- {raw_path}\n- {csv_path}\nManifest: {manifest_path}")

# 7) Quick checks

In [None]:
print("By language (top 10):")
display(df["lang"].value_counts().head(10))

print("\nBy type:")
display(df["type"].value_counts())

if "like_count" in df.columns and "retweet_count" in df.columns:
    print("\nBasic engagement stats:")
    display(df[["like_count", "retweet_count", "comment_count"]].describe())