## Les importations


In [None]:
import pandas as pd
import requests
import base64
import time
import math
import os
import kagglehub
import requests
from bs4 import BeautifulSoup
import pandas as pd
import google.generativeai as genai
import time
import json
import os
import numpy as np
from tqdm import tqdm

## 1. Extraction du dataset Kaggle


In [None]:
def extract_dataset():
    path = kagglehub.dataset_download("maharshipandya/-spotify-tracks-dataset")
    files = os.listdir(path)
    for f in files:
        if f.endswith(".csv"):
            return pd.read_csv(os.path.join(path, f))


## 2. Configuration 


In [None]:
CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")
OUTPUT_FILE = 'dataset_final.json'


## 3. Cartographie des r√©gions


In [None]:
def get_region_from_code(code):
    """Transforme un code pays (2 lettres) en R√©gion globale."""
    if not code or len(code) != 2: return "Inconnu"
    
    code = code.upper()
    
    # Am√©rique du Nord
    if code in ['US', 'CA', 'MX']: return "Am√©rique du Nord"
    
    # Europe 
    if code in ['GB', 'FR', 'DE', 'SE', 'IT', 'ES', 'NL', 'NO', 'DK', 'IE', 'BE', 'CH']: return "Europe"
    
    # Asie 
    if code in ['KR', 'JP', 'CN', 'IN', 'TW']: return "Asie"
    
    # Am√©rique Latine
    if code in ['BR', 'AR', 'CO', 'PR', 'CL']: return "Am√©rique Latine/ Am√©rique du Sud"
    
    # Oc√©anie
    if code in ['AU', 'NZ']: return "Oc√©anie"
    
    return "Reste du Monde" # Afrique, etc. ou codes rares


## 4. connexion √† spotify developpers


In [None]:
def get_spotify_token():
    auth_url = "https://accounts.spotify.com/api/token"
    auth_string = f"{CLIENT_ID}:{CLIENT_SECRET}"
    auth_base64 = str(base64.b64encode(auth_string.encode("utf-8")), "utf-8")

    headers = {
        "Authorization": "Basic " + auth_base64,
        "Content-Type": "application/x-www-form-urlencoded"
    }
    data = {"grant_type": "client_credentials"}
    
    try:
        response = requests.post(auth_url, headers=headers, data=data, timeout=10)
        return response.json().get("access_token")
    except Exception as e:
        print(f"Erreur Token : {e}")
        return None


## 5. Extraction depuis spotify


In [None]:
def fetch_tracks_metadata(track_ids, token):
    ids_string = ",".join(track_ids)
    api_url = f"https://api.spotify.com/v1/tracks?ids={ids_string}"
    headers = {"Authorization": "Bearer " + token}
    metadata_map = {}
    
    try:
        response = requests.get(api_url, headers=headers, timeout=10)
        
        if response.status_code == 200:
            tracks_data = response.json().get('tracks', [])
            
            for track in tracks_data:
                if track and 'album' in track:
                    track_id = track['id']
                    
                    # 1. Ann√©e
                    release_date = track['album']['release_date']
                    year = release_date[:4] if release_date else None
                    
                    # 2. Image
                    images = track['album']['images']
                    image_url = images[0]['url'] if images else None
                    
                    # 3. Preview Audio
                    preview_url = track.get('preview_url')
                    
                    # 4. PAYS & R√âGION (Via ISRC)
                    # L'ISRC est dans external_ids (ex: "USUM71204425")
                    external_ids = track.get('external_ids', {})
                    isrc = external_ids.get('isrc', '')
                    
                    country_code = "XX"
                    region = "Inconnu"
                    
                    if isrc and len(isrc) >= 2:
                        country_code = isrc[:2] # Les 2 premi√®res lettres = Pays
                        region = get_region_from_code(country_code)

                    metadata_map[track_id] = {
                        'year': year,
                        'image': image_url,
                        'preview': preview_url,
                        'country': country_code,
                        'region': region
                    }
                    
        elif response.status_code == 429:
            time.sleep(5)
            
    except Exception as e:
        print(f"Erreur Batch : {e}")
    
    return metadata_map


def format_duration(ms):
    if pd.isna(ms): return "0:00"
    seconds = int((ms / 1000) % 60)
    minutes = int((ms / (1000 * 60)) % 60)
    return f"{minutes}:{seconds:02d}"


