diff --git a/__pycache__/app.cpython-310.pyc b/__pycache__/app.cpython-310.pyc deleted file mode 100644 index a8240b7..0000000 Binary files a/__pycache__/app.cpython-310.pyc and /dev/null differ diff --git a/challenge_api/api/__pycache__/__init__.cpython-310.pyc b/challenge_api/api/__pycache__/__init__.cpython-310.pyc deleted file mode 100644 index ace4f9b..0000000 Binary files a/challenge_api/api/__pycache__/__init__.cpython-310.pyc and /dev/null differ diff --git a/challenge_api/api/__pycache__/__init__.cpython-313.pyc b/challenge_api/api/__pycache__/__init__.cpython-313.pyc deleted file mode 100644 index c4ae5ca..0000000 Binary files a/challenge_api/api/__pycache__/__init__.cpython-313.pyc and /dev/null differ diff --git a/challenge_api/api/__pycache__/challenge_api.cpython-310.pyc b/challenge_api/api/__pycache__/challenge_api.cpython-310.pyc deleted file mode 100644 index 2026121..0000000 Binary files a/challenge_api/api/__pycache__/challenge_api.cpython-310.pyc and /dev/null differ diff --git a/challenge_api/api/__pycache__/challenge_api.cpython-313.pyc b/challenge_api/api/__pycache__/challenge_api.cpython-313.pyc deleted file mode 100644 index 8a34331..0000000 Binary files a/challenge_api/api/__pycache__/challenge_api.cpython-313.pyc and /dev/null differ diff --git a/challenge_api/config.py b/challenge_api/config.py index 6c9d020..7e65661 100644 --- a/challenge_api/config.py +++ b/challenge_api/config.py @@ -9,7 +9,6 @@ class Config: - 데이터베이스 연결 설정 - SQLAlchemy ORM 설정 - 보안 관련 설정 - - Kafka 메시징 설정 - Loki 로깅 설정 """ @@ -50,25 +49,6 @@ class Config: SESSION_COOKIE_HTTPONLY = True # JavaScript에서 세션 쿠키 접근 방지 SESSION_COOKIE_SAMESITE = 'Lax' # CSRF 공격 방지를 위한 SameSite 설정 - # ========================================================================= - # Kafka 메시징 설정 - # ========================================================================= - # Kafka 브로커 및 토픽 설정 - KAFKA_BOOTSTRAP_SERVERS = os.getenv( - 'KAFKA_BOOTSTRAP_SERVERS', - 'localhost:9093' - ) # Kafka 브로커 주소 - - KAFKA_TOPIC = os.getenv( - 'KAFKA_TOPIC', - 'challenge-status' - ) # 메시지 토픽명 - - KAFKA_GROUP_ID = os.getenv( - 'KAFKA_GROUP_ID', - 'challenge-consumer-group' - ) # 컨슈머 그룹 ID - # ========================================================================= # Loki 로깅 설정 # ========================================================================= diff --git a/challenge_api/db/__pycache__/__init__.cpython-310.pyc b/challenge_api/db/__pycache__/__init__.cpython-310.pyc deleted file mode 100644 index da5b04d..0000000 Binary files a/challenge_api/db/__pycache__/__init__.cpython-310.pyc and /dev/null differ diff --git a/challenge_api/db/__pycache__/__init__.cpython-312.pyc b/challenge_api/db/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index 93b2e39..0000000 Binary files a/challenge_api/db/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/challenge_api/db/__pycache__/__init__.cpython-313.pyc b/challenge_api/db/__pycache__/__init__.cpython-313.pyc deleted file mode 100644 index 9fc4a1f..0000000 Binary files a/challenge_api/db/__pycache__/__init__.cpython-313.pyc and /dev/null differ diff --git a/challenge_api/db/__pycache__/config.cpython-310.pyc b/challenge_api/db/__pycache__/config.cpython-310.pyc deleted file mode 100644 index 0bbe0ff..0000000 Binary files a/challenge_api/db/__pycache__/config.cpython-310.pyc and /dev/null differ diff --git a/challenge_api/db/__pycache__/config.cpython-312.pyc b/challenge_api/db/__pycache__/config.cpython-312.pyc deleted file mode 100644 index 045da0e..0000000 Binary files a/challenge_api/db/__pycache__/config.cpython-312.pyc and /dev/null differ diff --git a/challenge_api/db/__pycache__/config.cpython-313.pyc b/challenge_api/db/__pycache__/config.cpython-313.pyc deleted file mode 100644 index 4bd749f..0000000 Binary files a/challenge_api/db/__pycache__/config.cpython-313.pyc and /dev/null differ diff --git a/challenge_api/db/__pycache__/db_manager.cpython-312.pyc b/challenge_api/db/__pycache__/db_manager.cpython-312.pyc deleted file mode 100644 index 27118f9..0000000 Binary files a/challenge_api/db/__pycache__/db_manager.cpython-312.pyc and /dev/null differ diff --git a/challenge_api/db/__pycache__/models.cpython-310.pyc b/challenge_api/db/__pycache__/models.cpython-310.pyc deleted file mode 100644 index 9265f33..0000000 Binary files a/challenge_api/db/__pycache__/models.cpython-310.pyc and /dev/null differ diff --git a/challenge_api/db/__pycache__/models.cpython-312.pyc b/challenge_api/db/__pycache__/models.cpython-312.pyc deleted file mode 100644 index 167d148..0000000 Binary files a/challenge_api/db/__pycache__/models.cpython-312.pyc and /dev/null differ diff --git a/challenge_api/db/__pycache__/models.cpython-313.pyc b/challenge_api/db/__pycache__/models.cpython-313.pyc deleted file mode 100644 index 393d9fa..0000000 Binary files a/challenge_api/db/__pycache__/models.cpython-313.pyc and /dev/null differ diff --git a/challenge_api/db/__pycache__/repository.cpython-310.pyc b/challenge_api/db/__pycache__/repository.cpython-310.pyc deleted file mode 100644 index fdc8e49..0000000 Binary files a/challenge_api/db/__pycache__/repository.cpython-310.pyc and /dev/null differ diff --git a/challenge_api/extensions/__init__.py b/challenge_api/extensions/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/challenge_api/extensions/__pycache__/__init__.cpython-310.pyc b/challenge_api/extensions/__pycache__/__init__.cpython-310.pyc deleted file mode 100644 index 9a7355f..0000000 Binary files a/challenge_api/extensions/__pycache__/__init__.cpython-310.pyc and /dev/null differ diff --git a/challenge_api/extensions/__pycache__/__init__.cpython-312.pyc b/challenge_api/extensions/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index be81a5b..0000000 Binary files a/challenge_api/extensions/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/challenge_api/extensions/kafka/__init__.py b/challenge_api/extensions/kafka/__init__.py deleted file mode 100644 index 3b5dfc2..0000000 --- a/challenge_api/extensions/kafka/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -__version__ = '1.0.0' - -__all__ = [ - 'KafkaEventConsumer', - 'KafkaConfig', -] - -from challenge_api.extensions.kafka.config import KafkaConfig -from challenge_api.extensions.kafka.consumer import KafkaEventConsumer diff --git a/challenge_api/extensions/kafka/__pycache__/__init__.cpython-310.pyc b/challenge_api/extensions/kafka/__pycache__/__init__.cpython-310.pyc deleted file mode 100644 index 86eb9b2..0000000 Binary files a/challenge_api/extensions/kafka/__pycache__/__init__.cpython-310.pyc and /dev/null differ diff --git a/challenge_api/extensions/kafka/__pycache__/__init__.cpython-312.pyc b/challenge_api/extensions/kafka/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index f03836a..0000000 Binary files a/challenge_api/extensions/kafka/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/challenge_api/extensions/kafka/__pycache__/config.cpython-310.pyc b/challenge_api/extensions/kafka/__pycache__/config.cpython-310.pyc deleted file mode 100644 index 6962c1f..0000000 Binary files a/challenge_api/extensions/kafka/__pycache__/config.cpython-310.pyc and /dev/null differ diff --git a/challenge_api/extensions/kafka/__pycache__/config.cpython-312.pyc b/challenge_api/extensions/kafka/__pycache__/config.cpython-312.pyc deleted file mode 100644 index 3d148d5..0000000 Binary files a/challenge_api/extensions/kafka/__pycache__/config.cpython-312.pyc and /dev/null differ diff --git a/challenge_api/extensions/kafka/__pycache__/consumer.cpython-310.pyc b/challenge_api/extensions/kafka/__pycache__/consumer.cpython-310.pyc deleted file mode 100644 index 9e9c201..0000000 Binary files a/challenge_api/extensions/kafka/__pycache__/consumer.cpython-310.pyc and /dev/null differ diff --git a/challenge_api/extensions/kafka/__pycache__/consumer.cpython-312.pyc b/challenge_api/extensions/kafka/__pycache__/consumer.cpython-312.pyc deleted file mode 100644 index 16deb19..0000000 Binary files a/challenge_api/extensions/kafka/__pycache__/consumer.cpython-312.pyc and /dev/null differ diff --git a/challenge_api/extensions/kafka/__pycache__/handler.cpython-310.pyc b/challenge_api/extensions/kafka/__pycache__/handler.cpython-310.pyc deleted file mode 100644 index dd47b5f..0000000 Binary files a/challenge_api/extensions/kafka/__pycache__/handler.cpython-310.pyc and /dev/null differ diff --git a/challenge_api/extensions/kafka/config.py b/challenge_api/extensions/kafka/config.py deleted file mode 100644 index afe4dc6..0000000 --- a/challenge_api/extensions/kafka/config.py +++ /dev/null @@ -1,33 +0,0 @@ -# Kaf -class KafkaConfig: - """Kafka 설정 클래스""" - def __init__( - self, - bootstrap_servers=None, # Kafka 서버 주소 리스트 - topic=None, # Kafka Topic 이름 - group_id=None, # Kafka Consumer Group ID - **kwargs - ): - """KafkaConfig 생성자""" - self.bootstrap_servers = bootstrap_servers or ['localhost:9093'] - self.topic = topic or 'challenge-status' - self.group_id = group_id or 'challenge-consumer-group' - self.additional_config = kwargs - - @property - def consumer_config(self): - """Kafka Consumer 설정을 반환 - - Returns: - bootstrap_servers (list): Kafka 서버 주소 리스트 - group_id (str): Kafka Consumer Group ID - auto_offset_reset (str): Consumer가 메세지를 읽을 위치 - enable_auto_commit (bool): 자동 커밋 여부 - """ - return { - 'bootstrap_servers': self.bootstrap_servers, # Kafka 서버 주소 리스트 - 'group_id': self.group_id, # Kafka Consumer Group ID - 'auto_offset_reset': 'earliest', # Consumer가 메세지를 읽을 위치, 데이터 손실 방지를 위해 earliest로 설정 - 'enable_auto_commit': True, # 자동 커밋 여부 - **self.additional_config - } \ No newline at end of file diff --git a/challenge_api/extensions/kafka/consumer.py b/challenge_api/extensions/kafka/consumer.py deleted file mode 100644 index 683d2ae..0000000 --- a/challenge_api/extensions/kafka/consumer.py +++ /dev/null @@ -1,106 +0,0 @@ -import datetime -import logging -from typing import Any, Dict -from kafka import KafkaConsumer -import json -# from challenge_api.exceptions.kafka_exceptions import QueueProcessingError - - -class StatusMessage: - """상태 메시지를 표현하는 클래스""" - def __init__(self, userId: str, problemId: str, newStatus: str, timestamp: str, endpoint: str = None): - # 메시지의 기본 속성들을 초기화 - self.userId = userId # 사용자 ID - self.problemId = problemId # 문제 ID - self.newStatus = newStatus # 새로운 상태 - self.timestamp = timestamp # 타임스탬프 - self.endpoint = endpoint # 엔드포인트 - - @classmethod - def from_json(cls, data: Dict[str, Any]) -> 'StatusMessage': - """ - JSON 데이터로부터 StatusMessage 객체를 생성하는 클래스 메서드 - - Args: - data: JSON 형식의 딕셔너리 데이터 - Returns: - StatusMessage 인스턴스 - """ - return cls( - userId=data['userId'], - problemId=data['problemId'], - newStatus=data['newStatus'], - timestamp=data['timestamp'], - endpoint=data.get('endpoint') # endpoint가 없을 수 있으므로 get 사용 - ) - - def __str__(self) -> str: - """객체를 문자열로 표현""" - return f"StatusMessage(userId={self.userId}, problemId={self.problemId}, newStatus={self.newStatus}, timestamp={self.timestamp}, endpoint={self.endpoint})" - -class KafkaEventConsumer: - """Kafka 이벤트 소비자 클래스""" - def __init__(self, config): - """ - Kafka 소비자 초기화 - - Args: - config (KafkaConfig): 설정 객체 - """ - self.config = config - self._consumer = None # 실제 Kafka 소비자 인스턴스 - - @property - def consumer(self): - """ - Kafka 소비자 인스턴스를 생성하고 반환하는 프로퍼티 - 지연 초기화(lazy initialization) 패턴 사용 - """ - if self._consumer is None: - try: - self._consumer = KafkaConsumer( - self.config.topic, - # 바이트 문자열을 JSON으로 자동 변환하는 deserializer 설정 - value_deserializer=lambda x: json.loads(x.decode('utf-8')), - **self.config.consumer_config - ) - except Exception as e: - # logger.error(f"Failed to create consumer: {e}") - raise QueueProcessingError(error_msg=f"Failed to create consumer: {e}") from e - return self._consumer - - def consume_events(self, callback): - """ - 이벤트를 소비하고 콜백 함수로 처리 - - Args: - callback: 각 메시지를 처리할 콜백 함수 - """ - try: - for message in self.consumer: - try: - # 메시지 파싱 - status_msg = StatusMessage.from_json(message.value) - callback(status_msg) - - except json.JSONDecodeError as e: - # JSON 디코딩 오류 처리 - # logger.error(f"Error decoding message: {e}") - continue - except KeyError as e: - # 필수 필드 누락 오류 처리 - # logger.error(f"Missing required field in message: {e}") - continue - except Exception as e: - # 기타 예외 처리 - # logger.error(f"Error processing message: {e}") - continue - - except Exception as e: - raise QueueProcessingError(error_msg=f"Kafka Error: {str(e)}") from e - - def close(self): - """Kafka 소비자 연결 종료""" - if self._consumer: - self._consumer.close() - self._consumer = None \ No newline at end of file diff --git a/challenge_api/extensions/kafka/handler.py b/challenge_api/extensions/kafka/handler.py deleted file mode 100644 index 170fb82..0000000 --- a/challenge_api/extensions/kafka/handler.py +++ /dev/null @@ -1,123 +0,0 @@ -import logging -from typing import Any, Dict -from challenge_api.exceptions.kafka_exceptions import QueueProcessingError -from challenge_api.db.repository import UserChallengesRepository, UserChallengeStatusRepository -from challenge_api.objects.challenge import ChallengeInfo -from sqlalchemy.exc import SQLAlchemyError - -logger = logging.getLogger(__name__) - -class MessageHandler: - VALID_STATUSES = {'Pending', 'Running', 'Deleted', 'Error'} - VALID_STATUS_TRANSITIONS = { - 'Pending': {'Running', 'Error', 'Deleted'}, - 'Running': {'Error', 'Deleted'}, - 'Error': {'Running', 'Deleted'}, - 'Deleted': set() # Deleted is a terminal state - } - - @staticmethod - def validate_message(message: Dict[str, Any]) -> tuple[str, str, str, str, str]: - """ - Kafka 메세지의 필수 필드를 검증하고 반환 - - Args: - message (Dict[str, Any]): Kafka 메세지 - - Returns: - tuple[str, str, str, str, str]: 사용자 이름, 챌린지 ID, 새로운 상태, 타임스탬프, 엔드포인트 - """ - try: - user_id = message.userId - problem_id = message.problemId - new_status = message.newStatus - endpoint = message.endpoint - timestamp = message.timestamp - except AttributeError: - user_id = message['userId'] - problem_id = message['problemId'] - new_status = message['newStatus'] - timestamp = message['timestamp'] - endpoint = message.get('endpoint') # endpoint가 없을 수 있으므로 get 사용 - - if not all([user_id, problem_id, new_status, timestamp]): - raise QueueProcessingError(error_msg=f"Kafka Error : Missing required fields in message: {message}") - - if new_status not in MessageHandler.VALID_STATUSES: - raise QueueProcessingError(error_msg=f"Kafka Error : Invalid status in message: {new_status}") - - return user_id, problem_id, new_status, timestamp, endpoint - - @staticmethod - def validate_status_transition(current_status: str, new_status: str) -> bool: - """ - 상태 전이가 유효한지 검증 - - Args: - current_status (str): 현재 상태 - new_status (str): 새로운 상태 - - Returns: - bool: 유효한 상태 전이인지 여부 - """ - if not current_status: - return True # 첫 상태는 항상 유효 - return new_status in MessageHandler.VALID_STATUS_TRANSITIONS.get(current_status, set()) - - @staticmethod - def handle_message(message: Dict[str, Any]): - """ - Consume한 Kafka message 내용을 DB에 반영 - - Args: - message: Kafka 메세지 - """ - try: - user_id, challenge_id, new_status, _, endpoint = MessageHandler.validate_message(message) - challenge_info = ChallengeInfo(challenge_id=int(challenge_id), user_id=int(user_id)) - challenge_name = challenge_info.name - - # 상태 정보 업데이트 - userchallenge_repo = UserChallengesRepository() - status_repo = UserChallengeStatusRepository() - - if not userchallenge_repo.is_exist(challenge_info): - logger.warning(f"Challenge {challenge_name} does not exist") - return - - userchallenge = userchallenge_repo.get_by_user_challenge_name(challenge_name) - if not userchallenge: - logger.warning(f"Challenge {challenge_name} exists but could not be retrieved") - return - - recent_status = status_repo.get_recent_status(userchallenge.idx) - if not recent_status: - logger.warning(f"No status found for challenge {challenge_name}, creating new status") - recent_status = status_repo.create(userchallenge_idx=userchallenge.idx, port=0) - - # 상태 전이 검증 - if not MessageHandler.validate_status_transition(recent_status.status, new_status): - logger.warning(f"Invalid status transition from {recent_status.status} to {new_status}") - return - - try: - port = int(endpoint) if endpoint else 0 - except (ValueError, TypeError): - logger.warning(f"Invalid endpoint value: {endpoint}, using 0 as default") - port = 0 - - try: - if new_status == 'Running' and endpoint: - # Running 상태이고 endpoint가 있으면 포트 업데이트 - status_repo.update_port(recent_status.idx, port) - - status_repo.update_status(recent_status.idx, new_status) - logger.info(f"Updated status for challenge {challenge_name} to {new_status} with endpoint {endpoint}") - except SQLAlchemyError as e: - logger.error(f"Database error while updating status: {str(e)}") - raise QueueProcessingError(error_msg=f"Database error: {str(e)}") - - except ValueError as e: - raise QueueProcessingError(error_msg=f"Invalid message format {str(e)}") from e - except Exception as e: - raise QueueProcessingError(error_msg=f"Kafka Error: {str(e)}") from e diff --git a/challenge_api/extensions_manager.py b/challenge_api/extensions_manager.py deleted file mode 100644 index 64babdb..0000000 --- a/challenge_api/extensions_manager.py +++ /dev/null @@ -1,87 +0,0 @@ -import logging -import sys -from threading import Lock, Thread, Event -from typing import Optional, Callable -from flask import Flask -from challenge_api.extensions.kafka import KafkaConfig, KafkaEventConsumer - -class FlaskKafkaConsumer: - """Flask 애플리케이션에서 Kafka 메시지 소비를 관리하는 클래스""" - def __init__(self): - self.consumer: Optional[KafkaEventConsumer] = None - self._consumer_thread: Optional[Thread] = None - self._running = Event() - self._lock = Lock() - self.app: Optional[Flask] = None - - def init_app(self, app: Flask) -> None: - """Flask 애플리케이션 초기화""" - with self._lock: - self.app = app - config = KafkaConfig( - bootstrap_servers=[app.config['KAFKA_BOOTSTRAP_SERVERS']], - topic=app.config['KAFKA_TOPIC'], - group_id=app.config['KAFKA_GROUP_ID'] - ) - self.consumer = KafkaEventConsumer(config) - - # teardown_appcontext 핸들러 등록 - app.teardown_appcontext(self.cleanup) - - def cleanup(self, exception=None): - """애플리케이션 컨텍스트 종료 시 정리""" - self.stop_consuming() - - def start_consuming(self, message_handler: Callable) -> None: - """Thread-safe하게 메시지 소비 시작""" - with self._lock: - if self._consumer_thread is not None: - print("Consumer thread already running", file=sys.stderr) - return - - self._running.set() - self._consumer_thread = Thread( - target=self._consume_messages, - args=(message_handler,), - daemon=True - ) - self._consumer_thread.start() - print("Kafka consumer thread started", file=sys.stderr) - - def stop_consuming(self) -> None: - """Thread-safe하게 메시지 소비 중지""" - with self._lock: - if not self._running.is_set(): - print("Consumer not running", file=sys.stderr) - return - - self._running.clear() - if self.consumer: - self.consumer.close() - - if self._consumer_thread: - self._consumer_thread.join(timeout=5.0) - if self._consumer_thread.is_alive(): - print("Consumer thread did not stop gracefully", file=sys.stderr) - self._consumer_thread = None - print("Kafka consumer stopped", file=sys.stderr) - - def _consume_messages(self, message_handler: Callable) -> None: - """Thread-safe한 메시지 소비 루프""" - with self.app.app_context(): - try: - while self._running.is_set(): - try: - self.consumer.consume_events(message_handler) - except Exception as e: - print(f"Error consuming messages: {e}", file=sys.stderr) - if self._running.is_set(): - print("Attempting to reconnect...", file=sys.stderr) - self._running.wait(timeout=5.0) - except Exception as e: - print(f"[ERROR] Fatal error in consumer thread: {e}", file=sys.stderr) - finally: - print("Consumer thread ending", file=sys.stderr) - -# 전역 인스턴스 생성 -kafka_consumer = FlaskKafkaConsumer() \ No newline at end of file diff --git a/challenge_api/factory.py b/challenge_api/factory.py index ebc3cf4..b4806ae 100644 --- a/challenge_api/factory.py +++ b/challenge_api/factory.py @@ -7,35 +7,21 @@ from challenge_api.exceptions.http import BaseHttpException from challenge_api.api.challenge_api import challenge_bp from challenge_api.config import Config -from challenge_api.extensions.kafka.handler import MessageHandler -from challenge_api.extensions_manager import kafka_consumer from challenge_api.db.db_manager import db from challenge_api.container import Container -def start_kafka_consumer(app): - """Start Kafka consumer in a separate thread""" - with app.app_context(): - kafka_consumer.start_consuming(MessageHandler.handle_message) - class FlaskApp: def __init__(self, config_class: Type[Config] = Config): self.app = Flask(__name__) self.app.config.from_object(config_class) - # 초기 설정 - self._init_kafka() self._init_db() self._register_error_handlers() self._setup_blueprints() self._inject() - def _init_kafka(self): - """Extensions 초기화""" - # Kafka 초기화 - kafka_consumer.init_app(self.app) - kafka_consumer.start_consuming(MessageHandler.handle_message) def _init_db(self): # DB 초기화 diff --git a/challenge_api/requirements.txt b/challenge_api/requirements.txt index 6588925..d75bb7d 100644 --- a/challenge_api/requirements.txt +++ b/challenge_api/requirements.txt @@ -4,7 +4,6 @@ SQLAlchemy==2.0.20 pymysql>=1.1.1 # mariadb>=1.0.11 kubernetes==26.1.0 -kafka-python==2.0.2 prometheus-client==0.19.0 python-logging-loki==0.3.1 flask-prometheus-metrics==1.0.0