In [None]:
# =====================================================
# Pegos Twitter Scraper (Enhanced for Accurate Counts)
# =====================================================
import os, time, random, traceback
import pandas as pd
from bs4 import BeautifulSoup
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager

def get_tweet_counts(art):
    """Yeni Twitter DOM'una g√∂re etkile≈üim sayƒ±larƒ± √ßeker."""
    counts = {"comment": 0, "retweet": 0, "like": 0, "see_count": 0}

    for label, key in [
        ("reply", "comment"),
        ("retweet", "retweet"),
        ("like", "like"),
        ("view", "see_count"),
    ]:
        try:
            tag = art.find(attrs={"data-testid": label})
            if not tag:
                continue
            val = tag.get_text(strip=True)
            if not val:
                continue
            if val.endswith("B"):
                val = float(val[:-1]) * 1000
            elif val.endswith("Mn") or val.endswith("M"):
                val = float(val[:-2]) * 1_000_000
            elif val.isdigit():
                val = float(val)
            else:
                val = 0
            counts[key] = int(val)
        except Exception:
            continue

    return counts

In [None]:
# --------------------------
# ENV deƒüi≈ükenleri (GitHub Secrets)
# --------------------------
AUTH_TOKEN = os.getenv("AUTH_TOKEN")
CT0 = os.getenv("CT0")
OUT_PATH = "/tmp/pegos_output.csv"

if not AUTH_TOKEN or not CT0:
    raise RuntimeError("AUTH_TOKEN veya CT0 tanƒ±mlƒ± deƒüil (GitHub Secrets kƒ±smƒ±na ekle).")

In [None]:
# --------------------------
# Chrome ba≈ülat (headless)
# --------------------------
opts = Options()
opts.add_argument("--headless=new")
opts.add_argument("--no-sandbox")
opts.add_argument("--disable-dev-shm-usage")
opts.add_argument("--disable-gpu")
opts.add_argument("--window-size=1920,1080")
opts.add_argument("--disable-blink-features=AutomationControlled")
opts.add_experimental_option("excludeSwitches", ["enable-automation"])
opts.add_experimental_option("useAutomationExtension", False)

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=opts)

In [None]:
# --------------------------
# Login with cookies
# --------------------------
driver.get("https://x.com")
time.sleep(3)
driver.add_cookie({"name": "auth_token", "value": AUTH_TOKEN, "domain": ".x.com"})
driver.add_cookie({"name": "ct0", "value": CT0, "domain": ".x.com"})
driver.refresh()
time.sleep(4)
print("‚úÖ Cookies set, current URL:", driver.current_url)

In [None]:
# --------------------------
# Tweet scraping (improved)
# --------------------------
KEYWORDS = ['blockchain', 'cryptocurrency', 'bitcoin', 'ethereum']
tweetArr = []

for kw in KEYWORDS:
    print(f"üîé Searching for: {kw}")
    driver.get(f"https://x.com/search?q={kw}&src=typed_query&f=live")
    time.sleep(5)

    for scroll_round in range(35):
        driver.execute_script("window.scrollBy(0, 1200);")
        time.sleep(random.uniform(1.5, 2.5))  # JS y√ºklenmesi i√ßin bekleme
        driver.execute_script("window.scrollBy(0, -200);")  # Lazy-load d√ºzeltme

        html = driver.page_source
        soup = BeautifulSoup(html, "html.parser")
        articles = soup.find_all("article") or []

        for art in articles:
            try:
                tag = art.find(attrs={"data-testid": "tweetText"})
                if not tag:
                    continue

                text = tag.get_text(" ", strip=True)
                time_tag = art.find("time")
                time_str = time_tag["datetime"] if time_tag else None

                counts = get_tweet_counts(art)

                tweetArr.append({
                    "keyword": kw,
                    "tweet": text,
                    "time": time_str,
                    **counts
                })

            except Exception as e:
                print("‚ö†Ô∏è Parse error:", e)

    print(f"‚úÖ Finished {kw}: total tweets so far {len(tweetArr)}")

driver.quit()

In [None]:
# --------------------------
# Save to CSV
# --------------------------
df = pd.DataFrame(tweetArr)
if df.empty:
    print("‚ö†Ô∏è No tweets collected.")
else:
    df.drop_duplicates(subset=['tweet', 'time'], inplace=True)
    df.to_csv(OUT_PATH, index=False)
    print(f"üíæ Saved to {OUT_PATH}, total {len(df)} rows.")