## 6. Transormation


In [None]:
def main():
    print("Lecture du fichier CSV...")
    df = extract_dataset()
    
    # On garde les Hits (>30 popularit√©) pour avoir des stats pertinentes
    df_hits = df[df['popularity'] >= 30].copy()
    df_hits = df_hits.drop_duplicates(subset=['track_id'])
    
    print(f" Traitement de {len(df_hits)} chansons...")

    token = get_spotify_token()
    if not token: return

    track_ids_list = df_hits['track_id'].tolist()
    batch_size = 50
    total_batches = math.ceil(len(track_ids_list) / batch_size)
    full_metadata = {}

    print(" R√©cup√©ration des donn√©es *...")

    for i in range(total_batches):
        start = i * batch_size
        end = (i + 1) * batch_size
        batch = track_ids_list[start:end]
        
        results = fetch_tracks_metadata(batch, token)
        full_metadata.update(results)
        
        if i % 10 == 0: print(f" Lot {i + 1}/{total_batches}...")
        time.sleep(0.5) # Pause API

    print(" Transformation termin√©. Fusion...")


## 6. Fusion et dataset finale


In [None]:
    def apply_enrichment(row):
        tid = row['track_id']
        if tid in full_metadata:
            data = full_metadata[tid]
            row['year'] = data['year']
            row['image'] = data['image']
            row['preview'] = data['preview']
            row['country_code'] = data['country'] # Ex: US, FR, GB
            row['region'] = data['region']        # Ex: Am√©rique du Nord, Europe
        else:
            row['year'] = None
        
        row['duration_fmt'] = format_duration(row['duration_ms'])
        return row

    df_final = df_hits.apply(apply_enrichment, axis=1)
    df_final = df_final.dropna(subset=['year'])

    # Colonnes finales
    cols = [
        'track_name', 'artists', 'year', 'region', 'country_code', # <-- Nouvelles colonnes
        'image', 'preview', 'duration_fmt', 'popularity', 
        'danceability', 'energy', 'tempo', 'track_genre'
    ]
    
    existing_cols = [c for c in cols if c in df_final.columns]
    df_final = df_final[existing_cols]

    df_final.to_json(OUTPUT_FILE, orient='records', indent=4)
    print(f" Termin√© ! Fichier '{OUTPUT_FILE}' pr√™t avec les R√©gions.")

if __name__ == "__main__":
    main()


In [None]:
API_KEY = os.getenv("GEMINI_API_KEY") 
if not API_KEY:
    raise ValueError("La cl√© API GEMINI_API_KEY n'est pas d√©finie.")

genai.configure(api_key=API_KEY)

In [None]:
MODEL_ROSTER = [
    'models/gemini-1.5-flash', # Mod√®le rapide et √©conomique par d√©faut
    'models/gemini-1.5-pro',
    'models/gemini-1.0-pro'
]

# Noms de fichiers
RAW_CSV = "billboard_raw.csv"
ENRICHED_CSV = "billboard_enriched_live.csv"
SPOTIFY_JSON = "dataset_final.json"
FINAL_CSV = "GLOBAL_HIT_1980_2023_MMSS.csv"

