In [5]:
import json
import string
import time
from pathlib import Path
from typing import List, Dict, Tuple, Set

import pandas as pd
import requests
from playwright.sync_api import sync_playwright
from rich.console import Console
from rich.panel import Panel
from rich.table import Table

console = Console()

# Base dirs
DATA_DIR = Path("data")
RAW_DIR = DATA_DIR / "raw"
EXTERNAL_DIR = DATA_DIR / "external"
PROCESSED_DIR = DATA_DIR / "processed"

for d in [DATA_DIR, RAW_DIR, EXTERNAL_DIR, PROCESSED_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# Where we'll save slug inventories and panels
SLUGS_PATH = EXTERNAL_DIR / "bandsintown_artist_slugs.parquet"
ARTIST_PANEL_PATH = PROCESSED_DIR / "bandsintown_artist_panel.parquet"
EVENT_PANEL_PATH = PROCESSED_DIR / "bandsintown_event_panel.parquet"

# Bandsintown base
BANDSINTOWN_BASE = "https://www.bandsintown.com"
GRAPHQL_URL = "https://graphql.bandsintown.com/"

# ⚠️ Search API endpoint is a best guess; verify in DevTools Network tab.
BANDSINTOWN_SEARCH_URL = f"{BANDSINTOWN_BASE}/api/search"

# Simple UA to look less like a script
HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/120.0 Safari/537.36"
    )
}


In [6]:
import asyncio
from playwright.async_api import async_playwright

async def get_bandsintown_token_async() -> str:
    """
    Async version: Scrapes the JWT token from GraphQL requests.
    Works inside Jupyter.
    """

    console.print(Panel("[cyan]Capturing Bandsintown token (async)...[/cyan]"))

    token_box = {"value": None}

    async with async_playwright() as pw:
        browser = await pw.chromium.launch(
            headless=True,
            args=[
                "--no-sandbox",
                "--disable-setuid-sandbox",
                "--disable-dev-shm-usage",
                "--disable-gpu"
            ]
        )

        page = await browser.new_page()

        async def handle_request(request):
            auth = request.headers.get("authorization")
            if auth and "Bearer" in auth:
                token_box["value"] = auth.split("Bearer ")[-1]

        page.on("request", handle_request)

        await page.goto(BANDSINTOWN_BASE, wait_until="networkidle")
        await asyncio.sleep(3)

        await browser.close()

    token = token_box["value"]
    if not token:
        raise RuntimeError("Failed to extract Bandsintown token.")

    console.print("[green]Token acquired via async Playwright.[/green]")
    return token


In [7]:
async def extract_artist_slugs_from_page_async(page) -> set[str]:
    anchors = await page.query_selector_all("a[href^='/a/'], a[href^='/artists/']")

    slugs = set()
    for a in anchors:
        href = await a.get_attribute("href") or ""
        if not href.startswith("/"):
            continue

        parts = href.strip("/").split("/")
        candidate = parts[-1]

        if candidate and "-" in candidate:
            slugs.add(candidate.lower())

    return slugs


async def collect_slugs_from_urls_async(urls: list[str]) -> set[str]:
    console.print(
        Panel(
            f"[cyan]Collecting artist slugs async from {len(urls)} pages[/cyan]",
            border_style="cyan"
        )
    )

    all_slugs = set()

    async with async_playwright() as pw:
        browser = await pw.chromium.launch(headless=True)
        page = await browser.new_page()

        for url in urls:
            console.print(f"[blue]Visiting {url}[/blue]")
            await page.goto(url, wait_until="networkidle")
            await asyncio.sleep(2)

            slugs = await extract_artist_slugs_from_page_async(page)
            console.print(f"[green]Found {len(slugs)} slugs[/green]")

            all_slugs.update(slugs)

        await browser.close()

    console.print(f"[bold green]Total curated slugs: {len(all_slugs)}[/bold green]")
    return all_slugs


In [9]:
from playwright.async_api import async_playwright
from rich.console import Console
from rich.panel import Panel

console = Console()

CURATED_URLS = [
    "https://www.bandsintown.com/c/top-artists",
    "https://www.bandsintown.com/c/trending",
    "https://www.bandsintown.com/c/new-music",
    "https://www.bandsintown.com/c/top-sellers",
    "https://www.bandsintown.com/c/festivals",
    "https://www.bandsintown.com/c/hip-hop",
    "https://www.bandsintown.com/c/pop",
    "https://www.bandsintown.com/c/country",
    "https://www.bandsintown.com/c/rock",
    "https://www.bandsintown.com/c/edm",
]


