In [3]:
import asyncio
import re
import pandas as pd
from bs4 import BeautifulSoup
from playwright.async_api import async_playwright
import os
import time

In [5]:
categories = [
    {"name": "처세술_삶의 자세", "url": "https://www.yes24.com/product/category/display/001001026008"},
    {"name": "인간관계", "url": "https://www.yes24.com/product/category/display/001001026009"},
    {"name": "성공학_경력관리", "url": "https://www.yes24.com/product/category/display/001001026002"},
    {"name": "화술_협상_회의진행", "url": "https://www.yes24.com/product/category/display/001001026004"},
    {"name": "기획_정보_시간관리", "url": "https://www.yes24.com/product/category/display/001001026003"},
    {"name": "창조적사고_두뇌계발", "url": "https://www.yes24.com/product/category/display/001001026010"},
    {"name": "여성을위한자기계발", "url": "https://www.yes24.com/product/category/display/001001026001"},
    {"name": "취업_유망직업", "url": "https://www.yes24.com/product/category/display/001001026005"},
    {"name": "성공스토리", "url": "https://www.yes24.com/product/category/display/001001026012"},
    {"name": "유학_이민", "url": "https://www.yes24.com/product/category/display/001001026011"},
]

## 1. 도서 링크 수집하기

In [36]:
import asyncio
import csv
import os
from playwright.async_api import async_playwright

output_dir = "yes24_links"
os.makedirs(output_dir, exist_ok=True)

