In [1]:
import aiohttp
import asyncio
import joblib
import random
import requests
import pandas as pd
import sqlite3 as sq
import time
from datetime import date

from itertools import chain
from toolz import get_in


In [2]:
from src.db import (
    execute_fetchall,
    get_schema,
    get_schemas,
    drop_table,
    get_tables,
    count_tables,
    get_table,
    get_table_filter_ids,
    update_db,
)

from src.api import (
    request_api_animes,
    request_api_characters,
    request_api_staff,
    request_api_themes,
    request_api_people,
    request_animes,
    request_characters,
    request_staff,
    request_themes,
    request_people
)

In [8]:
df=get_table('anime')

In [9]:
df

In [7]:
await request_api_animes(1)

In [3]:
from toolz import get_in
import pandas as pd
from toolz import merge_with

def format_anime_r(response):
    return pd.DataFrame(
        dict(
            anime_id = get_in(['mal_id'], anime),
            url = get_in(['url'], anime),
            image_jpg = get_in(['images', 'jpg', 'image_url'], anime),
            image_jpg_small = get_in(['images', 'jpg', 'small_image_url'], anime),
            image_jpg_large = get_in(['images', 'jpg', 'large_image_url'], anime),
            image_webp = get_in(['images', 'webp', 'image_url'], anime),
            image_webp_small = get_in(['images', 'webp', 'small_image_url'], anime),
            image_webp_large = get_in(['images', 'webp', 'large_image_url'], anime),
            approved = get_in(['approved'], anime),
            type = get_in(['type'], anime),
            source = get_in(['source'], anime),
            episodes = get_in(['episodes'], anime),
            status = get_in(['status'], anime),
            airing = get_in(['airing'], anime),
            aired_from = get_in(['aired', 'from'], anime),
            aired_to = get_in(['aired', 'to'], anime),
            duration = get_in(['duration'], anime),
            rating = get_in(['rating'], anime),
            score = get_in(['score'], anime),
            scored_by = get_in(['scored_by'], anime),
            rank = get_in(['rank'], anime),
            popularity = get_in(['popularity'], anime),
            season = get_in(['season'], anime),
            year = get_in(['year'], anime),
        )
        for anime in [] or response.get('data', [])
    )


def format_anime_r_X(response, key, with_rel = True, extra_cols=[]):
    df = pd\
        .DataFrame([
            dict(anime_id=anime.get('mal_id'), **x)
            for anime in [] or response.get('data', [])
            for x in anime.get(f'{key}s', [])
        ]).rename(dict(mal_id=f'{key}_id'), axis=1)
    
    if with_rel:
        if df.empty:
            return [df, df]
        else:
            return [
                df[['anime_id', f'{key}_id'] + extra_cols],
                df.drop('anime_id', axis=1).drop_duplicates()
            ]
    else:
        return df


def format_anime(id, response):
    df_anime = format_anime_r(response)
    df_anime_producer   , df_producer    = format_anime_r_X(response, 'producer')
    df_anime_licensor   , df_licensor    = format_anime_r_X(response, 'licensor')
    df_anime_studio     , df_studio      = format_anime_r_X(response, 'studio')
    df_anime_genre      , df_genre       = format_anime_r_X(response, 'genre')
    df_anime_theme      , df_theme       = format_anime_r_X(response, 'theme')
    df_anime_demographic, df_demographic = format_anime_r_X(response, 'demographic')
    df_title = format_anime_r_X(response, 'title', with_rel = False, extra_cols=['type'])

    return dict(
        df_anime             = df_anime,
        df_producer          = df_producer,
        df_anime_producer    = df_anime_producer,
        df_licensor          = df_licensor,
        df_anime_licensor    = df_anime_licensor,
        df_studio            = df_studio,
        df_anime_studio      = df_anime_studio,
        df_genre             = df_genre,
        df_anime_genre       = df_anime_genre,
        df_theme             = df_theme,
        df_anime_theme       = df_anime_theme,
        df_demographic       = df_demographic,
        df_anime_demographic = df_anime_demographic,
        df_title             = df_title
    )


def format_staffs(id, response):
    print('staff: format')
    df_staff_full = pd.DataFrame([
        dict(
            anime_id  = id,
            staff_id  = staff.get('person', dict()).get('mal_id'),
            url       = staff.get('person', dict()).get('url'),
            image_url = staff.get('person', dict()).get('images', dict()).get('jpg', dict()).get('image_url'),
            name      = staff.get('person', dict()).get('name'),
            position  = position
        )
        for staff in [] or response.get('data', [])
        for position in staff.get('positions', [])
    ])

    df_staff = df_staff_full\
        .sort_values('image_url')\
        .drop_duplicates(subset='staff_id', keep='last')\
        [['staff_id', 'url', 'image_url', 'name']]\
        .sort_values('staff_id')\
        .reset_index(drop=True)
    
    df_anime_staff = df_staff_full\
        [[
            'anime_id',
            'staff_id',
            'position'
        ]]\
        .drop_duplicates()\
        .sort_values(['anime_id', 'staff_id'])
    
    return dict(
        df_staff = df_staff,
        df_anime_staff = df_anime_staff
    )


