From 92a30a532b4a757a58b46cdad77584bf25924ae3 Mon Sep 17 00:00:00 2001 From: lostsnow Date: Wed, 27 Apr 2022 17:29:50 +0800 Subject: [PATCH 1/6] method pool task retryable --- .../handler/saas_method_pool_handler.py | 8 +++- conf/config.ini.example | 4 ++ conf/config.ini.test | 4 ++ core/tasks.py | 46 ++++++++++++++++--- 4 files changed, 54 insertions(+), 8 deletions(-) diff --git a/apiserver/report/handler/saas_method_pool_handler.py b/apiserver/report/handler/saas_method_pool_handler.py index 727ad8c5b..ce1c5e6b1 100644 --- a/apiserver/report/handler/saas_method_pool_handler.py +++ b/apiserver/report/handler/saas_method_pool_handler.py @@ -234,12 +234,16 @@ def save_method_call(self, pool_sign: str, @staticmethod def send_to_engine(method_pool_id, update_record=False, model=None): + """ + @TODO: use method pool sign for task + """ try: if model is None: logger.info( f'[+] send method_pool [{method_pool_id}] to engine for {"update" if update_record else "new record"}') - search_vul_from_method_pool.delay(method_pool_id) - search_sink_from_method_pool.delay(method_pool_id) + retryable = settings.config.getboolean('task', 'retryable', fallback=False) + search_vul_from_method_pool.delay(method_pool_id, retryable=retryable) + search_sink_from_method_pool.delay(method_pool_id, retryable=retryable) else: logger.info( f'[+] send method_pool [{method_pool_id}] to engine for {model if model else ""}' diff --git a/conf/config.ini.example b/conf/config.ini.example index 94d5cab58..1cd2054f7 100644 --- a/conf/config.ini.example +++ b/conf/config.ini.example @@ -33,6 +33,10 @@ access_key_secret = ZoEOSi7KfayQ7JalvJVHa37fdZ4XFY [sca] base_url = http://52.80.75.225:8000 +[task] +retryable = False +max_retries = 3 + [security] csrf_trust_origins = localhost,.huoxian.cn,.secnium.xyz secret_key = vbjlvbxfvazjfprywuxgyclmvhtmselddsefxxlcixovmqfpgy diff --git a/conf/config.ini.test b/conf/config.ini.test index a0e52084e..c044835fc 100644 --- a/conf/config.ini.test +++ b/conf/config.ini.test @@ -38,6 +38,10 @@ secret_key = vbjlvbxfvazjfprywuxgyclmvhtmselddsefxxlcixovmqfpgy [sca] base_url = http://52.80.75.225:8000 +[task] +retryable = False +max_retries = 3 + [other] domain = http://localhost.domain/ demo_session_cookie_domain = .huoxian.cn diff --git a/core/tasks.py b/core/tasks.py index 062973485..01fbd177b 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -51,6 +51,13 @@ } +RETRY_INTERVALS = [10, 30, 90] + + +class RetryableException(Exception): + pass + + def queryset_to_iterator(queryset): """ 将queryset转换为迭代器,解决使用queryset遍历数据导致的一次性加载至内存带来的内存激增问题 @@ -185,14 +192,22 @@ def search_and_save_sink(engine, method_pool_model, strategy): method_pool_model.sinks.add(strategy.get('strategy')) -@shared_task(queue='dongtai-method-pool-scan') -def search_vul_from_method_pool(method_pool_id): +@shared_task(bind=True, queue='dongtai-method-pool-scan', + max_retries=settings.config.getint('task', 'max_retries', fallback=3)) +def search_vul_from_method_pool(self, method_pool_id, retryable=False): logger.info(f'漏洞检测开始,方法池 {method_pool_id}') try: method_pool_model = MethodPool.objects.filter(id=method_pool_id).first() if method_pool_model is None: - logger.warn(f'漏洞检测终止,方法池 {method_pool_id} 不存在') + if retryable: + if self.request.retries < self.max_retries: + tries = self.request.retries + 1 + raise RetryableException(f'漏洞检测方法池 {method_pool_id} 不存在,重试第 {tries} 次') + else: + logger.error(f'漏洞检测超过最大重试次数 {self.max_retries},方法池 {method_pool_id} 不存在') + else: + logger.warning(f'漏洞检测终止,方法池 {method_pool_id} 不存在') return check_response_header(method_pool_model) check_response_content(method_pool_model) @@ -206,6 +221,12 @@ def search_vul_from_method_pool(method_pool_id): if strategy.get('value') in engine.method_pool_signatures: search_and_save_vul(engine, method_pool_model, method_pool, strategy) logger.info(f'漏洞检测成功') + except RetryableException as e: + if self.request.retries < self.max_retries: + delay = 5 + pow(3, self.request.retries) * 10 + self.retry(exc=e, countdown=delay) + else: + logger.error(f'漏洞检测超过最大重试次数,错误原因:{e}') except Exception as e: logger.error(f'漏洞检测出错,方法池 {method_pool_id}. 错误原因:{e}') @@ -258,18 +279,28 @@ def search_vul_from_strategy(strategy_id): logger.error(f'漏洞检测出错,错误原因:{e}') -@shared_task(queue='dongtai-search-scan') -def search_sink_from_method_pool(method_pool_id): +@shared_task(bind=True, queue='dongtai-search-scan', + max_retries=settings.config.getint('task', 'max_retries', fallback=3)) +def search_sink_from_method_pool(self, method_pool_id, retryable=False): """ 根据方法池ID搜索方法池中是否匹配到策略库中的sink方法 + :param self: celery task :param method_pool_id: 方法池ID + :param retryable: 可重试 :return: None """ logger.info(f'sink规则扫描开始,方法池ID[{method_pool_id}]') try: method_pool_model = MethodPool.objects.filter(id=method_pool_id).first() if method_pool_model is None: - logger.warn(f'sink规则扫描终止,方法池 [{method_pool_id}] 不存在') + if retryable: + if self.request.retries < self.max_retries: + tries = self.request.retries + 1 + raise RetryableException(f'sink规则扫描方法池 {method_pool_id} 不存在,重试第 {tries} 次') + else: + logger.error(f'sink规则扫描超过最大重试次数 {self.max_retries},方法池 {method_pool_id} 不存在') + else: + logger.warn(f'sink规则扫描终止,方法池 [{method_pool_id}] 不存在') return strategies = load_sink_strategy(method_pool_model.agent.user, method_pool_model.agent.language) engine = VulEngine() @@ -277,6 +308,9 @@ def search_sink_from_method_pool(method_pool_id): for strategy in strategies: search_and_save_sink(engine, method_pool_model, strategy) logger.info(f'sink规则扫描完成') + except RetryableException as e: + delay = 5 + pow(3, self.request.retries) * 10 + self.retry(exc=e, countdown=delay) except Exception as e: logger.error(f'sink规则扫描出错,错误原因:{e}') From 1aaef12c875db0baf81f60fe62b7198988c85410 Mon Sep 17 00:00:00 2001 From: lostsnow Date: Wed, 27 Apr 2022 19:02:22 +0800 Subject: [PATCH 2/6] use pool_sign and agent_id for vul and sink task --- .../handler/saas_method_pool_handler.py | 21 ++++++-------- core/tasks.py | 29 ++++++++++--------- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/apiserver/report/handler/saas_method_pool_handler.py b/apiserver/report/handler/saas_method_pool_handler.py index ce1c5e6b1..fc3036999 100644 --- a/apiserver/report/handler/saas_method_pool_handler.py +++ b/apiserver/report/handler/saas_method_pool_handler.py @@ -142,15 +142,15 @@ def save(self): current_version_agents = self.get_project_agents(self.agent) with transaction.atomic(): try: + # @TODO: send to log service update_record, method_pool = self.save_method_call( pool_sign, current_version_agents) - self.send_to_engine(method_pool_id=method_pool.id, + self.send_to_engine(self.agent_id, method_pool_sign=pool_sign, update_record=update_record) except Exception as e: logger.info(e, exc_info=True) - def save_method_call(self, pool_sign: str, current_version_agents) -> Tuple[bool, MethodPool]: """ @@ -233,25 +233,22 @@ def save_method_call(self, pool_sign: str, return update_record, method_pool @staticmethod - def send_to_engine(method_pool_id, update_record=False, model=None): - """ - @TODO: use method pool sign for task - """ + def send_to_engine(agent_id, method_pool_id="", method_pool_sign="", update_record=False, model=None): + retryable = settings.config.getboolean('task', 'retryable', fallback=False) try: if model is None: logger.info( - f'[+] send method_pool [{method_pool_id}] to engine for {"update" if update_record else "new record"}') - retryable = settings.config.getboolean('task', 'retryable', fallback=False) - search_vul_from_method_pool.delay(method_pool_id, retryable=retryable) - search_sink_from_method_pool.delay(method_pool_id, retryable=retryable) + f'[+] send method_pool [{method_pool_sign}] to engine for {"update" if update_record else "new record"}') + search_vul_from_method_pool.delay(method_pool_sign, agent_id, retryable=retryable) + search_sink_from_method_pool.delay(method_pool_sign, agent_id, retryable=retryable) else: logger.info( - f'[+] send method_pool [{method_pool_id}] to engine for {model if model else ""}' + f'[+] send method_pool [{method_pool_sign}] to engine for {model if model else ""}' ) search_vul_from_replay_method_pool.delay(method_pool_id) #requests.get(url=settings.REPLAY_ENGINE_URL.format(id=method_pool_id)) except Exception as e: - logger.info(f'[-] Failure: send method_pool [{method_pool_id}], Error: {e}') + logger.info(f'[-] Failure: send method_pool [{method_pool_id}{method_pool_sign}], Error: {e}') def calc_hash(self): sign_raw = '-'.join( diff --git a/core/tasks.py b/core/tasks.py index 01fbd177b..947259dc3 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -194,20 +194,20 @@ def search_and_save_sink(engine, method_pool_model, strategy): @shared_task(bind=True, queue='dongtai-method-pool-scan', max_retries=settings.config.getint('task', 'max_retries', fallback=3)) -def search_vul_from_method_pool(self, method_pool_id, retryable=False): +def search_vul_from_method_pool(self, method_pool_sign, agent_id, retryable=False): - logger.info(f'漏洞检测开始,方法池 {method_pool_id}') + logger.info(f'漏洞检测开始,方法池 {method_pool_sign}') try: - method_pool_model = MethodPool.objects.filter(id=method_pool_id).first() + method_pool_model = MethodPool.objects.filter(pool_sign=method_pool_sign, agent_id=agent_id).first() if method_pool_model is None: if retryable: if self.request.retries < self.max_retries: tries = self.request.retries + 1 - raise RetryableException(f'漏洞检测方法池 {method_pool_id} 不存在,重试第 {tries} 次') + raise RetryableException(f'漏洞检测方法池 {method_pool_sign} 不存在,重试第 {tries} 次') else: - logger.error(f'漏洞检测超过最大重试次数 {self.max_retries},方法池 {method_pool_id} 不存在') + logger.error(f'漏洞检测超过最大重试次数 {self.max_retries},方法池 {method_pool_sign} 不存在') else: - logger.warning(f'漏洞检测终止,方法池 {method_pool_id} 不存在') + logger.warning(f'漏洞检测终止,方法池 {method_pool_sign} 不存在') return check_response_header(method_pool_model) check_response_content(method_pool_model) @@ -228,7 +228,7 @@ def search_vul_from_method_pool(self, method_pool_id, retryable=False): else: logger.error(f'漏洞检测超过最大重试次数,错误原因:{e}') except Exception as e: - logger.error(f'漏洞检测出错,方法池 {method_pool_id}. 错误原因:{e}') + logger.error(f'漏洞检测出错,方法池 {method_pool_sign}. 错误原因:{e}') @shared_task(queue='dongtai-replay-vul-scan') @@ -281,26 +281,27 @@ def search_vul_from_strategy(strategy_id): @shared_task(bind=True, queue='dongtai-search-scan', max_retries=settings.config.getint('task', 'max_retries', fallback=3)) -def search_sink_from_method_pool(self, method_pool_id, retryable=False): +def search_sink_from_method_pool(self, method_pool_sign, agent_id, retryable=False): """ 根据方法池ID搜索方法池中是否匹配到策略库中的sink方法 :param self: celery task - :param method_pool_id: 方法池ID + :param method_pool_sign: 方法池 sign + :param agent_id: Agent ID :param retryable: 可重试 :return: None """ - logger.info(f'sink规则扫描开始,方法池ID[{method_pool_id}]') + logger.info(f'sink规则扫描开始,方法池[{method_pool_sign}]') try: - method_pool_model = MethodPool.objects.filter(id=method_pool_id).first() + method_pool_model = MethodPool.objects.filter(pool_sign=method_pool_sign, agent_id=agent_id).first() if method_pool_model is None: if retryable: if self.request.retries < self.max_retries: tries = self.request.retries + 1 - raise RetryableException(f'sink规则扫描方法池 {method_pool_id} 不存在,重试第 {tries} 次') + raise RetryableException(f'sink规则扫描方法池 {method_pool_sign} 不存在,重试第 {tries} 次') else: - logger.error(f'sink规则扫描超过最大重试次数 {self.max_retries},方法池 {method_pool_id} 不存在') + logger.error(f'sink规则扫描超过最大重试次数 {self.max_retries},方法池 {method_pool_sign} 不存在') else: - logger.warn(f'sink规则扫描终止,方法池 [{method_pool_id}] 不存在') + logger.warn(f'sink规则扫描终止,方法池 [{method_pool_sign}] 不存在') return strategies = load_sink_strategy(method_pool_model.agent.user, method_pool_model.agent.language) engine = VulEngine() From d46ec9441f054dbf5546a733f09fae224dd5c2ce Mon Sep 17 00:00:00 2001 From: lostsnow Date: Fri, 29 Apr 2022 16:37:21 +0800 Subject: [PATCH 3/6] send method pool to log service --- .../handler/saas_method_pool_handler.py | 87 +++++++++++++++---- apiserver/report/log_service.py | 47 ++++++++++ apiserver/report/report_handler_factory.py | 16 ++++ conf/config.ini.example | 8 +- conf/config.ini.test | 8 +- 5 files changed, 149 insertions(+), 17 deletions(-) create mode 100644 apiserver/report/log_service.py diff --git a/apiserver/report/handler/saas_method_pool_handler.py b/apiserver/report/handler/saas_method_pool_handler.py index fc3036999..f4c551433 100644 --- a/apiserver/report/handler/saas_method_pool_handler.py +++ b/apiserver/report/handler/saas_method_pool_handler.py @@ -7,7 +7,9 @@ import json import logging import random +import socket import time +import uuid from hashlib import sha256,sha1 import requests @@ -35,6 +37,18 @@ @ReportHandler.register(const.REPORT_VULN_SAAS_POOL) class SaasMethodPoolHandler(IReportHandler): + def __init__(self): + super(SaasMethodPoolHandler, self).__init__() + self.async_send = settings.config.getboolean('task', 'async_send', fallback=False) + self.async_send_delay = settings.config.getint('task', 'async_send_delay', fallback=2) + self.retryable = settings.config.getboolean('task', 'retryable', fallback=False) + + if ReportHandler.log_service_disabled or ReportHandler.log_service is None: + logger.error('log service disabled or failed to connect, disable async send method pool') + self.async_send = False + else: + self.log_service = ReportHandler.log_service + @staticmethod def parse_headers(headers_raw): headers = dict() @@ -138,18 +152,55 @@ def save(self): self.send_to_engine(method_pool_id=method_pool_id, model='replay') else: - pool_sign = self.calc_hash() - current_version_agents = self.get_project_agents(self.agent) - with transaction.atomic(): + pool_sign = uuid.uuid4().hex + if self.async_send: try: - # @TODO: send to log service - update_record, method_pool = self.save_method_call( - pool_sign, current_version_agents) - self.send_to_engine(self.agent_id, method_pool_sign=pool_sign, - update_record=update_record) + method_pool = self.to_json(pool_sign) + ok = self.log_service.send(method_pool) + if ok: + self.send_to_engine(self.agent_id, method_pool_sign=pool_sign) except Exception as e: - logger.info(e, exc_info=True) + logger.error(e, exc_info=True) + else: + current_version_agents = self.get_project_agents(self.agent) + with transaction.atomic(): + try: + update_record, method_pool = self.save_method_call( + pool_sign, current_version_agents) + self.send_to_engine(self.agent_id, method_pool_sign=pool_sign, + update_record=update_record) + except Exception as e: + logger.error(e, exc_info=True) + def to_json(self, pool_sign: str): + timestamp = int(time.time()) + pool = { + 'agent_id': self.agent_id, + 'url': self.http_url, + 'uri': self.http_uri, + 'http_method': self.http_method, + 'http_scheme': self.http_scheme, + 'http_protocol': self.http_protocol, + 'req_header': self.http_req_header, + 'req_params': self.http_query_string, + 'req_data': self.http_req_data, + 'req_header_for_search': utils.build_request_header(req_method=self.http_method, + raw_req_header=self.http_req_header, + uri=self.http_uri, + query_params=self.http_query_string, + http_protocol=self.http_protocol), + 'res_header': utils.base64_decode(self.http_res_header), + 'res_body': decode_content(get_res_body(self.http_res_body, self.version), + get_content_encoding(self.http_res_header), self.version), + 'context_path': self.context_path, + 'method_pool': json.dumps(self.method_pool), + 'pool_sign': pool_sign, + 'clent_ip': self.client_ip, + 'create_time': timestamp, + 'update_time': timestamp, + 'uri_sha1': self.sha1(self.http_uri), + } + return json.dumps(pool) def save_method_call(self, pool_sign: str, current_version_agents) -> Tuple[bool, MethodPool]: @@ -232,15 +283,21 @@ def save_method_call(self, pool_sign: str, ) return update_record, method_pool - @staticmethod - def send_to_engine(agent_id, method_pool_id="", method_pool_sign="", update_record=False, model=None): - retryable = settings.config.getboolean('task', 'retryable', fallback=False) + def send_to_engine(self, agent_id, method_pool_id="", method_pool_sign="", update_record=False, model=None): try: if model is None: logger.info( f'[+] send method_pool [{method_pool_sign}] to engine for {"update" if update_record else "new record"}') - search_vul_from_method_pool.delay(method_pool_sign, agent_id, retryable=retryable) - search_sink_from_method_pool.delay(method_pool_sign, agent_id, retryable=retryable) + delay = 0 + if self.async_send: + delay = self.async_send_delay + kwargs = { + 'method_pool_sign': method_pool_sign, + 'agent_id': agent_id, + 'retryable': self.retryable, + } + search_vul_from_method_pool.apply_async(kwargs=kwargs, countdown=delay) + search_sink_from_method_pool.apply_async(kwargs=kwargs, countdown=delay) else: logger.info( f'[+] send method_pool [{method_pool_sign}] to engine for {model if model else ""}' @@ -248,7 +305,7 @@ def send_to_engine(agent_id, method_pool_id="", method_pool_sign="", update_reco search_vul_from_replay_method_pool.delay(method_pool_id) #requests.get(url=settings.REPLAY_ENGINE_URL.format(id=method_pool_id)) except Exception as e: - logger.info(f'[-] Failure: send method_pool [{method_pool_id}{method_pool_sign}], Error: {e}') + logger.error(f'[-] Failure: send method_pool [{method_pool_id}{method_pool_sign}], Error: {e}') def calc_hash(self): sign_raw = '-'.join( diff --git a/apiserver/report/log_service.py b/apiserver/report/log_service.py new file mode 100644 index 000000000..771d65978 --- /dev/null +++ b/apiserver/report/log_service.py @@ -0,0 +1,47 @@ +import logging +import socket + +logger = logging.getLogger('dongtai.openapi') + + +class LogService: + def __init__(self, host, port): + super(LogService, self).__init__() + self.host = host + self.port = port + self.socket = None + + def create_socket(self): + if self.socket: + return + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(5) + try: + sock.connect((self.host, self.port)) + sock.setblocking(False) + self.socket = sock + return True + except OSError: + logger.error(f'failed to connect log service {self.host}:{self.port}') + self.socket = None + sock.close() + return False + + def __del__(self): + if self.socket: + self.socket.close() + self.socket = None + + def send(self, message): + try: + if not self.socket: + self.create_socket() + if self.socket: + self.socket.sendall(bytes(message + "\n", encoding='utf-8'), socket.MSG_DONTWAIT) + return True + except Exception as e: + logger.error('failed to send message to log service', exc_info=e) + self.socket.close() + self.socket = None + return False diff --git a/apiserver/report/report_handler_factory.py b/apiserver/report/report_handler_factory.py index fb65c4db5..d6ea61d08 100644 --- a/apiserver/report/report_handler_factory.py +++ b/apiserver/report/report_handler_factory.py @@ -7,6 +7,7 @@ import logging, requests, json, time from django.utils.translation import gettext_lazy as _ from AgentServer import settings +from apiserver.report.log_service import LogService from dongtai.models.agent import IastAgent logger = logging.getLogger('dongtai.openapi') @@ -14,6 +15,8 @@ class ReportHandler: HANDLERS = {} + log_service = None + log_service_disabled = False # 注册handler到当前命名空间,后续进行异步处理数据 @staticmethod @@ -60,6 +63,19 @@ def handler(reports, user): @classmethod def register(cls, handler_name): def wrapper(handler): + async_send = settings.config.getboolean('task', 'async_send', fallback=False) + if not async_send: + cls.log_service_disabled = True + if cls.log_service is None and not cls.log_service_disabled: + host = settings.config.get('log_service', 'host') + port = settings.config.getint('log_service', 'port') + if not host or not port: + logger.error('log service must config host and post') + cls.log_service_disabled = True + srv = LogService(host, port) + if srv.create_socket(): + cls.log_service = srv + logger.info( _('Registration report type {} handler {}').format(handler_name, handler.__name__)) if handler_name not in cls.HANDLERS: diff --git a/conf/config.ini.example b/conf/config.ini.example index 1cd2054f7..860fa425c 100644 --- a/conf/config.ini.example +++ b/conf/config.ini.example @@ -34,8 +34,14 @@ access_key_secret = ZoEOSi7KfayQ7JalvJVHa37fdZ4XFY base_url = http://52.80.75.225:8000 [task] -retryable = False +retryable = false max_retries = 3 +async_send = false +async_send_delay = 2 + +[log_service] +host = localhost +port = 8082 [security] csrf_trust_origins = localhost,.huoxian.cn,.secnium.xyz diff --git a/conf/config.ini.test b/conf/config.ini.test index c044835fc..5da6da0ab 100644 --- a/conf/config.ini.test +++ b/conf/config.ini.test @@ -39,8 +39,14 @@ secret_key = vbjlvbxfvazjfprywuxgyclmvhtmselddsefxxlcixovmqfpgy base_url = http://52.80.75.225:8000 [task] -retryable = False +retryable = false max_retries = 3 +async_send = false +async_send_delay = 2 + +[log_service] +host = localhost +port = 8082 [other] domain = http://localhost.domain/ From 42afce9d4df953ce6f498c5c8d21b2e04314eb6b Mon Sep 17 00:00:00 2001 From: lostsnow Date: Sat, 30 Apr 2022 17:15:35 +0800 Subject: [PATCH 4/6] fixes method pool replay log --- apiserver/report/handler/saas_method_pool_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apiserver/report/handler/saas_method_pool_handler.py b/apiserver/report/handler/saas_method_pool_handler.py index f4c551433..d6a2f9f51 100644 --- a/apiserver/report/handler/saas_method_pool_handler.py +++ b/apiserver/report/handler/saas_method_pool_handler.py @@ -300,7 +300,7 @@ def send_to_engine(self, agent_id, method_pool_id="", method_pool_sign="", updat search_sink_from_method_pool.apply_async(kwargs=kwargs, countdown=delay) else: logger.info( - f'[+] send method_pool [{method_pool_sign}] to engine for {model if model else ""}' + f'[+] send method_pool [{method_pool_id}] to engine for {model if model else ""}' ) search_vul_from_replay_method_pool.delay(method_pool_id) #requests.get(url=settings.REPLAY_ENGINE_URL.format(id=method_pool_id)) From 95e6651ea99be3ec259f47a6b3c101c210c92ce6 Mon Sep 17 00:00:00 2001 From: lostsnow Date: Sun, 8 May 2022 15:01:46 +0800 Subject: [PATCH 5/6] minor setting changes --- .../handler/saas_method_pool_handler.py | 2 +- webapi/settings.py | 21 +++++++++++-------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/apiserver/report/handler/saas_method_pool_handler.py b/apiserver/report/handler/saas_method_pool_handler.py index d6a2f9f51..8678e9afb 100644 --- a/apiserver/report/handler/saas_method_pool_handler.py +++ b/apiserver/report/handler/saas_method_pool_handler.py @@ -43,7 +43,7 @@ def __init__(self): self.async_send_delay = settings.config.getint('task', 'async_send_delay', fallback=2) self.retryable = settings.config.getboolean('task', 'retryable', fallback=False) - if ReportHandler.log_service_disabled or ReportHandler.log_service is None: + if self.async_send and (ReportHandler.log_service_disabled or ReportHandler.log_service is None): logger.error('log service disabled or failed to connect, disable async send method pool') self.async_send = False else: diff --git a/webapi/settings.py b/webapi/settings.py index 2e47b345e..939f754d1 100644 --- a/webapi/settings.py +++ b/webapi/settings.py @@ -210,6 +210,7 @@ def safe_execute(default, exception, function, *args): DATABASES = { 'default': { + 'CONN_MAX_AGE': 900, 'ENGINE': 'django.db.backends.mysql', 'USER': config.get("mysql", 'user'), 'NAME': config.get("mysql", 'name'), @@ -309,37 +310,37 @@ def safe_execute(default, exception, function, *args): 'loggers': { 'django.db.backends': { 'handlers': ['console'], - 'level': 'DEBUG', + 'level': 'ERROR', }, 'dongtai-webapi': { 'handlers': ['console', 'dongtai-webapi'], 'propagate': True, - 'level': 'INFO', + 'level': 'ERROR', }, 'dongtai.openapi': { 'handlers': ['console', 'dongtai.openapi'], 'propagate': True, - 'level': 'INFO', + 'level': 'ERROR', }, 'dongtai-core': { 'handlers': ['console', 'dongtai-webapi'], 'propagate': True, - 'level': 'INFO', + 'level': 'ERROR', }, 'django': { 'handlers': ['console', 'dongtai-webapi'], 'propagate': True, - 'level': 'INFO', + 'level': 'ERROR', }, 'dongtai-engine': { 'handlers': ['console', 'dongtai-webapi'], 'propagate': True, - 'level': 'INFO', + 'level': 'ERROR', }, 'celery.apps.worker': { 'handlers': ['console', 'celery.apps.worker'], 'propagate': True, - 'level': 'INFO', + 'level': 'ERROR', }, } } @@ -453,9 +454,11 @@ def safe_execute(default, exception, function, *args): CELERY_WORKER_LOG_FORMAT = '%(message)s' CELERY_TASK_EAGER_PROPAGATES = True CELERY_WORKER_REDIRECT_STDOUTS = True -CELERY_WORKER_REDIRECT_STDOUTS_LEVEL = "INFO" +CELERY_WORKER_REDIRECT_STDOUTS_LEVEL = "ERROR" # CELERY_WORKER_HIJACK_ROOT_LOGGER = True -CELERY_WORKER_MAX_TASKS_PER_CHILD = 40 +CELERY_WORKER_MAX_TASKS_PER_CHILD = 5000 +# CELERYD_CONCURRENCY = 8 +CELERY_IGNORE_RESULT = True CELERY_TASK_SOFT_TIME_LIMIT = 3600 DJANGO_CELERY_BEAT_TZ_AWARE = False From 2a360f64e14e2104946b8794606a178d783906d3 Mon Sep 17 00:00:00 2001 From: lostsnow Date: Sun, 8 May 2022 16:35:58 +0800 Subject: [PATCH 6/6] fixes log service close --- apiserver/report/log_service.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apiserver/report/log_service.py b/apiserver/report/log_service.py index 771d65978..e0d52709f 100644 --- a/apiserver/report/log_service.py +++ b/apiserver/report/log_service.py @@ -42,6 +42,7 @@ def send(self, message): return True except Exception as e: logger.error('failed to send message to log service', exc_info=e) - self.socket.close() + if self.socket: + self.socket.close() self.socket = None return False