From 872741006c7d37a1c7dcfc3ffc14a846b55c840e Mon Sep 17 00:00:00 2001 From: Alexander Kozlovsky Date: Tue, 20 Dec 2022 22:23:32 +0100 Subject: [PATCH] Add ProcessLocker --- src/run_tribler.py | 2 + .../components/reporter/exception_handler.py | 4 + src/tribler/core/components/session.py | 2 +- src/tribler/core/exceptions.py | 3 - .../core/sentry_reporter/sentry_reporter.py | 5 + src/tribler/core/start_core.py | 25 +- src/tribler/core/utilities/process_locker.py | 362 ++++++++++++++++++ .../utilities/tests/test_process_locker.py | 298 ++++++++++++++ src/tribler/gui/core_manager.py | 1 - src/tribler/gui/error_handler.py | 5 +- src/tribler/gui/single_application.py | 4 +- src/tribler/gui/start_gui.py | 32 +- src/tribler/gui/tests/test_gui.py | 2 +- src/tribler/gui/tribler_app.py | 4 +- src/tribler/gui/tribler_window.py | 2 + 15 files changed, 724 insertions(+), 27 deletions(-) create mode 100644 src/tribler/core/utilities/process_locker.py create mode 100644 src/tribler/core/utilities/tests/test_process_locker.py diff --git a/src/run_tribler.py b/src/run_tribler.py index e72a31f64b2..bfad138a5d9 100644 --- a/src/run_tribler.py +++ b/src/run_tribler.py @@ -83,6 +83,8 @@ def init_boot_logger(): logger.info(f'Root state dir: {root_state_dir}') api_port = os.environ.get('CORE_API_PORT') + api_port = int(api_port) if api_port else None + api_key = os.environ.get('CORE_API_KEY') # Check whether we need to start the core or the user interface diff --git a/src/tribler/core/components/reporter/exception_handler.py b/src/tribler/core/components/reporter/exception_handler.py index d5c45db38c4..690e61e7e5c 100644 --- a/src/tribler/core/components/reporter/exception_handler.py +++ b/src/tribler/core/components/reporter/exception_handler.py @@ -10,6 +10,7 @@ from tribler.core.components.component import ComponentStartupException from tribler.core.components.reporter.reported_error import ReportedError from tribler.core.sentry_reporter.sentry_reporter import SentryReporter +from tribler.core.utilities import process_locker # There are some errors that we are ignoring. IGNORED_ERRORS_BY_CODE = { @@ -111,6 +112,7 @@ def unhandled_error_observer(self, _, context): should_stop=should_stop ) self.logger.error(f"Unhandled exception occurred! {reported_error}\n{reported_error.long_text}") + process_locker.set_error(exc=exception) if self.report_callback: self.logger.error('Call report callback') @@ -123,6 +125,8 @@ def unhandled_error_observer(self, _, context): self.unreported_error = reported_error except Exception as ex: + process_locker.set_error(exc=ex) + self.sentry_reporter.capture_exception(ex) self.logger.exception(f'Error occurred during the error handling: {ex}') raise ex diff --git a/src/tribler/core/components/session.py b/src/tribler/core/components/session.py index 1b98c7e5d37..9e8621676b5 100644 --- a/src/tribler/core/components/session.py +++ b/src/tribler/core/components/session.py @@ -27,7 +27,7 @@ class Session: def __init__(self, config: TriblerConfig = None, components: List[Component] = (), shutdown_event: Event = None, notifier: Notifier = None, failfast: bool = True): # deepcode ignore unguarded~next~call: not necessary to catch StopIteration on infinite iterator - self.exit_code = 0 + self.exit_code = None self.failfast = failfast self.logger = logging.getLogger(self.__class__.__name__) self.config: TriblerConfig = config or TriblerConfig() diff --git a/src/tribler/core/exceptions.py b/src/tribler/core/exceptions.py index f6ae113d158..d4dd470df79 100644 --- a/src/tribler/core/exceptions.py +++ b/src/tribler/core/exceptions.py @@ -8,9 +8,6 @@ class TriblerException(Exception): """Super class for all Tribler-specific Exceptions the Tribler Core throws.""" - def __str__(self): - return str(self.__class__) + ': ' + Exception.__str__(self) - class OperationNotPossibleAtRuntimeException(TriblerException): """The requested operation is not possible after the Session or Download has been started.""" diff --git a/src/tribler/core/sentry_reporter/sentry_reporter.py b/src/tribler/core/sentry_reporter/sentry_reporter.py index e94724b8de9..afcb3f0d38b 100644 --- a/src/tribler/core/sentry_reporter/sentry_reporter.py +++ b/src/tribler/core/sentry_reporter/sentry_reporter.py @@ -21,6 +21,7 @@ parse_last_core_output, parse_os_environ, parse_stacktrace, ) +from tribler.core.utilities.process_locker import get_global_process_locker # fmt: off @@ -219,6 +220,10 @@ def send_event(self, event: Dict = None, post_data: Dict = None, sys_info: Dict reporter['events'] = extract_dict(sys_info, r'^(event|request)') reporter[SYSINFO] = {key: sys_info[key] for key in sys_info if key not in reporter['events']} + process_locker = get_global_process_locker() + if process_locker: + reporter['last_processes'] = [p.describe() for p in process_locker.get_last_processes()] + # try to retrieve an error from the last_core_output if last_core_output: # split for better representation in the web view diff --git a/src/tribler/core/start_core.py b/src/tribler/core/start_core.py index 5ad24591f23..63f85e228c9 100644 --- a/src/tribler/core/start_core.py +++ b/src/tribler/core/start_core.py @@ -16,6 +16,7 @@ from tribler.core.components.component import Component from tribler.core.components.gigachannel.gigachannel_component import GigaChannelComponent from tribler.core.components.gigachannel_manager.gigachannel_manager_component import GigachannelManagerComponent +from tribler.core.components.gui_process_watcher.gui_process_watcher import GuiProcessWatcher from tribler.core.components.gui_process_watcher.gui_process_watcher_component import GuiProcessWatcherComponent from tribler.core.components.ipv8.ipv8_component import Ipv8Component from tribler.core.components.key.key_component import KeyComponent @@ -39,6 +40,7 @@ from tribler.core.sentry_reporter.sentry_reporter import SentryReporter, SentryStrategy from tribler.core.upgrade.version_manager import VersionHistory from tribler.core.utilities.process_checker import single_tribler_instance +from tribler.core.utilities.process_locker import ProcessKind, ProcessLocker, set_global_process_locker logger = logging.getLogger(__name__) CONFIG_FILE_NAME = 'triblerd.conf' @@ -119,7 +121,7 @@ async def core_session(config: TriblerConfig, components: List[Component]) -> in return session.exit_code -def run_tribler_core_session(api_port: str, api_key: str, state_dir: Path, gui_test_mode: bool = False) -> int: +def run_tribler_core_session(api_port: int, api_key: str, state_dir: Path, gui_test_mode: bool = False) -> int: """ This method will start a new Tribler session. Note that there is no direct communication between the GUI process and the core: all communication is performed @@ -137,7 +139,7 @@ def run_tribler_core_session(api_port: str, api_key: str, state_dir: Path, gui_t if SentryReporter.is_in_test_mode(): default_core_exception_handler.sentry_reporter.global_strategy = SentryStrategy.SEND_ALLOWED - config.api.http_port = int(api_port) + config.api.http_port = api_port # If the API key is set to an empty string, it will remain disabled if config.api.key not in ('', api_key): config.api.key = api_key @@ -177,10 +179,25 @@ def run_core(api_port, api_key, root_state_dir, parsed_args): logger.info('Running Core' + ' in gui_test_mode' if parsed_args.gui_test_mode else '') load_logger_config('tribler-core', root_state_dir) + gui_pid = GuiProcessWatcher.get_gui_pid() + process_locker = ProcessLocker(root_state_dir, ProcessKind.Core, gui_pid) + set_global_process_locker(process_locker) + + if not process_locker.current_process.active: + msg = f'Another Core process with PID {process_locker.active_process.pid} is already running' + logger.warning(msg) + process_locker.sys_exit(1, msg) + + if api_port is None: + msg = 'api_port is not specified for a core process' + logger.error(msg) + process_locker.sys_exit(1, msg) + + process_locker.set_api_port(api_port) + with single_tribler_instance(root_state_dir): version_history = VersionHistory(root_state_dir) state_dir = version_history.code_version.directory exit_code = run_tribler_core_session(api_port, api_key, state_dir, gui_test_mode=parsed_args.gui_test_mode) - if exit_code: - sys.exit(exit_code) + process_locker.sys_exit(exit_code) diff --git a/src/tribler/core/utilities/process_locker.py b/src/tribler/core/utilities/process_locker.py new file mode 100644 index 00000000000..1ff60826588 --- /dev/null +++ b/src/tribler/core/utilities/process_locker.py @@ -0,0 +1,362 @@ +from __future__ import annotations + +import json +import logging +import os +import sqlite3 +import sys +import time +from datetime import datetime +from enum import Enum +from functools import wraps +from pathlib import Path +from threading import Lock +from typing import ContextManager, List, Optional + +import psutil +from decorator import contextmanager + +from tribler.core.version import version_id + +logger = logging.getLogger(__name__) + +DB_FILENAME = 'processes.sqlite' + +CREATE_SQL = """ + CREATE TABLE IF NOT EXISTS processes ( + rowid INTEGER PRIMARY KEY AUTOINCREMENT, + row_version INTEGER NOT NULL DEFAULT 0, + pid INTEGER NOT NULL, + kind TEXT NOT NULL, + active INT NOT NULL, + canceled INT NOT NULL, + app_version TEXT NOT NULL, + started_at INT NOT NULL, + creator_pid INT, + api_port INT, + shutdown_request_pid INT, + shutdown_requested_at INT, + finished_at INT, + exit_code INT, + error_msg TEXT, + error_info JSON, + other_params JSON + ) +""" + + +class ProcessKind(Enum): + GUI = 'gui' + Core = 'core' + + +def to_json(value) -> Optional[str]: + if value is None: + return None + return json.dumps(value) + + +def from_json(value) -> Optional[dict]: + if value is None: + return None + return json.loads(value) + + +class ProcessInfo: + def __init__(self, pid: int, kind: ProcessKind, app_version: str, started_at: int, + rowid: Optional[int] = None, creator_pid: Optional[int] = None, active: int = 0, canceled: int = 0, + row_version: int = 0, api_port: Optional[int] = None, finished_at: Optional[int] = None, + exit_code: Optional[int] = None, error_msg: Optional[str] = None, error_info: Optional[dict] = None, + shutdown_request_pid: Optional[int] = None, shutdown_requested_at: Optional[int] = None, + other_params: Optional[dict] = None): + self.rowid = rowid + self.row_version = row_version + self.pid = pid + self.kind = kind + self.active = active + self.canceled = canceled + self.app_version = app_version + self.started_at = started_at + self.creator_pid = creator_pid + self.api_port = api_port + self.finished_at = finished_at + self.exit_code = exit_code + self.error_msg = error_msg + self.error_info = error_info + self.shutdown_request_pid = shutdown_request_pid + self.shutdown_requested_at = shutdown_requested_at + self.other_params = other_params + + @classmethod + def from_row(cls, row: tuple) -> ProcessInfo: + rowid, row_version, pid, kind, active, canceled, app_version, started_at, creator_pid, api_port, \ + shutdown_request_pid, shutdown_requested_at, finished_at, exit_code, error_msg, error_info, \ + other_params = row + + kind = ProcessKind(kind) + + return ProcessInfo(rowid=rowid, row_version=row_version, pid=pid, kind=kind, active=active, canceled=canceled, + app_version=app_version, started_at=started_at, creator_pid=creator_pid, + api_port=api_port, shutdown_request_pid=shutdown_request_pid, + shutdown_requested_at=shutdown_requested_at, finished_at=finished_at, + exit_code=exit_code, error_msg=error_msg, error_info=from_json(error_info), + other_params=from_json(other_params)) + + def to_dict(self) -> dict: + d = dict(rowid=self.rowid, pid=self.pid, kind=self.kind.value, active=self.active, canceled=self.canceled, + app_version=self.app_version, started_at=self.started_at, creator_pid=self.creator_pid, + api_port=self.api_port, finished_at=self.finished_at, exit_code=self.exit_code, + error_msg=self.error_msg, error_info=self.error_info, shutdown_request_pid=self.shutdown_request_pid, + shutdown_request_at=self.shutdown_requested_at, other_params=self.other_params) + return {key: value for key, value in d.items() if value is not None} + + def describe(self): + kind = self.kind.value.capitalize() + flags = f"{'active, ' if self.active else ''}{'canceled, ' if self.canceled else ''}" + result = [f'{kind}Process({flags}pid={self.pid}'] + if self.creator_pid is not None: + result.append(f', gui_pid={self.creator_pid}') + started = datetime.utcfromtimestamp(self.started_at) + result.append(f", version='{self.app_version}', started='{started.strftime('%Y-%m-%d %H:%M:%S')}'") + if self.api_port is not None: + result.append(f', api_port={self.api_port}') + if self.finished_at: + finished = datetime.utcfromtimestamp(self.finished_at) + duration = finished - started + result.append(f", duration='{duration}'") + if self.exit_code is not None: + result.append(f', exit_code={self.exit_code}') + if self.error_msg: + result.append(f', error={repr(self.error_msg)}') + result.append(')') + return ''.join(result) + + @classmethod + def current_process(cls, kind: ProcessKind, creator_pid: Optional[int] = None, **other_params) -> ProcessInfo: + return cls(pid=os.getpid(), kind=kind, app_version=version_id, started_at=int(time.time()), + creator_pid=creator_pid, row_version=0, other_params=other_params or None) + + def is_current_process(self): + return self.pid == os.getpid() + + def is_running(self): + if not psutil.pid_exists(self.pid): + return False + + try: + process = psutil.Process(self.pid) + status = process.status() + except psutil.Error as e: + logger.warning(e) + return False + + if status == psutil.STATUS_ZOMBIE: + return False + + if process.create_time() > self.started_at: + return False + + return True + + def set_error(self, error_msg: Optional[str] = None, error_info: Optional[dict] = None, + exc: Optional[Exception] = None, replace: bool = False): + if exc and not error_msg: + error_msg = f"{exc.__class__.__name__}: {exc}" + + if replace: + self.error_msg = error_msg + self.error_info = error_info + else: + self.error_msg = self.error_msg or error_msg + self.error_info = self.error_info or error_info + + def mark_finished(self, exit_code: Optional[int] = None): + self.active = 0 + self.finished_at = int(time.time()) + + # if exit_code is specified, it overrides the previously set exit code + if exit_code is not None: + self.exit_code = exit_code + + # if no exit code is specified, use exit code 0 (success) as a default value + if self.exit_code is None: + self.exit_code = 0 if not self.error_msg else 1 + + def save(self, con: sqlite3.Connection): + cursor = con.cursor() + if self.rowid is None: + self._before_insert_check() + self.row_version = 0 + cursor.execute(""" + INSERT INTO processes ( + pid, kind, active, canceled, app_version, started_at, + creator_pid, api_port, shutdown_request_pid, shutdown_requested_at, + finished_at, exit_code, error_msg, error_info, other_params + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, [self.pid, self.kind.value, self.active, self.canceled, self.app_version, self.started_at, + self.creator_pid, self.api_port, self.shutdown_request_pid, self.shutdown_requested_at, + self.finished_at, self.exit_code, self.error_msg, to_json(self.error_info), + to_json(self.other_params)]) + self.rowid = cursor.lastrowid + else: + prev_version = self.row_version + self.row_version += 1 + cursor.execute(""" + UPDATE processes + SET row_version = ?, active = ?, canceled = ?, creator_pid = ?, api_port = ?, + shutdown_request_pid = ?, shutdown_requested_at = ?, finished_at = ?, + exit_code = ?, error_msg = ?, error_info = ?, other_params = ? + WHERE rowid = ? and row_version = ? and pid = ? and kind = ? and app_version = ? and started_at = ? + """, [self.row_version, self.active, self.canceled, self.creator_pid, self.api_port, + self.shutdown_request_pid, self.shutdown_requested_at, self.finished_at, + self.exit_code, self.error_msg, to_json(self.error_info), to_json(self.other_params), + self.rowid, prev_version, self.pid, self.kind.value, self.app_version, self.started_at]) + if cursor.rowcount == 0: + logger.error(f'Row {self.rowid} with row version {prev_version} was not found') + + def _before_insert_check(self): + if self.row_version: + logger.error(f"The `row_version` value for a new process row should not be set. Got: {self.row_version}") + + +global_process_locker: Optional[ProcessLocker] = None + +_lock = Lock() + + +def set_global_process_locker(process_locker: Optional[ProcessLocker]): + global global_process_locker # pylint: disable=global-statement + with _lock: + global_process_locker = process_locker + + +def get_global_process_locker() -> Optional[ProcessLocker]: + with _lock: + return global_process_locker + + +def set_api_port(api_port: int): + process_locker = get_global_process_locker() + if process_locker is None: + logger.warning('Cannot set api_port for process locker: no process locker global instance is set') + else: + process_locker.set_api_port(api_port) + + +def set_error(error_msg: Optional[str] = None, error_info: Optional[dict] = None, + exc: Optional[Exception] = None, replace: bool = False): + process_locker = get_global_process_locker() + if process_locker is None: + logger.warning('Cannot set error for process locker: no process locker global instance is set') + else: + process_locker.current_process.set_error(error_msg, error_info, exc, replace) + process_locker.save(process_locker.current_process) + + +def with_retry(func): + @wraps(func) + def new_func(*args, **kwargs): + try: + return func(*args, **kwargs) + except sqlite3.Error as e: + logger.warning(f'Retrying after the error: {e.__class__.__name__}: {e}') + return func(*args, **kwargs) + new_func: func + return new_func + + +class ProcessLocker: + filename: Path + current_process: ProcessInfo + active_process: ProcessInfo + + def __init__(self, root_dir: Path, process_kind: ProcessKind, creator_pid: Optional[int] = None, **other_params): + self.root_dir = root_dir + self.filename = self._get_file_name(root_dir) + self.current_process = ProcessInfo.current_process(process_kind, creator_pid, **other_params) + self.active_process = self.atomic_get_active_process(process_kind, self.current_process) + + @classmethod + def _get_file_name(cls, root_dir: Path) -> Path: # The method is added for easier testing + return root_dir / DB_FILENAME + + def connect(self) -> sqlite3.Connection: + connection = sqlite3.connect(str(self.filename)) + try: + connection.execute('BEGIN EXCLUSIVE TRANSACTION') + connection.execute(CREATE_SQL) + return connection + except: # noqa: E722 + connection.close() + raise + + @contextmanager + def transaction(self) -> ContextManager[sqlite3.Connection]: + connection = None + try: + connection = self.connect() + yield connection + connection.execute('COMMIT') + connection.close() + + except Exception as e: + logger.exception(f'{e.__class__.__name__}: {e}') + if connection: + connection.close() # pragma: no cover + if isinstance(e, sqlite3.DatabaseError): + self.filename.unlink(missing_ok=True) + raise + + @with_retry + def atomic_get_active_process(self, kind: ProcessKind, + current_process: Optional[ProcessInfo] = None) -> Optional[ProcessInfo]: + active_process = None + with self.transaction() as connection: # pylint: disable=not-context-manager # false Pylint alarm + cursor = connection.execute(""" + SELECT * FROM processes WHERE kind = ? and active = 1 ORDER BY rowid DESC LIMIT 1 + """, [kind.value]) + row = cursor.fetchone() + if row is not None: + previous_active_process = ProcessInfo.from_row(row) + if previous_active_process.is_running(): + active_process = previous_active_process + else: + previous_active_process.active = 0 + previous_active_process.save(connection) + + if current_process is not None: + if active_process is None: + current_process.active = 1 + active_process = current_process + else: + current_process.active = 0 + current_process.canceled = 1 + current_process.save(connection) + + return active_process + + @with_retry + def save(self, process: ProcessInfo): + with self.transaction() as connection: # pylint: disable=not-context-manager # false Pylint alarm + process.save(connection) + + def set_api_port(self, api_port: int): + self.current_process.api_port = api_port + self.save(self.current_process) + + def sys_exit(self, exit_code: Optional[int] = None, error_msg: Optional[str] = None, + error_info: Optional[dict] = None, exc: Optional[Exception] = None, replace: bool = False): + if error_msg is not None: + self.current_process.set_error(error_msg, error_info, exc, replace) + self.current_process.mark_finished(exit_code) + self.save(self.current_process) + exit_code = self.current_process.exit_code + sys.exit(exit_code) + + @with_retry + def get_last_processes(self, limit=6) -> List[ProcessInfo]: + with self.transaction() as connection: # pylint: disable=not-context-manager # false Pylint alarm + cursor = connection.execute("""SELECT * FROM processes ORDER BY rowid DESC LIMIT ?""", [limit]) + result = [ProcessInfo.from_row(row) for row in cursor] + result.reverse() + return result diff --git a/src/tribler/core/utilities/tests/test_process_locker.py b/src/tribler/core/utilities/tests/test_process_locker.py new file mode 100644 index 00000000000..9cb2eceff90 --- /dev/null +++ b/src/tribler/core/utilities/tests/test_process_locker.py @@ -0,0 +1,298 @@ +import os +import re +from pathlib import Path +from unittest.mock import Mock, patch + +import psutil +import pytest + +from tribler.core.utilities.process_locker import logger, ProcessInfo, ProcessKind, ProcessLocker, \ + get_global_process_locker, set_api_port, set_error, set_global_process_locker + + +@pytest.fixture(name='process_locker') +def process_locker_fixture(tmp_path: Path): + return ProcessLocker(tmp_path, ProcessKind.Core) + + +def test_process_info(): + p = ProcessInfo.current_process(ProcessKind.Core, 123, arbitrary_param=456) + assert p.is_current_process() + assert p.is_running() + + d = p.to_dict() + d2 = {'active': 0, 'canceled': 0, 'kind': 'core', 'pid': os.getpid(), 'creator_pid': 123, + 'other_params': {'arbitrary_param': 456}} + assert d2.items() <= d.items() + assert 'app_version' in d and 'started_at' in d + + s = p.describe() + pattern = r"^CoreProcess\(pid=\d+, gui_pid=123, version='[^']+', started='\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}'\)$" + assert re.match(pattern, s) + + +@patch('psutil.Process') +@patch('psutil.pid_exists') +def test_process_info_is_running(pid_exists: Mock, process_class): + p = ProcessInfo.current_process(ProcessKind.GUI) + assert not pid_exists.called + + # if the pid does not exist, the process is not running + pid_exists.return_value = False + assert p.is_running() is False + assert pid_exists.called + + # if the instantiation of the Process instance lead to psutil.Error, the process is not running + pid_exists.return_value = True + process_class.side_effect = psutil.Error + assert p.is_running() is False + assert process_class.called + + # if the process is zombie, it is not considered to be running + process = Mock() + process.status.return_value = psutil.STATUS_ZOMBIE + process_class.side_effect = None + process_class.return_value = process + assert p.is_running() is False + + # if the process with the specified pid was created after the specified time, it is a different process + process.status.return_value = psutil.STATUS_RUNNING + process.create_time.return_value = p.started_at + 1 + assert p.is_running() is False + + # if the process exists, it is not a zombie, and its creation time matches the recorded value, it is running + process.create_time.return_value = p.started_at + assert p.is_running() is True + + +def test_process_info_set_error(): + p = ProcessInfo.current_process(ProcessKind.GUI) + + # Initially there is no exception + assert p.error_msg is None and p.error_info is None + + # In simplest case, just specify an + p.set_error('Error text 1') + + assert p.error_msg == 'Error text 1' and p.error_info is None + # By default, the second exception does not override the first one (as the first exception may be the root case) + + p.set_error('Error text 2') + assert p.error_msg == 'Error text 1' and p.error_info is None + + # But it is possible to override exception explicitly + p.set_error('Error text 2', replace=True) + assert p.error_msg == 'Error text 2' and p.error_info is None + + # It is possible to specify an additional dict with arbitrary JSON-serializable information about the error + p.set_error('Error text 3', error_info={'error3_param': 'error3_value'}, replace=True) + assert p.error_msg == 'Error text 3' and p.error_info == {'error3_param': 'error3_value'} + + # If the error is replaced, then the entire error_info dict is replaced as well, the dicts are not mixed together + p.set_error('Error text 4', error_info={'error4_param': 'error4_value'}, replace=True) + assert p.error_msg == 'Error text 4' and p.error_info == {'error4_param': 'error4_value'} + + # If error_info is not specified, the previous error_info is still replaced + p.set_error('Error text 5', replace=True) + assert p.error_msg == 'Error text 5' and p.error_info is None + + # It is possible to specify an exception + p.set_error(exc=ValueError('exception text'), error_info={'some_param': 'some_value'}, replace=True) + assert p.error_msg == 'ValueError: exception text' and p.error_info == {'some_param': 'some_value'} + + # The error text is included in ProcessInfo.describe() output + s = p.describe() + pattern = r"^GuiProcess\(pid=\d+, version='[^']+', started='[^']+', error='ValueError: exception text'\)$" + assert re.match(pattern, s) + + +def test_process_info_mark_finished(): + def make_process_info(): + p = ProcessInfo.current_process(ProcessKind.Core) + p.active = 1 + p.api_port = 10000 + return p + + p = make_process_info() + assert p.exit_code is None + assert p.finished_at is None + + p.mark_finished(123) + assert p.active == 0 + assert p.exit_code == 123 + assert p.finished_at is not None + + s = p.describe() + assert s.endswith(", api_port=10000, duration='0:00:00', exit_code=123)") + + p = make_process_info() + p.mark_finished() # the error is not set and the exit code is not specified, and by default should be 0 + assert p.exit_code == 0 + + p = make_process_info() + p.error_msg = 'Error text' + p.mark_finished() # the error is set and the exit code is not specified, and by default should be 1 + assert p.exit_code == 1 + + +@patch.object(logger, 'error') +def test_process_info_save(logger_error: Mock): + p = ProcessInfo.current_process(ProcessKind.Core) + assert p.rowid is None and p.row_version == 0 + + cursor = Mock(lastrowid=123) + connection = Mock() + connection.cursor.return_value = cursor + + p.save(connection) + assert "INSERT INTO" in cursor.execute.call_args[0][0] + assert p.rowid == 123 and p.row_version == 0 + + cursor.rowcount = 1 + p.save(connection) + assert "UPDATE" in cursor.execute.call_args[0][0] + assert p.rowid == 123 and p.row_version == 1 + + assert not logger_error.called + cursor.rowcount = 0 + p.save(connection) + assert logger_error.called + assert logger_error.call_args[0][0] == 'Row 123 with row version 1 was not found' + + p = ProcessInfo.current_process(ProcessKind.Core) + p.row_version = 1 + p.save(connection) + assert logger_error.call_args[0][0] == 'The `row_version` value for a new process row should not be set. Got: 1' + + +def test_connect(process_locker): + process_locker.filename = ':memory:' + process_locker.connect() + connection = process_locker.connect() + cursor = connection.execute('select * from processes') + column_names = [column[0] for column in cursor.description] + assert column_names == ['rowid', 'row_version', 'pid', 'kind', 'active', 'canceled', 'app_version', + 'started_at', 'creator_pid', 'api_port', 'shutdown_request_pid', 'shutdown_requested_at', + 'finished_at', 'exit_code', 'error_msg', 'error_info', 'other_params'] + + with patch('sqlite3.connect') as connect: + connection = Mock() + connect.return_value = connection + connection.execute.side_effect = ValueError + with pytest.raises(ValueError): + process_locker.connect() + assert connection.close.called + + +def test_atomic_get_active_process(process_locker: ProcessLocker): + assert process_locker.current_process.active == 1 + assert process_locker.active_process is process_locker.current_process + + fake_process = ProcessInfo.current_process(ProcessKind.Core) + fake_process.pid = fake_process.pid + 1 + active_process = process_locker.atomic_get_active_process(ProcessKind.Core, fake_process) + assert active_process.active == 1 + assert fake_process.active == 0 + + with process_locker.transaction() as connection: + connection.execute('update processes set pid = pid + 100') + + current_process = ProcessInfo.current_process(ProcessKind.Core) + active_process = process_locker.atomic_get_active_process(ProcessKind.Core, current_process) + assert current_process.active + assert active_process is current_process + + with process_locker.transaction() as connection: + rows = connection.execute('select rowid from processes where active = 1').fetchall() + assert len(rows) == 1 and rows[0][0] == current_process.rowid + + +def test_save(process_locker: ProcessLocker): + p = ProcessInfo.current_process(ProcessKind.Core) + p.pid = p.pid + 100 + process_locker.save(p) + assert p.rowid is not None + + +def test_set_api_port(process_locker: ProcessLocker): + process_locker.set_api_port(12345) + with process_locker.transaction() as connection: + rows = connection.execute('select rowid from processes where api_port = 12345').fetchall() + assert len(rows) == 1 and rows[0][0] == process_locker.current_process.rowid + + +@patch('sys.exit') +def test_sys_exit(sys_exit: Mock, process_locker: ProcessLocker): + process_locker.sys_exit(123, 'Error text') + + with process_locker.transaction() as connection: + rows = connection.execute('select active, error_msg from processes where rowid = ?', + [process_locker.current_process.rowid]).fetchall() + assert len(rows) == 1 and rows[0] == (0, 'Error text') + assert sys_exit.called and sys_exit.call_args[0][0] == 123 + + +def test_get_last_processes(process_locker: ProcessLocker): + last_processes = process_locker.get_last_processes() + assert len(last_processes) == 1 and last_processes[0].rowid == process_locker.current_process.rowid + + fake_process = ProcessInfo.current_process(ProcessKind.Core) + fake_process.pid = fake_process.pid + 1 + process_locker.atomic_get_active_process(ProcessKind.Core, fake_process) + + last_processes = process_locker.get_last_processes() + assert len(last_processes) == 2 + assert last_processes[0].rowid == process_locker.current_process.rowid + assert last_processes[1].rowid == fake_process.rowid + + +@patch.object(logger, 'warning') +def test_global_process_locker(warning: Mock, process_locker: ProcessLocker): + assert get_global_process_locker() is None + + set_api_port(12345) + assert warning.call_args[0][0] == 'Cannot set api_port for process locker: no process locker global instance is set' + + set_error('Error text') + assert warning.call_args[0][0] == 'Cannot set error for process locker: no process locker global instance is set' + + set_global_process_locker(process_locker) + assert get_global_process_locker() is process_locker + + set_api_port(12345) + set_error('Error text') + + assert process_locker.current_process.api_port == 12345 + assert process_locker.current_process.error_msg == 'Error text' + + set_global_process_locker(None) + assert get_global_process_locker() is None + + +def test_json_fields(process_locker: ProcessLocker): + p = process_locker.current_process + p.set_error('Error text', {'arbitrary_key': 'arbitrary_value'}) + p.other_params = {'some_key': 'some_value'} + process_locker.save(p) # should serialize `error_info` and `other_params` to JSON + processes = process_locker.get_last_processes() + assert len(processes) == 1 + p2 = processes[0] + assert p is not p2 # p2 is a new instance constructed from the database row + assert processes[0].error_info == {'arbitrary_key': 'arbitrary_value'} # parsed from the database + assert processes[0].other_params == {'some_key': 'some_value'} + + +@patch.object(logger, 'warning') +@patch.object(logger, 'exception') +def test_corrupted_database(logger_exception: Mock, logger_warning: Mock, process_locker: ProcessLocker): + db_content = process_locker.filename.read_bytes() + assert len(db_content) > 2000 + process_locker.filename.write_bytes(db_content[:1500]) # corrupt the database file + + # no exception, the database is silently re-created: + process_locker2 = ProcessLocker(process_locker.root_dir, ProcessKind.Core) + assert logger_exception.call_args[0][0] == 'DatabaseError: database disk image is malformed' + assert logger_warning.call_args[0][0] == 'Retrying after the error: DatabaseError: database disk image is malformed' + + processes = process_locker2.get_last_processes() + assert len(processes) == 1 diff --git a/src/tribler/gui/core_manager.py b/src/tribler/gui/core_manager.py index f1ba28675ba..56e72f8113f 100644 --- a/src/tribler/gui/core_manager.py +++ b/src/tribler/gui/core_manager.py @@ -7,7 +7,6 @@ from typing import Optional from PyQt5.QtCore import QObject, QProcess, QProcessEnvironment -from PyQt5.QtNetwork import QNetworkRequest from tribler.core.utilities.process_checker import ProcessChecker from tribler.gui import gui_sentry_reporter diff --git a/src/tribler/gui/error_handler.py b/src/tribler/gui/error_handler.py index 8a4f2889a84..de6d63faa40 100644 --- a/src/tribler/gui/error_handler.py +++ b/src/tribler/gui/error_handler.py @@ -6,6 +6,7 @@ from tribler.core.components.reporter.reported_error import ReportedError from tribler.core.sentry_reporter.sentry_reporter import SentryStrategy from tribler.core.sentry_reporter.sentry_scrubber import SentryScrubber +from tribler.core.utilities import process_locker from tribler.gui import gui_sentry_reporter from tribler.gui.app_manager import AppManager from tribler.gui.dialogs.feedbackdialog import FeedbackDialog @@ -29,6 +30,7 @@ def __init__(self, tribler_window): def gui_error(self, exc_type, exc, tb): self._logger.info(f'Processing GUI error: {exc_type}') + process_locker.set_error(f"GUI {exc.__class__.__name__}: {exc}", exc=exc) text = "".join(traceback.format_exception(exc_type, exc, tb)) self._logger.error(text) @@ -79,9 +81,10 @@ def gui_error(self, exc_type, exc, tb): def core_error(self, reported_error: ReportedError): if self._tribler_stopped or reported_error.type in self._handled_exceptions: return - self._logger.info(f'Processing Core error: {reported_error}') self._handled_exceptions.add(reported_error.type) + self._logger.info(f'Processing Core error: {reported_error}') + process_locker.set_error(f"Core {reported_error.type}: {reported_error.text}") error_text = f'{reported_error.text}\n{reported_error.long_text}' self._logger.error(error_text) diff --git a/src/tribler/gui/single_application.py b/src/tribler/gui/single_application.py index 0c18bec64fb..b0cd64c49d9 100644 --- a/src/tribler/gui/single_application.py +++ b/src/tribler/gui/single_application.py @@ -20,7 +20,7 @@ class QtSingleApplication(QApplication): message_received = pyqtSignal(str) - def __init__(self, win_id, *argv): + def __init__(self, win_id: str, another_process_is_active: bool, *argv): self.logger = logging.getLogger(self.__class__.__name__) self.logger.info(f'Start Tribler application. Win id: "{win_id}". ' f'Sys argv: "{sys.argv}"') @@ -35,7 +35,7 @@ def __init__(self, win_id, *argv): self._outgoing_connection.connectToServer(self._id) connected_to_previous_instance = self._outgoing_connection.waitForConnected() - self._is_app_already_running = connected_to_previous_instance + self._is_app_already_running = connected_to_previous_instance or another_process_is_active self._stream_to_running_app = None self._incoming_connection = None diff --git a/src/tribler/gui/start_gui.py b/src/tribler/gui/start_gui.py index d58c862e055..1d2e884038c 100644 --- a/src/tribler/gui/start_gui.py +++ b/src/tribler/gui/start_gui.py @@ -8,13 +8,14 @@ check_and_enable_code_tracing, check_environment, check_free_space, - enable_fault_handler, - error_and_exit, + enable_fault_handler ) from tribler.core.exceptions import TriblerException from tribler.core.logger.logger import load_logger_config from tribler.core.sentry_reporter.sentry_reporter import SentryStrategy +from tribler.core.utilities.process_locker import ProcessKind, ProcessLocker, set_global_process_locker from tribler.core.utilities.rest_utils import path_to_url +from tribler.core.utilities.utilities import show_system_popup from tribler.gui import gui_sentry_reporter from tribler.gui.app_manager import AppManager from tribler.gui.tribler_app import TriblerApplication @@ -41,14 +42,18 @@ def run_gui(api_port, api_key, root_state_dir, parsed_args): # Enable tracer using commandline args: --trace-debug or --trace-exceptions trace_logger = check_and_enable_code_tracing('gui', root_state_dir) - try: - enable_fault_handler(root_state_dir) - # Exit if we cant read/write files, etc. - check_environment() - check_free_space() + enable_fault_handler(root_state_dir) + # Exit if we cant read/write files, etc. + check_environment() + check_free_space() + + process_locker = ProcessLocker(root_state_dir, ProcessKind.GUI) + another_process_is_active = not process_locker.current_process.active + set_global_process_locker(process_locker) # to be able to add information about exception to the process info + try: app_name = os.environ.get('TRIBLER_APP_NAME', 'triblerapp') - app = TriblerApplication(app_name, sys.argv) + app = TriblerApplication(app_name, sys.argv, another_process_is_active) app_manager = AppManager(app) # Note (@ichorid): translator MUST BE created and assigned to a separate variable @@ -67,22 +72,25 @@ def run_gui(api_port, api_key, root_state_dir, parsed_args): elif arg.startswith('magnet'): app.send_message(arg) logger.info('Close the current application.') - sys.exit(1) + process_locker.sys_exit(1, 'Tribler GUI application is already running') logger.info('Start Tribler Window') window = TriblerWindow(app_manager, settings, root_state_dir, api_port=api_port, api_key=api_key) window.setWindowTitle("Tribler") app.tribler_window = window app.parse_sys_args(sys.argv) - sys.exit(app.exec_()) + exit_code = app.exec_() + process_locker.sys_exit(exit_code or None) except ImportError as ie: logger.exception(ie) - error_and_exit("Import Error", f"Import error: {ie}") + show_system_popup("Import Error", f"Import error: {ie}") + process_locker.sys_exit(1, exc=ie) except TriblerException as te: logger.exception(te) - error_and_exit("Tribler Exception", f"{te}") + show_system_popup("Tribler Exception", f"{te.__class__.__name__}: {te}") + process_locker.sys_exit(1, exc=te) except SystemExit: logger.info("Shutting down Tribler") diff --git a/src/tribler/gui/tests/test_gui.py b/src/tribler/gui/tests/test_gui.py index 622c622a64d..a0b3bb80642 100644 --- a/src/tribler/gui/tests/test_gui.py +++ b/src/tribler/gui/tests/test_gui.py @@ -40,7 +40,7 @@ def fixture_window(tmpdir_factory): api_key = hexlify(os.urandom(16)) root_state_dir = str(tmpdir_factory.mktemp('tribler_state_dir')) - app = TriblerApplication("triblerapp-guitest", sys.argv) + app = TriblerApplication("triblerapp-guitest", sys.argv, another_process_is_active=False) app_manager = AppManager(app) # We must create a separate instance of QSettings and clear it. # Otherwise, previous runs of the same app will affect this run. diff --git a/src/tribler/gui/tribler_app.py b/src/tribler/gui/tribler_app.py index 74ab74ac0d2..6469d81ec67 100644 --- a/src/tribler/gui/tribler_app.py +++ b/src/tribler/gui/tribler_app.py @@ -23,8 +23,8 @@ class TriblerApplication(QtSingleApplication): This class represents the main Tribler application. """ - def __init__(self, app_name, args): - QtSingleApplication.__init__(self, app_name, args) + def __init__(self, app_name: str, args, another_process_is_active: bool): + QtSingleApplication.__init__(self, app_name, another_process_is_active, args) self.code_executor = None connect(self.message_received, self.on_app_message) diff --git a/src/tribler/gui/tribler_window.py b/src/tribler/gui/tribler_window.py index 014227b2aee..daf0479f81f 100644 --- a/src/tribler/gui/tribler_window.py +++ b/src/tribler/gui/tribler_window.py @@ -44,6 +44,7 @@ from psutil import LINUX from tribler.core.upgrade.version_manager import VersionHistory +from tribler.core.utilities.process_locker import set_api_port from tribler.core.utilities.network_utils import default_network_utils from tribler.core.utilities.rest_utils import ( FILE_SCHEME, @@ -182,6 +183,7 @@ def __init__( "Tribler configuration conflicts with the current OS state: " "REST API port %i already in use" % api_port ) + set_api_port(api_port) api_key = format_api_key(api_key or get_gui_setting(self.gui_settings, "api_key", None) or create_api_key()) set_api_key(self.gui_settings, api_key)