async def collect_slugs(urls):
    console.print(Panel(f"[cyan]Collecting slugs from {len(urls)} pages[/cyan]"))

    slugs = set()

    async with async_playwright() as pw:
        browser = await pw.chromium.launch(
            headless=True,
            args=[
                "--no-sandbox",
                "--disable-setuid-sandbox",
                "--disable-dev-shm-usage",
                "--disable-gpu",
            ]
        )
        page = await browser.new_page()

        for url in urls:
            console.print(f"[blue]Visiting:[/blue] {url}")
            await page.goto(url, timeout=60000)

            # Wait for React hydration
            try:
                await page.wait_for_selector('a[href^="/a/"]', timeout=8000)
            except:
                console.print("[yellow]No direct artist links loaded: trying JS extraction[/yellow]")

            # Extract slugs using JS (bypasses shadow DOM issues)
            found = await page.evaluate("""
                () => {
                    const links = Array.from(document.querySelectorAll('a[href^="/a/"], a[href^="/e/"]'));
                    return links.map(a => a.getAttribute("href"));
                }
            """)
            
            cleaned = {s.strip("/") for s in found if s}
            console.print(f"[green]Found {len(cleaned)} slugs on page[/green]")
            slugs.update(cleaned)

        await browser.close()

    return slugs


In [10]:
async def scrape_artist_async(token: str, slug: str):
    # Same as before, but using aiohttp if you want async HTTP
    pass

In [11]:
def search_artists_by_prefix(prefix: str) -> Set[str]:
    """
    Use Bandsintown's search API to discover artist slugs by prefix.
    This is semi-undocumented; verify the endpoint & response structure.
    """
    params = {"query": prefix}
    try:
        resp = requests.get(BANDSINTOWN_SEARCH_URL, params=params, headers=HEADERS, timeout=10)
    except Exception as e:
        console.print(f"[red]Request error for prefix '{prefix}': {e}[/red]")
        return set()

    if resp.status_code != 200:
        console.print(f"[yellow]Non-200 ({resp.status_code}) for prefix '{prefix}'[/yellow]")
        return set()

    try:
        data = resp.json()
    except Exception:
        console.print(f"[yellow]Failed to parse JSON for prefix '{prefix}'[/yellow]")
        return set()

    slugs = set()
    # Example structure: {"artists": [{"slug": "...", ...}, ...]}
    artists = data.get("artists", [])
    for a in artists:
        slug = a.get("slug")
        if slug:
            slugs.add(slug.lower())

    return slugs


def expand_slugs_via_search(prefixes: List[str]) -> Set[str]:
    """
    Call search_artists_by_prefix over many prefixes (aa, ab, ..., zz).
    """
    console.print(
        Panel(
            f"[cyan]Expanding artist slugs via search prefixes ({len(prefixes)} prefixes)[/cyan]",
            border_style="cyan",
        )
    )
    all_slugs = set()

    for p in prefixes:
        slugs = search_artists_by_prefix(p)
        console.print(f"[blue]Prefix '{p}' → {len(slugs)} slugs[/blue]")
        all_slugs.update(slugs)
        time.sleep(0.2)  # be nice-ish

    console.print(f"[bold green]Total slugs from search expansion: {len(all_slugs)}[/bold green]")
    return all_slugs


# Example: simple two-letter prefixes aa–zz
two_letter_prefixes = [a + b for a in string.ascii_lowercase for b in string.ascii_lowercase]

slugs_search = expand_slugs_via_search(two_letter_prefixes[:50])  # <-- start with first 50 to test
len(slugs_search)


0

In [None]:
# You can add slugs from TM artist names if you build a name→slug resolver.
slugs_combined = set()
slugs_combined.update(slugs_curated)
slugs_combined.update(slugs_search)

console.print(
    Panel(
        f"[bold cyan]Combined slug universe[/bold cyan]\n"
        f"Curated: {len(slugs_curated):,}\n"
        f"Search: {len(slugs_search):,}\n"
        f"Total unique: {len(slugs_combined):,}",
        border_style="cyan",
    )
)

# Save to parquet as a simple artist slug inventory
slug_df = pd.DataFrame({"artist_slug": sorted(list(slugs_combined))})
slug_df.to_parquet(SLUGS_PATH, index=False)

SLUGS_PATH, len(slug_df), slug_df.head()


In [None]:
def graphql_query(token: str, query: str, variables: Dict) -> Dict:
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
    }
    payload = {"query": query, "variables": variables}

    r = requests.post(GRAPHQL_URL, headers=headers, json=payload, timeout=15)
    if r.status_code != 200:
        raise RuntimeError(f"GraphQL {r.status_code}: {r.text}")

    return r.json()


# Template/basic query for artist metadata
ARTIST_QUERY = """
query GetArtist($slug: String!) {
  artist(slug: $slug) {
    id
    name
    slug
    imageUrl
    facebookUrl
    instagramUrl
    twitterUrl
    websiteUrl
    trackerCount
    upcomingEventCount
    pastEventCount
  }
}
"""

# Template/basic query for artist events
EVENTS_QUERY = """
query GetArtistEvents($slug: String!) {
  artist(slug: $slug) {
    id
    name
    slug
    events(sort: {field: datetime, order: ASC}) {
      id
      datetime
      title
      url
      festival
      lineup {
        name
      }
      venue {
        id
        name
        city
        region
        country
        latitude
        longitude
      }
    }
  }
}
"""


