# Import packages

In [None]:
from googleapiclient.discovery import build
import pandas as pd
import re
from datetime import datetime, timedelta
import gspread
from google.oauth2.service_account import Credentials
import dotenv
import os

load_dotenv(".env")
api_key = os.getenv("API_KEY")

# Option et essentiel pour faire marcher le code

In [16]:
scopes = [
    "https://www.googleapis.com/auth/spreadsheets"
]
cred = Credentials.from_service_account_file("credential.json",scopes=scopes)
client = gspread.authorize(cred)

sheets_id = "1-eNcd2PWEHks-SaEB92pTFmTpk0FiG2ifZ7AdLavlPg"
workbook = client.open_by_key(sheets_id)

In [17]:
mail_sheet = workbook.worksheet("Mail adresses")

In [None]:
api_service_name = "youtube"
api_version = "v3"

youtube = build(
    api_service_name, api_version, developerKey=api_key)

In [19]:
def search_youtube(query:str, max_results:int):
    """ Fait une recherche sur Youtube à partir de ce que tu veux.
    
    Coûte 100 unités par recherche.

    Args : 
        query:str -> Ta recherche.
        max_results:int -> Le nombre de vidéo que tu veux."""
    results = []
    next_page_token = None

    while len(results) < max_results:
        # Limiter les résultats par requête pour rester en dessous de la limite max de 50
        limit = min(50, max_results - len(results))

        # Effectuer la requête
        request = youtube.search().list(
            q=query,
            part='snippet',
            maxResults=limit,
            type='video',
            pageToken=next_page_token,
            order = "viewCount",
            publishedAfter = "2024-01-01T00:00:00Z"
        )
        response = request.execute()

        # Ajouter les résultats
        results.extend(response.get('items', []))
        
        # Récupérer le token pour la page suivante
        next_page_token = response.get('nextPageToken')
        
        # Sortir de la boucle si aucune page suivante n'existe
        if not next_page_token:
            break

    return results
def fetch_videos(id:str):
    """ Cherche parmis tous les identifiants donnés en paramètres les informations des vidéos
    
    Coûte 1 unité par recherche
    
    Args :
        id:str -> identifiant de vidéo

    return string id video    
    """
    request = youtube.videos().list(
        part="snippet,contentDetails,statistics",
        id=id
        )
    # ytb_search(query = query)
    return request.execute()

def channel_search(id_channel:str):
    """Voir le nombre d'abonné
    
    100 de quota.
    """
    request = youtube.channels().list(
        part="snippet,contentDetails,statistics",
        id=id_channel
    )
    return request.execute()

def channel_videos(id_channel:str):
    """
    1 de quota
    """
    request = youtube.search().list(
        part="snippet",
        maxResults=50,
        order="date",
        type="video",
        channelId=id_channel
    )
    return request.execute()

def fetch_videos_id(query:str,max_results:int = 100):
    """Search video ids from the def ytb_search
    Args : 
        raw_json_search:dict -> Must be the default json of ytb_search
    """
    youtube_search_json = search_youtube(query=query,max_results=max_results)
    mail = mail_sheet.col_values(2)

    
    channel_Ids = []
    unique_channel_ids = []
    video_ids = ""
    
    for x in range(0,max_results):
        channel_Ids.append(youtube_search_json[x]["snippet"]["channelId"])
    
    [unique_channel_ids.append(channel_Id) for channel_Id in channel_Ids if channel_Id not in unique_channel_ids]

    channel_id_search = []
    channel_videos_list = []
    for unique_channel_id in unique_channel_ids:
        if unique_channel_id not in mail:
            channel_id_search.append(channel_search(unique_channel_id))
            channel_videos_list.append(channel_videos(unique_channel_id))
        else:
            pass
    for r in range(0,len(channel_id_search)):
        try:    
            channel_id_v = channel_videos_list[r]["items"][0]["snippet"]["channelId"]
            channel_id_s = channel_id_search[r]["items"][0]["id"]
            subscriberCount = int(channel_id_search[r]["items"][0]["statistics"]["subscriberCount"])
        except TypeError:
            pass
        
        publishedAt = channel_videos_list[r]["items"][0]["snippet"]["publishedAt"]
        
        date_obj = datetime.strptime(publishedAt, "%Y-%m-%dT%H:%M:%SZ")

        # Calcul de la différence de temps
        diff = datetime.now() - date_obj
        
        # Vérification si la différence est inférieure à 7 jours
        if channel_id_s == channel_id_v:
            if subscriberCount > 1000:
                if diff < timedelta(weeks=2.5):
                    video_ids += channel_videos_list[r]["items"][0]["id"]["videoId"] + ","
                else:
                    pass
            else:
                pass
        else:
            pass
    return video_ids


