Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/pre-processing-service/app/model/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class ResponseSadaguCrawl(ResponseBase[SadaguCrawlData]):


class RequestS3Upload(RequestBase):
task_run_id: int = Field(..., title="Task Run ID", description="워크플로우 실행 ID")
keyword: str = Field(
..., title="검색 키워드", description="폴더명 생성용 키워드"
) # 추가
Expand Down
154 changes: 96 additions & 58 deletions apps/pre-processing-service/app/service/crawl_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@
from app.model.schemas import RequestSadaguCrawl
from loguru import logger
from app.utils.response import Response
import os

os.environ["TOKENIZERS_PARALLELISM"] = "false"


class CrawlService:
def __init__(self):
pass

async def crawl_product_detail(self, request: RequestSadaguCrawl) -> dict:
async def crawl_product_detail(
self, request: RequestSadaguCrawl, max_concurrent: int = 5
) -> dict:
"""
선택된 상품들의 상세 정보를 크롤링하는 비즈니스 로직입니다. (5단계)
여러 상품 URL을 입력받아 순차적으로 상세 정보를 크롤링하여 딕셔너리로 반환합니다.
여러 상품 URL을 입력받아 비동기로 상세 정보를 크롤링하여 딕셔너리로 반환합니다.
"""
product_urls = [str(url) for url in request.product_urls]

Expand All @@ -25,70 +30,44 @@ async def crawl_product_detail(self, request: RequestSadaguCrawl) -> dict:
fail_count = 0

try:
# 각 상품을 순차적으로 크롤링 (안정성 확보)
for i, product_url in enumerate(product_urls, 1):
logger.info(f"상품 {i}/{len(product_urls)} 크롤링 시작: {product_url}")

crawler = DetailCrawler(use_selenium=True)

try:
# 상세 정보 크롤링 실행
product_detail = await crawler.crawl_detail(product_url)

if product_detail:
product_title = product_detail.get("title", "Unknown")[:50]
logger.success(
f"상품 {i} 크롤링 성공: title='{product_title}', price={product_detail.get('price', 0)}"
)

# 성공한 상품 추가
crawled_products.append(
{
"index": i,
"url": product_url,
"product_detail": product_detail,
"status": "success",
"crawled_at": time.strftime("%Y-%m-%d %H:%M:%S"),
}
)
success_count += 1
else:
logger.error(f"상품 {i} 크롤링 실패: 상세 정보 없음")
crawled_products.append(
{
"index": i,
"url": product_url,
"product_detail": None,
"status": "failed",
"error": "상세 정보 없음",
"crawled_at": time.strftime("%Y-%m-%d %H:%M:%S"),
}
)
fail_count += 1
# 세마포어로 동시 실행 수 제한
semaphore = asyncio.Semaphore(max_concurrent)

except Exception as e:
logger.error(
f"상품 {i} 크롤링 오류: url={product_url}, error='{e}'"
)
# 모든 크롤링 태스크를 동시에 실행
tasks = []
for i, product_url in enumerate(product_urls, 1):
task = self._crawl_single_with_semaphore(
semaphore, i, product_url, len(product_urls)
)
tasks.append(task)

# 모든 태스크 동시 실행 및 결과 수집
results = await asyncio.gather(*tasks, return_exceptions=True)

# 결과 정리
for result in results:
if isinstance(result, Exception):
logger.error(f"크롤링 태스크 오류: {result}")
crawled_products.append(
{
"index": i,
"url": product_url,
"index": len(crawled_products) + 1,
"url": "unknown",
"product_detail": None,
"status": "failed",
"error": str(e),
"error": str(result),
"crawled_at": time.strftime("%Y-%m-%d %H:%M:%S"),
}
)
fail_count += 1
else:
crawled_products.append(result)
if result["status"] == "success":
success_count += 1
else:
fail_count += 1

finally:
# 각 크롤러 개별 정리
await crawler.close()

# 상품간 간격 (서버 부하 방지)
if i < len(product_urls):
await asyncio.sleep(1)
# 인덱스 순으로 정렬
crawled_products.sort(key=lambda x: x["index"])

