In [None]:
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException, WebDriverException
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException, StaleElementReferenceException


import sqlite3
import json
from tqdm import tqdm

import sys
from pathlib import Path

sys.path.append(str(Path.cwd().parent))

from pokeca_rec.src.ptcg_product_crawler import PTCGProductCrawler
from pokeca_rec.src.ptcg_product_crawler import NOT_UNIQUE_COLS
from pokeca_rec.utils.chrome_option import chrome_opt

In [None]:
RESTART = False
START_PAGE = 1
DB_NAME = "ptcg_card"
TABLE_NAME = DB_NAME

In [None]:
"""Official Site URL"""

# url = "https://www.pokemon-card.com/card-search/index.php?\
#     keyword=&se_ta=&regulation_sidebar_form=all&pg=&illust=&sm_and_keyword=true"  # all
url = f"https://www.pokemon-card.com/card-search/index.php?\
    keyword=&se_ta=&regulation_sidebar_form=XY&pg=&illust=&sm_and_keyword=true&page={START_PAGE}"  # standard

In [None]:
"""Init database and create table"""
db_file = f"../db/{DB_NAME}.db"
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
if RESTART:
    cursor.execute(f"""DROP TABLE IF EXISTS {TABLE_NAME};""")
cursor.execute(
    f"""CREATE TABLE IF NOT EXISTS {TABLE_NAME}
               (id INTEGER PRIMARY KEY,
               card_type TEXT,
               card_name_jp TEXT,
               evo_type TEXT,
               hp INT,
               hp_type TEXT,
               ability TEXT,
               attacks TEXT,
               special_rule TEXT,
               weakness TEXT,
               resistance TEXT,
               retreat TEXT,
               description_jp TEXT,
               hash_unique_info TEXT UNIQUE,
               card_code_jp TEXT,
               img_url_jp TEXT,
               rarity_code_jp TEXT
               );"""
)  # above hash_unique_info is unique

In [None]:
def is_data_duplicated(row, cur_card_code):
    card_code_index = NOT_UNIQUE_COLS.index("card_code_jp")
    saved_card_code = row[card_code_index]
    return cur_card_code in saved_card_code


def insert_or_update_data(cursor, detail_info):
    """if card is in db already, append info into the related row (e.g. card_code)
    else add a new row for it
    """
    hash_unique_info = detail_info["hash_unique_info"]
    not_unique_cols_str = ", ".join(NOT_UNIQUE_COLS)

    # Try to fetch the row with the given name
    cursor.execute(
        f"SELECT {not_unique_cols_str}\
                    FROM {TABLE_NAME} WHERE hash_unique_info = ?",
        (hash_unique_info,),
    )
    row = cursor.fetchone()

    if row:
        if not is_data_duplicated(row, detail_info["card_code_jp"]):
            for index, col in enumerate(NOT_UNIQUE_COLS):
                cur_list = json.loads(row[index])
                cur_list.append(detail_info[col])
                serialized = json.dumps(cur_list, ensure_ascii=False)
                cursor.execute(
                    f"UPDATE {TABLE_NAME} SET {col} = ? WHERE hash_unique_info = ?",
                    (serialized, hash_unique_info),
                )
    else:
        # Serialize list type data
        for key, value in detail_info.items():
            if key in NOT_UNIQUE_COLS:
                value = [value]
            if isinstance(value, list):
                detail_info[key] = json.dumps(value, ensure_ascii=False)

        # Insert a row of data
        columns = ", ".join(detail_info.keys())
        placeholders = ", ".join(["?"] * len(detail_info))
        sql = f"INSERT INTO {TABLE_NAME} ({columns}) VALUES ({placeholders});"
        cursor.execute(sql, tuple(detail_info.values()))

    # Commit the changes
    conn.commit()

In [None]:
def wait_for_non_zero_element(driver):
    element = driver.find_element(By.ID, "AllCountNum")
    return element if element.text != "0" else False


def get_total_card_num(driver):
    counter = 0
    total_card_num = WebDriverWait(driver, 10).until(wait_for_non_zero_element).text
    while True:
        total_card_num_update = (
            WebDriverWait(driver, 10).until(wait_for_non_zero_element).text
        )
        if total_card_num_update == total_card_num:
            counter += 1
            if counter > 10:
                total_card_num = total_card_num_update
                break
        else:
            total_card_num = total_card_num_update
            counter = 0

    return int(total_card_num)


def get_total_page_num(driver):
    # TODO
    return 3

In [None]:
# Crawl cards from official website and inject to database
with webdriver.Chrome(options=chrome_opt) as driver:
    # Init driver
    driver.implicitly_wait(2)
    driver.get(url)

    total_card_num = get_total_card_num(driver)
    total_page_num = get_total_page_num(driver)
    pbar = tqdm(total=total_card_num)

    is_next = True
    page_count = 0
    extractor = PTCGProductCrawler()
    while is_next:
        print(f"Page: {page_count}")
        list_items = WebDriverWait(driver, 10).until(
            EC.visibility_of_all_elements_located((By.CLASS_NAME, "List_item"))
        )

        for list_item in list_items:
            try:
                # Get element
                card_element = WebDriverWait(list_item, 10).until(
                    EC.presence_of_element_located(
                        (By.CSS_SELECTOR, "li.List_item img[data-src]")
                    )
                )

                # Extract info from element
                # e.g. /assets/images/card_images/large/SV2a/043491_P_ZENIGAME.jpg
                data_src = card_element.get_attribute("data-src")
                card_id = int(data_src.split("/")[-1].split("_")[0])
                detail_info_url = f"https://www.pokemon-card.com/card-search/details.php/card/{card_id}/regu/XY"
                detail_info = extractor(detail_info_url)

                # Update to db
                insert_or_update_data(cursor, detail_info)

                # Update progress bar
                pbar.update(1)

            except StaleElementReferenceException:
                img_item = WebDriverWait(list_item, 10).until(
                    EC.presence_of_element_located(
                        (By.CSS_SELECTOR, "li.List_item img[data-src]")
                    )
                )
                print("try again")
                print(img_item.get_attribute("data-src"))
            except TimeoutException:
                print("No image item found within the wait time")
                try:
                    next_page_button = WebDriverWait(list_item, 10).until(
                        EC.presence_of_element_located(
                            (By.XPATH, '//li[contains(.,"次のページ")]')
                        )
                    )
                    next_page_button.click()
                    print("Next page button found")
                    is_next = True
                except TimeoutException:
                    print("No next page button found within the wait time")
                    is_next = False

        page_count += 1
        is_next = page_count < total_page_num


In [None]:
conn.close()

In [None]:
!python ../scripts/read_db.py -db ../db/"$DB_NAME".db -t "$TABLE_NAME" --limit 10