In [11]:
!pip install aiohttp tenacity pandas python-dotenv

import os

os.environ["OPENAI_API_KEY"] = "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"
os.environ["TAVILY_API_KEY"] = "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"
os.environ["OPENAI_MODEL"] = "gpt-4.1-mini"



In [6]:
import os
import json
import time
import asyncio
from typing import List, Dict, Any, Optional, Tuple
import aiohttp
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import pandas as pd

# ========= CONFIG =========
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
OPENAI_MODEL   = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")  # adjust if you prefer
CONCURRENCY    = 6          # max destinations processed in parallel
SEARCH_RESULTS = 6          # documents to retrieve per destination
REQUEST_TIMEOUT = 30
OUTPUT_CSV     = "destination_activities_llm.csv"
# ==========================

if not OPENAI_API_KEY:
    raise RuntimeError("Missing OPENAI_API_KEY env var.")
if not TAVILY_API_KEY:
    raise RuntimeError("Missing TAVILY_API_KEY env var.")

# ---- Destinations ----
def load_destinations() -> List[Dict[str, str]]:
    """
    Load from destinations.csv (columns: location[, country]) if present,
    otherwise fall back to the 100-location list we curated.
    """
    if os.path.exists("destinations.csv"):
        df = pd.read_csv("destinations.csv")
        if "location" not in df.columns:
            raise ValueError("destinations.csv must contain a 'location' column.")
        # normalize
        df["country"] = df.get("country", pd.Series([""] * len(df)))
        return df[["location", "country"]].to_dict(orient="records")

    # Fallback: our 100 destinations (location, country)
    fallback = [
        # Europe (10 city, 12 nature-ish)
        ("London","UK"),("Paris","France"),("Barcelona","Spain"),("Rome","Italy"),("Venice","Italy"),
        ("Athens","Greece"),("Amsterdam","Netherlands"),("Berlin","Germany"),("Prague","Czechia"),("Vienna","Austria"),
        ("Interlaken","Switzerland"),("Dolomites","Italy"),("Lofoten Islands","Norway"),("Lapland","Finland"),
        ("Plitvice Lakes","Croatia"),("Faroe Islands","Denmark"),("Madeira","Portugal"),("Azores","Portugal"),
        ("Santorini","Greece"),("Dubrovnik","Croatia"),("Scottish Highlands","UK"),("Reykjavik","Iceland"),
        # Asia (8 city, 14 nature)
        ("Tokyo","Japan"),("Kyoto","Japan"),("Seoul","South Korea"),("Hong Kong","China"),
        ("Singapore","Singapore"),("Bangkok","Thailand"),("Hanoi","Vietnam"),("Siem Reap","Cambodia"),
        ("Phuket","Thailand"),("Bali","Indonesia"),("Jeju Island","South Korea"),("Hokkaido","Japan"),
        ("Raja Ampat","Indonesia"),("Palawan","Philippines"),("Komodo Island","Indonesia"),("Ladakh","India"),
        ("Langtang","Nepal"),("Ella","Sri Lanka"),("Gobi Desert","Mongolia"),("Bagan","Myanmar"),
        ("Boracay","Philippines"),("Male","Maldives"),
        # North America (6 city, 10 nature)
        ("New York City","USA"),("San Francisco","USA"),("Miami","USA"),
        ("Vancouver","Canada"),("Toronto","Canada"),("Mexico City","Mexico"),
        ("Banff","Canada"),("Jasper","Canada"),("Yellowstone","USA"),("Yosemite","USA"),("Grand Canyon","USA"),
        ("Sedona","USA"),("Alaska/Denali","USA"),("Honolulu/Oahu","USA"),("New Orleans","USA"),("Quebec City","Canada"),
        # Latin America (3 city, 12 nature)
        ("Rio de Janeiro","Brazil"),("Cartagena","Colombia"),("Havana","Cuba"),
        ("Cancún","Mexico"),("Tulum","Mexico"),("Punta Cana","Dominican Republic"),
        ("Galápagos Islands","Ecuador"),("Torres del Paine","Chile"),("Atacama Desert","Chile"),
        ("Salar de Uyuni","Bolivia"),("Lake Titicaca","Peru/Bolivia"),("Bariloche","Argentina"),
        ("Fernando de Noronha","Brazil"),("Arenal","Costa Rica"),("San Juan","Puerto Rico"),
        # Africa + Middle East (5 city, 13 nature)
        ("Marrakech","Morocco"),("Cairo","Egypt"),
        ("Cape Town","South Africa"),("Kruger","South Africa"),("Maasai Mara","Kenya"),
        ("Serengeti","Tanzania"),("Zanzibar","Tanzania"),("Mount Kilimanjaro","Tanzania"),
        ("Okavango Delta","Botswana"),("Namib Desert","Namibia"),("Bazaruto Archipelago","Mozambique"),
        ("Seychelles","Seychelles"),("Mauritius","Mauritius"),("Fes","Morocco"),
        ("Jerusalem","Israel"),("Petra","Jordan"),("Dubai","UAE"),("Wadi Rum","Jordan"),
        ("Musandam Fjords","Oman"),("Muscat","Oman"),("Tel Aviv","Israel"),
        # Oceania (1 city, 5 nature)
        ("Sydney","Australia"),("Queenstown","New Zealand"),("Rotorua","New Zealand"),
        ("Cairns","Australia"),("Tasmania","Australia"),("Bora Bora","French Polynesia"),
    ]
    return [{"location": loc, "country": ctry} for loc, ctry in fallback]