# ✅ HTML 구조 기반 페이지 이동 함수
async def go_to_page(page, target_page):
    try:
        print(f"페이지 {target_page} 이동 시도...")
        
        # 현재 페이지 확인
        current_page_elem = await page.query_selector("div.yesUI_pagen strong.num")
        current_page = 1
        if current_page_elem:
            current_page = int(await current_page_elem.text_content())
        
        print(f"현재 페이지: {current_page}")
        
        # 이미 목표 페이지에 있으면 스킵
        if current_page == target_page:
            print(f"✅ 이미 페이지 {target_page}에 있음")
            return True
        
        # 현재 페이지네이션 블록 범위 확인 (1-10, 11-20, 21-30, ...)
        current_block_start = ((current_page - 1) // 10) * 10 + 1
        current_block_end = current_block_start + 9
        
        target_block_start = ((target_page - 1) // 10) * 10 + 1
        target_block_end = target_block_start + 9
        
        print(f"현재 블록: {current_block_start}-{current_block_end}, 목표 블록: {target_block_start}-{target_block_end}")
        
        # 다른 블록으로 이동해야 하는 경우
        if current_block_start != target_block_start:
            if target_page > current_page:
                # 앞으로 이동: '다음' 버튼 클릭
                clicks_needed = (target_block_start - current_block_start) // 10
                for i in range(clicks_needed):
                    next_button = await page.query_selector("a.bgYUI.next")
                    if next_button:
                        print(f"🔄 '다음' 버튼 클릭 ({i+1}/{clicks_needed})")
                        await next_button.click()
                        await asyncio.sleep(2)
                    else:
                        print(f"❌ '다음' 버튼 없음")
                        break
            else:
                # 뒤로 이동: '이전' 버튼 클릭 또는 JavaScript 직접 호출
                print(f"🔧 JavaScript로 직접 페이지 {target_page} 이동")
                await page.evaluate(f'changeCategoryProductParam(null, {target_page});')
                await page.wait_for_selector('div.itemUnit', state='visible', timeout=60000)
                await asyncio.sleep(3)
                return True
        
        # 같은 블록 내에서 페이지 이동
        page_anchor = await page.query_selector(f'div.yesUI_pagen a.num[title="{target_page}"]')
        if page_anchor:
            print(f"✅ 블록 내 링크 클릭: 페이지 {target_page}")
            await page_anchor.click()
        else:
            # JavaScript 직접 호출 fallback
            print(f"🔧 JavaScript로 페이지 {target_page} 이동")
            await page.evaluate(f'changeCategoryProductParam(null, {target_page});')
        
        # 페이지 로딩 대기
        await page.wait_for_selector('div.itemUnit', state='visible', timeout=60000)
        await asyncio.sleep(3)
        
        # 이동 결과 확인
        current_page_elem = await page.query_selector("div.yesUI_pagen strong.num")
        if current_page_elem:
            final_page = int(await current_page_elem.text_content())
            if final_page == target_page:
                print(f"✅ 페이지 {target_page} 이동 성공")
                return True
            else:
                print(f"⚠️ 페이지 이동 불일치: 목표 {target_page}, 실제 {final_page}")
                return False
        
        return True
        
    except Exception as e:
        print(f"⚠️ 페이지 {target_page} 이동 실패: {e}")
        return False

# ✅ 전체 링크 수집 함수 (순서 유지)
async def collect_books_for_category(category):
    collected_books = []
    seen_books = set()
    
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=False)
        page = await browser.new_page()
        await page.goto(category["url"])
        await page.wait_for_timeout(3000)
        
        # 판매량순 클릭
        try:
            await page.click("a[data-search-value='SALE_SCO']")
            await page.wait_for_timeout(2000)
        except:
            print("⚠️ 판매량순 클릭 실패")
        
        # 120개 보기 설정
        try:
            await page.select_option("#pg_size", value="120")
            await page.wait_for_timeout(3000)
        except:
            print("⚠️ 120개 보기 설정 실패")
        
        # 전체 페이지 수 파악을 위한 스크롤
        print("🔍 전체 페이지 수 파악 중...")
        for _ in range(15):  # 더 많이 스크롤
            await page.mouse.wheel(0, 3000)
            await page.wait_for_timeout(1000)
        
        # 마지막 페이지 추출 (HTML 구조 기반)
        last_page = 1
        try:
            # '맨끝' 버튼에서 정확한 마지막 페이지 추출
            end_button = await page.query_selector("a.bgYUI.end")
            if end_button:
                last_page = int(await end_button.get_attribute("title"))
                print(f"📄 마지막 페이지: {last_page}")
            else:
                print("⚠️ '맨끝' 버튼을 찾을 수 없음")
                last_page = 10  # 기본값
                        
        except Exception as e:
            print(f"⚠️ 마지막 페이지 파악 실패: {e}")
            last_page = 10  # 기본값
        
        print(f"🔎 {category['name']} 총 {last_page}페이지 수집 시작")
        
        # 각 페이지 순회
        for page_num in range(1, last_page + 1):
            print(f"\n--- 페이지 {page_num}/{last_page} ---")
            
            success = await go_to_page(page, page_num)
            if not success:
                print(f"❌ 페이지 {page_num} 건너뛰기")
                continue
            
            try:
                book_nums = await page.eval_on_selector_all(
                    "#yesNewList > li[data-goods-no]",
                    "els => els.map(el => el.getAttribute('data-goods-no'))"
                )
                print(f"🔗 {len(book_nums)}개 번호 수집")
                
                # 순서를 유지하면서 중복 제거
                for book_num in book_nums:
                    if book_num and book_num not in seen_books:
                        collected_books.append(book_num)
                        seen_books.add(book_num)
                        
            except Exception as e:
                print(f"⚠️ 페이지 {page_num} 번호 수집 실패: {e}")
            
            # 진행률 표시
            if page_num % 10 == 0:
                print(f"📊 진행률: {page_num}/{last_page} ({len(collected_books)}개 수집)")
        
        await browser.close()
    
    # CSV 저장
    filename = os.path.join(output_dir, f"{category['name']}_books.csv")
    with open(filename, "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        writer.writerow(["book"])
        for book in collected_books:
            writer.writerow([book])
    
    print(f"✅ 저장 완료: {filename} (총 {len(collected_books)}개)")
    return filename

In [None]:
await collect_books_for_category(categories[8])

## 2. HTML 수집하기

In [2]:
import os
import requests
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time

# 개별 HTML 저장 함수
def fetch_and_save_html(book_id, category_dir, retries=3, delay=1):
    url = f"https://www.yes24.com/Product/Goods/{book_id}"
    save_path = os.path.join(category_dir, f"{book_id}.html")
    
    # 이미 수집된 경우 건너뜀
    if os.path.exists(save_path):
        return True

    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
    }

    for attempt in range(retries):
        try:
            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()
            with open(save_path, "w", encoding="utf-8") as f:
                f.write(response.text)
            return True
        except Exception as e:
            if attempt < retries - 1:
                time.sleep(delay)
            else:
                return False

# CSV를 기반으로 병렬 HTML 수집
def save_html_from_csv(csv_path, output_dir="htmls", max_workers=20):
    category_name = os.path.basename(csv_path).replace('_books.csv', '')
    category_dir = os.path.join(output_dir, category_name)
    os.makedirs(category_dir, exist_ok=True)

    df = pd.read_csv(csv_path)
    book_ids = df.iloc[:, 0].astype(str).tolist()

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(fetch_and_save_html, book_id, category_dir): book_id for book_id in book_ids}
        for future in tqdm(as_completed(futures), total=len(futures), desc=f"[{category_name}] HTML 수집 중"):
            book_id = futures[future]
            if not future.result():
                print(f"❌ 실패: {book_id}")


