In [4]:
from googleapiclient.discovery import build
from datetime import datetime, timedelta
import pandas as pd
import isodate


In [10]:

# Remplacez par votre clé API
API_KEY = 'AIzaSyCMns7RoR6OKWkwTThFh5NxtiA7nd-oKiY'

# Initialiser le service YouTube
youtube = build('youtube', 'v3', developerKey=API_KEY)

# Définir la période des 10 derniers jours
date_10_days_ago = (datetime.utcnow() - timedelta(days=20)).isoformat("T") + "Z"

# Rechercher des vidéos liées à "League of Legends" publiées au cours des 10 derniers jours
search_response = youtube.search().list(
    q='League of Legends',
    part='id,snippet',
    type='video',
    publishedAfter=date_10_days_ago,
    maxResults=50
).execute()

# Extraire les identifiants des vidéos
video_ids = [item['id']['videoId'] for item in search_response['items']]

# Récupérer les statistiques des vidéos
video_response = youtube.videos().list(
    id=','.join(video_ids),
    part='snippet,statistics,contentDetails',
    maxResults=50
).execute()

# Créer un DataFrame pour stocker les données
data = {
    'TitreVidéo': [],
    'Chaîne': [],
    'Vues': [],
    'DatePublication': [],
    'DuréeSecondes': []
}

for video in video_response['items']:
    title = video['snippet'].get('title', 'Titre inconnu')
    channel_title = video['snippet'].get('channelTitle', 'Chaîne inconnue')
    view_count = int(video['statistics'].get('viewCount', 0))
    publish_date = video['snippet'].get('publishedAt', 'Date inconnue')
    duration_iso = video['contentDetails'].get('duration', None)
    duration = isodate.parse_duration(duration_iso).total_seconds() if duration_iso else None

    data['TitreVidéo'].append(title)
    data['Chaîne'].append(channel_title)
    data['Vues'].append(view_count)
    data['DatePublication'].append(publish_date)
    data['DuréeSecondes'].append(duration)

df = pd.DataFrame(data)

# Calculer le total des vues par chaîne
df_grouped = df.groupby('Chaîne', as_index=False).sum()

df_video_count = df.groupby('Chaîne').size().reset_index(name='NombreVidéos')

df_avg_duration = df.groupby('Chaîne')['DuréeSecondes'].mean().reset_index(name='DuréeMoyenneSecondes')

# Calculer le total des vues par chaîne
df_total_views = df.groupby('Chaîne')['Vues'].sum().reset_index(name='VuesTotales')

# Fusionner les DataFrames
df_summary = pd.merge(df_total_views, df_video_count, on='Chaîne')
df_summary = pd.merge(df_summary, df_avg_duration, on='Chaîne')

# Trier les chaînes par nombre de vues décroissant et sélectionner le top 20
df_top20 = df_summary.sort_values(by='VuesTotales', ascending=False).head(20)
df_top20['DuréeMoyenneMinutes'] = df_top20['DuréeMoyenneSecondes'] / 60

print(df_top20)


                                Chaîne  VuesTotales  NombreVidéos  \
9                  League of Hardstuck      2404235             1   
1                              Czokled      2010089             2   
7                           LCK Global      1799602             2   
10                   League of Legends      1041294             1   
29                                Suna       400726             1   
20                              Necrit       299448             1   
2                             Doaenel        198643             2   
36                         Zwag Xerath       196712             1   
30                             TC Zwag       161774             1   
24                               Remus       151119             3   
27  Skill Capped Challenger LoL Guides       139495             2   
34                            Vandiril       116077             1   
37                              Zwagmo       100067             1   
35                              We

In [2]:
import plotly.express as px
# Créer le diagramme en barres horizontales
fig = px.bar(df_top20, x='Vues', y='Chaîne', orientation='h',
             title='Top 20 des chaînes YouTube sur League of Legends (10 derniers jours)',
             labels={'Vues': 'Nombre de vues', 'Chaîne': 'Chaîne YouTube'},
             text='Vues')

