In [3]:
import os
import time
import json
import requests
import pandas as pd
from datetime import datetime
from typing import List, Dict, Optional
from google.colab import userdata


# ============================================================
# CONFIGURATION & AUTHENTIFICATION
# ============================================================

CLIENT_ID = userdata.get("GBP_CLIENT_ID")
CLIENT_SECRET = userdata.get("GBP_CLIENT_SECRET")
REFRESH_TOKEN = userdata.get("GBP_REFRESH_TOKEN")


# V√©rification proactive des identifiants
if not all([CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN]):
    missing_secrets = []
    if not CLIENT_ID: missing_secrets.append("GBP_CLIENT_ID")
    if not CLIENT_SECRET: missing_secrets.append("GBP_CLIENT_SECRET")
    if not REFRESH_TOKEN: missing_secrets.append("GBP_REFRESH_TOKEN")

    print("‚ùå ERREUR CRITIQUE : Identifiants manquants dans les Secrets Colab.")
    print(f"Secrets introuvables : {', '.join(missing_secrets)}")
    print("Action requise : Ajoutez-les via le menu 'Cl√©' (üîë) √† gauche et cochez 'Acc√®s au notebook'.")
    raise ValueError("Arr√™t du script : Secrets API Google non configur√©s.")
else:
    print("‚úÖ Identifiants charg√©s. Pr√™t pour l'authentification OAuth.")

‚úÖ Identifiants charg√©s. Pr√™t pour l'authentification OAuth.


In [7]:
TOKEN_URL = "https://oauth2.googleapis.com/token"

# --- NOUVELLE ARCHITECTURE API (F√©d√©r√©e) ---
# 1. Pour les comptes : Account Management API v1
API_ACCOUNTS_BASE = "https://mybusinessaccountmanagement.googleapis.com/v1"
# 2. Pour les √©tablissements : Business Information API v1
API_LOCATIONS_BASE = "https://mybusinessbusinessinformation.googleapis.com/v1"
# 3. Pour les avis : My Business API v4 (Legacy mais actif pour les avis)
API_REVIEWS_BASE = "https://mybusiness.googleapis.com/v4"

OUTPUT_DIR = "data/gbp_reviews"
PARQUET_FILE = f"{OUTPUT_DIR}/reviews_all_locations.parquet"

os.makedirs(OUTPUT_DIR, exist_ok=True)

def get_access_token() -> str:
    """R√©cup√®re un access token via le refresh token."""
    payload = {
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "refresh_token": REFRESH_TOKEN,
        "grant_type": "refresh_token",
    }

    try:
        response = requests.post(TOKEN_URL, data=payload)
        response.raise_for_status()
        return response.json()["access_token"]
    except requests.exceptions.HTTPError as e:
        print(f"Erreur d'authentification : {e.response.text}")
        raise

# ============================================================
# LOGIQUE DE COLLECTE (MULTI-API)
# ============================================================

def api_request(url: str, headers: Dict, method: str = "GET", params: Optional[Dict] = None) -> Dict:
    """Requ√™te g√©n√©rique avec backoff TR√àS robuste (d√©lais longs)."""
    max_retries = 10

    for attempt in range(max_retries):
        try:
            response = requests.get(url, headers=headers, params=params) if method == "GET" else None

            # Gestion du quota (429)
            if response.status_code == 429:
                # Strat√©gie agressive : on commence √† 10s et on double
                # 10s, 20s, 40s, 80s, 160s...
                wait_time = 10 * (2 ** attempt)
                print(f"  ‚ö†Ô∏è Quota atteint (429). Tentative {attempt+1}/{max_retries}. Pause de {wait_time}s...")
                time.sleep(wait_time)
                continue

            # Gestion des erreurs serveur temporaires (500, 502, 503)
            if 500 <= response.status_code < 600:
                print(f"  ‚ö†Ô∏è Erreur serveur Google ({response.status_code}). Pause de 5s...")
                time.sleep(5)
                continue

            response.raise_for_status()
            return response.json()

        except requests.exceptions.RequestException as e:
            # Erreurs r√©seau (DNS, connexion coup√©e, etc.)
            print(f"  ‚ö†Ô∏è Erreur r√©seau : {e}. Pause de 5s...")
            if attempt == max_retries - 1:
                print(f"‚ùå √âchec d√©finitif sur : {url}")
                raise
            time.sleep(5)

    return {}

