In [14]:
from igdb_puller import pull_games_and_dependents, pull_tables_as_globals
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import plotly.express as px
from pathlib import Path
import re

In [1]:
import os, csv, re, time
from pathlib import Path
from urllib.parse import urlparse, urlunparse
from bs4 import BeautifulSoup, NavigableString

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options as FxOptions
from selenium.webdriver.firefox.service import Service as FxService
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.firefox import GeckoDriverManager
from urllib.parse import urlparse, urlunparse

In [12]:
# cricinfo_batch_firefox.py
# ESPNcricinfo ball-by-ball scraper (Firefox)
# - Resolves proper commentary URL via legacy redirect (+ strips '/full-scorecard')
# - Uses YOUR absolute XPaths to select innings (1st/2nd) from the commentary page
# - Loads every over via stepwise scrolling (tuned)
# - Extracts ONLY commentary paragraphs (the "red-box" text)
# - Exposes run_batch_from_df(...) for notebook use + CLI for scripting

import os, csv, re, time
from pathlib import Path
from urllib.parse import urlparse, urlunparse
from bs4 import BeautifulSoup, NavigableString

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options as FxOptions
from selenium.webdriver.firefox.service import Service as FxService
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.firefox import GeckoDriverManager

# -------------------------
# GLOBAL CONFIG
# -------------------------
SCROLL_STEP_PX     = 100
SCROLL_SETTLE_SEC  = 1.5
GROWTH_PATIENCE    = 3
MAX_SCROLL_ROUNDS  = 1000

DEBUG = False  # flip True to print extra logs

def dbg(*args):
    if DEBUG:
        print("[DBG]", *args)

BALL_RE = re.compile(r"^\s*(\d+)\.(\d+)\s*$")

# -------------------------
# Consent / misc helpers
# -------------------------
def dismiss_consent_if_present(driver):
    """Best-effort click-through for consent banners; safe to no-op."""
    try:
        buttons = driver.find_elements(
            By.XPATH,
            "//button[contains(., 'Accept') or contains(., 'I Agree') or contains(., 'Continue') or contains(., 'Got it')]"
        )
        for b in buttons:
            if b.is_displayed() and b.is_enabled():
                driver.execute_script("arguments[0].click();", b)
                time.sleep(0.5)
                break
    except Exception:
        pass

# -------------------------
# URL resolution / normalization
# -------------------------
def _strip_full_scorecard(url: str) -> str:
    """Remove '/full-scorecard' segment from any URL path."""
    p = urlparse(url)
    path = p.path
    path = path.replace("/full-scorecard/ball-by-ball-commentary", "/ball-by-ball-commentary")
    path = path.replace("/full-scorecard/live-cricket-score", "/live-cricket-score")
    path = path.replace("/full-scorecard/", "/")
    if path.endswith("/full-scorecard"):
        path = path[: -len("/full-scorecard")]
    return urlunparse((p.scheme, p.netloc, path, p.params, p.query, p.fragment))

def resolve_commentary_url_via_redirect(match_id: str, driver) -> str:
    """
    Use ESPN legacy engine URL to get the modern, fully-slugged match URL,
    then normalize to /ball-by-ball-commentary and strip '/full-scorecard'.
    """
    engine_url = f"https://www.espncricinfo.com/ci/engine/match/{match_id}.html"
    driver.get(engine_url)
    dismiss_consent_if_present(driver)
    WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
    time.sleep(1.0)

    final_url = _strip_full_scorecard(driver.current_url)
    dbg("Resolved engine ->", final_url)

    if "/ball-by-ball-commentary" in final_url:
        return final_url

    if "/live-cricket-score" in final_url:
        return _strip_full_scorecard(final_url.replace("/live-cricket-score", "/ball-by-ball-commentary"))

    parsed = urlparse(final_url)
    if parsed.path.rstrip("/").endswith(str(match_id)):
        return _strip_full_scorecard(final_url.rstrip("/") + "/ball-by-ball-commentary")

    return _strip_full_scorecard(final_url.rstrip("/") + "/ball-by-ball-commentary")

