From 0e66417cc95d4a20e692e37039960b139c062c5a Mon Sep 17 00:00:00 2001 From: S0okJu Date: Thu, 8 May 2025 23:39:44 +0900 Subject: [PATCH] =?UTF-8?q?[hotfix]=20kafka=20=EC=98=88=EC=99=B8=EC=B2=98?= =?UTF-8?q?=EB=A6=AC=20=EA=B3=A0=EB=8F=84=ED=99=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- challenge_api/extensions/kafka/handler.py | 67 +++++++++++++++++------ 1 file changed, 50 insertions(+), 17 deletions(-) diff --git a/challenge_api/extensions/kafka/handler.py b/challenge_api/extensions/kafka/handler.py index acf5a4f..f370340 100644 --- a/challenge_api/extensions/kafka/handler.py +++ b/challenge_api/extensions/kafka/handler.py @@ -3,11 +3,18 @@ from challenge_api.exceptions.kafka_exceptions import QueueProcessingError from challenge_api.db.repository import UserChallengesRepository, UserChallengeStatusRepository from challenge_api.objects.challenge_info import ChallengeInfo +from sqlalchemy.exc import SQLAlchemyError logger = logging.getLogger(__name__) class MessageHandler: VALID_STATUSES = {'Pending', 'Running', 'Deleted', 'Error'} + VALID_STATUS_TRANSITIONS = { + 'Pending': {'Running', 'Error', 'Deleted'}, + 'Running': {'Error', 'Deleted'}, + 'Error': {'Running', 'Deleted'}, + 'Deleted': set() # Deleted is a terminal state + } @staticmethod def validate_message(message: Dict[str, Any]) -> tuple[str, str, str, str, str]: @@ -41,6 +48,22 @@ def validate_message(message: Dict[str, Any]) -> tuple[str, str, str, str, str]: return user_id, problem_id, new_status, timestamp, endpoint + @staticmethod + def validate_status_transition(current_status: str, new_status: str) -> bool: + """ + 상태 전이가 유효한지 검증 + + Args: + current_status (str): 현재 상태 + new_status (str): 새로운 상태 + + Returns: + bool: 유효한 상태 전이인지 여부 + """ + if not current_status: + return True # 첫 상태는 항상 유효 + return new_status in MessageHandler.VALID_STATUS_TRANSITIONS.get(current_status, set()) + @staticmethod def handle_message(message: Dict[str, Any]): """ @@ -58,31 +81,41 @@ def handle_message(message: Dict[str, Any]): userchallenge_repo = UserChallengesRepository() status_repo = UserChallengeStatusRepository() - if userchallenge_repo.is_exist(challenge_info): - userchallenge = userchallenge_repo.get_by_user_challenge_name(challenge_name) - if not userchallenge: - logger.warning(f"Challenge {challenge_name} exists but could not be retrieved") - return - - recent_status = status_repo.get_recent_status(userchallenge.idx) - if not recent_status: - logger.warning(f"No status found for challenge {challenge_name}, creating new status") - recent_status = status_repo.create(userchallenge_idx=userchallenge.idx, port=0) + if not userchallenge_repo.is_exist(challenge_info): + logger.warning(f"Challenge {challenge_name} does not exist") + return - try: - port = int(endpoint) if endpoint else 0 - except (ValueError, TypeError): - logger.warning(f"Invalid endpoint value: {endpoint}, using 0 as default") - port = 0 + userchallenge = userchallenge_repo.get_by_user_challenge_name(challenge_name) + if not userchallenge: + logger.warning(f"Challenge {challenge_name} exists but could not be retrieved") + return + recent_status = status_repo.get_recent_status(userchallenge.idx) + if not recent_status: + logger.warning(f"No status found for challenge {challenge_name}, creating new status") + recent_status = status_repo.create(userchallenge_idx=userchallenge.idx, port=0) + + # 상태 전이 검증 + if not MessageHandler.validate_status_transition(recent_status.status, new_status): + logger.warning(f"Invalid status transition from {recent_status.status} to {new_status}") + return + + try: + port = int(endpoint) if endpoint else 0 + except (ValueError, TypeError): + logger.warning(f"Invalid endpoint value: {endpoint}, using 0 as default") + port = 0 + + try: if new_status == 'Running' and endpoint: # Running 상태이고 endpoint가 있으면 포트 업데이트 status_repo.update_port(recent_status.idx, port) status_repo.update_status(recent_status.idx, new_status) logger.info(f"Updated status for challenge {challenge_name} to {new_status} with endpoint {endpoint}") - else: - logger.warning(f"Challenge {challenge_name} does not exist") + except SQLAlchemyError as e: + logger.error(f"Database error while updating status: {str(e)}") + raise QueueProcessingError(error_msg=f"Database error: {str(e)}") except ValueError as e: raise QueueProcessingError(error_msg=f"Invalid message format {str(e)}") from e