In [None]:
def scrape_billboard_wiki(start_year, end_year):
    base_url = "https://en.wikipedia.org/wiki/Billboard_Year-End_Hot_100_singles_of_{}"
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
    }
    
    all_songs = []
    print(f"--- üåç D√©marrage du scraping de {start_year} √† {end_year} ---")

    for year in range(start_year, end_year + 1):
        url = base_url.format(year)
        try:
            response = requests.get(url, headers=headers)
            if response.status_code != 200:
                print(f"‚ùå Erreur HTTP {response.status_code} pour {year}")
                continue

            soup = BeautifulSoup(response.text, 'html.parser')
            tables = soup.select('table.wikitable')
            
            target_table = None
            for t in tables:
                if "Title" in t.text or "Song" in t.text:
                    target_table = t
                    break
            
            if not target_table and tables:
                target_table = tables[0]

            if not target_table:
                print(f"‚ö†Ô∏è Pas de table trouv√©e pour {year}")
                continue

            rows = target_table.find_all('tr')
            
            for row in rows:
                cols = row.find_all(['th', 'td'])
                if len(cols) < 2: continue
                
                track_name = "Unknown"
                artist_name = "Unknown"
                
                found_title = False
                for i, col in enumerate(cols):
                    text = col.get_text(strip=True)
                    if '"' in text:
                        track_name = text.replace('"', '')
                        if i + 1 < len(cols):
                            artist_name = cols[i+1].get_text(strip=True)
                        found_title = True
                        break
                
                if not found_title and len(cols) >= 3:
                    track_name = cols[1].get_text(strip=True).replace('"', '')
                    artist_name = cols[2].get_text(strip=True)
                
                if track_name.lower() in ['title', 'song'] or artist_name.lower() in ['artist', 'performer']:
                    continue

                if track_name != "Unknown":
                    all_songs.append({
                        'track_name': track_name,
                        'artists': artist_name,
                        'year': year,
                        'region': 'US',
                        'country_code': 'US'
                    })

        except Exception as e:
            print(f"‚ùå Erreur sur l'ann√©e {year}: {e}")

    print(f"‚úÖ Scraping termin√©. {len(all_songs)} chansons r√©cup√©r√©es.")
    return pd.DataFrame(all_songs)

# Ex√©cution du scraping
if not os.path.exists(RAW_CSV):
    df_raw = scrape_billboard_wiki(1980, 2010) # 1980-2010 comme sp√©cifi√© dans tes fichiers
    df_raw.to_csv(RAW_CSV, index=False)
    print(f"üìÅ Donn√©es brutes sauvegard√©es dans {RAW_CSV}")
else:
    print(f"üìÇ Le fichier {RAW_CSV} existe d√©j√†. Chargement...")
    df_raw = pd.read_csv(RAW_CSV)

In [None]:
def enrich_data_with_gemini(input_file, output_file, batch_size=500): # Batch size r√©duit pour √©viter les timeouts
    if not os.path.exists(input_file):
        print(f"‚ùå Fichier d'entr√©e {input_file} introuvable.")
        return

    df_raw = pd.read_csv(input_file)
    records = df_raw.to_dict('records')
    total_records = len(records)
    
    # Gestion de la reprise (si le script a plant√©)
    start_index = 0
    if os.path.exists(output_file):
        try:
            df_existing = pd.read_csv(output_file)
            start_index = len(df_existing)
            print(f"üîÑ Reprise de l'enrichissement √† la ligne {start_index}...")
        except pd.errors.EmptyDataError:
            # Fichier vide, on √©crit les headers
            pd.DataFrame(columns=[
                'track_name', 'artists', 'year', 'region', 'country_code', 
                'duration', 'tempo', 'track_genre', 'popularity'
            ]).to_csv(output_file, index=False)
    else:
        # Cr√©ation du fichier
        pd.DataFrame(columns=[
            'track_name', 'artists', 'year', 'region', 'country_code', 
            'duration', 'tempo', 'track_genre', 'popularity'
        ]).to_csv(output_file, index=False)

    current_model_idx = 0
    i = start_index

    while i < total_records:
        end_idx = min(i + batch_size, total_records)
        batch = records[i : end_idx]
        
        if not batch: break
        
        batch_success = False
        retry_count = 0
        
        while not batch_success:
            if current_model_idx >= len(MODEL_ROSTER):
                print("üíÄ Tous les mod√®les Gemini sont √©puis√©s. Arr√™t du script.")
                return

            model_name = MODEL_ROSTER[current_model_idx]
            
            try:
                print(f"‚ö° Enrichissement lignes {i} √† {end_idx} ({model_name})...")
                model = genai.GenerativeModel(model_name)
                
                prompt = f"""
                DATA: {json.dumps(batch)}
                
                INSTRUCTIONS:
                Tu es un expert musical. Pour chaque chanson de la liste DATA, retourne un objet JSON avec ces cl√©s :
                - track_name (garder l'original)
                - artists (garder l'original)
                - year (garder l'original)
                - region (garder 'US')
                - country_code (garder 'US')
                - duration (estimation pr√©cise en SECONDES, entier)
                - tempo (estimation BPM, entier)
                - track_genre (genre principal unique, string court ex: 'Pop', 'Rock')
                - popularity (score historique 0-100, entier)

                Retourne UNIQUEMENT une liste JSON valide contenant exactement {len(batch)} objets. Pas de markdown.
                """
                
                response = model.generate_content(prompt)
                text = response.text.strip()
                if text.startswith("```"): text = text.split("\n", 1)[1].rsplit("\n", 1)[0]
                
                try:
                    data = json.loads(text)
                except json.JSONDecodeError:
                     # Fallback si le JSON est malform√© (ex: texte coup√©)
                    print("‚ö†Ô∏è Erreur JSON, nouvelle tentative...")
                    raise ValueError("JSON invalide")

                if len(data) != len(batch):
                    print(f"‚ö†Ô∏è Mismatch: {len(data)} items re√ßus vs {len(batch)} attendus.")
                     # On pourrait g√©rer plus finement, mais ici on retry
                    raise ValueError("Nombre d'items incorrect")

                # Sauvegarde incr√©mentale
                df_batch = pd.DataFrame(data)
                cols = ['track_name', 'artists', 'year', 'region', 'country_code', 'duration', 'tempo', 'track_genre', 'popularity']
                # S√©curisation des colonnes manquantes
                for c in cols: 
                    if c not in df_batch.columns: df_batch[c] = None
                
                df_batch[cols].to_csv(output_file, mode='a', header=False, index=False)
                
                batch_success = True
                i += batch_size
                time.sleep(1) # Petite pause de courtoisie

            except Exception as e:
                err_str = str(e)
                if "429" in err_str or "ResourceExhausted" in err_str:
                    print(f"‚õî Quota d√©pass√© pour {model_name}. Changement de mod√®le.")
                    current_model_idx += 1
                else:
                    print(f"‚ö†Ô∏è Erreur: {e}")
                    retry_count += 1
                    if retry_count > 3:
                        print("‚ùå Trop d'erreurs, on saute ce batch (donn√©es perdues pour ces lignes).")
                        i += batch_size # On avance quand m√™me pour ne pas bloquer
                        batch_success = True
                    else:
                        time.sleep(2)

