From 98edd7a0e526a7d333c2deeef5442878f74ed729 Mon Sep 17 00:00:00 2001 From: S0okJu Date: Fri, 17 Jan 2025 20:45:48 +0900 Subject: [PATCH] =?UTF-8?q?[Fix]=20Kafka=20init=20logic=20=EC=88=98?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/extensions_manager.py | 39 +++++++++++++++++++++------------------ app/factory.py | 25 ++----------------------- 2 files changed, 23 insertions(+), 41 deletions(-) diff --git a/app/extensions_manager.py b/app/extensions_manager.py index 594d1a5..b37099d 100644 --- a/app/extensions_manager.py +++ b/app/extensions_manager.py @@ -2,9 +2,8 @@ import sys from threading import Lock, Thread, Event from typing import Optional, Callable -from flask import Flask, current_app +from flask import Flask, from flask_sqlalchemy import SQLAlchemy -from app.extensions.db.config import MariaDBConfig from app.extensions.kafka import KafkaConfig, KafkaEventConsumer class FlaskKafkaConsumer: @@ -15,7 +14,7 @@ def __init__(self): self._running = Event() self._lock = Lock() self.app: Optional[Flask] = None - + def init_app(self, app: Flask) -> None: """Flask 애플리케이션 초기화""" with self._lock: @@ -27,13 +26,20 @@ def init_app(self, app: Flask) -> None: ) self.consumer = KafkaEventConsumer(config) + # teardown_appcontext 핸들러 등록 + app.teardown_appcontext(self.cleanup) + + def cleanup(self, exception=None): + """애플리케이션 컨텍스트 종료 시 정리""" + self.stop_consuming() + def start_consuming(self, message_handler: Callable) -> None: """Thread-safe하게 메시지 소비 시작""" with self._lock: if self._consumer_thread is not None: - # logger.warning("Consumer thread already running") + print("Consumer thread already running", file=sys.stderr) return - + self._running.set() self._consumer_thread = Thread( target=self._consume_messages, @@ -41,27 +47,26 @@ def start_consuming(self, message_handler: Callable) -> None: daemon=True ) self._consumer_thread.start() - # logger.info("Kafka consumer thread started") - + print("Kafka consumer thread started", file=sys.stderr) + def stop_consuming(self) -> None: """Thread-safe하게 메시지 소비 중지""" with self._lock: if not self._running.is_set(): - # logger.warning("Consumer not running") + print("Consumer not running", file=sys.stderr) return - + self._running.clear() if self.consumer: self.consumer.close() - + if self._consumer_thread: self._consumer_thread.join(timeout=5.0) if self._consumer_thread.is_alive(): print("Consumer thread did not stop gracefully", file=sys.stderr) - # logger.warning("Consumer thread did not stop gracefully") self._consumer_thread = None - # logger.info("Kafka consumer stopped") - + print("Kafka consumer stopped", file=sys.stderr) + def _consume_messages(self, message_handler: Callable) -> None: """Thread-safe한 메시지 소비 루프""" with self.app.app_context(): @@ -70,17 +75,15 @@ def _consume_messages(self, message_handler: Callable) -> None: try: self.consumer.consume_events(message_handler) except Exception as e: - # logger.error(f"Error consuming messages: {e}") + print(f"Error consuming messages: {e}", file=sys.stderr) if self._running.is_set(): - # logger.info("Attempting to reconnect...") + print("Attempting to reconnect...", file=sys.stderr) self._running.wait(timeout=5.0) except Exception as e: print(f"[ERROR] Fatal error in consumer thread: {e}", file=sys.stderr) - # logger.error(f"Fatal error in consumer thread: {e}") finally: print("Consumer thread ending", file=sys.stderr) - # logger.info("Consumer thread ending") - + # 전역 인스턴스 생성 db = SQLAlchemy() kafka_consumer = FlaskKafkaConsumer() \ No newline at end of file diff --git a/app/factory.py b/app/factory.py index 85ce24d..fa76e6e 100644 --- a/app/factory.py +++ b/app/factory.py @@ -28,19 +28,12 @@ def __init__(self, config_class: Type[Config] = Config): self._setup_middleware() self._register_error_handlers() self._setup_blueprints() - self._setup_kafka() - - # def _setup_logger(self) -> logging.Logger: - # """Loki 로거 설정""" - # # 기본 로거 생성 - # loki_logger = FlaskLokiLogger(app_name=self.app.name, loki_url=self.app.config['LOKI_URL']) - - # return loki_logger.logger - + def _init_extensions(self): """Extensions 초기화""" # Kafka 초기화 kafka_consumer.init_app(self.app) + kafka_consumer.start_consuming(MessageHandler.handle_message) # DB 초기화 db.init_app(self.app) @@ -77,20 +70,6 @@ def handle_challenge_error(error): def _setup_blueprints(self): """Blueprint 등록""" self.app.register_blueprint(challenge_bp, url_prefix='/v1/user-challenges') - - def _setup_kafka(self): - """Kafka 컨슈머 설정""" - consumer_thread = threading.Thread( - target=start_kafka_consumer, - args=(self.app,), - daemon=True - ) - consumer_thread.start() - self.app.consumer_thread = consumer_thread - - @self.app.teardown_appcontext - def cleanup(exception=None): - kafka_consumer.stop_consuming() def _get_request_context(self) -> Dict[str, Any]: """현재 요청의 컨텍스트 정보 수집"""