# General

In [12]:
import aiohttp
import dotenv
import os
import asyncio
from collections import defaultdict

from typing import List, Tuple

from db.db import SongRepository, Song

from constants.states import ResponseState
from constants.services import Service, service_names_map

In [13]:
dotenv.load_dotenv()

True

In [14]:
DB_URL = os.environ["DB_URL"]

YOUTUBE_API_KEYS = os.environ["YOUTUBE_API_KEYS"].split(',')

acceptable_sites_map = {
    "Youtube" : Service.YOUTUBE,
    "NicoNicoDouga" : Service.NICONICO,
    # "Bilibili" : Service.BILIBILI
}

In [15]:
db = SongRepository(DB_URL, False)
await db.init_models()

In [16]:
def parse_url_id(url: str):
    """
    Examples:
    YouTube: https://youtu.be/P9l6Eg_Kk0g
    NicoNico: http://www.nicovideo.jp/watch/sm45326679
    BiliBili: https://www.bilibili.com/video/av114962936629968
    """
    return url.split('/')[-1]

# Websites data fetchers

## YouTube URL data fetcher

Really fast with an API key, and 1 key is enough for 500,000 videos.

How to get an API key: https://developers.google.com/youtube/v3/getting-started

Without an API key, you will be shadow blocked fairly quickly.

Scraper without an API key can be found at ./scrapers/noAPIUnrecommended.py

In [17]:
YT_BATCH_SIZE = 500

In [18]:
from scrapers.youtubeVideoStatistics import YouTubeScraper

yt = YouTubeScraper(api_keys=YOUTUBE_API_KEYS)

In [19]:
def yt_results_to_updates(url_id_map: dict, res: dict):
    updates = []
    present_urls = set()
    for item in res:
        url = f'https://youtu.be/{item["id"]}'
        stats = item.get('statistics', {})
        present_urls.add(url)
        for id_ in url_id_map[url]:
            upd = {
                'id' : id_,
                'url' : url,
                'views' : stats.get('viewCount', 0),
                'likes' : stats.get('likeCount', None),
                'dislikes' : stats.get('dislikeCount', None),
                'favorites' : stats.get('favoriteCount', None)
            }
            updates.append(upd)

    for url, ids in url_id_map.items():
        if (url not in present_urls):
            for id_ in ids:
                upd = {
                    'id' : id_,
                    'url' : url,
                    'views' : 0,
                    'likes' : 0,
                    'dislikes' : 0,
                    'favorites' : 0
                }
                updates.append(upd)
    return updates

In [20]:
cnt = 0
while True:
    batch = await db.fetch_unprocessed_yt_batch(YT_BATCH_SIZE)
    if (len(batch) == 0):
        break
    cnt += 1
    print(batch[0].id)

    url_id_map = defaultdict(list)
    for row in batch:
        url_id_map[row.url].append(row.id)
    ids = [parse_url_id(row.url) for row in batch]
    res = await yt.fetch_videos_stats(ids)
    
    updates = yt_results_to_updates(url_id_map, res)

    await db.update_song_urls_batch(updates)

    print(f"Finished batch {cnt}")

print("Finished!")

900770
Finished batch 1
Finished!


## NicoNico URL data fetcher

Couldn't find an official API, so resulted in using an external one.

Couldn't find a documentation for the external API, so I don't know about rate limits, but it seems that the bar is pretty high and you get unblocked fairly quickly (in 1-2 minutes) if you exceed it. 

In [22]:
NN_BATCH_SIZE = 500

In [23]:
from scrapers.niconicoVideoStatistics import NicoNicoScraper

nn = NicoNicoScraper()

In [24]:
def nn_results_to_updates(url_id_map: dict, res: List[Tuple[str, ResponseState, dict]]):
    everything_ok = True
    updates = []
    present_urls = set()
    for item in res:
        if (item[1] == ResponseState.UNKNOWN):
            everything_ok = False
            continue

        url = f'http://www.nicovideo.jp/watch/{item[0]}'
        views = item[2].get('views', 0)
        present_urls.add(url)
        for id_ in url_id_map[url]:
            upd = {
                'id' : id_,
                'url' : url,
                'views' : views
            }
            updates.append(upd)
    
    return (everything_ok, updates)

In [25]:
import logging

ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)

logger = logging.getLogger("NicoNicoScraper")
logger.addHandler(ch)

cnt = 0
while True:
    batch = await db.fetch_unprocessed_nn_batch(NN_BATCH_SIZE)
    if (len(batch) == 0):
        break
    cnt += 1
    print(batch[0].id)

    url_id_map = defaultdict(list)
    for row in batch:
        url_id_map[row.url].append(row.id)
    ids = [parse_url_id(row.url) for row in batch]
    res = await nn.get_videos_data(ids)
    
    state, updates = nn_results_to_updates(url_id_map, res)

    await db.update_song_urls_batch(updates)

    print(f"Finished batch {cnt}")

    if (not state):
        print("Rate limit reached... Sleeping for 120 seconds...")
        await asyncio.sleep(120)

print("Finished!")

