# YT Comments analysis

In [None]:
from googleapiclient.discovery import build
import pandas as pd
from api import API_KEY

channel_id = "UCWeg2Pkate69NFdBeuRFTAw" #Squeezie channel
etoiles = 'UCABf02qOye7XYapcK1M45LQ'
youtube = build('youtube', 'v3', developerKey=API_KEY)
exemple_video = "qCKyRhkhqoQ"
otp_recap = 'F7A8OCdmZ90'

In [None]:
class Request:
    """ Class Request handling youtube request better """
    def __init__(self, requestType,part=None, id=None, chart=None, regionCode=None, maxResults=None, pageToken=None, videoId=None):
        self.requestType = requestType
        self.part = part
        self.id = id
        self.chart = chart
        self.regionCode = regionCode
        self.maxResults = maxResults
        self.pageToken = pageToken
        self.videoId = videoId
        
    def execute(self):
        param = vars(self) # Fetch class attributes
        param = {x:y for x,y in list(param.items())[1:] if y} # Delete requestType ([1:]) and None attributes
        
        request = self.requestType.list(**param)
        return request.execute()

In [None]:
from datetime import datetime, timedelta
import re      
        
def iso_toDatetime(iso_date:str):
    """Converts an ISO 8601 formatted date to a datetime object."""
    return datetime.strptime(iso_date[:-1], '%Y-%m-%dT%H:%M:%S')

def datetime_toISO(dt_obj:datetime):
    """Converts a datetime object to an ISO 8601 formatted date."""
    return dt_obj.isoformat()[:-7]  # remove microseconds

def iso_toDelta(iso_duration:str):
    """Converts an ISO 8601 formatted duration to a timedelta object."""
    match = re.match(r'PT(\d+D)*(\d+H)*(\d+M)*(\d+S)', iso_duration)
    days, hours, minutes, seconds = [int(x[:-1]) if x else 0 for x in match.groups()]
    return timedelta(days=days,hours=hours, minutes=minutes, seconds=seconds)

def delta_toISO(delta_obj:timedelta):
    """Converts a timedelta object to an ISO 8601 formatted duration."""
    hours = delta_obj.seconds // 3600
    minutes = (delta_obj.seconds % 3600) // 60
    seconds = delta_obj.seconds % 60
    
    daysStr = f"{delta_obj.days}D" if delta_obj.days != 0 else ""
    hoursStr = f"{hours}H" if hours != 0 else ""
    minutesStr = f"{minutes}M" if minutes != 0 else ""
    secondsStr = f"{seconds}S" if seconds != 0 else ""
    return f"PT{daysStr}{hoursStr}{minutesStr}{secondsStr}"

# print(iso_toDelta('PT4D3H20M9S'))
# print(delta_toISO(iso_toDelta('PT20M9S')))

In [None]:
def format_channel_data(channel_data):
    """ Structure raw channel data """
    data = {
        "channel_name": channel_data.get('snippet', {}).get('title'),
        "channel_id": channel_data.get('id'),
        "country": channel_data.get('snippet', {}).get('country',""),
        "stats": channel_data.get('statistics'),
        "topics": [wikilink.split('/')[-1] for wikilink in channel_data.get('topicDetails', {}).get('topicCategories', [])],
    }
    del data['stats']['hiddenSubscriberCount']
    return data

In [None]:
def get_channel_data(youtube, channel_id):
    """ Request (by id) for most important channel stats """
    request = Request(
        requestType=youtube.channels(),
        part="snippet,contentDetails,statistics,topicDetails",
        id=channel_id
    )
    response = request.execute()
    rawData = response.get('items', [])[0]
    return format_channel_data(rawData)
    

get_channel_data(youtube, channel_id)

In [None]:
from datetime import datetime
def format_video_data(video_data):
    """ Structure raw video data """
    data = {
            "title": video_data.get('snippet', {}).get('title'),
            "id": video_data.get('id'),
            "publishedAt": video_data.get('snippet', {}).get('publishedAt'),
            "duration" : video_data.get('contentDetails').get('duration'),
            "ViewCount" : video_data.get('statistics', {}).get('viewCount'),
            "likeCount" : video_data.get('statistics', {}).get('likeCount'),
            "commentCount" : video_data.get('statistics', {}).get('commentCount'),  
            "tags" : video_data.get('snippet', {}).get('tags')
    }
    
    return data

# time = get_video_info(youtube, 'JRBGBjaR9Wg').get("publishedAt")
# print(datetime.strptime(time, '%Y-%m-%dT%H:%M:%SZ'))

In [None]:
def get_video_data(youtube, video_Id):
    """ Request (by id) for most important video stats """
    request = Request(
        requestType=youtube.videos(),
        part="snippet,contentDetails,statistics,topicDetails",
        id=video_Id,
    )
    response = request.execute()
    
    rawData = response.get('items', [])[0]
    return format_video_data(rawData)