def test_api_connection(headers: Dict) -> bool:
    """
    Effectue un test simple de connectivit√© sur l'endpoint Accounts.
    Permet de valider le Token et les quotas avant de lancer le gros traitement.
    """
    print("\n--- üß™ TEST DE CONNEXION INITIAL (Sanity Check) ---")
    url = f"{API_ACCOUNTS_BASE}/accounts"
    try:
        print(f"Tentative de contact : {url}")
        response = requests.get(url, headers=headers)

        print(f"Code retour HTTP : {response.status_code}")

        if response.status_code == 200:
            data = response.json()
            acc_count = len(data.get("accounts", []))
            print(f"‚úÖ SUCC√àS : Connexion √©tablie. {acc_count} compte(s) trouv√©(s).")
            return True
        elif response.status_code == 429:
            print("‚ùå √âCHEC : Quota d√©pass√© d√®s le test (429). Attendez quelques minutes.")
            return False
        else:
            print(f"‚ùå √âCHEC : Erreur API ({response.text})")
            return False

    except Exception as e:
        print(f"‚ùå √âCHEC TECHNIQUE : {str(e)}")
        return False

def get_accounts(headers: Dict) -> List[Dict]:
    """Utilise l'API Account Management v1."""
    url = f"{API_ACCOUNTS_BASE}/accounts"
    data = api_request(url, headers)
    return data.get("accounts", [])

def get_locations(account_name: str, headers: Dict) -> List[Dict]:
    """
    Utilise l'API Business Information v1.
    IMPORTANT : Le readMask est OBLIGATOIRE pour cette API.
    """
    url = f"{API_LOCATIONS_BASE}/{account_name}/locations"
    # On demande explicitement les champs n√©cessaires
    params = {"readMask": "name,title,storeCode", "pageSize": 100}

    locations = []
    while True:
        data = api_request(url, headers, params=params)
        locations.extend(data.get("locations", []))

        next_token = data.get("nextPageToken")
        if next_token:
            params["pageToken"] = next_token
        else:
            break

        # Petite pause pour √©viter de spammer l'API lors de la pagination
        time.sleep(1.0)

    return locations

def get_reviews(formatted_location_name: str, headers: Dict) -> List[Dict]:
    """
    Utilise l'API v4 (Legacy) pour les avis.
    formatted_location_name doit √™tre : accounts/{accId}/locations/{locId}
    """
    all_reviews = []
    # Endpoint v4
    url = f"{API_REVIEWS_BASE}/{formatted_location_name}/reviews"
    params = {"pageSize": 50}

    while True:
        try:
            data = api_request(url, headers, params=params)
        except Exception:
            # Si un √©tablissement n'a pas d'avis ou erreur sp√©cifique, on skip sans crasher tout le pipeline
            print(f"    ‚ö†Ô∏è Impossible de r√©cup√©rer les avis pour {formatted_location_name} (ou aucun avis).")
            break

        batch = data.get("reviews", [])
        all_reviews.extend(batch)

        # Log m√©triques si disponibles
        if "totalReviewCount" in data and not all_reviews:
             print(f"    ‚ÑπÔ∏è {data.get('totalReviewCount')} avis d√©tect√©s (Moyenne: {data.get('averageRating')})")

        next_token = data.get("nextPageToken")
        if next_token:
            params["pageToken"] = next_token
        else:
            break

        # Pause de politesse entre les pages d'avis
        time.sleep(1.0)

    return all_reviews

# ============================================================
# NORMALISATION
# ============================================================

RATING_MAP = {
    "ONE": 1, "TWO": 2, "THREE": 3, "FOUR": 4, "FIVE": 5, "STAR_RATING_UNSPECIFIED": 0
}

def normalize_review(review: Dict, location: Dict, account_id: str) -> Dict:
    reviewer = review.get("reviewer", {})
    reply = review.get("reviewReply", {})
    raw_rating = review.get("starRating", "STAR_RATING_UNSPECIFIED")

    # Dans l'API v1, le nom de l'√©tablissement est 'title', pas 'locationName'
    loc_name = location.get("title") or location.get("locationName")

    return {
        "account_id": account_id,
        "internal_location_name": loc_name,
        "store_code": location.get("storeCode"),
        "review_resource_name": review.get("name"),
        "review_id": review.get("reviewId"),
        "star_rating_label": raw_rating,
        "rating_value": RATING_MAP.get(raw_rating, 0),
        "comment": review.get("comment", ""),
        "create_time": review.get("createTime"),
        "update_time": review.get("updateTime"),
        "reviewer_name": reviewer.get("displayName"),
        "reviewer_photo": reviewer.get("profilePhotoUrl"),
        "is_anonymous": reviewer.get("isAnonymous", False),
        "reply_text": reply.get("comment"),
        "reply_date": reply.get("updateTime"),
        "ingestion_utc": datetime.utcnow().isoformat()
    }

# ============================================================
# EX√âCUTION
# ============================================================

def run_pipeline():
    print(f"--- Pipeline d√©marr√© √† {datetime.now()} (Nouvelle Architecture v1/v4) ---")

    try:
        token = get_access_token()
        headers = {"Authorization": f"Bearer {token}"}

        # 1. Test de connexion pr√©alable
        if not test_api_connection(headers):
            print("‚ùå Arr√™t du pipeline suite √† l'√©chec du test de connexion.")
            return

        dataset = []
        accounts = get_accounts(headers)
        print(f"\nComptes trouv√©s : {len(accounts)}")

        for acc in accounts:
            # acc['name'] est de la forme 'accounts/12345'
            acc_name = acc.get("name")
            print(f"Traitement du compte : {acc.get('accountName')} ({acc_name})")

            locations = get_locations(acc_name, headers)
            print(f"  > {len(locations)} √©tablissements trouv√©s.")

            for i, loc in enumerate(locations):
                # loc['name'] API v1 est souvent 'locations/98765' (sans le pr√©fixe account)
                # L'API v4 Reviews a besoin de 'accounts/12345/locations/98765'
                loc_id_part = loc.get("name").split("/")[-1]
                full_resource_name = f"{acc_name}/locations/{loc_id_part}"

                print(f"  > [{i+1}/{len(locations)}] R√©cup√©ration avis : {loc.get('title')}...")

                reviews = get_reviews(full_resource_name, headers)
                for r in reviews:
                    dataset.append(normalize_review(r, loc, acc_name))

                # IMPORTANT : Pause pour √©viter l'erreur 429 entre deux √©tablissements
                time.sleep(2.0)

        if dataset:
            df = pd.DataFrame(dataset)
            for col in ["create_time", "update_time", "reply_date"]:
                df[col] = pd.to_datetime(df[col], errors="coerce")

            df.to_parquet(PARQUET_FILE, engine="pyarrow", index=False)
            print(f"\n‚úÖ SUCC√àS : {len(df)} avis export√©s dans {PARQUET_FILE}")
            print(f"Note moyenne : {df['rating_value'].mean():.2f}")
        else:
            print("\n‚ö†Ô∏è Aucun avis trouv√© ou erreur de collecte.")

    except Exception as e:
        print(f"\n‚ùå ERREUR CRITIQUE PIPELINE : {str(e)}")

if __name__ == "__main__":
    run_pipeline()

--- Pipeline d√©marr√© √† 2026-01-30 03:11:47.365416 (Nouvelle Architecture v1/v4) ---

--- üß™ TEST DE CONNEXION INITIAL (Sanity Check) ---
Tentative de contact : https://mybusinessaccountmanagement.googleapis.com/v1/accounts
Code retour HTTP : 429
‚ùå √âCHEC : Quota d√©pass√© d√®s le test (429). Attendez quelques minutes.
‚ùå Arr√™t du pipeline suite √† l'√©chec du test de connexion.