In [5]:
# 사용 예시
save_html_from_csv("yes24_links/인간관계_books.csv", output_dir="htmls", max_workers=10)

[인간관계] HTML 수집 중: 100%|██████████| 4117/4117 [02:54<00:00, 23.62it/s]


In [4]:
save_html_from_csv("yes24_links/처세술_삶의 자세_books.csv", output_dir="htmls", max_workers=10)

[처세술_삶의 자세] HTML 수집 중: 100%|██████████| 14678/14678 [10:16<00:00, 23.81it/s]


In [6]:
save_html_from_csv("yes24_links/화술_협상_회의진행_books.csv", output_dir="htmls", max_workers=10)

[화술_협상_회의진행] HTML 수집 중: 100%|██████████| 3773/3773 [02:34<00:00, 24.41it/s]


## 3. HTML 을 돌며 도서 정보를 수집한다.

In [1]:
import os
import glob
import pandas as pd
from bs4 import BeautifulSoup
from concurrent.futures import ProcessPoolExecutor
from tqdm import tqdm

In [2]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import glob
from bs4 import BeautifulSoup
import re
from bs4 import NavigableString
import pandas as pd
from tqdm.notebook import tqdm

def extract_book_info_from_html(html_content):
    soup = BeautifulSoup(html_content, 'html.parser')

    title_elem = soup.select_one('h2.gd_name')
    title = title_elem.text.strip() if title_elem else ''

    pub_elem = soup.select_one('span.gd_pub a')
    publisher = pub_elem.text.strip() if pub_elem else ''

    pubdate_elem = soup.select_one('span.gd_date')
    pub_date = pubdate_elem.text.replace('출간일 :', '').strip() if pubdate_elem else ''

    price_tag = soup.select_one('td span.nor_price em.yes_m')
    price = price_tag.get_text(strip=True) if price_tag else None

    discount = None
    price_span = soup.select_one('span.nor_price')
    if price_span and price_span.next_sibling:
        sibling_text = price_span.next_sibling
        if isinstance(sibling_text, NavigableString):
            discount_match = re.search(r'(\d+%)\s*할인', sibling_text.strip())
            if discount_match:
                discount = discount_match.group(1)

    point = None
    li_tags = soup.select('li')
    for li in li_tags:
        text = li.get_text(strip=True)
        if '적립' in text:
            point = text
            break

    rating_tag = soup.select_one('span#spanGdRating em.yes_b')
    rating = rating_tag.get_text(strip=True) if rating_tag else None

    review_count_tag = soup.select_one('span.gd_reviewCount em.txC_blue')
    review_count = review_count_tag.get_text(strip=True) if review_count_tag else None

    sell_num = None
    sell_num_tag = soup.select_one('span.gd_sellNum')
    if sell_num_tag:
        match = re.search(r'판매지수\s*([\d,]+)', sell_num_tag.get_text())
        if match:
            sell_num = match.group(1).replace(',', '')

    table_of_contents = None
    h4_tags = soup.select('h4.tit_txt')
    for h4 in h4_tags:
        if '목차' in h4.get_text(strip=True):
            next_textarea = h4.find_next('textarea', class_='txtContentText')
            if next_textarea:
                table_of_contents = next_textarea.get_text(strip=True)
            break

    authors = []
    author_groups = soup.select('div.authorInfoGrp')
    if author_groups:
        for group in author_groups:
            author_id = group.get('authno', None)
            role_elem = group.select_one('.author_name')
            role_text = role_elem.text.strip() if role_elem else ''
            role = role_text.split(':')[0].strip() if ':' in role_text else '저자'

            for a in group.select('.author_name a.lnk_author'):
                author_name = a.text.strip()
                authors.append({
                    'name': author_name,
                    'role': role,
                    'author_id': author_id
                })
    else:
        auth_span = soup.select_one('span.gd_auth')
        if auth_span:
            # ① <a> 태그가 있으면 그대로 처리
            a_tags = auth_span.select('a')
            if a_tags:
                for a_tag in a_tags:
                    author_name = a_tag.get_text(strip=True)
                    role = ''
                    next_sib = a_tag.next_sibling
                    if next_sib and isinstance(next_sib, NavigableString):
                        role = next_sib.strip()
                    if not role:
                        role = '저자'
                    authors.append({'name': author_name, 'role': role, 'author_id': None})
            else:
                # ② <a> 태그 없이 텍스트만 있는 경우
                full_text = auth_span.get_text(strip=True)
                # "김찬배 저" 형식이면 역할 분리
                match = re.match(r'(.+?)\s*(저|역|그림)?$', full_text)
                if match:
                    author_name = match.group(1).strip()
                    role = match.group(2) or '저자'
                    authors.append({'name': author_name, 'role': role, 'author_id': None})

    return {
        'title': title,
        'publisher': publisher,
        'pub_date': pub_date,
        'price': price,
        'discount': discount,
        'point': point,
        'rating': rating,
        'review_count': review_count,
        'sell_num': sell_num,
        'table_of_contents': table_of_contents,
        'authors': authors
    }

