Conecta na API do Youtube e retorna os comentários desejados


In [None]:
import os
import re
import datetime
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from unidecode import unidecode

DATA_DIR = "data"

def clean_text(text):
    cleaned_text = unidecode(text)
    cleaned_text = re.sub(r'[^\w\s]', '', cleaned_text)
    cleaned_text = re.sub(r'[\uD800-\uDBFF][\uDC00-\uDFFF]', '', cleaned_text)
    return cleaned_text.lower()

def get_comment_replies(youtube, parent_id):
    replies = []
    try:
        response = youtube.comments().list(
            part="snippet",
            parentId=parent_id,
            maxResults=100,
            textFormat="plainText"
        ).execute()
        for element in response["items"]:
            replies.append(element["snippet"]["textDisplay"])

    except HttpError as e:
        print(e)
    return replies

def save_video_comments(youtube, video_id, filename):
    video_info = youtube.videos().list(
        part="snippet",
        id=video_id
    ).execute()
    video_title = video_info["items"][0]["snippet"]["title"]
    published_at = video_info["items"][0]["snippet"]["publishedAt"]

    video_published_date = datetime.datetime.strptime(published_at, '%Y-%m-%dT%H:%M:%SZ')
    start_date = datetime.datetime(2022, 1, 1)
    end_date = datetime.datetime(2022, 3, 16, 23, 59, 59)  # Último segundo de 2022

    if start_date <= video_published_date <= end_date:
        request = youtube.commentThreads().list(
            part="snippet",
            videoId=video_id,
            maxResults=100,
            textFormat="plainText"
        )

        try:
            response = request.execute()
            has_next = True
            page_idx = 0
            while has_next:
                page_idx += 1
                print(f"Page: {page_idx}")
                comments = []
                for item in response["items"]:
                    snippet = item["snippet"]
                    comment = snippet["topLevelComment"]
                    text = comment["snippet"]["textDisplay"]
                    comments.append(f"{video_title}: {text}")  # Adiciona o título do vídeo ao comentário
                    if snippet["totalReplyCount"] > 0:
                        replies = get_comment_replies(youtube, comment["id"])
                        comments.extend(replies)
                
                with open(os.path.join(DATA_DIR, filename), "a") as savefile:
                    cleaned_comments = [clean_text(comment) for comment in comments]
                    savefile.write("\n".join(cleaned_comments))

                if "nextPageToken" in response:
                    token = response["nextPageToken"]
                    response = youtube.commentThreads().list(
                        part="snippet",
                        videoId=video_id,
                        maxResults=100,
                        textFormat="plainText",
                        pageToken=token
                    ).execute()
                else:
                    has_next = False

        except HttpError as e:
            print(e)

if __name__ == '__main__':
    with open("apikey.txt") as apifile:
        api_key = apifile.read().strip()
    api_name = "youtube"
    api_version = "v3"
    playlist_url = "https://www.youtube.com/playlist?list=PLaE_mZALZ0V2E0lVJowee_oerd3OMvyJu"
    filename = "podpah2022.txt"

    youtube = build(api_name, api_version, developerKey=api_key)

    playlist_id = playlist_url.split("list=")[-1]
    request = youtube.playlistItems().list(
        part="snippet",
        playlistId=playlist_id,
        maxResults=50
    )
    response = request.execute()

    video_ids = []
    while "items" in response:
        for item in response["items"]:
            snippet = item["snippet"]
            video_ids.append(snippet["resourceId"]["videoId"])

        if "nextPageToken" in response:
            token = response["nextPageToken"]
            request = youtube.playlistItems().list(
                part="snippet",
                playlistId=playlist_id,
                maxResults=50,
                pageToken=token
            )
            response = request.execute()
        else:
            break

    for video_id in video_ids:
        save_video_comments(youtube, video_id, filename)

    with open(os.path.join(DATA_DIR, filename)) as commentsfile:
        comments = commentsfile.readlines()
    
    hashtag = ""  # Coloque a hashtag desejada aqui
    found = sum(1 for comment in comments if hashtag in comment.lower())
    print(f"Total de comentários: {len(comments)}\nCom {hashtag}: {found}")


Trazendo os dados e salvando eles em um arquivo JSON

In [9]:
import os
import re
import datetime
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from unidecode import unidecode
import json

DATA_DIR = "data"

def clean_text(text):
    cleaned_text = unidecode(text)
    cleaned_text = re.sub(r'[^\w\s]', '', cleaned_text)
    cleaned_text = re.sub(r'[\uD800-\uDBFF][\uDC00-\uDFFF]', '', cleaned_text)
    return cleaned_text.lower()

def get_comment_replies(youtube, parent_id):
    replies = []
    try:
        response = youtube.comments().list(
            part="snippet",
            parentId=parent_id,
            maxResults=100,
            textFormat="plainText"
        ).execute()
        for element in response["items"]:
            replies.append(element["snippet"])

    except HttpError as e:
        print(e)
    return replies