get_video_data(youtube, 'JRBGBjaR9Wg')

In [None]:
def get_Most_Popular_Video(youtube, region:str):
    """ Request for most populars videos stats """
    request = Request(
        requestType=youtube.videos(),
        part="snippet,contentDetails,statistics,topicDetails",
        chart="mostPopular",
        regionCode=region,
        maxResults=100,
        pageToken=''
    )
    response = request.execute()
    
    pages = [response]
    while response.get('nextPageToken'):
        request.pageToken = response.get('nextPageToken')
        response = request.execute()
        pages.append(response)
    
    top_videos = [format_video_data(videos) for page in pages for videos in page.get('items')]
    return top_videos

get_Most_Popular_Video(youtube, 'FR')

In [None]:
def format_comment_data(comment):
    """ Structure raw comment data """
    data = {
        "id": comment.get('id'),
        "comment": comment.get('snippet', {}).get('textOriginal'),
        # "viewerRating": comment.get('snippet', {}).get('viewerRating'),
        "likeCount": comment.get('snippet', {}).get('likeCount'),
        "publishedAt": comment.get('snippet', {}).get('publishedAt'),
        "updatedAt": comment.get('snippet', {}).get('updatedAt')
        }
    
    return data

def format_threadedComment_data(comment):
    """ Structure raw threaded comment data """
    data = {
        "id": comment.get('id'),
        "topLevelComment": format_comment_data(comment.get('snippet', {}).get('topLevelComment')),
        "totalReplyCount": comment.get('snippet', {}).get('totalReplyCount'),
        "replies": [format_comment_data(com) for com in comment.get('replies', {}).get('comments', [])]
        }
    
    return data

In [None]:
def get_comment(youtube,comment_id):
    """ Request (by id) for most important comment stats """
    request = Request(
        requestType=youtube.comments(),
        part="snippet,id",
        id=comment_id,
    )
    response = request.execute()
    # print(response)
    rawData = response.get('items')[0]
    return format_comment_data(rawData)

get_comment(youtube, 'UgwUQR2JJFJSkihWLhx4AaABAg')

In [None]:
def get_video_commentThreads(youtube,video_Id):
    """ Request (by id) for all comments of a videos """
    request = Request(
        requestType=youtube.commentThreads(),
        part="snippet,id,replies",
        videoId=video_Id,
        maxResults=100,
        pageToken = ''
    )
    response = request.execute()
    
    pages = [response]
    while response.get('nextPageToken'):
        request.pageToken = response.get('nextPageToken')
        response = request.execute()
        pages.append(response)
        
    top_videos = [format_threadedComment_data(comments) for page in pages for comments in page.get('items')]
    return top_videos

get_video_commentThreads(youtube, otp_recap)

# Fetching Top Videos
The goal is to fetch the top 200 videos everyday and to get their comments a week after publishing.

In [None]:
from datetime import datetime
import json
REGION = ['FR', 'US', 'GB']
topvids = 'db/topVideos.json'
minElapsedTime = 24 # Hours

def push_top_vids(filepath):
    today = datetime.today()
    with open(filepath, 'r') as f:
        data = json.load(f)
        
    if lastUpdate:= data.get('lastUpdate'):
        delta = today - iso_toDatetime(lastUpdate)
        if delta.seconds // 3600 <= minElapsedTime:
            raise Exception(f'The fetch request has be done too soon. Next request vailable in {24-(delta.seconds // 3600)}h ')
    else:   
        data['lastUpdate'] = datetime_toISO(today)
        # Fetching
        for reg in REGION:
            if reg not in data.keys():
                data[reg] = {}
            data[reg][datetime_toISO(datetime.today())] = get_Most_Popular_Video(youtube, reg)

        with open(filepath, 'w') as fichier:
            json.dump(data, fichier)

push_top_vids(topvids)
# data

In [None]:
topvids = 'db/topVideos.json'
minElapsedComments = 7 # days

def fetch_topVids_comments(filepath):
    today = datetime.today()
    with open(filepath, 'r') as f:
        data:dict = json.load(f)
        
    del data['lastUpdate']
    # print(data)
    
    for dailyEntries in data.values():
        for vids in dailyEntries.values():
            for video in vids:
                if (today - iso_toDatetime(video.get('publishedAt'))).days >= minElapsedComments:
                    if not video.get("fetchedComment"):
                        video['comments'] = get_video_commentThreads(youtube, video.get('id'))
                        video['fetchedComment'] = True
                        
    with open(filepath, 'w') as fichier:
            json.dump(data, fichier)                

fetch_topVids_comments(topvids)    