logger.success(
f"전체 크롤링 완료: 총 {len(product_urls)}개, 성공 {success_count}개, 실패 {fail_count}개"
Expand All @@ -111,10 +90,69 @@ async def crawl_product_detail(self, request: RequestSadaguCrawl) -> dict:
logger.error(f"배치 크롤링 서비스 오류: error='{e}'")
raise InvalidItemDataException()

# 기존 단일 크롤링 메서드도 유지 (하위 호환성)
async def _crawl_single_with_semaphore(
self,
semaphore: asyncio.Semaphore,
index: int,
product_url: str,
total_count: int,
) -> dict:
"""
세마포어를 사용한 단일 상품 크롤링
"""
async with semaphore:
logger.info(f"상품 {index}/{total_count} 크롤링 시작: {product_url}")

crawler = DetailCrawler(use_selenium=True)

try:
# 상세 정보 크롤링 실행
product_detail = await crawler.crawl_detail(product_url)

if product_detail:
product_title = product_detail.get("title", "Unknown")[:50]
logger.success(
f"상품 {index} 크롤링 성공: title='{product_title}', price={product_detail.get('price', 0)}"
)

return {
"index": index,
"url": product_url,
"product_detail": product_detail,
"status": "success",
"crawled_at": time.strftime("%Y-%m-%d %H:%M:%S"),
}
else:
logger.error(f"상품 {index} 크롤링 실패: 상세 정보 없음")
return {
"index": index,
"url": product_url,
"product_detail": None,
"status": "failed",
"error": "상세 정보 없음",
"crawled_at": time.strftime("%Y-%m-%d %H:%M:%S"),
}

except Exception as e:
logger.error(
f"상품 {index} 크롤링 오류: url={product_url}, error='{e}'"
)
return {
"index": index,
"url": product_url,
"product_detail": None,
"status": "failed",
"error": str(e),
"crawled_at": time.strftime("%Y-%m-%d %H:%M:%S"),
}

finally:
# 각 크롤러 개별 정리
await crawler.close()

async def crawl_single_product_detail(self, product_url: str) -> dict:
"""
단일 상품 크롤링 (하위 호환성용)
단일 상품 크롤링
"""
crawler = DetailCrawler(use_selenium=True)

Expand Down
75 changes: 45 additions & 30 deletions apps/pre-processing-service/app/service/crawlers/detail_crawler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
import re
import asyncio
from bs4 import BeautifulSoup
from .search_crawler import SearchCrawler
from loguru import logger
Expand All @@ -13,28 +14,19 @@ async def crawl_detail(self, product_url: str) -> dict:
try:
logger.info(f"상품 상세 크롤링 시작: url='{product_url}'")

# HTML 가져오기
# HTML 가져오기 (Selenium 부분을 별도 스레드에서 실행)
soup = (
await self._get_soup_selenium(product_url)
if self.use_selenium
else await self._get_soup_httpx(product_url)
)

# 기본 정보 추출
title = self._extract_title(soup)
price = self._extract_price(soup)
rating = self._extract_rating(soup)
options = self._extract_options(soup)
material_info = self._extract_material_info(soup)

# 이미지 정보 추출 (항상 실행)
logger.info("이미지 정보 추출 중...")
page_images = self._extract_images(soup)
option_images = [
opt["image_url"] for opt in options if opt.get("image_url")
]
# 중복 제거 후 합치기
all_images = list(set(page_images + option_images))
# 기본 정보 추출 (CPU 집약적 작업을 별도 스레드에서 실행)
extraction_tasks = await asyncio.to_thread(
self._extract_all_data, soup, product_url
)

title, price, rating, options, material_info, all_images = extraction_tasks

product_data = {
"url": product_url,
Expand All @@ -58,20 +50,25 @@ async def crawl_detail(self, product_url: str) -> dict:
raise Exception(f"크롤링 실패: {str(e)}")

async def _get_soup_selenium(self, product_url: str) -> BeautifulSoup:
"""Selenium으로 HTML 가져오기"""
try:
logger.debug(f"Selenium HTML 로딩 시작: url='{product_url}'")
self.driver.get(product_url)
self.wait.until(
lambda driver: driver.execute_script("return document.readyState")
== "complete"
)
time.sleep(2)
logger.debug("Selenium HTML 로딩 완료")
return BeautifulSoup(self.driver.page_source, "html.parser")
except Exception as e:
logger.error(f"Selenium HTML 로딩 실패: url='{product_url}', error='{e}'")
raise Exception(f"Selenium HTML 로딩 실패: {e}")
"""Selenium으로 HTML 가져오기 (별도 스레드에서 실행)"""

def _selenium_sync(url):
try:
logger.debug(f"Selenium HTML 로딩 시작: url='{url}'")
self.driver.get(url)
self.wait.until(
lambda driver: driver.execute_script("return document.readyState")
== "complete"
)
time.sleep(2)
logger.debug("Selenium HTML 로딩 완료")
return BeautifulSoup(self.driver.page_source, "html.parser")
except Exception as e:
logger.error(f"Selenium HTML 로딩 실패: url='{url}', error='{e}'")
raise Exception(f"Selenium HTML 로딩 실패: {e}")

# Selenium 동기 코드를 별도 스레드에서 실행
return await asyncio.to_thread(_selenium_sync, product_url)

async def _get_soup_httpx(self, product_url: str) -> BeautifulSoup:
"""httpx로 HTML 가져오기"""
Expand All @@ -85,6 +82,24 @@ async def _get_soup_httpx(self, product_url: str) -> BeautifulSoup:
logger.error(f"httpx HTML 요청 실패: url='{product_url}', error='{e}'")
raise Exception(f"HTTP 요청 실패: {e}")

def _extract_all_data(self, soup: BeautifulSoup, product_url: str) -> tuple:
"""모든 데이터 추출을 한 번에 처리 (동기 함수)"""
# 기본 정보 추출
title = self._extract_title(soup)
price = self._extract_price(soup)
rating = self._extract_rating(soup)
options = self._extract_options(soup)
material_info = self._extract_material_info(soup)

# 이미지 정보 추출
logger.info("이미지 정보 추출 중...")
page_images = self._extract_images(soup)
option_images = [opt["image_url"] for opt in options if opt.get("image_url")]
# 중복 제거 후 합치기
all_images = list(set(page_images + option_images))

return title, price, rating, options, material_info, all_images

def _extract_title(self, soup: BeautifulSoup) -> str:
title_element = soup.find("h1", {"id": "kakaotitle"})
title = title_element.get_text(strip=True) if title_element else "제목 없음"
Expand Down
Loading
Loading