def parse_html_file(filepath):
    with open(filepath, 'r', encoding='utf-8') as f:
        html = f.read()
    return extract_book_info_from_html(html)

def process_files_with_progress(file_list, max_workers=8):
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(parse_html_file, f) for f in file_list]
        for future in tqdm(as_completed(futures), total=len(futures), desc='Parsing HTML files'):
            try:
                res = future.result()
                results.append(res)
            except Exception as e:
                print(f'Error processing file: {e}')
    return results

In [None]:
file_list = glob.glob('htmls//*.html', recursive=True)
results = process_files_with_progress(file_list, max_workers=5)

df = pd.DataFrame(results)
df.to_csv('yes24/처세술_삶의자세.csv', index=False)

print('작업 완료! csv 파일 저장됨.')

Parsing HTML files:   0%|          | 0/14678 [00:00<?, ?it/s]

작업 완료! csv 파일 저장됨.


## 4. 데이터 전처리

In [None]:
import pandas as pd
import os

folder_path = "./yes24"  # yes24 폴더 경로

csv_files = [file for file in os.listdir(folder_path) if file.endswith(".csv")]

# 파일을 하나의 DataFrame으로 이어붙이기
merged_df = pd.concat(
    [pd.read_csv(os.path.join(folder_path, file)) for file in csv_files],
    ignore_index=True
)

print(f"총 행 개수: {len(merged_df)}")

In [37]:
import pandas as pd
import re


# 1. 할인율: NaN → '0%', % 제거 후 int형
merged_df['discount'] = merged_df['discount'].fillna('0%').astype(str).str.replace('%', '').astype(int)

# 2. 적립금: 적립률만 숫자 추출, '5만원이상 구매 시' 등은 0 처리
import re
def extract_point_percent(text):
    if pd.isna(text):
        return 0
    if not isinstance(text, str):
        return 0
    if '5만원이상 구매 시' in text:
        return 0
    # 숫자와 % 그리고 적립 단어를 포함하는 부분 찾기
    match = re.search(r'\((\d+)%.*적립\)', text)
    if match:
        return int(match.group(1))
    return 0

merged_df['point'] = merged_df['point'].apply(extract_point_percent)