# ---- HTTP helpers ----
class Transient(Exception): pass

@retry(wait=wait_exponential(multiplier=1, min=1, max=20),
       stop=stop_after_attempt(4),
       retry=retry_if_exception_type((aiohttp.ClientError, asyncio.TimeoutError, Transient)))
async def tavily_search(session: aiohttp.ClientSession, query: str, max_results: int = SEARCH_RESULTS) -> List[Dict[str, Any]]:
    url = "https://api.tavily.com/search"
    payload = {
        "api_key": TAVILY_API_KEY,
        "query": query,
        "max_results": max_results,
        "search_depth": "advanced",
        "include_answer": False,
        "include_images": False,
        "include_raw_content": False,
    }
    async with session.post(url, json=payload, timeout=REQUEST_TIMEOUT) as r:
        if r.status >= 500:
            raise Transient(f"Tavily 5xx: {r.status}")
        if r.status != 200:
            txt = await r.text()
            raise RuntimeError(f"Tavily error {r.status}: {txt}")
        data = await r.json()
        # Normalize to a compact list of dicts
        results = data.get("results", [])
        return [{"title": d.get("title",""),
                 "url": d.get("url",""),
                 "snippet": d.get("content","")} for d in results]

@retry(wait=wait_exponential(multiplier=1, min=1, max=20),
       stop=stop_after_attempt(4),
       retry=retry_if_exception_type((aiohttp.ClientError, asyncio.TimeoutError, Transient)))
async def openai_extract_activities(session: aiohttp.ClientSession, location: str, country: str, search_docs: List[Dict[str, Any]]) -> List[str]:
    """
    Ask the LLM to pick the top 5 concise activities/features based on search snippets.
    Output must be a JSON list of 5 short phrases (2-4 words each).
    """
    system = (
        "You are a travel researcher. Read the provided web search snippets and return the 5 most relevant, "
        "non-overlapping activities or destination features for the location. "
        "Prefer concise noun phrases (2-4 words). Do not include the location name. "
        "Be specific but generalizable (e.g., 'snorkeling', 'temple visits', 'mountain hiking', 'street food'). "
        "Return ONLY a JSON array of 5 strings, no commentary."
    )

    user = {
        "location": location,
        "country": country,
        "instructions": "Select the 5 most relevant activities/features for this location.",
        "web_snippets": search_docs
    }

    url = "https://api.openai.com/v1/responses"
    headers = {"Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json"}
    payload = {
        "model": OPENAI_MODEL,
        "input": [
            {"role": "system", "content": system},
            {"role": "user", "content": json.dumps(user)}
        ],
        "temperature": 0.2,
        "max_output_tokens": 200
    }

    async with session.post(url, headers=headers, json=payload, timeout=REQUEST_TIMEOUT) as r:
        if r.status >= 500:
            raise Transient(f"OpenAI 5xx: {r.status}")
        if r.status != 200:
            txt = await r.text()
            raise RuntimeError(f"OpenAI error {r.status}: {txt}")
        data = await r.json()

        # Extract the model's text output
        # Responses API returns in data["output"][0]["content"][0]["text"] format
        try:
            output = data["output"][0]["content"][0]["text"]
        except Exception:
            # Fallback to a best-effort parse
            output = json.dumps(["sightseeing", "museums", "historic landmarks", "shopping", "local cuisine"])

        # Strict JSON parse -> list of 5 strings
        try:
            items = json.loads(output)
            if not isinstance(items, list):
                raise ValueError("Not a list")
            items = [str(x).strip() for x in items if str(x).strip()]
        except Exception:
            # If parsing failed, do a crude salvage: keep the first 5 comma/line separated items
            cleaned = [t.strip(" -–•\n\r\t") for t in output.replace("\n", ",").split(",")]
            items = [x for x in cleaned if x][:5]

        # pad/trim to exactly 5
        items = (items + ["sightseeing","museums","historic landmarks","shopping","local cuisine"])[:5]
        return items