# Lancer l'enrichissement
enrich_data_with_gemini(RAW_CSV, ENRICHED_CSV)
print(f"‚úÖ Enrichissement termin√©. Donn√©es dans {ENRICHED_CSV}")

In [None]:
def seconds_to_mm_ss(val):
    """Convertit une dur√©e en secondes (ex: 214) en format MM:SS (ex: 3:34)"""
    try:
        if pd.isna(val) or val == '': return None
        seconds = int(float(val))
        m = seconds // 60
        s = seconds % 60
        return f"{m}:{s:02d}"
    except:
        return None

def merge_datasets():
    print("--- üîÑ Fusion des datasets (Billboard Gemini + Spotify) ---")

    # 1. Chargement et pr√©paration GEMINI (1980-1999)
    if not os.path.exists(ENRICHED_CSV):
        print(f"‚ùå Erreur: Fichier {ENRICHED_CSV} manquant.")
        return

    df_old = pd.read_csv(ENRICHED_CSV)
    
    # Filtre temporel strict (au cas o√π Gemini a hallucin√© des ann√©es)
    df_old = df_old[df_old['year'] < 2000].copy()
    
    # Conversion Duration (Secondes -> MM:SS)
    df_old['duration'] = df_old['duration'].apply(seconds_to_mm_ss)
    
    # Colonnes cibles
    cols_target = ['track_name', 'artists', 'year', 'duration', 'tempo', 'track_genre', 'popularity', 'region', 'country_code']
    
    # Ajout des colonnes manquantes si n√©cessaire
    for c in cols_target:
        if c not in df_old.columns: df_old[c] = None

    print(f"üìä Gemini (1980-1999): {len(df_old)} titres pr√™ts.")

    # 2. Chargement et pr√©paration SPOTIFY (2000-2023)
    if not os.path.exists(SPOTIFY_JSON):
        print(f"‚ùå Erreur: Fichier {SPOTIFY_JSON} manquant.")
        return

    try:
        df_new = pd.read_json(SPOTIFY_JSON)
    except ValueError:
        print("‚ùå Erreur: JSON Spotify mal form√©.")
        return

    # Conversion et filtre ann√©e
    df_new['year'] = pd.to_numeric(df_new['year'], errors='coerce')
    df_new = df_new[df_new['year'] >= 2000].copy()

    # Mapping Duration Spotify (duration_fmt -> duration)
    if 'duration_fmt' in df_new.columns:
        df_new = df_new.rename(columns={'duration_fmt': 'duration'})
    
    # On s'assure d'avoir les m√™mes colonnes
    for c in cols_target:
        if c not in df_new.columns: df_new[c] = None
    
    df_new = df_new[cols_target]
    print(f"üìä Spotify (2000-2023): {len(df_new)} titres pr√™ts.")

    # 3. Concat√©nation
    df_final = pd.concat([df_old, df_new], ignore_index=True)
    
    # Tri par ann√©e
    df_final = df_final.sort_values(by='year')
    
    # Sauvegarde finale
    df_final.to_csv(FINAL_CSV, index=False)
    
    print(f"\n‚úÖ FUSION R√âUSSIE : {FINAL_CSV}")
    print(f"   Total lignes : {len(df_final)}")
    print(f"   Aper√ßu des colonnes : {list(df_final.columns)}")