# -------------------------
# Commentary parser (ONLY the "red-box" text)
# -------------------------
def extract_rows_from_html(html: str):
    soup = BeautifulSoup(html, "html.parser")
    rows, seen = [], set()

    def is_badge_text(s: str) -> bool:
        s = s.strip()
        return (
            s in {"•"} or
            re.fullmatch(r"\d+[a-z]*", s, flags=re.I) is not None or   # "1", "1lb", "2nb"
            s.lower() in {"lb", "nb", "wd"}
        )

    def find_ball_container(node):
        if isinstance(node, NavigableString):
            node = node.parent
        cur = node
        for _ in range(8):
            if cur is None:
                break
            try:
                if cur.find("p"):
                    return cur
            except Exception:
                pass
            cur = cur.parent
        return node

    # Iterate nodes that look like ball labels "18.3"
    for txt_node in soup.find_all(string=BALL_RE):
        label = txt_node.strip()
        m = BALL_RE.match(label)
        if not m:
            continue

        container = find_ball_container(txt_node)
        if container is None:
            continue

        try:
            ps = [p.get_text(" ", strip=True) for p in container.find_all("p", recursive=True)]
        except AttributeError:
            continue

        ps = [p for p in ps if p and not is_badge_text(p)]

        if not ps:
            full = container.get_text(" ", strip=True) or ""
            full = re.sub(r"^\s*"+re.escape(label)+r"\s*", "", full)
            if full and not is_badge_text(full):
                ps = [full]

        if not ps:
            continue

        text = " ".join(ps).strip()
        if not text:
            continue

        over_i, ball_i = int(m.group(1)), int(m.group(2))
        key = (label, text)
        if key in seen:
            continue
        seen.add(key)

        rows.append({
            "over": over_i,
            "ball_in_over": ball_i,
            "over_ball": label,
            "text": text,
        })

    rows.sort(key=lambda r: (r["over"], r["ball_in_over"]))
    return rows

# -------------------------
# Scroller (stepwise; tuned)
# -------------------------
def load_all_by_scrolling(driver,
                          step_px=SCROLL_STEP_PX,
                          settle_sec=SCROLL_SETTLE_SEC,
                          growth_patience=GROWTH_PATIENCE,
                          max_rounds=MAX_SCROLL_ROUNDS):

    def doc_heights():
        return driver.execute_script("return [window.pageYOffset, document.body.scrollHeight, window.innerHeight];")

    def scroll_by(px):
        driver.execute_script("window.scrollBy(0, arguments[0]);", px)

    def scroll_to_bottom_stepwise():
        prev_y = -1
        for _ in range(400):
            y, h, vh = doc_heights()
            if y >= h - vh - 2:
                break
            scroll_by(step_px)
            time.sleep(0.15)
            y2, _, _ = doc_heights()
            if y2 == prev_y:
                scroll_by(step_px)
                time.sleep(0.15)
                y2, _, _ = doc_heights()
            prev_y = y2

    def click_load_more_if_present():
        candidates = driver.find_elements(
            By.XPATH,
            "//button[contains(., 'Load more')] | //a[contains(., 'Load more')] | "
            "//button[@aria-label='Load more' or @data-testid='load-more'] | "
            "//a[@aria-label='Load more' or @data-testid='load-more']"
        )
        btn = next((el for el in candidates if el.is_displayed() and el.is_enabled()), None)
        if btn:
            try:
                driver.execute_script("arguments[0].click();", btn)
                return True
            except Exception:
                try:
                    btn.click()
                    return True
                except Exception:
                    return False
        return False

    def unique_ball_count():
        elems = driver.find_elements(By.XPATH, "//*[normalize-space(text())]")
        s = set()
        for e in elems:
            try:
                t = e.text.strip()
                if BALL_RE.match(t):
                    s.add(t)
            except Exception:
                pass
        return len(s)

    last_count = -1
    no_growth = 0

    for _ in range(max_rounds):
        scroll_to_bottom_stepwise()
        time.sleep(settle_sec)

        clicked = click_load_more_if_present()
        if clicked:
            time.sleep(settle_sec + 0.5)

        # bottom jiggle to re-trigger observers
        scroll_by(-200)
        time.sleep(0.15)
        scroll_by(400)
        time.sleep(settle_sec)

        cur = unique_ball_count()
        dbg("Scroll round -> unique balls:", cur)
        if cur <= last_count:
            no_growth += 1
        else:
            no_growth = 0
            last_count = cur

        if no_growth >= growth_patience:
            break