async def process_one(sema: asyncio.Semaphore, session: aiohttp.ClientSession, dest: Dict[str,str]) -> Tuple[str, List[str]]:
    async with sema:
        location = dest["location"]
        country  = dest.get("country","")
        q = f"Top activities and things to do in {location} {country}".strip()
        docs = await tavily_search(session, q, max_results=SEARCH_RESULTS)
        acts = await openai_extract_activities(session, location, country, docs)
        return location, acts

async def main():
    destinations = load_destinations()
    sema = asyncio.Semaphore(CONCURRENCY)

    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.create_task(process_one(sema, session, d)) for d in destinations]
        results: List[Tuple[str, List[str]]] = []
        for fut in asyncio.as_completed(tasks):
            try:
                loc, acts = await fut
                results.append((loc, acts))
                print(f"✔ {loc}")
            except Exception as e:
                print(f"✖ Error: {e}")
                # keep a placeholder row to maintain alignment
                results.append(("UNKNOWN", ["sightseeing","museums","historic landmarks","shopping","local cuisine"]))

    # Build DataFrame
    rows = []
    # Reassociate by original order:
    loc_to_acts = {loc: acts for loc, acts in results}
    for d in destinations:
        loc = d["location"]
        acts = loc_to_acts.get(loc, ["sightseeing","museums","historic landmarks","shopping","local cuisine"])
        row = {
            "location": loc,
            "activity_1": acts[0],
            "activity_2": acts[1],
            "activity_3": acts[2],
            "activity_4": acts[3],
            "activity_5": acts[4],
        }
        rows.append(row)

    df = pd.DataFrame(rows, columns=["location","activity_1","activity_2","activity_3","activity_4","activity_5"])
    df.to_csv(OUTPUT_CSV, index=False)
    print(f"\nSaved: {OUTPUT_CSV}")
    print(df.head())

await main()

✔ Rome
✔ London
✔ Venice
✔ Paris
✔ Athens
✔ Barcelona
✔ Prague
✔ Amsterdam
✔ Berlin
✔ Vienna
✔ Interlaken
✔ Dolomites
✔ Madeira
✔ Lofoten Islands
✔ Plitvice Lakes
✔ Lapland
✔ Faroe Islands
✔ Azores
✔ Dubrovnik
✔ Santorini
✔ Reykjavik
✔ Tokyo
✔ Kyoto
✔ Scottish Highlands
✔ Seoul
✔ Singapore
✔ Bangkok
✔ Siem Reap
✔ Hong Kong
✔ Hanoi
✔ Phuket
✔ Bali
✔ Jeju Island
✔ Hokkaido
✔ Palawan
✔ Raja Ampat
✔ Komodo Island
✔ Ladakh
✔ Male
✔ Langtang
✔ Ella
✔ Gobi Desert
✔ Bagan
✔ Boracay
✔ New York City
✔ San Francisco
✔ Toronto
✔ Miami
✔ Mexico City
✔ Vancouver
✔ Banff
✔ Yosemite
✔ Yellowstone
✔ Jasper
✔ Sedona
✔ Grand Canyon
✔ Alaska/Denali
✔ Quebec City
✔ New Orleans
✔ Honolulu/Oahu
✔ Rio de Janeiro
✔ Havana
✔ Tulum
✔ Cartagena
✔ Galápagos Islands
✔ Cancún
✔ Torres del Paine
✔ Punta Cana
✔ Atacama Desert
✔ Salar de Uyuni
✔ Lake Titicaca
✔ Bariloche
✔ Fernando de Noronha
✔ Arenal
✔ Cairo
✔ Cape Town
✔ San Juan
✔ Marrakech
✔ Maasai Mara
✔ Kruger
✔ Zanzibar
✔ Serengeti
✔ Mount Kilimanjaro
✔ Okavango

