Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 50 additions & 17 deletions challenge_api/extensions/kafka/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@
from challenge_api.exceptions.kafka_exceptions import QueueProcessingError
from challenge_api.db.repository import UserChallengesRepository, UserChallengeStatusRepository
from challenge_api.objects.challenge_info import ChallengeInfo
from sqlalchemy.exc import SQLAlchemyError

logger = logging.getLogger(__name__)

class MessageHandler:
VALID_STATUSES = {'Pending', 'Running', 'Deleted', 'Error'}
VALID_STATUS_TRANSITIONS = {
'Pending': {'Running', 'Error', 'Deleted'},
'Running': {'Error', 'Deleted'},
'Error': {'Running', 'Deleted'},
'Deleted': set() # Deleted is a terminal state
}

@staticmethod
def validate_message(message: Dict[str, Any]) -> tuple[str, str, str, str, str]:
Expand Down Expand Up @@ -41,6 +48,22 @@ def validate_message(message: Dict[str, Any]) -> tuple[str, str, str, str, str]:

return user_id, problem_id, new_status, timestamp, endpoint

@staticmethod
def validate_status_transition(current_status: str, new_status: str) -> bool:
"""
상태 전이가 유효한지 검증

Args:
current_status (str): 현재 상태
new_status (str): 새로운 상태

Returns:
bool: 유효한 상태 전이인지 여부
"""
if not current_status:
return True # 첫 상태는 항상 유효
return new_status in MessageHandler.VALID_STATUS_TRANSITIONS.get(current_status, set())

@staticmethod
def handle_message(message: Dict[str, Any]):
"""
Expand All @@ -58,31 +81,41 @@ def handle_message(message: Dict[str, Any]):
userchallenge_repo = UserChallengesRepository()
status_repo = UserChallengeStatusRepository()

if userchallenge_repo.is_exist(challenge_info):
userchallenge = userchallenge_repo.get_by_user_challenge_name(challenge_name)
if not userchallenge:
logger.warning(f"Challenge {challenge_name} exists but could not be retrieved")
return

recent_status = status_repo.get_recent_status(userchallenge.idx)
if not recent_status:
logger.warning(f"No status found for challenge {challenge_name}, creating new status")
recent_status = status_repo.create(userchallenge_idx=userchallenge.idx, port=0)
if not userchallenge_repo.is_exist(challenge_info):
logger.warning(f"Challenge {challenge_name} does not exist")
return

try:
port = int(endpoint) if endpoint else 0
except (ValueError, TypeError):
logger.warning(f"Invalid endpoint value: {endpoint}, using 0 as default")
port = 0
userchallenge = userchallenge_repo.get_by_user_challenge_name(challenge_name)
if not userchallenge:
logger.warning(f"Challenge {challenge_name} exists but could not be retrieved")
return

recent_status = status_repo.get_recent_status(userchallenge.idx)
if not recent_status:
logger.warning(f"No status found for challenge {challenge_name}, creating new status")
recent_status = status_repo.create(userchallenge_idx=userchallenge.idx, port=0)

# 상태 전이 검증
if not MessageHandler.validate_status_transition(recent_status.status, new_status):
logger.warning(f"Invalid status transition from {recent_status.status} to {new_status}")
return

try:
port = int(endpoint) if endpoint else 0
except (ValueError, TypeError):
logger.warning(f"Invalid endpoint value: {endpoint}, using 0 as default")
port = 0

try:
if new_status == 'Running' and endpoint:
# Running 상태이고 endpoint가 있으면 포트 업데이트
status_repo.update_port(recent_status.idx, port)

status_repo.update_status(recent_status.idx, new_status)
logger.info(f"Updated status for challenge {challenge_name} to {new_status} with endpoint {endpoint}")
else:
logger.warning(f"Challenge {challenge_name} does not exist")
except SQLAlchemyError as e:
logger.error(f"Database error while updating status: {str(e)}")
raise QueueProcessingError(error_msg=f"Database error: {str(e)}")

except ValueError as e:
raise QueueProcessingError(error_msg=f"Invalid message format {str(e)}") from e
Expand Down