# -------------------------
# YOUR absolute XPaths for innings dropdown + selectors
# -------------------------
# Button that opens the innings dropdown:
XPATH_INNINGS_BUTTON = "/html/body/div[1]/section/section/div[5]/div/div/div[3]/div[1]/div[2]/div[1]/div[1]/div/div[2]/div/div"

# Menu item for innings i (1 = first innings, 2 = second innings)
def XPATH_INNINGS_ITEM(i:int) -> str:
    # As per your working snippet (menu rendered under body/div[3])
    return f"/html/body/div[3]/div/div/div/div/div/ul/li[{i}]/div/span"

def open_innings_menu(driver, timeout=10):
    WebDriverWait(driver, timeout).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
    btn = WebDriverWait(driver, timeout).until(
        EC.element_to_be_clickable((By.XPATH, XPATH_INNINGS_BUTTON))
    )
    driver.execute_script("arguments[0].click();", btn)

def select_innings_by_index(driver, idx:int, timeout=10, settle=1.0) -> str:
    """Click the innings dropdown and select the idx-th item. Returns the label text (e.g., 'SRH')."""
    open_innings_menu(driver, timeout=timeout)
    item = WebDriverWait(driver, timeout).until(
        EC.element_to_be_clickable((By.XPATH, XPATH_INNINGS_ITEM(idx)))
    )
    label = (item.text or "").strip()
    driver.execute_script("arguments[0].click();", item)
    time.sleep(settle)
    return label or f"Innings {idx}"

# -------------------------
# One-match scrape (uses resolver + your innings clicks)
# -------------------------
def scrape_match(series_id: str, match_id: str, driver, innings_indices=(1,)) -> list:
    """
    Scrape the specified innings indices (1=first, 2=second) for a match.
    Returns list of dicts:
      series_id, match_id, innings, over, ball_in_over, over_ball, text
    """
    url = resolve_commentary_url_via_redirect(match_id, driver)
    print(f"[i] Resolved commentary URL for {match_id} -> {url}")
    driver.get(url)
    dismiss_consent_if_present(driver)
    driver.maximize_window()
    WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
    time.sleep(1.0)

    rows_all = []
    for idx in innings_indices:
        try:
            innings_label = select_innings_by_index(driver, idx, timeout=10, settle=1.0)
            print(f"[i] Selected innings {idx} ({innings_label}) for {match_id}")
        except Exception as e:
            print(f"[!] Could not select innings {idx} for {match_id}: {e}")
            continue

        load_all_by_scrolling(driver)
        html = driver.page_source
        rows = extract_rows_from_html(html)
        for r in rows:
            r["innings"]   = innings_label or f"Innings {idx}"
            r["match_id"]  = match_id
            r["series_id"] = series_id
        rows_all.extend(rows)

    # De-dupe + sort
    seen = set()
    unique = []
    for r in rows_all:
        key = (r["innings"], r["over_ball"], r["text"])
        if key in seen: continue
        seen.add(key)
        unique.append(r)

    unique.sort(key=lambda r: (r["over"], r["ball_in_over"]))
    print(f"[i] {match_id}: extracted {len(unique)} rows")
    return unique