In [None]:
df = await main()
df.head()

✔ Paris
✔ Rome
✔ London
✔ Barcelona
✔ Athens
✔ Venice
✔ Amsterdam
✔ Berlin
✔ Prague
✔ Vienna
✔ Interlaken
✔ Plitvice Lakes
✔ Dolomites
✔ Lofoten Islands
✔ Lapland
✔ Madeira
✔ Azores
✔ Faroe Islands
✔ Santorini
✔ Dubrovnik
✔ Scottish Highlands
✔ Kyoto
✔ Reykjavik
✔ Tokyo
✔ Seoul
✔ Hong Kong
✔ Singapore
✔ Bangkok
✔ Hanoi
✔ Siem Reap
✔ Bali
✔ Jeju Island
✔ Raja Ampat
✔ Palawan
✔ Phuket
✔ Hokkaido
✔ Komodo Island
✔ Ladakh
✔ Langtang
✔ Ella
✔ Gobi Desert
✔ Bagan
✔ Male
✔ New York City
✔ Boracay
✔ Miami
✔ San Francisco
✔ Toronto
✔ Vancouver
✔ Mexico City
✔ Banff
✔ Yellowstone
✔ Jasper
✔ Yosemite
✔ Sedona
✔ Grand Canyon
✔ Alaska/Denali
✔ Quebec City
✔ New Orleans
✔ Honolulu/Oahu
✔ Havana
✔ Punta Cana
✔ Rio de Janeiro
✔ Cancún
✔ Cartagena
✔ Tulum
✔ Galápagos Islands
✔ Salar de Uyuni
✔ Torres del Paine
✔ Atacama Desert
✔ Lake Titicaca
✔ Bariloche
✔ Fernando de Noronha
✔ Arenal
✔ San Juan
✔ Marrakech
✔ Cairo
✔ Kruger
✔ Cape Town
✔ Maasai Mara
✔ Serengeti
✔ Okavango Delta
✔ Zanzibar
✔ Mount Kilim

Unnamed: 0,location,activity_1,activity_2,activity_3,activity_4,activity_5
0,London,Harry Potter Studios,West End Musicals,Thames River Cruise,Natural History Museum,Borough Market
1,Paris,Seine River Cruise,Eiffel Tower,Louvre Museum,Montmartre,Luxembourg Gardens
2,Barcelona,Antoni Gaudí landmarks,Sea kayaking on Costa Brava,Bungee jumping near Lloret de Mar,Camp Nou stadium tour,Montjuïc hill and museums
3,Rome,Sistine Chapel and Vatican Museums,Colosseum and Roman Forum,Piazza Navona and Four River Fountain,Pantheon and Piazza della Rotonda,Villa Borghese Gardens
4,Venice,Piazza San Marco,Gondola rides,Murano and Burano islands,Grand Canal vaporetto ride,Acqua Alta Bookstore


In [15]:
df.head(20)