# Afficher le graphique
fig.show()

In [21]:
from googleapiclient.discovery import build
from datetime import datetime, timedelta
import pandas as pd
import isodate
import time

# Paramètres configurables
TOP_N = 25       # Nombre de chaînes à afficher dans le top
JOURS_ANALYSE = 10  # Période d'analyse en jours
MAX_RESULTS_PER_PAGE = 50  # Nombre maximum de résultats par page
MAX_PAGES = 4  # Nombre maximum de pages à récupérer

# Configuration de l'API
youtube = build('youtube', 'v3', developerKey=API_KEY)

# Définir la période d'analyse
date_analyse = (datetime.utcnow() - timedelta(days=JOURS_ANALYSE)).isoformat("T") + "Z"

# Fonction pour récupérer plus de résultats avec pagination
def get_videos_with_pagination(query, published_after, max_pages=MAX_PAGES):
    video_data = []
    next_page_token = None
    pages_retrieved = 0
    
    while pages_retrieved < max_pages:
        search_params = {
            'q': query,
            'part': 'id,snippet',
            'type': 'video',
            'publishedAfter': published_after,
            'maxResults': MAX_RESULTS_PER_PAGE
        }
        
        if next_page_token:
            search_params['pageToken'] = next_page_token
        
        search_response = youtube.search().list(**search_params).execute()
        
        for item in search_response['items']:
            if 'videoId' in item['id']:
                video_data.append(item)
        
        next_page_token = search_response.get('nextPageToken')
        pages_retrieved += 1
        
        if not next_page_token:
            break
        
        # Respecter les limites de l'API
        time.sleep(0.2)
    
    return video_data

# Fonction pour récupérer les détails des vidéos par lots
def get_video_details(video_ids):
    video_details = []
    
    # Traiter les vidéos par lots pour respecter les limites de l'API
    for i in range(0, len(video_ids), 50):
        batch = video_ids[i:i+50]
        
        # Récupérer les détails des vidéos
        video_response = youtube.videos().list(
            id=','.join(batch),
            part='snippet,statistics,contentDetails',
            maxResults=50
        ).execute()
        
        video_details.extend(video_response.get('items', []))
        
        # Respecter les limites de l'API
        if i + 50 < len(video_ids):
            time.sleep(0.2)
    
    return video_details

# Fonction pour récupérer les vidéos d'une chaîne spécifique
def get_channel_videos(channel_id, published_after, max_pages=3):  # Augmenté à 3 pages pour être plus exhaustif
    channel_videos = []
    next_page_token = None
    pages_retrieved = 0
    
    while pages_retrieved < max_pages:
        search_params = {
            'channelId': channel_id,
            'part': 'id,snippet',
            'type': 'video',
            'publishedAfter': published_after,
            'maxResults': MAX_RESULTS_PER_PAGE,
            'order': 'date'  # récupérer les vidéos les plus récentes d'abord
        }
        
        if next_page_token:
            search_params['pageToken'] = next_page_token
        
        search_response = youtube.search().list(**search_params).execute()
        
        for item in search_response['items']:
            if 'videoId' in item['id']:
                channel_videos.append(item)
        
        next_page_token = search_response.get('nextPageToken')
        pages_retrieved += 1
        
        if not next_page_token:
            break
        
        # Respecter les limites de l'API
        time.sleep(0.2)
    
    return channel_videos

# Fonction pour déterminer si une vidéo est un short
def is_short_video(video):
    title = video['snippet'].get('title', '')
    description = video['snippet'].get('description', '')
    duration_iso = video['contentDetails'].get('duration', 'PT0S')
    duration_seconds = isodate.parse_duration(duration_iso).total_seconds()
    
    # Critères pour identifier un Short
    is_short_by_duration = duration_seconds <= 60
    has_shorts_tag = (
        '#shorts' in title.lower() or 
        '#short' in title.lower() or
        'shorts' in description.lower()
    )
    
    return is_short_by_duration or has_shorts_tag