# -------------------------
# Batch entry point for notebooks
# -------------------------
def run_batch_from_df(df,
                      innings_indices=(1,2),
                      out_dir="commentary_csv",
                      agg_csv="commentary_all.csv",
                      resume=False,
                      headless=False,
                      debug=False):
    """
    df must contain columns: series_id, cricinfo_match_id
    innings_indices: tuple/list of which innings to scrape via your XPath logic (1 and/or 2)
    Writes per-match CSVs into out_dir/ and appends to agg_csv. Returns total rows appended.
    """
    import pandas as pd

    # normalize column names (case-insensitive)
    colmap = {}
    for c in df.columns:
        cl = c.lower()
        if cl in {"series_id", "cricinfo_match_id"}:
            colmap[cl] = c
    if set(colmap) != {"series_id", "cricinfo_match_id"}:
        raise ValueError("DataFrame must have columns: series_id, cricinfo_match_id")

    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    agg_path = Path(agg_csv)
    agg_exists = agg_path.exists()

    agg_f = open(agg_path, "a" if agg_exists else "w", newline="", encoding="utf-8")
    agg_writer = csv.DictWriter(agg_f,
        fieldnames=["series_id","match_id","innings","over","ball_in_over","over_ball","text"])
    if not agg_exists:
        agg_writer.writeheader()

    # one Firefox for the whole batch
    opts = FxOptions()
    if headless: opts.add_argument("-headless")
    service = FxService(GeckoDriverManager().install())
    driver = webdriver.Firefox(service=service, options=opts)

    global DEBUG
    old_debug = DEBUG
    DEBUG = bool(debug)

    total_rows = 0
    try:
        for _, row in df.iterrows():
            series_id = str(row[colmap["series_id"]]).strip()
            match_id  = str(row[colmap["cricinfo_match_id"]]).strip()
            if not series_id or not match_id:
                continue

            out_path = out_dir / f"{match_id}_commentary.csv"
            if resume and out_path.exists():
                print(f"[i] Skipping {match_id} (exists, resume=True)")
                # keep agg in sync
                with open(out_path, "r", encoding="utf-8") as rf:
                    for r in csv.DictReader(rf):
                        agg_writer.writerow(r)
                agg_f.flush()
                continue

            print(f"[i] Scraping {match_id} …")
            try:
                rows_match = scrape_match(series_id, match_id, driver, innings_indices=innings_indices)
            except Exception as e:
                print(f"[!] Failed {match_id}: {e}")
                continue

            with open(out_path, "w", newline="", encoding="utf-8") as mf:
                mw = csv.DictWriter(mf,
                    fieldnames=["series_id","match_id","innings","over","ball_in_over","over_ball","text"])
                mw.writeheader()
                for r in rows_match:
                    mw.writerow(r)
            print(f"[i] Wrote {out_path} ({len(rows_match)} rows)")

            for r in rows_match:
                agg_writer.writerow(r)
            agg_f.flush()

            total_rows += len(rows_match)
    finally:
        driver.quit()
        agg_f.close()
        DEBUG = old_debug

    print(f"[i] Done. Aggregate at: {agg_path} (+{total_rows} rows this run)")
    return total_rows

# # -------------------------
# # CLI
# # -------------------------
# if __name__ == "__main__":
#     import argparse, pandas as pd
#     ap = argparse.ArgumentParser(description="Batch scrape ESPNcricinfo ball-by-ball commentary (Firefox)")
#     ap.add_argument("--index", default="match_index_ipl.csv", help="CSV with columns: series_id, cricinfo_match_id")
#     ap.add_argument("--outdir", default="commentary_csv")
#     ap.add_argument("--agg", default="commentary_all.csv")
#     ap.add_argument("--innings", default="1,2", help="Comma-separated innings indices to scrape, e.g. '1' or '1,2'")
#     ap.add_argument("--resume", action="store_true")
#     ap.add_argument("--headless", action="store_true")
#     ap.add_argument("--debug", action="store_true")
#     args = ap.parse_args()

#     innings_indices = tuple(int(x.strip()) for x in args.innings.split(",") if x.strip())

#     df_idx = pd.read_csv(args.index)
#     run_batch_from_df(
#         df_idx,
#         innings_indices=innings_indices,
#         out_dir=args.outdir,
#         agg_csv=args.agg,
#         resume=args.resume,
#         headless=args.headless,
#         debug=args.debug,
#     )


In [15]:
data = {'cricinfo_match_id': [1082591],
        'comp': ['IPL'],
        'season': [2017],
        'series_id': [1078425.0]}

match_index = pd.DataFrame(data)
display(match_index.head())

Unnamed: 0,cricinfo_match_id,comp,season,series_id
0,1082591,IPL,2017,1078425.0