Unnamed: 0,location,activity_1,activity_2,activity_3,activity_4,activity_5
0,London,Harry Potter Studios,West End Musicals,Thames River Cruise,Natural History Museum,Borough Market
1,Paris,Seine River Cruise,Eiffel Tower,Louvre Museum,Montmartre,Luxembourg Gardens
2,Barcelona,Antoni Gaudí landmarks,Sea kayaking on Costa Brava,Bungee jumping near Lloret de Mar,Camp Nou stadium tour,Montjuïc hill and museums
3,Rome,Sistine Chapel and Vatican Museums,Colosseum and Roman Forum,Piazza Navona and Four River Fountain,Pantheon and Piazza della Rotonda,Villa Borghese Gardens
4,Venice,Piazza San Marco,Gondola rides,Murano and Burano islands,Grand Canal vaporetto ride,Acqua Alta Bookstore
5,Athens,Acropolis and Parthenon,National Archaeological Museum,Monastiraki Flea Market,Cycladic Art Museum,Lycabettus Hill viewpoint
6,Amsterdam,Canal cruises,Van Gogh Museum,Anne Frank House,Vondelpark,Jordaan neighborhood
7,Berlin,Berlin Zoo,TV Tower at Alexanderplatz,Brandenburg Gate,Gendarmenmarkt Square,Mauerpark Flea Market
8,Prague,Petrín Hill and Rose Garden,Prague Astronomical Clock,Old Town exploration,Vltava River walk,Strahov Library
9,Vienna,Classical music concerts,Schönbrunn Park and Gardens,Prater amusement park,Horse-drawn carriage rides,Vienna café culture


In [10]:
!pip install aiohttp tenacity pandas python-dotenv



In [13]:
import os, sys, json, time, asyncio
from typing import List, Dict, Any, Tuple
import aiohttp
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import pandas as pd

# ---------------- CONFIG (you can tweak these) ----------------
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
OPENAI_MODEL   = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")

CONCURRENCY         = 6      # parallel destinations
RATE_LIMIT_PER_MIN  = 30     # max Tavily calls per minute
SEARCH_RESULTS      = 6      # search docs per destination
REQUEST_TIMEOUT     = 30     # seconds
OUTPUT_CSV          = "destination_activities_llm.csv"

if not OPENAI_API_KEY:
    raise RuntimeError("Missing OPENAI_API_KEY. Set it via os.environ before running.")
if not TAVILY_API_KEY:
    raise RuntimeError("Missing TAVILY_API_KEY. Set it via os.environ before running.")
# ---------------------------------------------------------------