# 3. 판매가: NaN -> 0, 문자열로 변환 후 숫자 외 문자 제거, 빈 문자열 0 처리 후 int 변환
merged_df['price'] = merged_df['price'].fillna(0).astype(str)
merged_df['price'] = merged_df['price'].str.replace('[^0-9]', '', regex=True)
merged_df.loc[merged_df['price'] == '', 'price'] = '0'
merged_df['price'] = merged_df['price'].astype(int)

# 4. 평점: NaN → 0, float형 변환
merged_df['rating'] = merged_df['rating'].fillna(0).astype(float)

# 5. 리뷰 수: NaN → 0, int형 변환
merged_df['review_count'] = (
    merged_df['review_count']
    .fillna('0')
    .astype(str)
    .str.replace(',', '', regex=True)
    .astype(float)    # float 변환
    .astype(int)      # int 변환
)
# 6. 판매지수: NaN → 0, int형 변환
merged_df['sell_num'] = merged_df['sell_num'].fillna(0).astype(int)

# 7. 발행일: datetime 변환, 변환 실패 시 NaT (결측 유지)
merged_df['pub_date'] = pd.to_datetime(merged_df['pub_date'], format="%Y년 %m월 %d일", errors='coerce')



In [None]:
# 작가 정보 전처리
# 1. "[{'name': '리에즈 카뎀 등저 / 박영종', 'role': '역', 'author_id': None}] 처럼 들어간 데이터가 존재
import re
import ast

def parse_author_name_role(name_str, default_role):
    # '/' 로 분리
    parts = [p.strip() for p in name_str.split('/')]

    authors_list = []
    role_keywords = ['저', '등저', '편', '역', '그림']  # 필요한 역할 키워드 추가 가능

    for part in parts:
        # 역할 키워드 찾기
        found_role = None
        for kw in role_keywords:
            if part.endswith(kw):
                found_role = kw
                # 이름은 역할 키워드 뺀 부분
                author_name = part[:-len(kw)].strip()
                break
        else:
            # 역할 키워드 없으면 기본 역할 할당
            found_role = default_role
            author_name = part
        
        authors_list.append({
            'name': author_name,
            'role': found_role,
            'author_id': None
        })

    return authors_list


def split_authors(authors_str):
    try:
        authors = ast.literal_eval(authors_str)
    except Exception:
        return authors_str

    new_authors = []
    for author in authors:
        name = author.get('name', '')
        role = author.get('role', '')

        if '/' in name:
            new_authors.extend(parse_author_name_role(name, role))
        else:
            new_authors.append(author)
    return new_authors

# 적용
merged_df['authors'] = merged_df['authors'].apply(split_authors)


In [None]:
# authors 에서 name 이 null 값인 경우
def has_null_name(authors):
    if not isinstance(authors, list):
        return False
    for author in authors:
        if author.get('name') in [None, '', ' ']:
            return True
    return False

null_name_rows = merged_df[merged_df['authors'].apply(has_null_name)]

In [60]:
null_name_rows

