In [7]:
# Crée un docker Mongodb sur le terminal (a faire qu'une fois):
# docker run -d --name mongodb -p 27017:27017 mongo:latest
 

In [8]:
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import requests
import csv
from pymongo import MongoClient

API_KEY = 'XXXXXXXXXXXXXX'

In [9]:
def get_technology_video_ids(tags, max_results=50):
    """
    Récupère les identifiants de vidéos YouTube correspondant à une liste de tags et une catégorie.

    Args:
        tags: Une liste de chaînes de caractères représentant les tags à rechercher.
        max_results: Le nombre maximum de résultats à récupérer.

    Returns:
        Une liste d'identifiants de vidéos.
    """
    youtube = build("youtube", "v3", developerKey=API_KEY, cache_discovery=False)

    all_video_ids = []
    next_page_token = None

    # Diviser les tags en sous-listes de 10 tags maximum
    tag_groups = [tags[i:i + 10] for i in range(0, len(tags), 10)]

    for tag_group in tag_groups:
        while True:
            try:
                search_response = youtube.search().list(
                    part="snippet",
                    maxResults=min(max_results, 50),  # Limite à 50 résultats par requête
                    type="video",
                    videoCategoryId="28",  # Catégorie Technologie
                    q=" ".join(tag_group),
                    pageToken=next_page_token
                ).execute()

                video_ids = [item['id']['videoId'] for item in search_response['items']]
                all_video_ids.extend(video_ids)

                next_page_token = search_response.get('nextPageToken')
                if not next_page_token or len(all_video_ids) >= max_results:
                    break
            except HttpError as err:
                print(f"Erreur lors de la récupération des vidéos : {err}")
                break

    return all_video_ids[:max_results]


def get_video_info(video_id):
    """
    Récupère les informations détaillées d'une vidéo YouTube à partir de son identifiant.

    Args:
        video_id: L'identifiant de la vidéo.

    Returns:
        Un dictionnaire contenant les informations de la vidéo, ou None en cas d'erreur.
    """
    url = "https://www.googleapis.com/youtube/v3/videos"
    params = {
        "id": video_id,
        "part": "snippet,statistics",
        "key": API_KEY
    }
    response = requests.get(url, params=params)

    if response.status_code == 200:
        return response.json()['items'][0]
    else:
        print(f"Erreur pour l'ID vidéo {video_id}: {response.status_code}")
        return None

def tags_to_csv():
    """
    Récupère les vidéos correspondant aux tags définis, puis enregistre les métadonnées dans un fichier CSV.
    """
    tags = [
        "Cloud Computing", "Kubernetes", "Docker", "DevOps", "Machine Learning", "AI", "Data Science", "Big Data"
    ]

    # Récupérer les identifiants des vidéos
    video_ids = get_technology_video_ids(tags, max_results=50)

    # Écrire les informations des vidéos dans un fichier CSV
    with open('Data_video.csv', mode='w', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        writer.writerow(['video_id', 'title', 'publishedAt', 'channelId', 'description', 'view_count', 
                         'like_count', 'favorite_count', 'comment_count', 'tags', 'defaultLanguage'])

        for video_id in video_ids:
            video_info = get_video_info(video_id)
            if video_info:
                writer.writerow([
                    video_id,
                    video_info['snippet']['title'],
                    video_info['snippet']['publishedAt'],
                    video_info['snippet']['channelId'],
                    video_info['snippet']['description'],
                    video_info['statistics'].get('viewCount', 'N/A'),
                    video_info['statistics'].get('likeCount', 'N/A'),
                    video_info['statistics'].get('favoriteCount', 'N/A'),
                    video_info['statistics'].get('commentCount', 'N/A'),
                    ", ".join(video_info['snippet'].get('tags', [])),
                    video_info['snippet'].get('defaultLanguage', 'N/A')
                ])

In [10]:
def csv_to_mongo_compass(path, mongo_url):
    """
    Lit les données depuis le fichier CSV généré et les insère dans une base de données MongoDB locale.
    """
    client = MongoClient(mongo_url)
    db = client['youtube_data']
    collection = db['videos']

    # Charger les données depuis le fichier CSV
    with open(path, mode='r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        video_data = [row for row in reader]

    # Insérer les données dans MongoDB
    if video_data:
        collection.insert_many(video_data)
        print(f"{len(video_data)} vidéos ont été insérées dans la base MongoDB.")
    else:
        print("Aucune donnée trouvée dans le fichier CSV.")

In [11]:
# # Étape 1 : Récupération des données et création du fichier CSV
print("Récupération des données et écriture dans le fichier CSV...")
tags_to_csv()

Récupération des données et écriture dans le fichier CSV...


In [6]:
# Étape 2 : Importation des données dans MongoDB
print("Importation des données dans MongoDB...")
csv_to_mongo_compass(path = 'Data_video.csv', mongo_url = "mongodb+srv://gaetan:05021995fR@cluster0.wxcxg.mongodb.net/")
# "mongodb://localhost:27017/"

Importation des données dans MongoDB...


FileNotFoundError: [Errno 2] No such file or directory: 'Data_video.csv'