def load_destinations() -> List[Dict[str, str]]:
    """Use destinations.csv if present (columns: location[, country]); else fallback to curated 100."""
    if os.path.exists("destinations.csv"):
        df = pd.read_csv("destinations.csv")
        if "location" not in df.columns:
            raise ValueError("destinations.csv must include a 'location' column.")
        if "country" not in df.columns:
            df["country"] = ""
        df["location"] = df["location"].astype(str).str.strip()
        df["country"]  = df["country"].astype(str).str.strip()
        return df[["location","country"]].to_dict(orient="records")

    fallback = [
        # Europe (cities, then nature-ish)
        ("London","UK"),("Paris","France"),("Barcelona","Spain"),("Rome","Italy"),("Venice","Italy"),
        ("Athens","Greece"),("Amsterdam","Netherlands"),("Berlin","Germany"),("Prague","Czechia"),("Vienna","Austria"),
        ("Interlaken","Switzerland"),("Dolomites","Italy"),("Lofoten Islands","Norway"),("Lapland","Finland"),
        ("Plitvice Lakes","Croatia"),("Faroe Islands","Denmark"),("Madeira","Portugal"),("Azores","Portugal"),
        ("Santorini","Greece"),("Dubrovnik","Croatia"),("Scottish Highlands","UK"),("Reykjavik","Iceland"),
        # Asia
        ("Tokyo","Japan"),("Kyoto","Japan"),("Seoul","South Korea"),("Hong Kong","China"),
        ("Singapore","Singapore"),("Bangkok","Thailand"),("Hanoi","Vietnam"),("Siem Reap","Cambodia"),
        ("Phuket","Thailand"),("Bali","Indonesia"),("Jeju Island","South Korea"),("Hokkaido","Japan"),
        ("Raja Ampat","Indonesia"),("Palawan","Philippines"),("Komodo Island","Indonesia"),("Ladakh","India"),
        ("Langtang","Nepal"),("Ella","Sri Lanka"),("Gobi Desert","Mongolia"),("Bagan","Myanmar"),
        ("Boracay","Philippines"),("Male","Maldives"),
        # North America
        ("New York City","USA"),("San Francisco","USA"),("Miami","USA"),
        ("Vancouver","Canada"),("Toronto","Canada"),("Mexico City","Mexico"),
        ("Banff","Canada"),("Jasper","Canada"),("Yellowstone","USA"),("Yosemite","USA"),("Grand Canyon","USA"),
        ("Sedona","USA"),("Alaska/Denali","USA"),("Honolulu/Oahu","USA"),("New Orleans","USA"),("Quebec City","Canada"),
        # Latin America
        ("Rio de Janeiro","Brazil"),("Cartagena","Colombia"),("Havana","Cuba"),
        ("Cancún","Mexico"),("Tulum","Mexico"),("Punta Cana","Dominican Republic"),
        ("Galápagos Islands","Ecuador"),("Torres del Paine","Chile"),("Atacama Desert","Chile"),
        ("Salar de Uyuni","Bolivia"),("Lake Titicaca","Peru/Bolivia"),("Bariloche","Argentina"),
        ("Fernando de Noronha","Brazil"),("Arenal","Costa Rica"),("San Juan","Puerto Rico"),
        # Africa & Middle East
        ("Marrakech","Morocco"),("Cairo","Egypt"),
        ("Cape Town","South Africa"),("Kruger","South Africa"),("Maasai Mara","Kenya"),
        ("Serengeti","Tanzania"),("Zanzibar","Tanzania"),("Mount Kilimanjaro","Tanzania"),
        ("Okavango Delta","Botswana"),("Namib Desert","Namibia"),("Bazaruto Archipelago","Mozambique"),
        ("Seychelles","Seychelles"),("Mauritius","Mauritius"),("Fes","Morocco"),
        ("Jerusalem","Israel"),("Petra","Jordan"),("Dubai","UAE"),("Wadi Rum","Jordan"),
        ("Musandam Fjords","Oman"),("Muscat","Oman"),("Tel Aviv","Israel"),
        # Oceania
        ("Sydney","Australia"),("Queenstown","New Zealand"),("Rotorua","New Zealand"),
        ("Cairns","Australia"),("Tasmania","Australia"),("Bora Bora","French Polynesia"),
    ]
    return [{"location": loc, "country": ctry} for loc, ctry in fallback]

# -------- Rate limiter (simple leaky bucket) --------
class RateLimiter:
    def __init__(self, per_min: int):
        self.per_min = max(1, per_min)
        self.timestamps: List[float] = []
        self.lock = asyncio.Lock()

    async def acquire(self):
        async with self.lock:
            now = time.time()
            self.timestamps = [t for t in self.timestamps if now - t < 60]
            if len(self.timestamps) >= self.per_min:
                sleep_for = 60 - (now - self.timestamps[0])
                await asyncio.sleep(max(0.0, sleep_for))
                now = time.time()
                self.timestamps = [t for t in self.timestamps if now - t < 60]
            self.timestamps.append(time.time())

# -------- HTTP helpers --------
class Transient(Exception): pass

@retry(wait=wait_exponential(multiplier=1, min=1, max=20),
       stop=stop_after_attempt(4),
       retry=retry_if_exception_type((aiohttp.ClientError, asyncio.TimeoutError, Transient)))
async def tavily_search(session: aiohttp.ClientSession, limiter: RateLimiter, query: str, max_results: int) -> List[Dict[str, Any]]:
    await limiter.acquire()
    url = "https://api.tavily.com/search"
    payload = {
        "api_key": TAVILY_API_KEY,
        "query": query,
        "max_results": max_results,
        "search_depth": "advanced",
        "include_answer": False,
        "include_images": False,
        "include_raw_content": False,
    }
    async with session.post(url, json=payload, timeout=REQUEST_TIMEOUT) as r:
        if r.status >= 500:
            raise Transient(f"Tavily 5xx: {r.status}")
        if r.status != 200:
            txt = await r.text()
            raise RuntimeError(f"Tavily error {r.status}: {txt}")
        data = await r.json()
        results = data.get("results", [])
        return [{"title": d.get("title",""),
                 "url": d.get("url",""),
                 "snippet": d.get("content","")} for d in results]