In [None]:
#df_idx = pd.read_csv("match_index_ipl.csv")  # columns: series_id, cricinfo_match_id
run_batch_from_df(
    match_index,
    innings_indices=(1,2),   # or (1,) to scrape only the first innings with your exact clicks
    out_dir="commentary_csv",
    agg_csv="commentary_all.csv",
    resume=True,
    headless=False,
    debug=False
)

In [None]:
def dismiss_consent_if_present(driver):
    """Best-effort click-through for consent banners; safe to no-op."""
    try:
        buttons = driver.find_elements(
            By.XPATH,
            "//button[contains(., 'Accept') or contains(., 'I Agree') or contains(., 'Continue') or contains(., 'Got it')]"
        )
        for b in buttons:
            if b.is_displayed() and b.is_enabled():
                driver.execute_script("arguments[0].click();", b)
                time.sleep(0.5)
                break
    except Exception:
        pass

In [None]:
opts = FxOptions()
#if headless: opts.add_argument("-headless")
service = FxService(GeckoDriverManager().install())
driver = webdriver.Firefox(service=service, options=opts)
url = "https://www.espncricinfo.com/series/ipl-2017-1078425/sunrisers-hyderabad-vs-royal-challengers-bangalore-1st-match-1082591/ball-by-ball-commentary"
driver.get(url)
driver.maximize_window()
b="/html/body/div[1]/section/section/div[5]/div/div/div[3]/div[1]/div[2]/div[1]/div[1]/div/div[2]/div/div"
#dismiss_consent_if_present(driver)
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
time.sleep(1.0)
driver.find_element(By.XPATH, value=b).click()
bb="/html/body/div[3]/div/div/div/div/div/ul/li[1]/div/span" 
driver.find_element(By.XPATH, value=bb).click()
time.sleep(5)
driver.quit()

NoSuchElementException: Message: Unable to locate element: /html/body/div[3]/div/div/div/div/div/ul/li[1]/div/span; For documentation on this error, please visit: https://www.selenium.dev/documentation/webdriver/troubleshooting/errors#no-such-element-exception
Stacktrace:
RemoteError@chrome://remote/content/shared/RemoteError.sys.mjs:8:8
WebDriverError@chrome://remote/content/shared/webdriver/Errors.sys.mjs:202:5
NoSuchElementError@chrome://remote/content/shared/webdriver/Errors.sys.mjs:555:5
dom.find/</<@chrome://remote/content/shared/DOM.sys.mjs:136:16


In [2]:
pull_games_and_dependents(max_games=500, include_second_order=True)

Created df_games                            from 'games'  shape=(500, 56)
Created df_game_time_to_beats               from 'game_time_to_beats'  shape=(219, 9)
Created df_popularity_primitives            from 'popularity_primitives'  shape=(2439, 9)
Created df_artworks                         from 'artworks'  shape=(858, 10)
Created df_covers                           from 'covers'  shape=(500, 9)
Created df_external_games                   from 'external_games'  shape=(1904, 15)
Created df_game_localizations               from 'game_localizations'  shape=(282, 8)
Created df_involved_companies               from 'involved_companies'  shape=(1566, 10)
Created df_language_supports                from 'language_supports'  shape=(2541, 7)
Created df_multiplayer_modes                from 'multiplayer_modes'  shape=(139, 14)
Created df_release_dates                    from 'release_dates'  shape=(2679, 16)
Created df_screenshots                      from 'screenshots'  shape=(2910, 9)
Create

In [3]:
for k, v in sorted((k, v) for k, v in globals().items() if k.startswith('df_') and isinstance(v, pd.DataFrame)):
    print(k, v.shape)

df_age_ratings (1438, 9)
df_artworks (858, 10)
df_covers (500, 9)
df_external_games (1904, 15)
df_franchises (81, 8)
df_game_localizations (282, 8)
df_game_modes (5, 7)
df_game_time_to_beats (219, 9)
df_game_videos (907, 5)
df_games (500, 56)
df_genres (21, 7)
df_involved_companies (1566, 10)
df_keywords (1862, 7)
df_language_supports (2541, 7)
df_multiplayer_modes (139, 14)
df_platforms (80, 17)
df_player_perspectives (6, 7)
df_popularity_primitives (2439, 9)
df_release_dates (2679, 16)
df_screenshots (2910, 9)
df_websites (2483, 6)