# Fonction pour traiter les vidéos et les ajouter à un DataFrame
def process_videos(video_details):
    data = {
        'TitreVidéo': [],
        'Chaîne': [],
        'ChaîneID': [],
        'Vues': [],
        'DatePublication': [],
        'DuréeSecondes': [],
        'EstShort': [],
        'ID': []
    }
    
    for video in video_details:
        if not video:  # Ignorer les entrées vides
            continue
            
        title = video['snippet'].get('title', 'Titre inconnu')
        channel_title = video['snippet'].get('channelTitle', 'Chaîne inconnue')
        channel_id = video['snippet'].get('channelId', '')
        view_count = int(video['statistics'].get('viewCount', 0))
        publish_date = video['snippet'].get('publishedAt', 'Date inconnue')
        video_id = video['id']
        
        # Extraction de la durée
        duration_iso = video['contentDetails'].get('duration', 'PT0S')
        duration_seconds = isodate.parse_duration(duration_iso).total_seconds()
        
        # Déterminer si c'est un Short
        is_short = is_short_video(video)
        
        data['TitreVidéo'].append(title)
        data['Chaîne'].append(channel_title)
        data['ChaîneID'].append(channel_id)
        data['Vues'].append(view_count)
        data['DatePublication'].append(publish_date)
        data['DuréeSecondes'].append(duration_seconds)
        data['EstShort'].append(is_short)
        data['ID'].append(video_id)
    
    return pd.DataFrame(data)

print("\n=== PHASE 1: IDENTIFICATION DES TOP CHAÎNES LEAGUE OF LEGENDS ===")
print(f"Recherche des vidéos League of Legends des {JOURS_ANALYSE} derniers jours...")

# PHASE 1: Récupérer un grand échantillon de vidéos League of Legends
video_items = get_videos_with_pagination('League of Legends', date_analyse)
print(f"Nombre total de vidéos trouvées : {len(video_items)}")

# Extraire les identifiants des vidéos
video_ids = [item['id']['videoId'] for item in video_items]

# Récupérer les détails des vidéos
video_details = get_video_details(video_ids)
print(f"Détails récupérés pour {len(video_details)} vidéos")

# Traiter les vidéos pour créer un DataFrame initial
df_initial = process_videos(video_details)

# Calculer les statistiques par chaîne pour identifier le top N
channel_stats = df_initial.groupby('Chaîne').agg(
    VuesTotales=('Vues', 'sum'),
    ChaîneID=('ChaîneID', 'first')  # Récupérer l'ID de la chaîne pour la phase 2
).reset_index().sort_values(by='VuesTotales', ascending=False)

# Sélectionner le top N des chaînes
top_channels = channel_stats.head(TOP_N)
print(f"\nTop {TOP_N} chaînes identifiées :")
for i, (_, row) in enumerate(top_channels.iterrows(), 1):
    print(f"{i}. {row['Chaîne']} ({int(row['VuesTotales']):,} vues)")

print("\n=== PHASE 2: ANALYSE DÉTAILLÉE DES TOP CHAÎNES ===")
# PHASE 2: Analyser en profondeur chaque chaîne du top N

# DataFrame pour stocker les résultats finaux
all_videos_data = []

# Pour chaque chaîne du top N, récupérer toutes ses vidéos récentes
for _, channel in top_channels.iterrows():
    channel_name = channel['Chaîne']
    channel_id = channel['ChaîneID']
    
    print(f"\nAnalyse de la chaîne: {channel_name}")
    
    # Récupérer les vidéos de cette chaîne
    channel_video_items = get_channel_videos(channel_id, date_analyse)
    
    if not channel_video_items:
        print(f"  Aucune vidéo trouvée pour cette chaîne dans la période")
        continue
    
    print(f"  {len(channel_video_items)} vidéos trouvées")
    
    # Récupérer les détails de ces vidéos
    channel_video_ids = [item['id']['videoId'] for item in channel_video_items]
    channel_video_details = get_video_details(channel_video_ids)
    
    # Ajouter ces vidéos au DataFrame final
    all_videos_data.extend(channel_video_details)
    
    # Patienter un peu pour respecter les limites de l'API
    time.sleep(0.5)

