In [4]:

import pandas as pd
import json
import numpy as np
import random
import enum
import re

from urllib.request import urlopen
from urllib.parse import urlencode

In [15]:

# SETUP
with open("./env.json","r") as f:
    env = json.load(f)
YOUTUBE_API_KEY = env["YOUTUBE_API_KEY"]
PROJECT_NAME = "CodeBullet"


In [7]:
import pandas as pd
class ByTypes(enum.Enum):
    video_id = "videoId"
    channel_id = "channelId"

class APIYoutubeCommentThread:
    
    def __init__(self, api_json_resource:dict = None):
        assert api_json_resource is not None
        assert api_json_resource["kind"] == "youtube#commentThread"
        
        # self.raw = api_json_resource
        
        
        
        snippet = api_json_resource["snippet"]
        snippet_top_level = snippet["topLevelComment"]["snippet"]   
        self.content = snippet_top_level["textDisplay"].replace("\n", " |n| ")
        self.author_name = snippet_top_level["authorDisplayName"].replace("\n", " |n| ")
        
        self.author_channel_url = snippet_top_level.get("authorChannelUrl", "none")
        
        
        self.video_id = snippet_top_level["videoId"]
        self.viewer_rating = snippet_top_level["viewerRating"]
        self.like_count = int(snippet_top_level["likeCount"])
        self.reply_count = int(snippet["totalReplyCount"])
        self.published_at = snippet_top_level["publishedAt"]
        
    def __str__(self):
        return f"Author:{self.author_name},Content:{self.content},Likes:{self.like_count}.Replies:{self.reply_count}"
    
    def transformToDict(self):
    
        return {
                "author_name": self.author_name,
                "content": self.content,
                "author_channel_url": self.author_channel_url,
                "like_count": self.like_count,
                "reply_count": self.reply_count,
                "published_at": self.published_at,
                "viewer_rating": self.viewer_rating,
                "video_id": self.video_id,
            }
    
        
        

class APIYoutubeVideo:
    
    def __init__(self, api_json_resource:dict = None):
        assert api_json_resource is not None
        assert api_json_resource["kind"] == "youtube#video"
        
        
        # self.raw = api_json_resource
        
        snippet = api_json_resource["snippet"]
        
        self.video_id = api_json_resource["id"]
        self.channel_id = snippet["channelId"]
        self.published_at = snippet["publishedAt"]
        self.title = snippet["title"].replace("\n", " |n| ")
        self.description = snippet["description"].replace("\n", " |n| ")
        
        self.url_thumbnail = snippet["thumbnails"].get("maxres", None)
        
        if self.url_thumbnail is None:
            self.url_thumbnail = snippet["thumbnails"].get("standard", None)
            
            if self.url_thumbnail is None:                
                self.url_thumbnail = snippet["thumbnails"].get("high", None)
                
                if self.url_thumbnail is None:                
                    self.url_thumbnail = snippet["thumbnails"].get("default", None)
        
        
        if self.url_thumbnail is not None:
            self.url_thumbnail =  self.url_thumbnail["url"]
        
        
        self.tags = np.unique(snippet["tags"])
        self.category_id = int(snippet["categoryId"])
        self.stats_view_count = int(api_json_resource["statistics"]["viewCount"])
        self.stats_like_count = int(api_json_resource["statistics"]["likeCount"])
        self.stats_favorite_count = int(api_json_resource["statistics"]["favoriteCount"])
        self.stats_comment_count = int(api_json_resource["statistics"]["commentCount"])
        
        self.content_details_duration = api_json_resource["contentDetails"]["duration"]
        self.content_details_caption = api_json_resource["contentDetails"]["caption"]
        self.content_details_licensedContent = api_json_resource["contentDetails"]["licensedContent"]
        
        
        
        
    def __str__(self):
        return f"Title:{self.title},Views:{self.stats_view_count},Likes:{self.stats_like_count},Comments:{self.stats_comment_count}"
    
    
    @staticmethod
    def parsePTDurationToSeconds(raw_pt:str):

        seconds = 0
        
        pattern_group_days = "(?:(\d+)D)?"
        pattern_group_hours = "(?:(\d+)H)?"
        pattern_group_minutes = "(?:(\d+)M)?"
        pattern_group_seconds = "(?:(\d+)S)?"
        
        pattern = f"P{pattern_group_days}T{pattern_group_hours}{pattern_group_minutes}{pattern_group_seconds}"
        
        m_date = re.match(pattern, raw_pt)
        
        
        assert m_date is not None
        
        groups = m_date.groups()
        groups_len = len(groups)
        

        i = 1
        time_multiplier = 1
        
        while i <= groups_len:
            g = groups[-i]
            if g is not None:
                seconds += int(g) * time_multiplier
                
            i += 1  
            if i == 4:
                time_multiplier *= 24
            else:
                time_multiplier *= 60
        
        return seconds

    def transformToDict(self):
        
        return {
                "channel_id": self.channel_id,
                "video_id": self.video_id,
                "published_at": self.published_at,
                "title": self.title,
                "description": self.description,
                "url_thumbnail": self.url_thumbnail,
                "tags": ",".join(self.tags),
                "category_id": self.category_id,
                "stats_view_count": self.stats_view_count,
                "stats_like_count": self.stats_like_count,
                "stats_favorite_count": self.stats_favorite_count,
                "stats_comment_count": self.stats_comment_count,
                "content_details_duration_seconds": APIYoutubeVideo.parsePTDurationToSeconds(self.content_details_duration),
                "content_details_caption": 0 if self.content_details_caption == "false" else 1,
                "content_details_licensedContent": 0 if self.content_details_licensedContent == "false" else 1,
        }
        

