diff --git a/src/tribler/core/start_core.py b/src/tribler/core/start_core.py index 5ad24591f23..b7c6d37aced 100644 --- a/src/tribler/core/start_core.py +++ b/src/tribler/core/start_core.py @@ -16,6 +16,7 @@ from tribler.core.components.component import Component from tribler.core.components.gigachannel.gigachannel_component import GigaChannelComponent from tribler.core.components.gigachannel_manager.gigachannel_manager_component import GigachannelManagerComponent +from tribler.core.components.gui_process_watcher.gui_process_watcher import GuiProcessWatcher from tribler.core.components.gui_process_watcher.gui_process_watcher_component import GuiProcessWatcherComponent from tribler.core.components.ipv8.ipv8_component import Ipv8Component from tribler.core.components.key.key_component import KeyComponent @@ -39,6 +40,7 @@ from tribler.core.sentry_reporter.sentry_reporter import SentryReporter, SentryStrategy from tribler.core.upgrade.version_manager import VersionHistory from tribler.core.utilities.process_checker import single_tribler_instance +from tribler.core.utilities.process_locker import ProcessKind, ProcessLocker logger = logging.getLogger(__name__) CONFIG_FILE_NAME = 'triblerd.conf' @@ -177,10 +179,14 @@ def run_core(api_port, api_key, root_state_dir, parsed_args): logger.info('Running Core' + ' in gui_test_mode' if parsed_args.gui_test_mode else '') load_logger_config('tribler-core', root_state_dir) + gui_pid = GuiProcessWatcher.get_gui_pid() + process_locker = ProcessLocker(root_state_dir, ProcessKind.Core, gui_pid) + if not process_locker.current_process.active: + logger.warning('The current') + with single_tribler_instance(root_state_dir): version_history = VersionHistory(root_state_dir) state_dir = version_history.code_version.directory exit_code = run_tribler_core_session(api_port, api_key, state_dir, gui_test_mode=parsed_args.gui_test_mode) - if exit_code: - sys.exit(exit_code) + process_locker.sys_exit(exit_code) diff --git a/src/tribler/core/utilities/process_locker.py b/src/tribler/core/utilities/process_locker.py new file mode 100644 index 00000000000..290c5518e6e --- /dev/null +++ b/src/tribler/core/utilities/process_locker.py @@ -0,0 +1,295 @@ +from __future__ import annotations + +import json +import logging +import os +import sqlite3 +import sys +import time +from enum import Enum +from pathlib import Path +from threading import Lock +from typing import ContextManager, Optional + +import psutil +from decorator import contextmanager + +from tribler.core.version import version_id + +DB_FILENAME = 'processes.sqlite' + + +class DbException(Exception): + pass + + +class TransactionException(DbException): + pass + + +class UpdateException(DbException): + pass + + +class ProcessKind(Enum): + GUI = 'gui' + Core = 'core' + + +CREATE_SQL = """ + CREATE TABLE IF NOT EXISTS processes ( + rowid INTEGER PRIMARY KEY AUTOINCREMENT, + row_version INTEGER NOT NULL DEFAULT 0, + pid INTEGER NOT NULL, + kind TEXT NOT NULL, + active INT NOT NULL, + canceled INT NOT NULL, + app_version TEXT NOT NULL, + started_at INT NOT NULL, + creator_pid INT, + api_port INT, + shutdown_request_pid INT, + shutdown_requested_at INT, + finished_at INT, + exit_code INT, + error_msg TEXT, + error_info JSON, + other_params JSON + ) +""" + + +logger = logging.getLogger(__name__) + + +def to_json(value) -> Optional[str]: + if value is None: + return None + return json.dumps(value) + + +def from_json(value) -> Optional[dict]: + if value is None: + return None + return json.loads(value) + + +class ProcessInfo: + def __init__(self, pid: int, kind: ProcessKind, app_version: str, started_at: int, + rowid: Optional[int] = None, creator_pid: Optional[int] = None, active: int = 0, canceled: int = 0, + row_version: int = 0, api_port: Optional[int] = None, finished_at: Optional[int] = None, + exit_code: Optional[int] = None, error_msg: Optional[str] = None, error_info: Optional[dict] = None, + shutdown_request_pid: Optional[int] = None, shutdown_requested_at: Optional[int] = None, + other_params: Optional[dict] = None): + self.rowid = rowid + self.row_version = row_version + self.pid = pid + self.kind = kind + self.active = active + self.canceled = canceled + self.app_version = app_version + self.started_at = started_at + self.creator_pid = creator_pid + self.api_port = api_port + self.finished_at = finished_at + self.exit_code = exit_code + self.error_msg = error_msg + self.error_info = error_info + self.shutdown_request_pid = shutdown_request_pid + self.shutdown_requested_at = shutdown_requested_at + self.other_params = other_params + + @classmethod + def from_row(cls, row: tuple) -> ProcessInfo: + rowid, row_version, pid, kind, active, canceled, app_version, started_at, creator_pid, api_port, \ + shutdown_request_pid, shutdown_requested_at, finished_at, exit_code, error_msg, error_info, \ + other_params = row + + kind = ProcessKind(kind) + + return ProcessInfo(rowid=rowid, row_version=row_version, pid=pid, kind=kind, active=active, canceled=canceled, + app_version=app_version, started_at=started_at, creator_pid=creator_pid, + api_port=api_port, shutdown_request_pid=shutdown_request_pid, + shutdown_requested_at=shutdown_requested_at, finished_at=finished_at, + exit_code=exit_code, error_msg=error_msg, error_info=from_json(error_info), + other_params=from_json(other_params)) + + @classmethod + def current_process(cls, kind: ProcessKind, creator_pid: Optional[int] = None, **other_params) -> ProcessInfo: + return cls(pid=os.getpid(), kind=kind, app_version=version_id, started_at=int(time.time()), + creator_pid=creator_pid, row_version=0, other_params=other_params or None) + + def is_current_process(self): + return self.pid == os.getpid() + + def still_active(self): + if not psutil.pid_exists(self.pid): + return False + + try: + process = psutil.Process(self.pid) + status = process.status() + except psutil.Error as e: + logger.warning(e) + return False + + if status == psutil.STATUS_ZOMBIE: + return False + + if process.create_time() > self.started_at: + return False + + return True + + def set_error(self, error_msg: Optional[str] = None, error_info: Optional[dict] = None, + exc: Optional[Exception] = None, replace: bool = False): + if exc and not error_msg: + error_msg = f"{exc.__class__.__name__}: {exc}" + + if replace: + self.error_msg = error_msg + self.error_info = error_info + else: + self.error_msg = self.error_msg or error_msg + self.error_info = self.error_info or error_info + + def mark_finished(self, exit_code: Optional[int] = None): + self.active = 0 + self.finished_at = time.time() + self.exit_code = exit_code + + def save(self, con: sqlite3.Connection): + cursor = con.cursor() + if self.rowid is None: + self._before_insert_check() + self.row_version = 0 + cursor.execute(""" + INSERT INTO processes ( + pid, kind, active, canceled, app_version, started_at, + creator_pid, api_port, shutdown_request_pid, shutdown_requested_at, + finished_at, exit_code, error_msg, error_info, other_params + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, [self.pid, self.kind.value, self.active, self.canceled, self.app_version, self.started_at, + self.creator_pid, self.api_port, self.shutdown_request_pid, self.shutdown_requested_at, + self.finished_at, self.exit_code, self.error_msg, to_json(self.error_info), + to_json(self.other_params)]) + self.rowid = cursor.lastrowid + else: + self._before_update_check() + prev_version = self.row_version + self.row_version += 1 + cursor.execute(""" + UPDATE processes + SET row_version = ?, active = ?, canceled = ?, creator_pid = ?, api_port = ?, + shutdown_request_pid = ?, shutdown_requested_at = ?, finished_at = ?, + exit_code = ?, error_msg = ?, error_info = ?, other_params = ? + WHERE rowid = ? and row_version = ? and pid = ? and kind = ? and app_version = ? and started_at = ? + """, [self.row_version, self.active, self.canceled, self.creator_pid, self.api_port, + self.shutdown_request_pid, self.shutdown_requested_at, self.finished_at, + self.exit_code, self.error_msg, to_json(self.error_info), to_json(self.other_params), + self.rowid, prev_version, self.pid, self.kind.value, self.app_version, self.started_at]) + if cursor.rowcount == 0: + raise UpdateException(f'Row {self.rowid} with row version {prev_version} was not found') + + def _before_insert_check(self): + if self.row_version: + logging.error(f"row_version column for a new row should not be set. Got: {self.row_version}") + + def _before_update_check(self): + if self.rowid is None: + logging.error("rowid for an existing row should not be None") + if self.row_version is None: + logging.error(f"row_version column for an existing row {self.rowid} should not be None") + + +global_process_locker: Optional[{ProcessLocker}] = None + +_lock = Lock() + + +def set_global_process_locker(process_locker: ProcessLocker): + global global_process_locker + with _lock: + global_process_locker = process_locker + + +def get_global_process_locker() -> Optional[ProcessLocker]: + with _lock: + return global_process_locker + + +def set_error(error_msg: Optional[str] = None, error_info: Optional[dict] = None, + exc: Optional[Exception] = None, replace: bool = False): + process_locker = get_global_process_locker() + if process_locker is None: + logger.warning('Cannot set error for process locker: process locker is not set') + process_locker.current_process.set_error(error_msg, error_info, exc, replace) + + +class ProcessLocker: + filename: Path + current_process: ProcessInfo + active_process: ProcessInfo + + def __init__(self, root_dir: Path, process_kind: ProcessKind, creator_pid: Optional[int] = None, **other_params): + filename = root_dir / DB_FILENAME + self.filename = filename + self.current_process = ProcessInfo.current_process(process_kind, creator_pid, **other_params) + self.active_process = self.atomic_get_active_process(process_kind, self.current_process) + + def connect(self) -> sqlite3.Connection: + connection = sqlite3.connect(self.filename) # TODO: check that the file is not corrupted + connection.execute('BEGIN EXCLUSIVE TRANSACTION') + connection.execute(CREATE_SQL) + return connection + + def transaction(self) -> ContextManager[sqlite3.Connection]: + @contextmanager + def transaction_context_manager(): # this additional level of wrapping is a workaround for PyCharm bug + connection = self.connect() + try: + yield connection + connection.execute('COMMIT') + connection.close() + + except Exception as e: + connection.execute('ROLLBACK') + connection.close() + raise e + + return transaction_context_manager() + + def atomic_get_active_process(self, kind: ProcessKind, + current_process: Optional[ProcessInfo] = None) -> Optional[ProcessInfo]: + active_process = None + with self.transaction() as connection: + cursor = connection.execute(""" + SELECT * FROM processes WHERE kind = ? and active = 1 ORDER BY rowid DESC LIMIT 1 + """, [kind.value]) + row = cursor.fetchone() + if row is not None: + previous_active_process = ProcessInfo.from_row(row) + if previous_active_process.still_active(): + active_process = previous_active_process + else: + previous_active_process.active = 0 + previous_active_process.save(connection) + + if active_process is None: + current_process.active = 1 + active_process = current_process + else: + current_process.active = 0 + current_process.canceled = 1 + + current_process.save(connection) + return active_process + + def save(self, process): + with self.transaction() as connection: + process.save(connection) + + def sys_exit(self, exit_code: int = 0): + self.current_process.mark_finished(exit_code) + self.save(self.current_process) + sys.exit(exit_code) diff --git a/src/tribler/gui/single_application.py b/src/tribler/gui/single_application.py index 23624acfc9a..7c2e8e1a183 100644 --- a/src/tribler/gui/single_application.py +++ b/src/tribler/gui/single_application.py @@ -8,6 +8,7 @@ from PyQt5.QtNetwork import QLocalServer, QLocalSocket from PyQt5.QtWidgets import QApplication +from tribler.core.utilities.process_locker import ProcessLocker from tribler.gui.tribler_window import TriblerWindow from tribler.gui.utilities import connect, disconnect @@ -20,7 +21,7 @@ class QtSingleApplication(QApplication): message_received = pyqtSignal(str) - def __init__(self, win_id, *argv): + def __init__(self, win_id, process_locker: ProcessLocker, *argv): self.logger = logging.getLogger(self.__class__.__name__) self.logger.info(f'Start Tribler application. Win id: "{win_id}". ' f'Sys argv: "{sys.argv}"') @@ -30,12 +31,14 @@ def __init__(self, win_id, *argv): self._id = win_id + self.process_locker = process_locker + # Is there another instance running? self._outgoing_connection = QLocalSocket() self._outgoing_connection.connectToServer(self._id) connected_to_previous_instance = self._outgoing_connection.waitForConnected() - self._is_app_already_running = connected_to_previous_instance + self._is_app_already_running = connected_to_previous_instance or not process_locker.current_process.active self._stream_to_running_app = None self._incoming_connection = None diff --git a/src/tribler/gui/start_gui.py b/src/tribler/gui/start_gui.py index d58c862e055..b2c7c490fbb 100644 --- a/src/tribler/gui/start_gui.py +++ b/src/tribler/gui/start_gui.py @@ -8,13 +8,14 @@ check_and_enable_code_tracing, check_environment, check_free_space, - enable_fault_handler, - error_and_exit, + enable_fault_handler ) from tribler.core.exceptions import TriblerException from tribler.core.logger.logger import load_logger_config from tribler.core.sentry_reporter.sentry_reporter import SentryStrategy +from tribler.core.utilities.process_locker import ProcessKind, ProcessLocker from tribler.core.utilities.rest_utils import path_to_url +from tribler.core.utilities.utilities import show_system_popup from tribler.gui import gui_sentry_reporter from tribler.gui.app_manager import AppManager from tribler.gui.tribler_app import TriblerApplication @@ -41,14 +42,16 @@ def run_gui(api_port, api_key, root_state_dir, parsed_args): # Enable tracer using commandline args: --trace-debug or --trace-exceptions trace_logger = check_and_enable_code_tracing('gui', root_state_dir) - try: - enable_fault_handler(root_state_dir) - # Exit if we cant read/write files, etc. - check_environment() - check_free_space() + enable_fault_handler(root_state_dir) + # Exit if we cant read/write files, etc. + check_environment() + check_free_space() + + process_locker = ProcessLocker(root_state_dir, ProcessKind.GUI) + try: app_name = os.environ.get('TRIBLER_APP_NAME', 'triblerapp') - app = TriblerApplication(app_name, sys.argv) + app = TriblerApplication(app_name, sys.argv, process_locker) app_manager = AppManager(app) # Note (@ichorid): translator MUST BE created and assigned to a separate variable @@ -67,22 +70,27 @@ def run_gui(api_port, api_key, root_state_dir, parsed_args): elif arg.startswith('magnet'): app.send_message(arg) logger.info('Close the current application.') - sys.exit(1) + process_locker.current_process.set_error(error_msg='Application is already running') + process_locker.sys_exit(1) logger.info('Start Tribler Window') window = TriblerWindow(app_manager, settings, root_state_dir, api_port=api_port, api_key=api_key) window.setWindowTitle("Tribler") app.tribler_window = window app.parse_sys_args(sys.argv) - sys.exit(app.exec_()) + process_locker.sys_exit(app.exec_()) except ImportError as ie: logger.exception(ie) - error_and_exit("Import Error", f"Import error: {ie}") + show_system_popup("Import Error", f"Import error: {ie}") + process_locker.current_process.set_error(exc=ie) + process_locker.sys_exit(1) except TriblerException as te: logger.exception(te) - error_and_exit("Tribler Exception", f"{te}") + show_system_popup("Tribler Exception", f"{te}") + process_locker.current_process.set_error(exc=te) + process_locker.sys_exit(1) except SystemExit: logger.info("Shutting down Tribler") diff --git a/src/tribler/gui/tribler_app.py b/src/tribler/gui/tribler_app.py index 74ab74ac0d2..0740e326f17 100644 --- a/src/tribler/gui/tribler_app.py +++ b/src/tribler/gui/tribler_app.py @@ -4,6 +4,7 @@ from PyQt5.QtCore import QCoreApplication, QEvent, Qt +from tribler.core.utilities.process_locker import ProcessLocker from tribler.core.utilities.rest_utils import path_to_url from tribler.core.utilities.unicode import ensure_unicode from tribler.gui.code_executor import CodeExecutor @@ -23,8 +24,8 @@ class TriblerApplication(QtSingleApplication): This class represents the main Tribler application. """ - def __init__(self, app_name, args): - QtSingleApplication.__init__(self, app_name, args) + def __init__(self, app_name, args, process_locker: ProcessLocker): + QtSingleApplication.__init__(self, app_name, process_locker, args) self.code_executor = None connect(self.message_received, self.on_app_message)