# Traiter toutes les vidéos collectées
print("\nTraitement de toutes les vidéos collectées...")
df_final = process_videos(all_videos_data)

# Convertir les dates de publication en format datetime
df_final['DatePublicationDT'] = pd.to_datetime(df_final['DatePublication'])

# Calculer la date limite en fonction de JOURS_ANALYSE
from datetime import timezone  # Ajouter cette import
date_limite = (datetime.utcnow() - timedelta(days=JOURS_ANALYSE)).replace(tzinfo=timezone.utc)

# Filtrer les vidéos pour ne garder que celles publiées dans la période d'analyse
df_filtered = df_final[df_final['DatePublicationDT'] >= date_limite]

# Afficher les stats de filtrage
videos_exclues = len(df_final) - len(df_filtered)
if videos_exclues > 0:
    print(f"\nFiltrage par date: {videos_exclues} vidéos exclues car hors de la période des {JOURS_ANALYSE} derniers jours")
    print(f"Nombre final de vidéos analysées: {len(df_filtered)}")

# Remplacer df_final par df_filtered pour la suite de l'analyse
df_final = df_filtered

# Ajouter une fonction pour afficher les vidéos d'une chaîne
def afficher_details_chaine(df, nom_chaine):
    """Affiche tous les contenus d'une chaîne spécifique pour vérification"""
    chaine_df = df[df['Chaîne'] == nom_chaine]
    
    if chaine_df.empty:
        print(f"Aucun contenu trouvé pour la chaîne: {nom_chaine}")
        # Vérifier si le nom est peut-être contenu dans une chaîne
        similar = df[df['Chaîne'].str.contains(nom_chaine, case=False)]
        if not similar.empty:
            print(f"Chaînes similaires trouvées: {similar['Chaîne'].unique()}")
        return
    
    videos = chaine_df[~chaine_df['EstShort']]
    shorts = chaine_df[chaine_df['EstShort']]
    
    print(f"\n=== Détail des contenus pour '{nom_chaine}' ===")
    print(f"Nombre total de contenus: {len(chaine_df)}")
    print(f"Vidéos: {len(videos)}, Shorts: {len(shorts)}")
    
    if not videos.empty:
        print("\n--- VIDÉOS ---")
        for _, row in videos.iterrows():
            print(f"- {row['TitreVidéo']} | {row['DuréeSecondes']:.0f}s | {row['Vues']:,} vues")
    
    if not shorts.empty:
        print("\n--- SHORTS ---")
        for _, row in shorts.iterrows():
            print(f"- {row['TitreVidéo']} | {row['DuréeSecondes']:.0f}s | {row['Vues']:,} vues")

# Vérifier quelques chaînes pour diagnostic
print("\n=== VÉRIFICATION DES DONNÉES POUR QUELQUES CHAÎNES ===")
# Vérifier les 2 premières chaînes du top et la chaîne League of Legends si présente
chaines_a_verifier = list(top_channels['Chaîne'].head(2))
if 'League of Legends' in df_final['Chaîne'].values and 'League of Legends' not in chaines_a_verifier:
    chaines_a_verifier.append('League of Legends')

for chaine in chaines_a_verifier:
    afficher_details_chaine(df_final, chaine)

# Créer un DataFrame des statistiques par chaîne
df_stats = df_final.groupby('Chaîne').apply(
    lambda x: pd.Series({
        'NombreVideos': sum(~x['EstShort']),
        'VuesTotalesVideos': x[~x['EstShort']]['Vues'].sum(),
        'DuréeMoyenneVideos': x[~x['EstShort']]['DuréeSecondes'].mean() if sum(~x['EstShort']) > 0 else 0,
        'NombreShorts': sum(x['EstShort']),
        'VuesTotalesShorts': x[x['EstShort']]['Vues'].sum(),
        'DuréeMoyenneShorts': x[x['EstShort']]['DuréeSecondes'].mean() if sum(x['EstShort']) > 0 else 0
    })
).reset_index()

