# 데이터 수집

In [2]:
# -*- coding: utf-8 -*-
"""
Books to Scrape 크롤러 (멀티스레딩 / 5페이지 / 진행상황+ETA / 이미지 다운로드)
- 상세 파싱과 동시에 표지 이미지를 data/images/ 에 저장
"""

import os
import re
import json
import time
import threading
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin, urlparse

import requests
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

START_URL = "http://books.toscrape.com/catalogue/page-1.html"
HEADERS = {"User-Agent": "Mozilla/5.0"}
MAX_WORKERS = 16
PER_REQUEST_PAUSE = 0.03

DATA_DIR = Path("data")
IMAGES_DIR = DATA_DIR / "images"

print_lock = threading.Lock()

def make_session():
    s = requests.Session()
    retries = Retry(
        total=3, connect=3, read=3, backoff_factor=0.5,
        status_forcelist=(429, 500, 502, 503, 504),
        allowed_methods=frozenset(["GET"])
    )
    adapter = HTTPAdapter(max_retries=retries, pool_connections=100, pool_maxsize=100)
    s.mount("http://", adapter)
    s.mount("https://", adapter)
    s.headers.update(HEADERS)
    return s

def clean_money(s):
    return (s or "").replace("Â", "").strip()

def slugify(text, maxlen=80):
    text = re.sub(r"[^\w\s-]", "", text, flags=re.UNICODE)  # 특수문자 제거
    text = re.sub(r"[\s_-]+", "-", text).strip("-")         # 공백/언더스코어 → 하이픈
    return text[:maxlen] if text else "untitled"

def guess_ext_from_url(url, default="jpg"):
    path = urlparse(url).path
    if "." in path:
        ext = path.rsplit(".", 1)[-1].lower()
        # 간단 화이트리스트
        if ext in {"jpg", "jpeg", "png", "webp"}:
            return "jpg" if ext == "jpeg" else ext
    return default

def download_image(sess, img_url, title, upc):
    IMAGES_DIR.mkdir(parents=True, exist_ok=True)
    ext = guess_ext_from_url(img_url, default="jpg")
    name_part = (upc or slugify(title))
    filename = f"{name_part}.{ext}"
    dest = IMAGES_DIR / filename

    # 동일 파일 존재 시 중복 방지(숫자 suffix)
    if dest.exists():
        i = 2
        while True:
            cand = IMAGES_DIR / f"{name_part}-{i}.{ext}"
            if not cand.exists():
                dest = cand
                break
            i += 1

    try:
        with sess.get(img_url, timeout=20, stream=True) as r:
            r.raise_for_status()
            with open(dest, "wb") as f:
                for chunk in r.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
        return str(dest)
    except Exception as e:
        with print_lock:
            print(f"(이미지 실패) {img_url} -> {e}")
        return None

def parse_book_detail(html, base_url):
    soup = BeautifulSoup(html, "html.parser")

    details = {}
    table = soup.find("table", class_="table table-striped")
    if table:
        for row in table.find_all("tr"):
            th = row.find("th").get_text(strip=True)
            td = row.find("td").get_text(strip=True)
            details[th] = td

    title = soup.find("div", class_="product_main").find("h1").get_text(strip=True)
    desc_anchor = soup.find("div", id="product_description")
    description = desc_anchor.find_next_sibling("p").get_text(strip=True) if desc_anchor else ""

    img_tag = soup.select_one(".item.active img") or soup.find("img")
    img_rel = img_tag["src"] if img_tag else ""
    image_url = urljoin(base_url, img_rel)

    return {
        "title": title,
        "upc": details.get("UPC"),
        "product_type": details.get("Product Type", ""),
        "price_excl_tax": clean_money(details.get("Price (excl. tax)")),
        "price_incl_tax": clean_money(details.get("Price (incl. tax)")),
        "tax": clean_money(details.get("Tax")),
        "availability": details.get("Availability", ""),
        "num_reviews": details.get("Number of reviews"),
        "description": description,
        "image_url": image_url,
        "url": base_url,
    }

def get_book_details(url):
    sess = make_session()
    try:
        resp = sess.get(url, timeout=15)
        resp.raise_for_status()
        data = parse_book_detail(resp.text, url)

        # 이미지 다운로드
        img_saved_path = None
        if data.get("image_url"):
            img_saved_path = download_image(
                sess, data["image_url"], data.get("title", ""), data.get("upc")
            )
        data["image_path"] = img_saved_path

        time.sleep(PER_REQUEST_PAUSE)
        return data
    except Exception as e:
        with print_lock:
            print(f"(건너뜀) {url} -> {e}")
        return None
    finally:
        sess.close()

def collect_book_urls(start_url, max_pages=5):
    urls = []
    page_url = start_url
    page_count = 0
    sess = make_session()

    while page_url and page_count < max_pages:
        with print_lock:
            print(f"[PAGE] {page_url}")
        resp = sess.get(page_url, timeout=15)
        resp.raise_for_status()
        soup = BeautifulSoup(resp.text, "html.parser")

        for art in soup.find_all("article", class_="product_pod"):
            a = art.find("h3").find("a")
            book_rel = a["href"]
            book_url = urljoin(page_url, book_rel)
            urls.append(book_url)

        page_count += 1
        next_li = soup.find("li", class_="next")
        page_url = urljoin(page_url, next_li.find("a")["href"]) if (next_li and page_count < max_pages) else None
        time.sleep(PER_REQUEST_PAUSE)

    sess.close()
    return urls

def parse_details_multithread(book_urls, max_workers=MAX_WORKERS):
    results = []
    total = len(book_urls)
    done_cnt = 0
    start_time = time.perf_counter()

    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        fut_map = {ex.submit(get_book_details, u): u for u in book_urls}
        for fut in as_completed(fut_map):
            data = fut.result()
            if data:
                results.append(data)

            done_cnt += 1
            elapsed = time.perf_counter() - start_time
            avg_time = elapsed / max(1, done_cnt)
            eta = avg_time * (total - done_cnt)

            with print_lock:
                print(f"[PROGRESS] {done_cnt}/{total} 완료 "
                      f"({elapsed:.1f}s 경과, ETA {eta:.1f}s)")

    return results

if __name__ == "__main__":
    DATA_DIR.mkdir(parents=True, exist_ok=True)
    IMAGES_DIR.mkdir(parents=True, exist_ok=True)

    # 1) URL 수집 (5페이지 한정)
    book_urls = collect_book_urls(START_URL, max_pages=5)
    print(f"[INFO] 수집된 책 URL 수: {len(book_urls)}")

    # 2) 멀티스레드 상세 파싱(+이미지 저장)
    books = parse_details_multithread(book_urls)

    # 3) 저장 (image_path 포함)
    out_path = DATA_DIR / "books_5pages.json"
    with open(out_path, "w", encoding="utf-8") as f:
        json.dump(books, f, ensure_ascii=False, indent=2)
    print(f"Data saved to {out_path}")
    print(f"Images saved to {IMAGES_DIR}/")


[PAGE] http://books.toscrape.com/catalogue/page-1.html
[PAGE] http://books.toscrape.com/catalogue/page-2.html
[PAGE] http://books.toscrape.com/catalogue/page-3.html
[PAGE] http://books.toscrape.com/catalogue/page-4.html
[PAGE] http://books.toscrape.com/catalogue/page-5.html
[INFO] 수집된 책 URL 수: 100
[PROGRESS] 1/100 완료 (1.0s 경과, ETA 96.7s)
[PROGRESS] 2/100 완료 (1.1s 경과, ETA 51.6s)
[PROGRESS] 3/100 완료 (1.3s 경과, ETA 41.2s)
[PROGRESS] 4/100 완료 (1.7s 경과, ETA 40.0s)
[PROGRESS] 5/100 완료 (1.9s 경과, ETA 35.4s)
[PROGRESS] 6/100 완료 (2.0s 경과, ETA 31.8s)
[PROGRESS] 7/100 완료 (2.0s 경과, ETA 26.9s)
[PROGRESS] 8/100 완료 (2.2s 경과, ETA 25.2s)
[PROGRESS] 9/100 완료 (2.2s 경과, ETA 22.2s)
[PROGRESS] 10/100 완료 (2.3s 경과, ETA 20.8s)
[PROGRESS] 11/100 완료 (2.4s 경과, ETA 19.0s)
[PROGRESS] 12/100 완료 (2.4s 경과, ETA 17.3s)
[PROGRESS] 13/100 완료 (2.4s 경과, ETA 16.0s)
[PROGRESS] 14/100 완료 (2.5s 경과, ETA 15.1s)
[PROGRESS] 15/100 완료 (2.5s 경과, ETA 14.1s)
[PROGRESS] 16/100 완료 (2.6s 경과, ETA 13.5s)
[PROGRESS] 17/100 완료 (2.8s 경과, ETA 13.

# duckdb에 데이터 추가하기

## Pandas DataFrame을 DuckDB 테이블 사용하기

In [4]:
import pandas as pd
import duckdb

# 예시 Pandas DataFrame 생성
df = pd.DataFrame({
    'product': ['A', 'B', 'A', 'C', 'B'],
    'price': [1000, 2000, 1500, 3000, 2500],
    'quantity': [2, 1, 3, 1, 2]
})

# DuckDB에 연결
con = duckdb.connect(database=':memory:', read_only=False)

# DataFrame을 SQL 쿼리로 바로 사용
# SQL 내에서 DataFrame 변수명(df)을 직접 사용
result = con.execute("SELECT product, SUM(price * quantity) as total_sales FROM df GROUP BY product").fetchdf()

print(result)

# 연결 닫기
con.close()

  product  total_sales
0       A       6500.0
1       B       7000.0
2       C       3000.0


## DuckDB를 사용하여 파일 입출력 처리
- DuckDB는 CSV, Parquet, JSON 등 다양한 파일 포맷을 직접 쿼리하고 저장

In [5]:
import seaborn as sns
import pandas as pd

# 1. Seaborn에서 'iris' 데이터셋 가져오기
iris_df = sns.load_dataset('iris')

# 2. DataFrame을 'iris_data.csv' 파일로 내보내기
# index=False 옵션은 Pandas DataFrame의 인덱스를 파일에 포함시키지 않도록 합니다.
iris_df.to_csv('data/iris_data.csv', index=False)
print("DataFrame이 'iris_data.csv' 파일로 성공적으로 저장되었습니다.")

DataFrame이 'iris_data.csv' 파일로 성공적으로 저장되었습니다.


In [7]:
import duckdb

# 1. DuckDB로 파일 가져와 쿼리하기
# DuckDB에 인메모리(in-memory) 연결
con = duckdb.connect(database=':memory:', read_only=False)

# SQL 쿼리를 사용하여 CSV 파일에서 데이터 읽기
# 'iris_data.csv' 파일을 마치 테이블처럼 직접 쿼리합니다.
# species 별 sepal_length의 평균을 계산
query = """
SELECT
    species,
    AVG(sepal_length) AS avg_sepal_length
FROM 'data/iris_data.csv'
GROUP BY species
ORDER BY species;
"""

# 쿼리 실행 및 결과를 Pandas DataFrame으로 가져오기
result_df = con.execute(query).fetchdf()

# 결과 출력
print("\nDuckDB로 'iris_data.csv' 파일을 읽어온 결과:")
print(result_df)

# DuckDB 연결 종료
con.close()


DuckDB로 'iris_data.csv' 파일을 읽어온 결과:
      species  avg_sepal_length
0      setosa             5.006
1  versicolor             5.936
2   virginica             6.588


## 파일 쓰기

In [9]:
import pandas as pd
import seaborn as sns

# Seaborn에서 'iris' 데이터셋 불러오기
iris_df = sns.load_dataset('iris')

# CSV 파일로 저장
iris_df.to_csv('data/iris_to_csv_pandas.csv', index=False)
print("Pandas to_csv()로 'iris_to_csv_pandas.csv' 파일이 저장되었습니다.")

# Parquet 파일로 저장 (pyarrow 또는 fastparquet 엔진 필요)
# pip install pyarrow
iris_df.to_parquet('data/iris_to_parquet_pandas.parquet', index=False)
print("Pandas to_parquet()로 'iris_to_parquet_pandas.parquet' 파일이 저장되었습니다.")

Pandas to_csv()로 'iris_to_csv_pandas.csv' 파일이 저장되었습니다.
Pandas to_parquet()로 'iris_to_parquet_pandas.parquet' 파일이 저장되었습니다.


- CSV 파일 기반으로 COPY 명령어 사용하기
- 기존에 Pandas로 저장했던 'iris_to_csv_pandas.csv' 파일을 읽어와서 특정 조건의 데이터를 필터링한 후, 새로운 CSV 파일로 다시 저장한다. 

In [10]:
import duckdb

# DuckDB 연결
con = duckdb.connect(database=':memory:', read_only=False)

# 기존 CSV 파일 ('iris_to_csv_pandas.csv')을 읽어서 sepal_length가 5.5 이상인 데이터만 필터링
# 결과를 'filtered_iris.csv' 파일로 저장
query_from_csv = """
COPY (
    SELECT *
    FROM 'data/iris_to_csv_pandas.csv'
    WHERE "sepal_length" >= 5.5
) TO 'data/filtered_iris.csv' (HEADER, DELIMITER ',');
"""

# 쿼리 실행
con.execute(query_from_csv)
print("CSV 파일을 기반으로 DuckDB COPY 명령어가 실행되었습니다. 'filtered_iris.csv' 파일이 저장되었습니다.")

# 연결 종료
con.close()

CSV 파일을 기반으로 DuckDB COPY 명령어가 실행되었습니다. 'filtered_iris.csv' 파일이 저장되었습니다.


- Parquet 파일 기반으로 COPY 명령어 사용하기
- 기존에 Pandas로 저장했던 'data/iris_to_parquet_pandas.parquet' 파일을 읽어와서 species별로 sepal_width의 평균을 계산한 뒤, 그 결과를 Parquet 파일로 저장합니다.

In [11]:
import duckdb

# DuckDB 연결
con = duckdb.connect(database=':memory:', read_only=False)

# 기존 Parquet 파일 ('iris_to_parquet_pandas.parquet')을 읽어서 species 별 평균 sepal_width 계산
# 결과를 'avg_sepal_width.parquet' 파일로 저장
query_from_parquet = """
COPY (
    SELECT
        species,
        AVG("sepal_width") AS avg_sepal_width
    FROM 'data/iris_to_parquet_pandas.parquet'
    GROUP BY species
) TO 'data/avg_sepal_width.parquet' (FORMAT PARQUET);
"""

# 쿼리 실행
con.execute(query_from_parquet)
print("Parquet 파일을 기반으로 DuckDB COPY 명령어가 실행되었습니다. 'avg_sepal_width.parquet' 파일이 저장되었습니다.")

# 연결 종료
con.close()

Parquet 파일을 기반으로 DuckDB COPY 명령어가 실행되었습니다. 'avg_sepal_width.parquet' 파일이 저장되었습니다.
