diff --git a/apps/pre-processing-service/app/model/schemas.py b/apps/pre-processing-service/app/model/schemas.py index 7487927b..4001b705 100644 --- a/apps/pre-processing-service/app/model/schemas.py +++ b/apps/pre-processing-service/app/model/schemas.py @@ -165,6 +165,7 @@ class ResponseSadaguCrawl(ResponseBase[SadaguCrawlData]): class RequestS3Upload(RequestBase): + task_run_id: int = Field(..., title="Task Run ID", description="워크플로우 실행 ID") keyword: str = Field( ..., title="검색 키워드", description="폴더명 생성용 키워드" ) # 추가 diff --git a/apps/pre-processing-service/app/service/crawl_service.py b/apps/pre-processing-service/app/service/crawl_service.py index e8785f64..3d1183eb 100644 --- a/apps/pre-processing-service/app/service/crawl_service.py +++ b/apps/pre-processing-service/app/service/crawl_service.py @@ -5,16 +5,21 @@ from app.model.schemas import RequestSadaguCrawl from loguru import logger from app.utils.response import Response +import os + +os.environ["TOKENIZERS_PARALLELISM"] = "false" class CrawlService: def __init__(self): pass - async def crawl_product_detail(self, request: RequestSadaguCrawl) -> dict: + async def crawl_product_detail( + self, request: RequestSadaguCrawl, max_concurrent: int = 5 + ) -> dict: """ 선택된 상품들의 상세 정보를 크롤링하는 비즈니스 로직입니다. (5단계) - 여러 상품 URL을 입력받아 순차적으로 상세 정보를 크롤링하여 딕셔너리로 반환합니다. + 여러 상품 URL을 입력받아 비동기로 상세 정보를 크롤링하여 딕셔너리로 반환합니다. """ product_urls = [str(url) for url in request.product_urls] @@ -25,70 +30,44 @@ async def crawl_product_detail(self, request: RequestSadaguCrawl) -> dict: fail_count = 0 try: - # 각 상품을 순차적으로 크롤링 (안정성 확보) - for i, product_url in enumerate(product_urls, 1): - logger.info(f"상품 {i}/{len(product_urls)} 크롤링 시작: {product_url}") - - crawler = DetailCrawler(use_selenium=True) - - try: - # 상세 정보 크롤링 실행 - product_detail = await crawler.crawl_detail(product_url) - - if product_detail: - product_title = product_detail.get("title", "Unknown")[:50] - logger.success( - f"상품 {i} 크롤링 성공: title='{product_title}', price={product_detail.get('price', 0)}" - ) - - # 성공한 상품 추가 - crawled_products.append( - { - "index": i, - "url": product_url, - "product_detail": product_detail, - "status": "success", - "crawled_at": time.strftime("%Y-%m-%d %H:%M:%S"), - } - ) - success_count += 1 - else: - logger.error(f"상품 {i} 크롤링 실패: 상세 정보 없음") - crawled_products.append( - { - "index": i, - "url": product_url, - "product_detail": None, - "status": "failed", - "error": "상세 정보 없음", - "crawled_at": time.strftime("%Y-%m-%d %H:%M:%S"), - } - ) - fail_count += 1 + # 세마포어로 동시 실행 수 제한 + semaphore = asyncio.Semaphore(max_concurrent) - except Exception as e: - logger.error( - f"상품 {i} 크롤링 오류: url={product_url}, error='{e}'" - ) + # 모든 크롤링 태스크를 동시에 실행 + tasks = [] + for i, product_url in enumerate(product_urls, 1): + task = self._crawl_single_with_semaphore( + semaphore, i, product_url, len(product_urls) + ) + tasks.append(task) + + # 모든 태스크 동시 실행 및 결과 수집 + results = await asyncio.gather(*tasks, return_exceptions=True) + + # 결과 정리 + for result in results: + if isinstance(result, Exception): + logger.error(f"크롤링 태스크 오류: {result}") crawled_products.append( { - "index": i, - "url": product_url, + "index": len(crawled_products) + 1, + "url": "unknown", "product_detail": None, "status": "failed", - "error": str(e), + "error": str(result), "crawled_at": time.strftime("%Y-%m-%d %H:%M:%S"), } ) fail_count += 1 + else: + crawled_products.append(result) + if result["status"] == "success": + success_count += 1 + else: + fail_count += 1 - finally: - # 각 크롤러 개별 정리 - await crawler.close() - - # 상품간 간격 (서버 부하 방지) - if i < len(product_urls): - await asyncio.sleep(1) + # 인덱스 순으로 정렬 + crawled_products.sort(key=lambda x: x["index"]) logger.success( f"전체 크롤링 완료: 총 {len(product_urls)}개, 성공 {success_count}개, 실패 {fail_count}개" @@ -111,10 +90,69 @@ async def crawl_product_detail(self, request: RequestSadaguCrawl) -> dict: logger.error(f"배치 크롤링 서비스 오류: error='{e}'") raise InvalidItemDataException() - # 기존 단일 크롤링 메서드도 유지 (하위 호환성) + async def _crawl_single_with_semaphore( + self, + semaphore: asyncio.Semaphore, + index: int, + product_url: str, + total_count: int, + ) -> dict: + """ + 세마포어를 사용한 단일 상품 크롤링 + """ + async with semaphore: + logger.info(f"상품 {index}/{total_count} 크롤링 시작: {product_url}") + + crawler = DetailCrawler(use_selenium=True) + + try: + # 상세 정보 크롤링 실행 + product_detail = await crawler.crawl_detail(product_url) + + if product_detail: + product_title = product_detail.get("title", "Unknown")[:50] + logger.success( + f"상품 {index} 크롤링 성공: title='{product_title}', price={product_detail.get('price', 0)}" + ) + + return { + "index": index, + "url": product_url, + "product_detail": product_detail, + "status": "success", + "crawled_at": time.strftime("%Y-%m-%d %H:%M:%S"), + } + else: + logger.error(f"상품 {index} 크롤링 실패: 상세 정보 없음") + return { + "index": index, + "url": product_url, + "product_detail": None, + "status": "failed", + "error": "상세 정보 없음", + "crawled_at": time.strftime("%Y-%m-%d %H:%M:%S"), + } + + except Exception as e: + logger.error( + f"상품 {index} 크롤링 오류: url={product_url}, error='{e}'" + ) + return { + "index": index, + "url": product_url, + "product_detail": None, + "status": "failed", + "error": str(e), + "crawled_at": time.strftime("%Y-%m-%d %H:%M:%S"), + } + + finally: + # 각 크롤러 개별 정리 + await crawler.close() + async def crawl_single_product_detail(self, product_url: str) -> dict: """ - 단일 상품 크롤링 (하위 호환성용) + 단일 상품 크롤링 """ crawler = DetailCrawler(use_selenium=True) diff --git a/apps/pre-processing-service/app/service/crawlers/detail_crawler.py b/apps/pre-processing-service/app/service/crawlers/detail_crawler.py index f01ed53a..38c6d56c 100644 --- a/apps/pre-processing-service/app/service/crawlers/detail_crawler.py +++ b/apps/pre-processing-service/app/service/crawlers/detail_crawler.py @@ -1,5 +1,6 @@ import time import re +import asyncio from bs4 import BeautifulSoup from .search_crawler import SearchCrawler from loguru import logger @@ -13,28 +14,19 @@ async def crawl_detail(self, product_url: str) -> dict: try: logger.info(f"상품 상세 크롤링 시작: url='{product_url}'") - # HTML 가져오기 + # HTML 가져오기 (Selenium 부분을 별도 스레드에서 실행) soup = ( await self._get_soup_selenium(product_url) if self.use_selenium else await self._get_soup_httpx(product_url) ) - # 기본 정보 추출 - title = self._extract_title(soup) - price = self._extract_price(soup) - rating = self._extract_rating(soup) - options = self._extract_options(soup) - material_info = self._extract_material_info(soup) - - # 이미지 정보 추출 (항상 실행) - logger.info("이미지 정보 추출 중...") - page_images = self._extract_images(soup) - option_images = [ - opt["image_url"] for opt in options if opt.get("image_url") - ] - # 중복 제거 후 합치기 - all_images = list(set(page_images + option_images)) + # 기본 정보 추출 (CPU 집약적 작업을 별도 스레드에서 실행) + extraction_tasks = await asyncio.to_thread( + self._extract_all_data, soup, product_url + ) + + title, price, rating, options, material_info, all_images = extraction_tasks product_data = { "url": product_url, @@ -58,20 +50,25 @@ async def crawl_detail(self, product_url: str) -> dict: raise Exception(f"크롤링 실패: {str(e)}") async def _get_soup_selenium(self, product_url: str) -> BeautifulSoup: - """Selenium으로 HTML 가져오기""" - try: - logger.debug(f"Selenium HTML 로딩 시작: url='{product_url}'") - self.driver.get(product_url) - self.wait.until( - lambda driver: driver.execute_script("return document.readyState") - == "complete" - ) - time.sleep(2) - logger.debug("Selenium HTML 로딩 완료") - return BeautifulSoup(self.driver.page_source, "html.parser") - except Exception as e: - logger.error(f"Selenium HTML 로딩 실패: url='{product_url}', error='{e}'") - raise Exception(f"Selenium HTML 로딩 실패: {e}") + """Selenium으로 HTML 가져오기 (별도 스레드에서 실행)""" + + def _selenium_sync(url): + try: + logger.debug(f"Selenium HTML 로딩 시작: url='{url}'") + self.driver.get(url) + self.wait.until( + lambda driver: driver.execute_script("return document.readyState") + == "complete" + ) + time.sleep(2) + logger.debug("Selenium HTML 로딩 완료") + return BeautifulSoup(self.driver.page_source, "html.parser") + except Exception as e: + logger.error(f"Selenium HTML 로딩 실패: url='{url}', error='{e}'") + raise Exception(f"Selenium HTML 로딩 실패: {e}") + + # Selenium 동기 코드를 별도 스레드에서 실행 + return await asyncio.to_thread(_selenium_sync, product_url) async def _get_soup_httpx(self, product_url: str) -> BeautifulSoup: """httpx로 HTML 가져오기""" @@ -85,6 +82,24 @@ async def _get_soup_httpx(self, product_url: str) -> BeautifulSoup: logger.error(f"httpx HTML 요청 실패: url='{product_url}', error='{e}'") raise Exception(f"HTTP 요청 실패: {e}") + def _extract_all_data(self, soup: BeautifulSoup, product_url: str) -> tuple: + """모든 데이터 추출을 한 번에 처리 (동기 함수)""" + # 기본 정보 추출 + title = self._extract_title(soup) + price = self._extract_price(soup) + rating = self._extract_rating(soup) + options = self._extract_options(soup) + material_info = self._extract_material_info(soup) + + # 이미지 정보 추출 + logger.info("이미지 정보 추출 중...") + page_images = self._extract_images(soup) + option_images = [opt["image_url"] for opt in options if opt.get("image_url")] + # 중복 제거 후 합치기 + all_images = list(set(page_images + option_images)) + + return title, price, rating, options, material_info, all_images + def _extract_title(self, soup: BeautifulSoup) -> str: title_element = soup.find("h1", {"id": "kakaotitle"}) title = title_element.get_text(strip=True) if title_element else "제목 없음" diff --git a/apps/pre-processing-service/app/service/product_selection_service.py b/apps/pre-processing-service/app/service/product_selection_service.py index 723bd940..590bf15e 100644 --- a/apps/pre-processing-service/app/service/product_selection_service.py +++ b/apps/pre-processing-service/app/service/product_selection_service.py @@ -1,7 +1,7 @@ import json from typing import List, Dict from loguru import logger -from app.model.schemas import RequestProductSelect +from app.model.schemas import RequestProductSelect, ProductSelectData from app.utils.response import Response from app.db.mariadb_manager import MariadbManager @@ -25,8 +25,15 @@ def select_product_for_content(self, request: RequestProductSelect) -> dict: if not db_products: logger.warning(f"DB에서 상품을 찾을 수 없음: task_run_id={task_run_id}") + # Pydantic Generic Response 구조에 맞춰 data에 항상 객체를 넣음 + data = ProductSelectData( + task_run_id=task_run_id, + selected_product={}, # 상품 없음 + total_available_products=0, + ) return Response.error( - "상품 데이터를 찾을 수 없습니다.", "PRODUCTS_NOT_FOUND" + message="상품 데이터를 찾을 수 없습니다.", + data=data.dict(), ) # 2. 최적 상품 선택 @@ -37,14 +44,16 @@ def select_product_for_content(self, request: RequestProductSelect) -> dict: f"selection_reason={selected_product['selection_reason']}" ) - data = { - "task_run_id": task_run_id, - "selected_product": selected_product, - "total_available_products": len(db_products), - } + # 응답용 데이터 구성 + data = ProductSelectData( + task_run_id=task_run_id, + selected_product=selected_product, + total_available_products=len(db_products), + ) return Response.ok( - data, f"콘텐츠용 상품 선택 완료: {selected_product['name']}" + data=data.dict(), + message=f"콘텐츠용 상품 선택 완료: {selected_product['name']}", ) except Exception as e: @@ -63,7 +72,7 @@ def _fetch_products_from_db(self, task_run_id: int) -> List[Dict]: WHERE task_run_id = %s AND io_type = 'OUTPUT' AND data_type = 'JSON' - ORDER BY name \ + ORDER BY name """ with self.db_manager.get_cursor() as cursor: @@ -73,12 +82,8 @@ def _fetch_products_from_db(self, task_run_id: int) -> List[Dict]: products = [] for row in rows: try: - # MariaDB에서 반환되는 row는 튜플 형태 id, name, data_value_str, created_at = row - - # JSON 데이터 파싱 data_value = json.loads(data_value_str) - products.append( { "id": id, @@ -111,13 +116,12 @@ def _select_best_product(self, db_products: List[Dict]) -> Dict: try: successful_products = [] - # 1순위: S3 업로드 성공하고 이미지가 있는 상품들 + # 1순위: S3 업로드 성공하고 이미지가 있는 상품 for product in db_products: data_value = product.get("data_value", {}) product_detail = data_value.get("product_detail", {}) product_images = product_detail.get("product_images", []) - # 크롤링 성공하고 이미지가 있는 상품 if ( data_value.get("status") == "success" and product_detail @@ -132,14 +136,11 @@ def _select_best_product(self, db_products: List[Dict]) -> Dict: ) if successful_products: - # 이미지 개수가 가장 많은 상품 선택 best_product = max(successful_products, key=lambda x: x["image_count"]) - logger.info( f"1순위 선택: name={best_product['product']['name']}, " f"images={best_product['image_count']}개" ) - return { "selection_reason": "s3_upload_success_with_most_images", "name": best_product["product"]["name"], @@ -148,7 +149,7 @@ def _select_best_product(self, db_products: List[Dict]) -> Dict: "title": best_product["title"], } - # 2순위: 크롤링 성공한 첫 번째 상품 (이미지 없어도) + # 2순위: 크롤링 성공한 첫 번째 상품 for product in db_products: data_value = product.get("data_value", {}) if data_value.get("status") == "success" and data_value.get( @@ -156,7 +157,6 @@ def _select_best_product(self, db_products: List[Dict]) -> Dict: ): product_detail = data_value.get("product_detail", {}) logger.info(f"2순위 선택: name={product['name']}") - return { "selection_reason": "first_crawl_success", "name": product["name"], @@ -170,9 +170,7 @@ def _select_best_product(self, db_products: List[Dict]) -> Dict: first_product = db_products[0] data_value = first_product.get("data_value", {}) product_detail = data_value.get("product_detail", {}) - logger.warning(f"3순위 fallback 선택: name={first_product['name']}") - return { "selection_reason": "fallback_first_product", "name": first_product["name"], diff --git a/apps/pre-processing-service/app/service/s3_upload_service.py b/apps/pre-processing-service/app/service/s3_upload_service.py index c804a201..725db0ec 100644 --- a/apps/pre-processing-service/app/service/s3_upload_service.py +++ b/apps/pre-processing-service/app/service/s3_upload_service.py @@ -20,7 +20,9 @@ def __init__(self): self.s3_util = S3UploadUtil() self.db_manager = MariadbManager() - async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: + async def upload_crawled_products_to_s3( + self, request: RequestS3Upload, max_concurrent: int = 5 + ) -> dict: """ 크롤링된 상품들의 이미지와 데이터를 S3에 업로드하고 DB에 저장하는 비즈니스 로직 (6단계) """ @@ -31,11 +33,24 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: # task_run_id는 자바 워크플로우에서 전달받음 task_run_id = getattr(request, "task_run_id", None) if not task_run_id: - # 임시: task_run_id가 없으면 생성 - task_run_id = int(time.time() * 1000) - logger.warning(f"task_run_id가 없어서 임시로 생성: {task_run_id}") - else: - logger.info(f"자바 워크플로우에서 전달받은 task_run_id: {task_run_id}") + # 자바에서 TaskRun을 만들었으므로 없으면 에러 + logger.error("task_run_id가 없어서 파이썬에서 실행 불가") + return Response.error( + data={ + "upload_results": [], + "db_save_results": [], + "task_run_id": None, + "summary": { + "total_products": 0, + "total_success_images": 0, + "total_fail_images": 0, + "db_success_count": 0, + "db_fail_count": 0, + }, + "uploaded_at": time.strftime("%Y-%m-%d %H:%M:%S"), + }, + message="task_run_id is required from Java workflow", + ) logger.info( f"S3 업로드 + DB 저장 서비스 시작: keyword='{keyword}', " @@ -49,31 +64,38 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: try: # HTTP 세션을 사용한 이미지 다운로드 - ssl_context = ssl.create_default_context(cafile=certifi.where()) connector = aiohttp.TCPConnector(ssl=ssl_context) async with aiohttp.ClientSession(connector=connector) as session: + # 세마포어로 동시 실행 수 제한 + semaphore = asyncio.Semaphore(max_concurrent) - # 각 상품별로 순차 업로드 + # 모든 업로드 태스크를 동시에 실행 + tasks = [] for product_info in crawled_products: - product_index = product_info.get("index", 0) - product_detail = product_info.get("product_detail") - - logger.info( - f"상품 {product_index}/{len(crawled_products)} S3 업로드 + DB 저장 시작" + task = self._upload_single_product_with_semaphore( + semaphore, + session, + product_info, + keyword, + base_folder, + task_run_id, ) + tasks.append(task) - # 크롤링 실패한 상품은 스킵 - if not product_detail or product_info.get("status") != "success": - logger.warning( - f"상품 {product_index}: 크롤링 실패로 인한 업로드 스킵" - ) + # 모든 태스크 동시 실행 및 결과 수집 + results = await asyncio.gather(*tasks, return_exceptions=True) + + # 결과 정리 + for result in results: + if isinstance(result, Exception): + logger.error(f"업로드 태스크 오류: {result}") upload_results.append( { - "product_index": product_index, + "product_index": len(upload_results) + 1, "product_title": "Unknown", - "status": "skipped", + "status": "error", "folder_s3_url": None, "uploaded_images": [], "success_count": 0, @@ -82,95 +104,152 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict: ) db_save_results.append( { - "product_index": product_index, - "db_status": "skipped", - "error": "크롤링 실패", + "product_index": len(db_save_results) + 1, + "db_status": "error", + "error": str(result), } ) - continue - - try: - # 1. 상품 이미지 + 데이터 S3 업로드 - upload_result = await self.s3_util.upload_single_product_images( - session, - product_info, - product_index, - keyword, - base_folder, - ) - + else: + upload_result, db_result = result upload_results.append(upload_result) - total_success_images += upload_result["success_count"] - total_fail_images += upload_result["fail_count"] - - # 2. DB에 상품 데이터 저장 - db_result = self._save_product_to_db( - task_run_id, keyword, product_index, product_info - ) db_save_results.append(db_result) - logger.success( - f"상품 {product_index} S3 업로드 + DB 저장 완료: " - f"이미지 성공 {upload_result['success_count']}개, DB {db_result['db_status']}" - ) - - except Exception as e: - logger.error( - f"상품 {product_index} S3 업로드/DB 저장 오류: {e}" - ) - upload_results.append( - { - "product_index": product_index, - "product_title": product_detail.get("title", "Unknown"), - "status": "error", - "folder_s3_url": None, - "uploaded_images": [], - "success_count": 0, - "fail_count": 0, - } - ) - db_save_results.append( - { - "product_index": product_index, - "db_status": "error", - "error": str(e), - } - ) + total_success_images += upload_result["success_count"] + total_fail_images += upload_result["fail_count"] - # 상품간 간격 (서버 부하 방지) - if product_index < len(crawled_products): - await asyncio.sleep(1) + # 인덱스 순으로 정렬 + upload_results.sort(key=lambda x: x["product_index"]) + db_save_results.sort(key=lambda x: x["product_index"]) logger.success( f"S3 업로드 + DB 저장 서비스 완료: 총 성공 이미지 {total_success_images}개, " f"총 실패 이미지 {total_fail_images}개" ) - # 응답 데이터 구성 - data = { - "upload_results": upload_results, - "db_save_results": db_save_results, - "task_run_id": task_run_id, - "summary": { - "total_products": len(crawled_products), - "total_success_images": total_success_images, - "total_fail_images": total_fail_images, - "db_success_count": len( - [r for r in db_save_results if r.get("db_status") == "success"] - ), - "db_fail_count": len( - [r for r in db_save_results if r.get("db_status") == "error"] - ), + # Response.ok 사용하여 올바른 스키마로 응답 + return Response.ok( + data={ + "upload_results": upload_results, + "db_save_results": db_save_results, + "task_run_id": task_run_id, + "summary": { + "total_products": len(crawled_products), + "total_success_images": total_success_images, + "total_fail_images": total_fail_images, + "db_success_count": len( + [ + r + for r in db_save_results + if r.get("db_status") == "success" + ] + ), + "db_fail_count": len( + [ + r + for r in db_save_results + if r.get("db_status") == "error" + ] + ), + }, + "uploaded_at": time.strftime("%Y-%m-%d %H:%M:%S"), }, - "uploaded_at": time.strftime("%Y-%m-%d %H:%M:%S"), - } - - message = f"S3 업로드 + DB 저장 완료: {total_success_images}개 이미지 성공, {len([r for r in db_save_results if r.get('db_status') == 'success'])}개 상품 DB 저장 성공" - return Response.ok(data, message) + message=f"S3 업로드 + DB 저장 완료: 총 성공 이미지 {total_success_images}개, 총 실패 이미지 {total_fail_images}개", + ) except Exception as e: logger.error(f"S3 업로드 + DB 저장 서비스 전체 오류: {e}") - raise InvalidItemDataException() + # Response.error 사용하여 에러도 올바른 스키마로 응답 + return Response.error( + data={ + "upload_results": [], + "db_save_results": [], + "task_run_id": task_run_id, + "summary": { + "total_products": 0, + "total_success_images": 0, + "total_fail_images": 0, + "db_success_count": 0, + "db_fail_count": 0, + }, + "uploaded_at": time.strftime("%Y-%m-%d %H:%M:%S"), + }, + message=f"S3 업로드 서비스 오류: {str(e)}", + ) + + async def _upload_single_product_with_semaphore( + self, + semaphore: asyncio.Semaphore, + session: aiohttp.ClientSession, + product_info: Dict, + keyword: str, + base_folder: str, + task_run_id: int, + ) -> tuple: + """세마포어를 사용한 단일 상품 업로드 + DB 저장""" + async with semaphore: + product_index = product_info.get("index", 0) + product_detail = product_info.get("product_detail") + + logger.info(f"상품 {product_index} S3 업로드 + DB 저장 시작") + + # 크롤링 실패한 상품은 스킵 + if not product_detail or product_info.get("status") != "success": + logger.warning(f"상품 {product_index}: 크롤링 실패로 인한 업로드 스킵") + upload_result = { + "product_index": product_index, + "product_title": "Unknown", + "status": "skipped", + "folder_s3_url": None, + "uploaded_images": [], + "success_count": 0, + "fail_count": 0, + } + db_result = { + "product_index": product_index, + "db_status": "skipped", + "error": "크롤링 실패", + } + return upload_result, db_result + + try: + # S3 업로드와 DB 저장을 동시에 실행 + upload_task = self.s3_util.upload_single_product_images( + session, product_info, product_index, keyword, base_folder + ) + db_task = asyncio.to_thread( + self._save_product_to_db, + task_run_id, + keyword, + product_index, + product_info, + ) + + upload_result, db_result = await asyncio.gather(upload_task, db_task) + + logger.success( + f"상품 {product_index} S3 업로드 + DB 저장 완료: " + f"이미지 성공 {upload_result['success_count']}개, DB {db_result['db_status']}" + ) + + return upload_result, db_result + + except Exception as e: + logger.error(f"상품 {product_index} S3 업로드/DB 저장 오류: {e}") + upload_result = { + "product_index": product_index, + "product_title": product_detail.get("title", "Unknown"), + "status": "error", + "folder_s3_url": None, + "uploaded_images": [], + "success_count": 0, + "fail_count": 0, + } + db_result = { + "product_index": product_index, + "db_status": "error", + "error": str(e), + } + return upload_result, db_result def _save_product_to_db( self, task_run_id: int, keyword: str, product_index: int, product_info: Dict @@ -192,8 +271,8 @@ def _save_product_to_db( with self.db_manager.get_cursor() as cursor: sql = """ INSERT INTO task_io_data - (task_run_id, io_type, name, data_type, data_value, created_at) - VALUES (%s, %s, %s, %s, %s, %s) \ + (task_run_id, io_type, name, data_type, data_value, created_at) + VALUES (%s, %s, %s, %s, %s, %s) """ cursor.execute( diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java index 17934012..a8a885ed 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/fastapi/body/ProductSelectBodyBuilder.java @@ -28,11 +28,6 @@ public boolean supports(String taskName) { public ObjectNode build(Task task, Map workflowContext) { ObjectNode body = objectMapper.createObjectNode(); - // task_run_id는 현재 실행 중인 task의 run_id를 사용 - // 실제 구현에서는 Task 객체나 워크플로우 컨텍스트에서 가져와야 할 수 있습니다. - body.put("task_run_id", task.getId()); // Task 객체에서 ID를 가져오는 방식으로 가정 - - // 기본 선택 기준 설정 (이미지 개수 우선) body.put("selection_criteria", "image_count_priority"); return body; diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index 3fa71524..afc4f555 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -128,6 +128,7 @@ private boolean executeTasksForJob( workflowLogger.info( "Job (JobRunId={}) 내 총 {}개의 Task를 순차 실행합니다.", jobRun.getId(), taskDtos.size()); boolean hasAnyTaskFailed = false; + Long s3UploadTaskRunId = null; // S3 업로드 태스크의 task_run_id 저장용 for (TaskDto taskDto : taskDtos) { try { @@ -146,6 +147,19 @@ private boolean executeTasksForJob( .map(builder -> builder.build(task, workflowContext)) .orElse(objectMapper.createObjectNode()); + if ("S3 업로드 태스크".equals(task.getName())) { + requestBody.put("task_run_id", taskRun.getId()); + s3UploadTaskRunId = taskRun.getId(); // S3 업로드의 task_run_id 저장 + } else if ("상품 선택 태스크".equals(task.getName())) { + // S3 업로드에서 사용한 task_run_id를 사용 + if (s3UploadTaskRunId != null) { + requestBody.put("task_run_id", s3UploadTaskRunId); + } else { + workflowLogger.error("S3 업로드 태스크가 먼저 실행되지 않아 task_run_id를 찾을 수 없습니다."); + // 또는 이전 Job에서 S3 업로드를 찾는 로직 추가 가능 + } + } + TaskRunner.TaskExecutionResult result = taskExecutionService.executeWithRetry(task, taskRun, requestBody); taskRun.finish(result.status(), result.message()); diff --git a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql index 9238b8a2..379140b5 100644 --- a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql +++ b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql @@ -16,7 +16,7 @@ DELETE FROM `workflow`; -- 워크플로우 생성 (ID: 1) INSERT INTO `workflow` (`id`, `name`, `description`, `created_by`, `default_config`) VALUES (1, '상품 분석 및 블로그 자동 발행', '키워드 검색부터 상품 분석 후 블로그 발행까지의 자동화 프로세스', 1, - JSON_OBJECT('1',json_object('tag','naver'),'8',json_object('tag','naver_blog','blog_id', 'wtecho331', 'blog_pw', 'testpass'))) + JSON_OBJECT('1',json_object('tag','naver'),'9',json_object('tag','blogger','blog_id', '', 'blog_pw', ''))) ON DUPLICATE KEY UPDATE name = VALUES(name), description = VALUES(description), @@ -27,7 +27,7 @@ INSERT INTO `job` (`id`, `name`, `description`, `created_by`) VALUES (2, '블로그 콘텐츠 생성', '분석 데이터를 기반으로 RAG 콘텐츠 생성 및 발행 작업', 1) ON DUPLICATE KEY UPDATE name = VALUES(name), description = VALUES(description), updated_at = NOW(); --- Task 생성 (ID: 1 ~ 7) - FastAPI Request Body 스키마 반영 +-- Task 생성 (ID: 1 ~ 9) INSERT INTO `task` (`id`, `name`, `type`, `parameters`) VALUES (1, '키워드 검색 태스크', 'FastAPI', JSON_OBJECT( 'endpoint', '/keywords/search', 'method', 'POST', @@ -56,7 +56,6 @@ INSERT INTO `task` (`id`, `name`, `type`, `parameters`) VALUES 'endpoint', '/products/crawl', 'method', 'POST', 'body', JSON_OBJECT('product_urls', 'List') -- { "product_urls": List[str] } 수정됨 )), - -- 🆕 S3 업로드 태스크 추가 (6, 'S3 업로드 태스크', 'FastAPI', JSON_OBJECT( 'endpoint', '/products/s3-upload', 'method', 'POST', 'body', JSON_OBJECT( -- { keyword: str, crawled_products: List, base_folder: str } @@ -65,9 +64,16 @@ INSERT INTO `task` (`id`, `name`, `type`, `parameters`) VALUES 'base_folder', 'String' ) )), + (7, '상품 선택 태스크', 'FastAPI', JSON_OBJECT( + 'endpoint', '/products/select', 'method', 'POST', + 'body', JSON_OBJECT( -- { task_run_id: int, selection_criteria: str } + 'task_run_id', 'Integer', + 'selection_criteria', 'String' + ) + )), -- RAG관련 request body는 추후에 결정될 예정 - (7, '블로그 RAG 생성 태스크', 'FastAPI', JSON_OBJECT('endpoint', '/blogs/rag/create', 'method', 'POST')), - (8, '블로그 발행 태스크', 'FastAPI', JSON_OBJECT( + (8, '블로그 RAG 생성 태스크', 'FastAPI', JSON_OBJECT('endpoint', '/blogs/rag/create', 'method', 'POST')), + (9, '블로그 발행 태스크', 'FastAPI', JSON_OBJECT( 'endpoint', '/blogs/publish', 'method', 'POST', 'body', JSON_OBJECT( -- { tag: str, blog_id: str, ... } 'tag', 'String', @@ -92,9 +98,10 @@ INSERT INTO `workflow_job` (`workflow_id`, `job_id`, `execution_order`) VALUES -- Job-Task 연결 INSERT INTO `job_task` (`job_id`, `task_id`, `execution_order`) VALUES - -- Job 1: 상품 분석 (키워드검색 → 상품검색 → 매칭 → 유사도 → 크롤링 → S3업로드) - (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5), (1, 6, 6), - (2, 7, 1), (2, 8, 2) + -- Job 1: 상품 분석 (키워드검색 → 상품검색 → 매칭 → 유사도 → 크롤링 → S3업로드 → 상품선택) + (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5), (1, 6, 6), (1, 7, 7), + -- Job 2: 블로그 콘텐츠 생성 (RAG생성 → 발행) + (2, 8, 1), (2, 9, 2) ON DUPLICATE KEY UPDATE execution_order = VALUES(execution_order); -- 스케줄 설정 (매일 오전 8시)