# Calculer les vues totales et les durées moyennes en minutes
df_stats['VuesTotales'] = df_stats['VuesTotalesVideos'] + df_stats['VuesTotalesShorts']
df_stats['DuréeMoyenneVideosMinutes'] = df_stats['DuréeMoyenneVideos'] / 60
df_stats['DuréeMoyenneShortsMinutes'] = df_stats['DuréeMoyenneShorts'] / 60

# Convertir les colonnes à afficher en entiers
cols_to_int = ['NombreVideos', 'VuesTotalesVideos', 'NombreShorts', 'VuesTotalesShorts', 'VuesTotales']
df_stats[cols_to_int] = df_stats[cols_to_int].astype(int)

# Trier par vues totales décroissantes 
df_stats_sorted = df_stats.sort_values(by='VuesTotales', ascending=False)

# Afficher le top N avec un format plus lisible
# Déterminer les dates exactes de la période d'analyse
date_debut = date_limite.strftime("%d/%m/%Y")
date_fin = datetime.utcnow().strftime("%d/%m/%Y")

print(f"\nTOP {TOP_N} DES CHAÎNES LEAGUE OF LEGENDS - PÉRIODE: {date_debut} AU {date_fin} ({JOURS_ANALYSE} JOURS)")
print("=" * 100)

# Créer un DataFrame pour l'affichage avec des colonnes formatées
df_display = df_stats_sorted.copy()
df_display.insert(0, 'Rang', range(1, len(df_display) + 1))

# Définir le format d'affichage
pd.options.display.float_format = '{:,.1f}'.format  # Une décimale pour les durées

# Afficher les résultats
print(df_display[[
    'Rang', 'Chaîne', 'VuesTotales', 
    'NombreVideos', 'VuesTotalesVideos', 
    'NombreShorts', 'VuesTotalesShorts', 
    'DuréeMoyenneVideosMinutes', 'DuréeMoyenneShortsMinutes'
]].to_string(index=False))

# Résumé global
total_videos = df_stats['NombreVideos'].sum()
total_shorts = df_stats['NombreShorts'].sum()
total_views_videos = df_stats['VuesTotalesVideos'].sum()
total_views_shorts = df_stats['VuesTotalesShorts'].sum()
total_views = total_views_videos + total_views_shorts

print("\nRÉSUMÉ GLOBAL")
print("=" * 40)
print(f"Nombre total de vidéos : {int(total_videos):,}")
print(f"Nombre total de shorts : {int(total_shorts):,}")
print(f"Total des vues (vidéos) : {int(total_views_videos):,}")
print(f"Total des vues (shorts) : {int(total_views_shorts):,}")
print(f"Total des vues (tous contenus) : {int(total_views):,}")
print(f"Pourcentage de vues sur les shorts : {(total_views_shorts/total_views*100 if total_views > 0 else 0):.2f}%")


=== PHASE 1: IDENTIFICATION DES TOP CHAÎNES LEAGUE OF LEGENDS ===
Recherche des vidéos League of Legends des 10 derniers jours...


HttpError: <HttpError 403 when requesting https://youtube.googleapis.com/youtube/v3/search?q=League+of+Legends&part=id%2Csnippet&type=video&publishedAfter=2025-02-19T23%3A25%3A30.820034Z&maxResults=50&key=AIzaSyCMns7RoR6OKWkwTThFh5NxtiA7nd-oKiY&alt=json returned "The request cannot be completed because you have exceeded your <a href="/youtube/v3/getting-started#quota">quota</a>.". Details: "[{'message': 'The request cannot be completed because you have exceeded your <a href="/youtube/v3/getting-started#quota">quota</a>.', 'domain': 'youtube.quota', 'reason': 'quotaExceeded'}]">