1101168
Finished batch 1
Finished!


## Bilibili URL data fetcher

1. Doesn't like scraping, you have to use user agent.
2. Has a really low, undefined rate limit (20 req/sec get rate limited). Therefore, slow af.

In [26]:
BB_BATCH_SIZE = 10

In [27]:
from scrapers.bilibiliVideoStatistics import BilibiliScraper

bb = BilibiliScraper()

In [28]:
def bb_results_to_updates(url_id_map: dict, res: List[Tuple[str, ResponseState, dict]]):
    everything_ok = True
    updates = []
    present_urls = set()
    for item in res:
        if (item[1] == ResponseState.UNKNOWN):
            everything_ok = False
            continue

        url = f'https://www.bilibili.com/video/{item[0]}'
        present_urls.add(url)
        for id_ in url_id_map[url]:
            upd = {
                'id' : id_,
                'url' : url,
                'views' : item[2].get('views', 0),
                'likes' : item[2].get('likes', 0),
                'dislikes' : item[2].get('dislikes', 0),
                'favorites' : item[2].get('favorites', 0)
            }
            updates.append(upd)
    
    return (everything_ok, updates)

In [29]:
import logging

ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)

logger = logging.getLogger("BilibiliScraper")
logger.addHandler(ch)

cnt = 0
while True:
    batch = await db.fetch_unprocessed_bb_batch(BB_BATCH_SIZE)
    if (len(batch) == 0):
        break
    cnt += 1
    print(batch[0].id)

    url_id_map = defaultdict(list)
    for row in batch:
        url_id_map[row.url].append(row.id)
    ids = [parse_url_id(row.url) for row in batch]
    res = await bb.get_videos_data(ids)
    
    state, updates = bb_results_to_updates(url_id_map, res)

    await db.update_song_urls_batch(updates)

    print(f"Finished batch {cnt}")

    if (not state):
        print("Rate limit reached... Sleeping for 120 seconds...")
        await asyncio.sleep(120)

    await asyncio.sleep(1) # Some delay to not exceed 10 requests per second (20 gets rate limited)

print("Finished!")

1101215
Finished batch 1
1101512
Finished batch 2
1102050
Finished batch 3
1102477
Finished batch 4
1102726
Finished batch 5
1103058
Finished batch 6
Finished!


## TODO

TODO:
1. SoundClound
2. Piapro
3. Bandcamp

# Merge Song and SongURL tables

This section can potentially be really CPU and RAM heavy.

Make sure to change these constants according to your system's specifications!!!

In [30]:
BATCH_SIZE = 1000
MAX_CONCURRENT_BATCHES = 50

In [31]:
service_order = [
    Service.YOUTUBE, Service.NICONICO, Service.BILIBILI
]

def process_views_data(views: dict) -> dict:
    res = {}
    for service_name in service_order:
        service = service_names_map[service_name]
        if (service in views.keys()):
            res[service] = views[service]
    return res

In [32]:
import asyncio

sem = asyncio.Semaphore(MAX_CONCURRENT_BATCHES)

async def process_batch(batch, cnt):
    async with sem:
        updates = [
            {
                "id": song["song_id"],
                "views": {
                    service_names_map[s]: song["services"][service_names_map[s]]
                    for s in service_order
                    if service_names_map[s] in song["services"]
                },
            }
            for song in batch
        ]
        await db.update_songs_batch(updates)
        print(f"Updated batch {cnt}")

async def run_all_batches():
    pending = set()
    cnt = 0

    async for batch in db.fetch_joined_views_in_batches(BATCH_SIZE):
        cnt += 1
        task = asyncio.create_task(process_batch(batch, cnt))
        pending.add(task)

        if len(pending) >= MAX_CONCURRENT_BATCHES * 2:
            done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)

    await asyncio.gather(*pending)
    print("Finished updating DB!")

In [33]:
await run_all_batches()

Updated batch 1
Updated batch 2
Updated batch 3
Updated batch 4
Updated batch 5
Updated batch 6
Updated batch 7
Updated batch 8
Updated batch 9
Updated batch 10
Updated batch 11
Updated batch 12
Updated batch 13
Updated batch 14
Updated batch 15
Updated batch 16
Updated batch 17
Updated batch 18
Updated batch 19
Updated batch 20
Updated batch 21
Updated batch 22
Updated batch 23
Updated batch 24
Updated batch 25
Updated batch 26
Updated batch 27
Updated batch 28
Updated batch 29
Updated batch 30
Updated batch 31
Updated batch 32
Updated batch 33
Updated batch 34
Updated batch 35
Updated batch 36
Updated batch 37
Updated batch 38
Updated batch 39
Updated batch 40
Updated batch 41
Updated batch 42
Updated batch 43
Updated batch 44
Updated batch 45
Updated batch 46
Updated batch 47
Updated batch 48
Updated batch 49
Updated batch 50
Updated batch 51
Updated batch 52
Updated batch 53
Updated batch 54
Updated batch 55
Updated batch 56
Updated batch 57
Updated batch 58
Updated batch 59
Update