def search_multiple_video_id(video_ids:str):
    """ Split les id de video en liste pour qu'elle y ait 50 videoId par liste et ainsi pouvoir faire marcher fetch_video_id
        Si je ne fais pas ça, il y a une limitation de 50 vidéos, au delà, cela ne marchera pas
    
    Besoin fetch_videos_id """
    ids_split_list = video_ids.split(",")
    nb = 0
    next_nb = 50
    json_raw_video_id = []
    while nb != len(ids_split_list):
        if len(ids_split_list) > 50:
            id_split = ids_split_list[nb:next_nb]
            json_raw_video_id.append(fetch_videos(id=id_split))
        
        nb += len(id_split)
        next_nb += len(id_split)
        
    return json_raw_video_id

def getmail_json(query:str,max_results:int = 100):
    query_list = query.split(",") # Transforme la chaine de caractère en liste.

    if len(query_list) > 50:
        json_video = search_multiple_video_id(fetch_videos(id=query_list))
    else:
        json_video = fetch_videos(id=query_list)

    mail_list = []
    items = json_video["items"]

    for video_count in range(0,len(items)):
        channelTitle = items[video_count]["snippet"]["channelTitle"]
        if "VEVO" in channelTitle or "Vevo" in channelTitle:
            pass
        else:
            text = items[video_count]["snippet"]["description"]
            channelId = items[video_count]["snippet"]["channelId"]

            # Motif regex pour capturer une adresse email
            email_pattern = r'\b[\w.-]+@[a-zA-Z-]+\.[a-zA-Z.]{2,}\b'

            # Recherche de l'adresse email
            try:
                match = re.search(email_pattern, text).group()
                mail_list.append({'channelTitle' : channelTitle,
                'channelId' : channelId,
                'channel_mail' : match,
                'research' : query,
            })
                
            except:
                pass

    return mail_list

In [20]:
# TODO : Changer la manière de trier les vidéos -> Faire par rapport au viewcount
# TODO : Si possible regarder en fonction du titre de la chaine
# TODO : Filter par date
# TODO : Rajoutez des stats sur les chaînes avec les ID -> utilisé youtube.channels

In [None]:
videos_id = fetch_videos_id("Luther type beat", max_results=100)

In [31]:
mail_dict = getmail_json(videos_id)

In [32]:
len(mail_dict)

4

# Google Sheets

In [33]:
mail_sheet = workbook.worksheet("Mail adresses")

In [34]:
def export_to_excel(mail_dict:list):
    mail = mail_sheet.col_values(2)
    nb = 0
    for row in mail_dict:
        channel_id = row["channelId"]
        if channel_id not in mail:
            nb += 1
            mail_sheet.insert_row([row["channelTitle"], row["channelId"], row["channel_mail"]], 2)
        else:
            pass
    return f"Il y a {nb} d'artiste qui ont été ajouté sur {len(mail_dict)}"

In [None]:
export_to_excel(mail_dict)

"Il y a 4 d'artiste qui ont été ajouté sur 4"

# Essaie


In [63]:
request = youtube.search().list(
    q="Travis Scott Type Beat",
    part='snippet',
    maxResults=200,
    type='video',
    order = "viewCount",
    publishedAfter = "2024-04-01T00:00:00Z"
)

In [103]:
central_split = central.split(",")
len(central_split)

37

In [104]:
mail_dict = getmail_json(central)

In [106]:
export_to_excel(mail_dict)

"Il y a 28 d'artiste qui ont été ajouté sur 28"