def save_video_comments(youtube, video_id):
    video_info = youtube.videos().list(
        part="snippet,statistics",
        id=video_id
    ).execute()
    
    if "items" not in video_info or len(video_info["items"]) == 0:
        print(f"Video ID {video_id} não possui informações disponíveis.")
        return []

    video_title = video_info["items"][0]["snippet"]["title"]
    published_at = video_info["items"][0]["snippet"]["publishedAt"]

    video_published_date = datetime.datetime.strptime(published_at, '%Y-%m-%dT%H:%M:%SZ')
    start_date = datetime.datetime(2022, 1, 1)
    end_date = datetime.datetime(2022, 12, 31)  # Último segundo de 2022

    if start_date <= video_published_date <= end_date:
        request = youtube.commentThreads().list(
            part="snippet",
            videoId=video_id,
            maxResults=100,
            textFormat="plainText"
        )

        try:
            response = request.execute()
            comments_data = []
            has_next = True
            page_idx = 0
            while has_next:
                page_idx += 1
                print(f"Page: {page_idx}")
                comments = []
                for item in response["items"]:
                    snippet = item["snippet"]
                    comment = snippet["topLevelComment"]["snippet"]
                    comment_data = {
                        "video_title": video_title,
                        "comment_text": comment["textDisplay"],
                        "comment_author": comment["authorDisplayName"],
                        "comment_date": comment["publishedAt"],
                        "comment_likes": comment["likeCount"],
                        "comment_reply_count": snippet["totalReplyCount"],
                    }
                    comments.append(comment_data)

                    if snippet["totalReplyCount"] > 0:
                        replies = get_comment_replies(youtube, item["id"])
                        for reply_snippet in replies:
                            reply_data = {
                                "video_title": video_title,
                                "comment_text": reply_snippet["textDisplay"],
                                "comment_author": reply_snippet["authorDisplayName"],
                                "comment_date": reply_snippet["publishedAt"],
                                "comment_likes": reply_snippet["likeCount"],
                            }
                            comments.append(reply_data)
                
                comments_data.extend(comments)

                if "nextPageToken" in response:
                    token = response["nextPageToken"]
                    response = youtube.commentThreads().list(
                        part="snippet",
                        videoId=video_id,
                        maxResults=100,
                        textFormat="plainText",
                        pageToken=token
                    ).execute()
                else:
                    has_next = False

            return comments_data

        except HttpError as e:
            print(e)
    
    return []  # Retorna uma lista vazia se o vídeo não atender aos critérios

if __name__ == '__main__':
    with open("apikey.txt") as apifile:
        api_key = apifile.read().strip()
    api_name = "youtube"
    api_version = "v3"
    playlist_url = "https://www.youtube.com/playlist?list=PLaE_mZALZ0V2E0lVJowee_oerd3OMvyJu"

    youtube = build(api_name, api_version, developerKey=api_key)

    playlist_id = playlist_url.split("list=")[-1]
    request = youtube.playlistItems().list(
        part="snippet",
        playlistId=playlist_id,
        maxResults=50
    )
    response = request.execute()

    video_ids = []
    while "items" in response:
        for item in response["items"]:
            snippet = item["snippet"]
            video_ids.append(snippet["resourceId"]["videoId"])

        if "nextPageToken" in response:
            token = response["nextPageToken"]
            request = youtube.playlistItems().list(
                part="snippet",
                playlistId=playlist_id,
                maxResults=50,
                pageToken=token
            )
            response = request.execute()
        else:
            break

    all_comments_data = []
    for video_id in video_ids:
        video_comments = save_video_comments(youtube, video_id)
        all_comments_data.extend(video_comments)

    # Limpar e normalizar os dados antes de salvá-los
    cleaned_comments_data = []
    for comment_data in all_comments_data:
        cleaned_comment_data = {
            "video_title": clean_text(comment_data["video_title"]),
            "comment_text": clean_text(comment_data["comment_text"]),
            "comment_author": clean_text(comment_data["comment_author"]),
            "comment_date": comment_data["comment_date"],
            "comment_likes": comment_data["comment_likes"],
        }
        if "comment_reply_count" in comment_data:
            cleaned_comment_data["comment_reply_count"] = comment_data["comment_reply_count"]
        cleaned_comments_data.append(cleaned_comment_data)

    # Salvar os dados tratados em um arquivo JSON
    with open(os.path.join(DATA_DIR, "comments_cleaned.json"), "w", encoding="utf-8") as json_file:
        json.dump(cleaned_comments_data, json_file, ensure_ascii=False, indent=4)

    print(f"Total de comentários coletados: {len(cleaned_comments_data)}")


Page: 1
Page: 1
Page: 2
Page: 3
Page: 1
Page: 1
Page: 2
Page: 3
Page: 4
Page: 5
Page: 1
Page: 2
Page: 1
Page: 2
Page: 3
Page: 4
Page: 1
Page: 2
Page: 3
Page: 1
Page: 1
Page: 1
Page: 1
Page: 2
Page: 1
Page: 1
Page: 1
Page: 1
Page: 2
Page: 1
Page: 2
Page: 1
Page: 2
Page: 3
Page: 4
Page: 5
Page: 6
Page: 7
Page: 8
Page: 9
Page: 10
Page: 11
Page: 12
Page: 13
Page: 14
Page: 15
Page: 16
Page: 17
Page: 18
Page: 19
Page: 20
Page: 21
Page: 22
Page: 23
Page: 24
Page: 25
Page: 26
Page: 27
Page: 28
Page: 1
Page: 2
Page: 3
Page: 4
Page: 5
Page: 6
Page: 7
Page: 8
Page: 9
Page: 1
Page: 2
Page: 3
Page: 1
Page: 1
Page: 1
Page: 2
Page: 3
Page: 1
Page: 1
Page: 2
Page: 3
Page: 1
Page: 2
Page: 1
Page: 2
Page: 3
Page: 4
Page: 5
Page: 6
Page: 1
Page: 1
Page: 2
Page: 1
Page: 2
Page: 3
Page: 4
Page: 5
Page: 1
Page: 2
Page: 3
Page: 1
Page: 1
Page: 2
Page: 3
Page: 4
Page: 5
Page: 6
Page: 7
Page: 1
Page: 2
Page: 3
Page: 4
Page: 5
Page: 1
Page: 1
Page: 1
Page: 1
Page: 1
Page: 1
Page: 2
Page: 3
Page: 4
Page: 5
Page: