In [1]:
# 항상 첫 번째 셀
import sys
import os
sys.path.append(os.path.abspath("../src"))

In [2]:
import os
import csv
import json
import time
import re
from dotenv import load_dotenv

# Selenium & BS4
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager

# SQLAlchemy
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError

# ==========================================
# 환경 변수 및 RDSClient 설정
# ==========================================
load_dotenv()  # .env 파일 로드

class RDSClient:
    def __init__(self):
        db_user = os.getenv('DB_USER')
        db_password = os.getenv('DB_PASSWORD')
        db_host = os.getenv('DB_HOST')
        db_port = os.getenv('DB_PORT', 3306)
        db_name = os.getenv('DB_NAME')

        # SQLAlchemy URL 형식: mysql+pymysql://user:password@host:port/dbname
        self.db_url = f"mysql+pymysql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}?charset=utf8mb4"

        try:
            self.engine = create_engine(
                self.db_url,
                pool_size=5,
                max_overflow=10,
                pool_recycle=3600,
                echo=False
            )
            print("✅ DB Engine (Pool) 생성 완료")
        except Exception as e:
            print(f"❌ Engine 생성 중 오류 발생: {e}")
            self.engine = None

    def execute(self, query, params=None):
        if not self.engine:
            print("DB 엔진이 초기화되지 않았습니다.")
            return None

        result_data = None
        try:
            with self.engine.connect() as connection:
                stmt = text(query)
                # params가 튜플/리스트 등일 경우를 대비해 딕셔너리로 확실하게 처리하는 것이 좋습니다.
                # 여기서는 호출부에서 딕셔너리로 넘기도록 설계합니다.
                result = connection.execute(stmt, params or {})

                if result.returns_rows:
                    result_data = [dict(row) for row in result.mappings()]
                else:
                    connection.commit()
                    result_data = result.rowcount
        except SQLAlchemyError as e:
            print(f"⚠️ 쿼리 실행 에러: {e}")
            # 에러 발생 시 None 반환
            return None 
        
        return result_data

# DB 클라이언트 초기화
rds = RDSClient()

# ==========================================
# product_bottom 테이블 컬럼 자동 읽기
# ==========================================
# SQLAlchemy 결과는 딕셔너리 리스트로 반환됩니다.
cols_result = rds.execute("SHOW COLUMNS FROM product_bottom")
if cols_result:
    # MySQL의 SHOW COLUMNS 결과에서 컬럼명은 'Field' 키에 담깁니다.
    db_cols = [row['Field'] for row in cols_result]
else:
    print("❌ 테이블 컬럼 정보를 가져오지 못했습니다. 종료합니다.")
    exit()

print("DB 테이블 컬럼:", db_cols)
print("총 컬럼 수:", len(db_cols))

# SQLAlchemy text() 방식에 맞춰 Named Parameter(:col_name) 방식으로 쿼리 생성
# 예: INSERT INTO table (col1, col2) VALUES (:col1, :col2)
placeholders = ", ".join([f":{col}" for col in db_cols])
insert_sql = f"""
INSERT INTO product_bottom ({", ".join(db_cols)})
VALUES ({placeholders})
"""

# ==========================================
# CSV에서 Product ID 읽기
# ==========================================
goods_ids = []
try:
    with open("musinsa_bottom_ids.csv", newline="", encoding="utf-8") as f:
        reader = csv.reader(f)
        next(reader) # 헤더 스킵
        for row in reader:
            goods_ids.append(row[0])
except FileNotFoundError:
    print("❌ CSV 파일을 찾을 수 없습니다.")
    exit()

total_count = len(goods_ids)

# ==========================================
# 이미 저장된 상품 수 확인 → 재시작 기능
# ==========================================
# COUNT(*) 쿼리 결과 처리
cnt_result = rds.execute("SELECT COUNT(*) as cnt FROM product_bottom")
saved_count = cnt_result[0]['cnt'] if cnt_result else 0

print(f"이미 저장된 상품 수: {saved_count}개")
print(f"전체 상품 수: {total_count}")

start_index = saved_count
print(f"이번에 시작할 인덱스: {start_index}")

# ==========================================
# Selenium 설정
# ==========================================
options = Options()
options.add_argument("--headless")
# 사용자 에이전트 추가 (차단 방지용 권장)
options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)

# ==========================================
# 공통 파싱 함수
# ==========================================
def extract_int(text):
    return int(re.sub(r"[^0-9]", "", text)) if text else None

def extract_number(text):
    if text is None:
        return None
    if "만" in text:
        try:
            return str(int(float(text.replace("만", "")) * 10000))
        except:
            return text
    cleaned = re.sub(r"[^0-9]", "", text)
    return cleaned if cleaned != "" else ""

def clean_product_name(name):
    return re.sub(r"-.*", "", name).strip()

# ==========================================
# 크롤링 시작
# ==========================================
print("크롤링 시작!")

for idx in range(start_index, total_count):

    gid = goods_ids[idx]
    print(f"[{idx+1}/{total_count}] 상품 {gid} 처리 중...")

    try:
        url = f"https://www.musinsa.com/products/{gid}"
        driver.get(url)
        time.sleep(0.7)
        soup = BeautifulSoup(driver.page_source, "html.parser")

        # -----------------------------
        # 1. 상품명
        # -----------------------------
        name_tag = soup.select_one("div[class*='GoodsName__Wrap'] span")
        product_name = clean_product_name(name_tag.get_text(strip=True)) if name_tag else None

        # 2. 브랜드
        brand_tag = soup.select_one("div[class*='Brand__Wrap'] span[class*='BrandName']")
        brand = re.sub(r"[0-9\.]+만", "", brand_tag.get_text(strip=True)).strip() if brand_tag else None

        # 3~5 정가/판매가/할인율
        normal_price = extract_int(soup.select_one("span.line-through").get_text()) if soup.select_one("span.line-through") else None
        sale_price = extract_int(soup.select_one("span[class*='CalculatedPrice']").get_text()) if soup.select_one("span[class*='CalculatedPrice']") else None
        discount = extract_int(soup.select_one("span[class*='DiscountRate']").get_text()) if soup.select_one("span[class*='DiscountRate']") else None

        # 6 카테고리
        cats = soup.select("div[class*='Category__Wrap'] a")
        upper_category = cats[0].get_text(strip=True) if len(cats) > 0 else None
        lower_category = cats[1].get_text(strip=True) if len(cats) > 1 else None

        # 7 성별
        gender = 0
        layout_boxes = soup.select("dl[class*='Layout__Wrap'] div[class*='Layout__Box']")
        if len(layout_boxes) > 1:
            dd = layout_boxes[1].find("dd")
            if dd:
                gtext = dd.get_text(strip=True)
                gender_map = {"남": 1, "여": 2, "공용": 0, "남녀공용": 0}
                gender = gender_map.get(gtext, 0)

        # 누적판매
        cumulative = ""
        for box in layout_boxes:
            dt = box.find("dt")
            dd = box.find("dd")
            if dt and dd and dt.get_text(strip=True) == "누적판매":
                cumulative = dd.get_text(strip=True)

        # 8 별점
        rating_tag = soup.select_one("div[class*='Review__Wrap'] span")
        rating = float(rating_tag.get_text(strip=True)) if rating_tag else None

        # 9 후기수
        review_cnt_tag = soup.select_one("div[class*='Review__Wrap'] span:nth-of-type(2)")
        review_cnt = int(extract_number(review_cnt_tag.get_text())) if review_cnt_tag else 0

        # 10 관심수
        like_tag = soup.select_one("div[class*='Like__Container'] span")
        like_cnt = extract_number(like_tag.get_text()) if like_tag else None

        # 11 스타일 태그
        tag_items = soup.select("ul[class*='ProductTags__List'] span")
        styles = ",".join([t.get_text(strip=True) for t in tag_items])

        # 12 실측사이즈 JSON
        size_json = "[]"

        # 13 핏/계절감 JSON
        fit_json = json.dumps({"핏": [], "계절감": []}, ensure_ascii=False)

        # -----------------------------
        # INSERT 값 준비 (SQLAlchemy 대응)
        # -----------------------------
        # DB 컬럼 순서에 맞춘 값 튜플
        values = (
            gid, product_name, brand, normal_price, sale_price,
            upper_category, lower_category, gender, rating,
            like_cnt, review_cnt, size_json, discount,
            fit_json, cumulative, styles
        )

        # SQLAlchemy text() 실행 시 Named Parameter를 사용하기 위해
        # 컬럼명과 값을 매핑한 딕셔너리로 변환합니다.
        # 예: {'col1': val1, 'col2': val2, ...}
        param_dict = dict(zip(db_cols, values))

        # DB 저장 실행
        rds.execute(insert_sql, param_dict)

    except Exception as e:
        print(f"오류: {gid}, 이유: {e}")
        with open("error_log.txt", "a", encoding="utf-8") as log:
            log.write(f"{gid}, {e}\n")
        
        # SQLAlchemy는 풀(Pool)에서 연결을 관리하므로 수동 reconnect 코드는 
        # 대부분 필요 없으나, 심각한 네트워크 오류 시에는 루프 다음 턴에서 
        # engine.connect()가 호출될 때 자동으로 새 연결을 시도합니다.

print("모든 작업 완료!")

driver.quit()
# rds.engine.dispose() # 프로그램 종료 시 엔진 리소스 해제 (필수 아님, 자동 처리됨)

DB HOST: musinsa-data.c07kuo6ug98z.us-east-1.rds.amazonaws.com
DB USER: admin
DB PASSWORD: qkqajrwkrnrnrn9_
DB NAME: musinsa
DB PORT: 3306
✅ DB Engine (Pool) 생성 완료
DB 테이블 컬럼: ['product_id', 'product_name', 'brand', 'original_price', 'sale_price', 'upper_category', 'lower_category', 'gender', 'rating', 'wish_count', 'review_count', 'size_info', 'discount_rate', 'fit_season', 'cumulative_sales', 'style']
총 컬럼 수: 16
❌ CSV 파일을 찾을 수 없습니다.
이미 저장된 상품 수: 3576개
전체 상품 수: 0
이번에 시작할 인덱스: 3576
크롤링 시작!
모든 작업 완료!


: 