class APIYoutube:
    
    def __init__(self, API_KEY):
        self.API_KEY = API_KEY
        
    def isValidVideoId(value:str):
        return len(value) == 11
    
    def getCommentThreads(self, by:ByTypes, id:str, part:str="snippet", max_results:int=100, max_pages:int=1):
        assert type(by) == type(ByTypes.video_id)
        assert max_results <= 100
        
        api_endpoint = 'https://www.googleapis.com/youtube/v3/commentThreads'
        
        data = []
        page_count = 0

        api_params = {
            'key': self.API_KEY,
            'part': part,
            'maxResults': max_results,
        }
        
        api_params[by.value] = id

        next_page_token = True

        while next_page_token and page_count <= max_pages:
            
            page_count += 1
            
            encoded_params = urlencode(api_params)
            
            with urlopen(f'{api_endpoint}?{encoded_params}') as response:
                results = json.load(response)
                
                for api_json_resource in results["items"]:
                    # print(api_json_resource)
                    try:
                        data.append(APIYoutubeCommentThread(api_json_resource))
                    except:
                        print("Error with: ", api_json_resource)
                
                next_page_token = results.get('nextPageToken', None)
                api_params['pageToken'] = next_page_token
            

        return data
    
    
    def getVideos(self, id:str, part:str="snippet,statistics,contentDetails"):
        # id can be multiple sepparated by commas
        # suggestions in part only can be retrieved by owner
        api_endpoint = 'https://www.googleapis.com/youtube/v3/videos'
        
        if "," in id:
            assert len(id.split(",")) <= 50

        api_params = {
            'key': self.API_KEY,
            'part': part,
            "id": id
        }
    
        encoded_params = urlencode(api_params)
        
        data = [] 
        
        with urlopen(f'{api_endpoint}?{encoded_params}') as response:
            results = json.load(response)
            
            for api_json_resource in results["items"]:
                # print(api_json_resource)
                data.append(APIYoutubeVideo(api_json_resource))
            

        return data
        


apiYoutube = APIYoutube(YOUTUBE_API_KEY)


In [17]:

with open("./videos_ids.txt", "r") as f:
    videos_ids = list(map(lambda s: s.replace("\n", ""), f.readlines()))
    
    
    videos = []
    i = 0
    while i < len(videos_ids):
        index_begin = i
        index_end = min(len(videos_ids), index_begin+49)
        
        assert index_begin != index_end and index_begin <= index_end
        
        videos_ids_str = ",".join(videos_ids[index_begin:(index_end+1)])
        videos.extend(apiYoutube.getVideos(videos_ids_str))
        
        i = index_end + 1
    
    
    print(f"Retrieved {len(videos)} videos")
    

    dictinary_list_videos = []
            


    for video in videos:
        dictinary_list_videos.append(video.transformToDict())


    df_videos = pd.DataFrame.from_dict(dictinary_list_videos)
    df_videos.to_csv(f"{PROJECT_NAME}_videos.csv", index=False)

    del dictinary_list_videos

Retrieved 47 videos


In [18]:
# Number of calls to API
total_comments = 0
max_comments = -1
for video in videos:
    comments_count = int(video.stats_comment_count)
    if comments_count > max_comments:
        max_comments = comments_count
    total_comments += comments_count
print("Total comments: " + str(total_comments), "Calls to API: " + str(total_comments/100))
print("Max number of comments in a video: " + str(max_comments), "Max page calls to api by video: " + str(max_comments/100))


Total comments: 339290 Calls to API: 3392.9
Max number of comments in a video: 23606 Max page calls to api by video: 236.06


In [19]:
 
quota_max = 4000
quota_left = quota_max
max_pages = 1000
comments = []


