In [1]:
import pickle
import numpy as np
import pandas as pd
import json
import sqlalchemy as sql
from sqlalchemy import create_engine
from tqdm import tqdm
import requests
from bs4 import BeautifulSoup
from io import StringIO
from urllib.parse import quote
from concurrent.futures import ThreadPoolExecutor
import threading
from multiprocessing import Pool
import time
import random
import re
import itertools
import zlib
import sys

In [2]:
with open('../tools/credentials.json') as file:
    credentials = json.load(file)
    
username = credentials["dblogin"]["username"]
password = credentials["dblogin"]["password"]

In [3]:
db_string = f"postgresql://{username}:{password}@192.168.0.3:5432/animeplanet"
db = create_engine(db_string)

In [4]:
def chunker(seq, size):
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))

### Parse User Watch List Tables

In [5]:
def count_rows():
    query = """
            SELECT COUNT(*)
            FROM web_scrape
            WHERE html_text IS NOT NULL
            AND url LIKE 'https://www.anime-planet.com/users/%/anime?sort=title&mylist_view=list%'
            """

    num_rows = pd.read_sql(sql.text(query), db)['count'][0]
    
    return num_rows

In [6]:
def read_sql_chunks(chunksize):

    num_rows = count_rows()
    
    for offset in range(0, num_rows, chunksize):
        query = f"""
                SELECT *
                FROM web_scrape
                WHERE html_text IS NOT NULL
                AND url LIKE 'https://www.anime-planet.com/users/%/anime?sort=title&mylist_view=list%'
                LIMIT {chunksize} OFFSET {offset};
                """

        chunk = pd.read_sql(sql.text(query), db)
        
        yield chunk

In [7]:
def compressText(text):
    return zlib.compress(bytes(text, 'utf-8') if type(text) == str else text)

In [8]:
def decompressText(text):
    return str(zlib.decompress(text), 'utf-8')

In [9]:
def parseTable(url_html_tup):
    url, html_text = url_html_tup
    try:
        soup = BeautifulSoup(html_text, 'html.parser')
        table = soup.find('table')
        df = pd.read_html(StringIO(str(table)))[0]
        df.columns = ['title', 'type', 'year', 'avg', 'status', 'eps', 'times_watched', 'rating']
        df['times_watched'] = df['times_watched'].str.extract(r'([0-9]*)', expand=False).astype('float')
        df['anime_url'] = [np.where(tag.has_attr('href'), 
                           'https://www.anime-planet.com' + tag.get('href'), 
                           'no link') for tag in [td.find('a') for td in table.find_all('td', attrs={'class':'tableTitle'})]]
        df['anime_url'] = df['anime_url'].astype('string')
        df['username'] = str(re.findall(r'/users/([A-Za-z0-9]*)/', url)[0])
        
        return df
    
    except:
        return pd.DataFrame(columns=['title', 'type', 'year', 'avg', 'status', 
                                     'eps', 'times_watched', 'rating', 'anime_url', 'username'])

In [10]:
def prepareNextChunk(sql_chunks):
    chunk = next(sql_chunks)
    list_of_tups = [tuple(r) for r in chunk.to_numpy()]
    return list_of_tups

In [11]:
def saveData(data):
    with db.connect() as con:
        query = f"""
                DELETE 
                FROM watch_list
                WHERE (anime_url, username) in 
                    ({str([tuple(r) for r in data[['anime_url', 'username']].to_numpy()])[1:-1]});
                """
        con.execute(sql.text(query))

        data.to_sql('watch_list', con, if_exists='append', index=False, method='multi')

## **!!Implement Async read of next chunk while processing current chunk!!**

Keep list of completed tuple(anime_url, username)

In [15]:
chunksize = 100
num_rows = count_rows()
sql_chunks = read_sql_chunks(chunksize)

list_of_tups = prepareNextChunk(sql_chunks)

with ThreadPoolExecutor() as executor:
    for i in tqdm(range(0, 1000, chunksize)):

        chunk_thread = executor.submit(prepareNextChunk, sql_chunks)

    
        with Pool(4) as p:
            data = pd.concat([*p.map(parseTable, list_of_tups)], ignore_index=True)

        save_thread = executor.submit(saveData, data)
#         saveData(data)

        list_of_tups = chunk_thread.result()
#         list_of_tups = prepareNextChunk(sql_chunks)

100%|██████████| 10/10 [01:05<00:00,  6.54s/it]


All serial: 01:18
Chunk thread: 01:15
Save thread: 01:06
All thread: 01:05