<a href="https://colab.research.google.com/github/admantiumblack/hololive-datawarehouse-project/blob/main/youtube_superchat.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install chat-downloader

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting argparse
  Using cached argparse-1.4.0-py2.py3-none-any.whl (23 kB)
Installing collected packages: argparse
Successfully installed argparse-1.4.0


In [None]:
from requests.exceptions import RequestException
from chat_downloader import ChatDownloader
from collections.abc import Sequence
from functools import lru_cache
from getpass import getpass
from tqdm.auto import tqdm
import pandas as pd
import numpy as np
import requests
import json
import math

CHANNEL_ID = 'UCP0BspO_AMEe3aQqqpo89Dg'
VTUBER = 'moonahoshinova'
N_VIDEOS = 120

In [None]:
YOUTUBE_SECRET_KEY = getpass()

··········


In [None]:
DEFAULT_PARAMS = {
    'key': YOUTUBE_SECRET_KEY
}

In [None]:
def construct_url(base_url, params):
  param_parts = []
  for i in params:
    if isinstance(params[i], (str, int, float)):
      param_parts.append(f'{i}={params[i]}')
    else:
      param_parts.append(f'{i}={",".join(params[i])}')
  return base_url + '?' + '&'.join(param_parts)

In [None]:
def get_playlist_id(channel_id, playlist_name='uploads'):
  channel_detail_api = "https://youtube.googleapis.com/youtube/v3/channels"
  parameter = {
      'part': 'contentDetails',
      'id': channel_id,
  }
  parameter.update(DEFAULT_PARAMS)
  api = construct_url(channel_detail_api, parameter)
  resp = requests.get(api).json()
  try:
    return resp["items"][0]["contentDetails"]["relatedPlaylists"][playlist_name]
  except:
    raise RequestException

def get_playlist_items(playlist_id, video_number=50):
    playlist_api = "https://www.googleapis.com/youtube/v3/playlistItems"
    parameter = {
        'part': 'contentDetails',
        'playlistId':playlist_id
    }
    parameter.update(DEFAULT_PARAMS)
    video_details = []

    while video_number != 0:
      max_per_page = 50

      if video_number < max_per_page:
        max_per_page = video_number

      video_number -= max_per_page
      parameter['maxResults'] = max_per_page
      api = construct_url(playlist_api, parameter)

      resp = requests.get(api).json()
      video_details.extend(resp['items'])
      if 'nextPageToken' not in resp:
        break

    return video_details

In [None]:
playlist_key = get_playlist_id(CHANNEL_ID)
videos = get_playlist_items(playlist_key, N_VIDEOS)

In [None]:
video_ids = [i['contentDetails']['videoId'] for i in videos]

In [None]:
def get_video_details(vid_ids):
    video_detail_api = "https://youtube.googleapis.com/youtube/v3/videos"
    parameters = {
      'part': ['liveStreamingDetails', 'statistics', 'topicDetails']
    }
    parameters.update(DEFAULT_PARAMS)
    result = []
    vid_ids = np.array_split(video_ids, math.ceil(len(video_ids)/50))
    for i in vid_ids:
      parameters['id'] = i
      api = construct_url(video_detail_api, parameters)
      video_details = requests.get(api).json()
      result.extend(video_details['items'])
    return result


In [None]:
details = get_video_details(video_ids)

In [None]:
def get_youtube_chat(video_id, message_groups=['superchat']):
  url = f'https://www.youtube.com/watch?v={video_id}'
  chats = ChatDownloader().get_chat('https://www.youtube.com/watch?v=HfRyoUFBkLM', message_groups=message_groups)
  return chats

@lru_cache(maxsize=50)
def get_conversion_rates(target_currency, date:str=None):
  base_api = 'https://api.exchangerate.host/{endpoint}'
  param = {
      'base': target_currency
  }
  base_api = construct_url(base_api, param)
  if date is None:
    api = base_api.format(endpoint='latest')
  else:
    api = base_api.format(endpoint=date)

  rates = requests.get(api).json()['rates']

  return rates

def get_revenue(message, target_currency, membership_cost, date=None):
  revenue = 0
  if message['message_type'] == 'paid_message' or message['message_type'] == 'paid_sticker':
    message_amount = message['money']['amount']
    message_currency = message['money']['currency']
    conversion_rate = get_conversion_rates(target_currency, date)
    revenue = message_amount / conversion_rate[message_currency]

  elif message['message_type'] == 'membership_item':
    revenue = membership_cost

  return revenue

def calculate_livestream_stats(video_id, target_currency='USD', membership_cost=4.99, date=None):
  chats = get_youtube_chat(video_id)
  res = {
      'total_revenue': 0,
      'total_superchat': 0
  }
  for chat in chats.chat:
    res['total_revenue'] += get_revenue(chat, target_currency, membership_cost, date)
    res['total_superchat'] += 1
  return res

In [None]:
def get_topics(topic_details):
  return [i.replace('https://en.wikipedia.org/wiki/','') for i in topic_details['topicCategories']]

def get_all_stream_stats(details, target_currency='USD', membership_cost=4.99):
  result = {
      'video_id': [],
      'date': [],
      'total_revenue': [],
      'total_superchat': [],
      'topics': []
  }
  for i in tqdm(details):
    try:
      date_string = i['liveStreamingDetails']['actualStartTime'].split('T')[0]
    except KeyError:
      continue
    stats_dict = calculate_livestream_stats(i['id'], target_currency, membership_cost, date_string)
    result['video_id'].append(i['id'])
    result['date'].append(date_string)
    result['total_revenue'].append(stats_dict['total_revenue'])
    result['total_superchat'].append(stats_dict['total_superchat'])
    try:
      topics = get_topics(i['topicDetails'])
    except KeyError:
      topics = []
    result['topics'].append(topics)
  return result

In [None]:
res = get_all_stream_stats(details)

  0%|          | 0/120 [00:00<?, ?it/s]

In [None]:
superchat_df = pd.DataFrame(res).drop('topics', axis=1)
superchat_df.to_csv(VTUBER + '_superchat.csv', index=False)

In [None]:
video_ids = []
topics = []
for video_id, video_topics in zip(res['video_id'], res['topics']):
  video_ids.extend([video_id for _ in range(len(video_topics))])
  topics.extend(video_topics)
new_values = {
    'video_id': video_ids,
    'topic': topics
}
topics_df = pd.DataFrame(new_values)
topics_df.to_csv(VTUBER + '_topics.csv', index=False)

In [None]:
from google.colab import files
files.download(f'{VTUBER}_topics.csv')
files.download(f'{VTUBER}_superchat.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>