Unnamed: 0,title,publisher,pub_date,price,discount,point,rating,review_count,sell_num,table_of_contents,authors
23501,생생 공부비법,가림출판사,2003-08-25,8550,10,5,0.0,0,0,"Part 1. 나의 학창 시절초등학교 | 중학교 | 고등학교, 재수 그리고 삼수 | 미국 유학 생활Part 2. 공부 시작하기공부란 게임과도 같다 | 공부를 하는 이유 | 공부를 하기 위한 마음가짐Part 3. 우등생이 되는 십계명시간을 지배하라 | 계획을 세워라 | 자신만의 목표를 세워라 |자신을 컨트롤하라 | 한 과목을 최소한 1시간 이상 공부하라 |스터디 그룹을 만들어라 | 잠을 충분히 자라 | 놀 때 놀고 공부할 때 공부하자 |출제자 입장에서 공부하라 | 할 수 있다는 자신감을 갖자Part 4. 공부 고수 되기공부 고수가 되기 위한 순서 | 공부 고수가 되는 지름길 | 시험을 잘 보는 비법Part 5. 공부… 보이지 않는 1%를 위해집중력을 갖자 | 독서를 많이 하자 | 공부에 시너지 효과를 발휘하자 |공부에 대해 프로 의식을 갖자Part 6. 승리 수학 따라잡기왜 수학을 공부해야 하는가? | 산수와 수학의 개념 차이 |수학을 잘하려면 이렇게 하자 | 수학, 보이지 않는 1%를 위해 |수학 선생님께 드리는 말씀Part 7. Jackie의 영어 따라잡기왜 영어를 배워야 되지? | 영어… 어떻게 공부하라고!? | 수능 영어 정복하기 |미국 유학을 생각하고 있는 학생들에게Part 8. 드리고 싶은 말씀학부모님께 | 선생님께 | 학생들에게 |미국에서 바라보는 한국 교육 - 미국 vs. 한국BEST Q&A - 학생들에게 가장 많이 받았던 질문들잠은 몇 시간씩 자야 되나요? | 자투리 시간은 어떻게 사용해야 되나요? |슬럼프는 어떻게 극복해야 하나요? | 암기 과목을 어떻게 공부해야 하나요? |시험을 보고 나면 어떻게 해야 하나요? |학교에서 어떻게 생활하고 공부해야 하나요? |계획대로 안 될 때는 어떻게 해야 하나요?","[{'name': '이은승', 'role': '저', 'author_id': None}, {'name': '', 'role': '저자', 'author_id': None}]"


In [61]:
# 직접 수정
# '생생 공부비법' 도서의 authors 컬럼 직접 수정하기
idx = merged_df[merged_df['title'] == '생생 공부비법'].index

# 예시: authors를 올바르게 리스트 형태로 직접 입력 (author_id는 None 처리)
merged_df.loc[idx, 'authors'] = [[{'name': '이은승', 'role': '저', 'author_id': None}]]

In [None]:
# role unique값확인하기
# authors 리스트에서 모든 role 값 모으기
all_roles = merged_df['authors'].explode().dropna().apply(lambda x: x.get('role') if isinstance(x, dict) else None)

# unique role 값 출력
unique_roles = all_roles.dropna().unique()
print(unique_roles)

['저' '역' ',' '공' '편' '편역' '저자' '공저' '저 /' '감수' '그림' '사진' '기획' '강의' '정리'
 '글' '글그림' '공저 /' '등저' '편저 /' '등역' '공편' '외 그림' '편저' '해제' '원저' '공역' '등저 /'
 '그림 /' '해설' ', 편집부 공저' '외 강의' '엮음' '원전 /' '북텔러' '구연' '역 /' '등 강의' '감독'
 '글 /' '외 공저' '저 / 편집부 역' '역,' '편 /' '출연' '글, 그림' '일러스트' '엮음 /' '원작 /'
 '등편' '평역' '저,' '평설' '감수 /' '기획, 편집' '구성 /']


In [None]:
# 저자, 역자만 가져오기
def filter_and_clean_authors(authors_list):
    if not isinstance(authors_list, list):
        return []
    filtered = []
    for author in authors_list:
        role = author.get('role', '')
        if not isinstance(role, str):
            continue
        if '저' in role:
            clean_role = '저'
        elif '역' in role:
            clean_role = '역'
        else:
            continue
        # 복사해서 role만 바꿔서 추가
        new_author = author.copy()
        new_author['role'] = clean_role
        filtered.append(new_author)
    return filtered

merged_df['authors_filtered'] = merged_df['authors'].apply(filter_and_clean_authors)


In [71]:
merged_df = merged_df.drop(columns=['clean_roles'])

In [73]:
merged_df = merged_df.rename(columns={
    'title': '제목',
    'publisher': '출판사',
    'pub_date': '발행일',
    'price': '판매가',
    'discount': '할인율',
    'point': '적립률',
    'rating': '평점',
    'review_count': '리뷰수',
    'sell_num': '판매지수',
    'table_of_contents': '목차',
    'authors': '저자정보',
    'authors_filtered': '저자_역자_정리'
})

In [74]:
merged_df.to_csv('yes24통합본.csv', index=False, encoding='utf-8-sig')