In [None]:
def scrape_artist_with_token(
    token: str,
    slug: str,
    out_dir: Path = RAW_DIR / "bandsintown"
) -> Tuple[Dict, Dict]:
    out_dir.mkdir(parents=True, exist_ok=True)

    console.print(f"[blue]Scraping artist '{slug}'[/blue]")

    try:
        artist_data = graphql_query(token, ARTIST_QUERY, {"slug": slug})
        events_data = graphql_query(token, EVENTS_QUERY, {"slug": slug})
    except Exception as e:
        console.print(f"[red]Error scraping '{slug}': {e}[/red]")
        return {}, {}

    ts = int(time.time())
    with open(out_dir / f"{slug}_artist_{ts}.json", "w") as f:
        json.dump(artist_data, f, indent=2)
    with open(out_dir / f"{slug}_events_{ts}.json", "w") as f:
        json.dump(events_data, f, indent=2)

    return artist_data, events_data


def batch_scrape_artists(
    slugs: List[str],
    max_artists: int = 2000
) -> Dict[str, Dict]:
    """
    Scrape up to max_artists artists with a single token.
    Returns dict[slug] = {"artist": ..., "events": ...}
    """

    console.print(
        Panel(
            f"[cyan]Batch scraping up to {max_artists} Bandsintown artists[/cyan]",
            border_style="cyan",
        )
    )

    token = get_bandsintown_token()

    results = {}
    for i, slug in enumerate(slugs[:max_artists], start=1):
        artist_data, events_data = scrape_artist_with_token(token, slug)
        if artist_data and events_data:
            results[slug] = {"artist": artist_data, "events": events_data}

        if i % 50 == 0:
            console.print(f"[green]Scraped {i} artists so far[/green]")
        time.sleep(0.1)  # mild politeness

    console.print(f"[bold green]Done. Scraped {len(results)} artists.[/bold green]")
    return results


# Load slugs from parquet
slug_df = pd.read_parquet(SLUGS_PATH)
slug_list = slug_df["artist_slug"].tolist()

bandsintown_raw = batch_scrape_artists(slug_list, max_artists=500)  # start with 500ish and see how it behaves
len(bandsintown_raw)


In [None]:
def flatten_artist_record(slug: str, artist_json: Dict) -> Dict:
    # Assuming structure: {"data": {"artist": {...}}}
    artist = (artist_json or {}).get("data", {}).get("artist", {}) or {}

    return {
        "artist_slug": slug,
        "artist_id": artist.get("id"),
        "artist_name": artist.get("name"),
        "image_url": artist.get("imageUrl"),
        "facebook_url": artist.get("facebookUrl"),
        "instagram_url": artist.get("instagramUrl"),
        "twitter_url": artist.get("twitterUrl"),
        "website_url": artist.get("websiteUrl"),
        "tracker_count": artist.get("trackerCount"),
        "upcoming_event_count": artist.get("upcomingEventCount"),
        "past_event_count": artist.get("pastEventCount"),
    }


artist_rows = []
for slug, payload in bandsintown_raw.items():
    artist_rows.append(flatten_artist_record(slug, payload.get("artist")))

artist_panel = pd.DataFrame(artist_rows)
artist_panel.to_parquet(ARTIST_PANEL_PATH, index=False)

ARTIST_PANEL_PATH, artist_panel.shape, artist_panel.head()


In [None]:
from datetime import datetime as dt

def flatten_event_records(slug: str, events_json: Dict) -> List[Dict]:
    artist = (events_json or {}).get("data", {}).get("artist", {}) or {}
    events = artist.get("events", []) or []

    rows = []
    for ev in events:
        venue = ev.get("venue", {}) or {}
        lineup = ev.get("lineup", []) or []

        rows.append({
            "artist_slug": slug,
            "artist_name": artist.get("name"),
            "event_id": ev.get("id"),
            "event_title": ev.get("title"),
            "event_url": ev.get("url"),
            "event_datetime": ev.get("datetime"),
            "is_festival": bool(ev.get("festival")),
            "venue_id": venue.get("id"),
            "venue_name": venue.get("name"),
            "venue_city": venue.get("city"),
            "venue_region": venue.get("region"),
            "venue_country": venue.get("country"),
            "venue_latitude": venue.get("latitude"),
            "venue_longitude": venue.get("longitude"),
            "lineup_names": ", ".join([m.get("name") for m in lineup if m.get("name")]),
        })

    return rows


event_rows = []
for slug, payload in bandsintown_raw.items():
    event_rows.extend(flatten_event_records(slug, payload.get("events")))

event_panel = pd.DataFrame(event_rows)

# Optional: parse datetime
if "event_datetime" in event_panel.columns:
    event_panel["event_datetime"] = pd.to_datetime(event_panel["event_datetime"], errors="coerce")

event_panel.to_parquet(EVENT_PANEL_PATH, index=False)

EVENT_PANEL_PATH, event_panel.shape, event_panel.head()