@retry(wait=wait_exponential(multiplier=1, min=1, max=20),
       stop=stop_after_attempt(4),
       retry=retry_if_exception_type((aiohttp.ClientError, asyncio.TimeoutError, Transient)))
async def openai_extract(session: aiohttp.ClientSession, location: str, country: str, search_docs: List[Dict[str, Any]]) -> List[str]:
    system = (
        "You are a travel researcher. Read the provided web search snippets and return the 5 most relevant, "
        "non-overlapping activities or destination features for the location. "
        "Prefer concise noun phrases (2–4 words). Do not include the location name. "
        "Return ONLY a JSON array of 5 strings, with no extra text."
    )
    user = {"location": location, "country": country, "web_snippets": search_docs}

    url = "https://api.openai.com/v1/responses"
    headers = {"Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json"}
    payload = {
        "model": OPENAI_MODEL,
        "input": [
            {"role": "system", "content": system},
            {"role": "user",   "content": json.dumps(user)}
        ],
        "temperature": 0.2,
        "max_output_tokens": 200
    }

    async with session.post(url, headers=headers, json=payload, timeout=REQUEST_TIMEOUT) as r:
        if r.status >= 500:
            raise Transient(f"OpenAI 5xx: {r.status}")
        if r.status != 200:
            txt = await r.text()
            raise RuntimeError(f"OpenAI error {r.status}: {txt}")
        data = await r.json()
        # Responses API structure
        try:
            text = data["output"][0]["content"][0]["text"]
        except Exception:
            text = json.dumps(["sightseeing", "museums", "historic landmarks", "shopping", "local cuisine"])

        # parse to 5 items
        try:
            items = json.loads(text)
            if not isinstance(items, list):
                raise ValueError("Not a list")
            items = [str(x).strip() for x in items if str(x).strip()]
        except Exception:
            items = [t.strip(" -–•\n\r\t") for t in text.replace("\n", ",").split(",") if t.strip()]
        items = (items + ["sightseeing","museums","historic landmarks","shopping","local cuisine"])[:5]
        return items

# -------- Pipeline --------
async def process_one(sema: asyncio.Semaphore, session: aiohttp.ClientSession, limiter: RateLimiter, dest: Dict[str,str]) -> Tuple[str, List[str]]:
    async with sema:
        location = dest["location"]
        country  = dest.get("country", "")
        q = f"Top activities and things to do in {location} {country}".strip()
        docs = await tavily_search(session, limiter, q, max_results=SEARCH_RESULTS)
        acts = await openai_extract(session, location, country, docs)
        print(f"✔ {location}")
        return location, acts

async def main() -> pd.DataFrame:
    destinations = load_destinations()
    sema = asyncio.Semaphore(CONCURRENCY)
    limiter = RateLimiter(RATE_LIMIT_PER_MIN)

    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.create_task(process_one(sema, session, limiter, d)) for d in destinations]
        results: List[Tuple[str, List[str]]] = []
        for fut in asyncio.as_completed(tasks):
            try:
                loc, acts = await fut
                results.append((loc, acts))
            except Exception as e:
                print(f"✖ Error: {e}")
                results.append(("UNKNOWN", ["sightseeing","museums","historic landmarks","shopping","local cuisine"]))

    # keep original order
    loc2acts = {loc: acts for loc, acts in results}
    rows = []
    for d in destinations:
        loc = d["location"]
        acts = loc2acts.get(loc, ["sightseeing","museums","historic landmarks","shopping","local cuisine"])
        rows.append({
            "location": loc,
            "activity_1": acts[0],
            "activity_2": acts[1],
            "activity_3": acts[2],
            "activity_4": acts[3],
            "activity_5": acts[4],
        })

    df = pd.DataFrame(rows, columns=["location","activity_1","activity_2","activity_3","activity_4","activity_5"])
    df.to_csv(OUTPUT_CSV, index=False)
    print(f"\nSaved: {OUTPUT_CSV}")
    return df