# Lancer la fusion
merge_datasets()

In [None]:
def seconds_to_mm_ss(val):
    """Convertit une dur√©e en secondes (ex: 214) en format MM:SS (ex: 3:34)"""
    try:
        if pd.isna(val) or val == '': return None
        seconds = int(float(val))
        m = seconds // 60
        s = seconds % 60
        return f"{m}:{s:02d}"
    except:
        return None

def merge_datasets():
    print("--- üîÑ Fusion des datasets (Billboard Gemini + Spotify) ---")

    # 1. Chargement et pr√©paration GEMINI (1980-1999)
    if not os.path.exists(ENRICHED_CSV):
        print(f"‚ùå Erreur: Fichier {ENRICHED_CSV} manquant.")
        return

    df_old = pd.read_csv(ENRICHED_CSV)
    
    # Filtre temporel strict (au cas o√π Gemini a hallucin√© des ann√©es)
    df_old = df_old[df_old['year'] < 2000].copy()
    
    # Conversion Duration (Secondes -> MM:SS)
    df_old['duration'] = df_old['duration'].apply(seconds_to_mm_ss)
    
    # Colonnes cibles
    cols_target = ['track_name', 'artists', 'year', 'duration', 'tempo', 'track_genre', 'popularity', 'region', 'country_code']
    
    # Ajout des colonnes manquantes si n√©cessaire
    for c in cols_target:
        if c not in df_old.columns: df_old[c] = None

    print(f"üìä Gemini (1980-1999): {len(df_old)} titres pr√™ts.")

    # 2. Chargement et pr√©paration SPOTIFY (2000-2023)
    if not os.path.exists(SPOTIFY_JSON):
        print(f"‚ùå Erreur: Fichier {SPOTIFY_JSON} manquant.")
        return

    try:
        df_new = pd.read_json(SPOTIFY_JSON)
    except ValueError:
        print("‚ùå Erreur: JSON Spotify mal form√©.")
        return

    # Conversion et filtre ann√©e
    df_new['year'] = pd.to_numeric(df_new['year'], errors='coerce')
    df_new = df_new[df_new['year'] >= 2000].copy()

    # Mapping Duration Spotify (duration_fmt -> duration)
    if 'duration_fmt' in df_new.columns:
        df_new = df_new.rename(columns={'duration_fmt': 'duration'})
    
    # On s'assure d'avoir les m√™mes colonnes
    for c in cols_target:
        if c not in df_new.columns: df_new[c] = None
    
    df_new = df_new[cols_target]
    print(f"üìä Spotify (2000-2023): {len(df_new)} titres pr√™ts.")

    # 3. Concat√©nation
    df_final = pd.concat([df_old, df_new], ignore_index=True)
    
    # Tri par ann√©e
    df_final = df_final.sort_values(by='year')
    
    # Sauvegarde finale
    df_final.to_csv(FINAL_CSV, index=False)
    
    print(f"\n‚úÖ FUSION R√âUSSIE : {FINAL_CSV}")
    print(f"   Total lignes : {len(df_final)}")
    print(f"   Aper√ßu des colonnes : {list(df_final.columns)}")

# Lancer la fusion
merge_datasets()