assert total_comments/100 < 10000 or quota_left > 10000  # daily quota over reach
assert max_pages > max_comments/100

for i, video in enumerate(videos):
    
    if quota_left - max_pages < 0:
        print("Max quota reach")
        break
    
    assert int(video.stats_comment_count) < 100000
    video_id = video.video_id

    comments_video = apiYoutube.getCommentThreads(ByTypes.video_id, video_id, max_results=100, max_pages=max_pages)
    comments.extend(comments_video)
    quota_left -= (int(len(comments_video)/100) + 1)
    
    print(f"Requested comments: {len(comments)}", f"Video {i} of {len(videos)}", f"Quota left {quota_left}, used {quota_max - quota_left}")
    

print(f"Retrieved {len(comments)} videos", f"Quota left {quota_left}, used {quota_max - quota_left}")


dictinary_list_comments = []

debug_step = len(comments)//10
for i, comment in enumerate(comments):
    if i > 0 and (i % debug_step == 0 or i == len(comments)-1):
        print("Processed in CSV ", i)
    dictinary_list_comments.append(comment.transformToDict())
            
df_comments = pd.DataFrame.from_dict(dictinary_list_comments)
df_comments.to_csv(f"{PROJECT_NAME}_comments.csv", index=False)   
del dictinary_list_comments


Processed 1600
Processed 2573
Processed 3739
Processed 4343
Processed 7827
Processed 11216
Processed 16337
Processed 21227
Processed 22766
Processed 24013
Processed 27818
Processed 28389
Processed 45134
Processed 57167
Processed 59474
Processed 59685
Processed 60932
Processed 61219
Processed 70903
Processed 80823
Processed 80969
Processed 88795
Processed 89790
Processed 105448
Processed 118151
Processed 136889
Processed 137217
Processed 148900
Processed 150421
Processed 160120
Processed 168358
Processed 175220
Processed 184460
Processed 186573
Processed 200468
Processed 202759
Processed 210831
Processed 211227
Processed 217503
Processed 227004
Processed 227947
Processed 237146
Processed 240383
Processed 240637
Processed 249806
Processed 250903
Processed 255786
Retrieved 255786 videos Quota left 1420
Processed in CSV  25578
Processed in CSV  51156
Processed in CSV  76734
Processed in CSV  102312
Processed in CSV  127890
Processed in CSV  153468
Processed in CSV  179046
Processed in CSV 

In [20]:
df_comments.head()

Unnamed: 0,author_name,content,author_channel_url,like_count,published_at,viewer_rating,video_id
0,Code Bullet,Hey guys<br> I&#39;ve uploaded the source code...,http://www.youtube.com/channel/UC0e3QhIYukixgh...,1374,2018-02-08T04:24:26Z,none,3bhP7zulFfY
1,Ariztax,This is so incredibly satisfying to watch.,http://www.youtube.com/channel/UC-mM7cY_WeRFyR...,1,2022-03-13T09:05:19Z,none,3bhP7zulFfY
2,bommer yeet,Nice,http://www.youtube.com/channel/UC6nHg0Nu-6Wm4G...,0,2022-01-21T02:39:58Z,none,3bhP7zulFfY
3,PoofyHairGuy,I remember watching these videos. I never real...,http://www.youtube.com/channel/UCRHqQfvTgwsS0l...,3,2022-01-18T08:25:15Z,none,3bhP7zulFfY
4,Random_Gachatuber,"You could make a cycle, just have the snake go...",http://www.youtube.com/channel/UCg-3hhvf2EoVq0...,0,2022-01-17T20:01:47Z,none,3bhP7zulFfY


In [24]:
df_comments.sort_values(["like_count"], ascending=False).head()
# TODO: repair comments in multiple lines

df_comments[df_comments["like_count"] > 10].sort_values(["like_count"], ascending=False).to_csv("CodeBulletGags.csv",index=False)

In [22]:
df_comments[["author_name","video_id"]].groupby(["author_name"]).agg("count").sort_values(["video_id"], ascending=False).head(10)

Unnamed: 0_level_0,video_id
author_name,Unnamed: 1_level_1
Alex,81
David,56
James,52
John Smith,46
Ben,45
Michael,45
Ryan,44
Andrew,43
Daniel,42
Chris,41


In [23]:
df_comments[["author_name","like_count"]].groupby(["author_name"]).sum().sort_values(["like_count"], ascending=False).head(10)

Unnamed: 0_level_0,like_count
author_name,Unnamed: 1_level_1
CryspyPasta,62288
Justin Y.,46055
Taikamuna,44823
Nicholas Nguyen,42867
shinnyii,42583
Dani,40500
Code Bullet,32977
Taylor Youngreen,29154
atenahena,28321
KRLA,25888