def format_characters(response):
    return dict(df=pd.Dataframe([str(response)], columns=['str']))


def format_voiceactors(response):
    return dict(df=pd.Dataframe([str(response)], columns=['str']))


def format_themes(response):
    return dict(df=pd.Dataframe([str(response)], columns=['str']))


def join_dict_dfs(dict_dfs_old, dict_dfs_new):
    return merge_with(pd.concat, dict_dfs_old, dict_dfs_new)


async def worker_process_default(queue_request, queue_process, fn_format):
    while True:
        print('worker 1')
        async_response = await queue_request.get()
        print('worker 2')
        id, response = async_response['id'], await async_response['response']
        print('worker 3')
        dict_dfs_new = fn_format(id, response)
        print('worker 4')
        dict_dfs_old = dict() if queue_process.empty() else await queue_process.get()
        print('worker 5')
        dict_dfs = join_dict_dfs(dict_dfs_old, dict_dfs_new)
        print('worker 6')
        queue_process.put_nowait(dict_dfs)
        print('worker 7')
        queue_request.task_done()
        print('worker 8')

async def worker_process_animes(queue_request, queue_process):
    await worker_process_default(queue_request, queue_process, format_anime)

async def worker_process_staffs(queue_request, queue_process):
    await worker_process_default(queue_request, queue_process, format_staffs)

async def worker_process_characters(queue_request, queue_process):
    await worker_process_default(queue_request, queue_process, format_characters)

async def worker_process_voiceactors(queue_request, queue_process):
    await worker_process_default(queue_request, queue_process, format_voiceactors)

async def worker_process_themes(queue_request, queue_process):
    await worker_process_default(queue_request, queue_process, format_themes)

def get_anime_ids_full(dict_dfs_anime, db):
    db = 'animedb.sqlite'
    df_anime_old = get_table('anime', db = db)
    df_anime_new = dict_dfs_anime['df_anime']

    return set(df_anime_new['anime_id']) - set(df_anime_old[df_anime_old['status'] == 'Finished Airing']['anime_id'])


async def request_and_process(name, fn_request, fn_process, kwargs_request=dict(), kwargs_process=dict()):
    today = str(date.today())
    queue_request = asyncio.Queue()
    queue_process = asyncio.Queue()

    task_request = asyncio.create_task(fn_request(queue_request, **kwargs_request))
    task_process = asyncio.create_task(fn_process(queue_request, queue_process, **kwargs_process))

    await asyncio.wait_for(task_request, 86400)
    print(f'{name}: task_request finished')
    await queue_request.join()
    print(f'{name}: queue completed')
    task_process.cancel()
    result = await queue_process.get()

    joblib.dump(result, f'{name}-bakup-{today}.pkl')
    return result


async def main():
    db = 'animedb.sqlite'

    dict_dfs_anime = await request_and_process(
        name = 'anime',
        fn_request = request_animes,
        fn_process = worker_process_animes,
        kwargs_request = dict(iters=2),
    )

    anime_ids_full = get_anime_ids_full(dict_dfs_anime, db)

    dict_dfs_staff = await request_and_process(
        name = 'staff',
        fn_request = request_staff,
        fn_process = worker_process_staffs,
        kwargs_request = dict(ids = anime_ids_full),
    )

    # dict_dfs_characters = await request_and_process(
    #     name = 'character',
    #     fn_request = request_characters,
    #     fn_process = worker_process_characters,
    #     kwargs_request = dict(ids = anime_ids_full),
    # )

    # dict_dfs_themes = await request_and_process(
    #     name = 'voiceactor',
    #     fn_request = request_themes,
    #     fn_process = worker_process_themes,
    #     kwargs_request = dict(ids = anime_ids_full),
    # )

    # dict_dfs_voiceactors = await request_and_process(
    #     name = 'voiceactor',
    #     fn_request = request_people,
    #     fn_process = worker_process_voiceactors,
    #     kwargs_request = dict(ids = anime_ids_full),
    # )

    return dict_dfs_anime, dict_dfs_staff#, dict_dfs_characters, dict_dfs_themes#, dict_dfs_voiceactors

In [4]:
dict_dfs_anime = await main()

In [6]:
* anime
* staff
- characters
- themes
- voiceactors
- add flags to recreate (themes)
- updatedb
