Skip to content

Commit

Permalink
Add ProcessManager to ensure that no more than one Tribler GUI/Core p…
Browse files Browse the repository at this point in the history
…rocess runs locally at any moment (#7212)

* Rename QtSingleApplication attributes from camelCase to snake_case

* Rename QtSingleApplication method: close() -> cleanup_crashed_server()

* Add ProcessLocker

* Fix add ProcessLocker

* Renaming: ProcessLocker -> ProcessManager

* Renaming: ProcessInfo -> TriblerProcess

* Make to_json & from_json static methods; remove to_dict

* Rename: active process -> primary process

* Split process_manager into tribler_process & tribler_process_manager

* Reduce the using of global variables

* Move tribler_process and tribler_process_manager into a process_manager subpackage

* Remove unnecessary line of code

* Formatting

* Refactoring: integrate ProcessManager.connect() into ProcessManager.transaction()

* Renaming: ProcessManager.transaction() -> ProcessManager._connect()

* Renaming: ProcessManager.filename -> ProcessManager.db_filepath

* Remove TriblerProcess.other_params

* Simplify set_error() signature; remove TriblerProcess.error_info & json-related methods

* Remove shutdown_request_pid & shutdown_requested_at

* Remove _before_insert_check()

* Make the code a bit easier to read

* Add a docstring to a `with_retry` decorator

* Remove ProcessManager._get_file_name()

* Extract ProcessManager._load_primary_process() from ProcessManager.atomic_get_primary_process()

* Refactor the process - manager interaction, API and tests

* Clean up old exception logic

* Auto-delete old records from the database

* Add comments to SQL scripts

* Add docstrings to ProcessManager methods

* Extract `with_retry` decorator to a separate utils module and make it applicable both for ProcessManager and TriblerProcess methods

* Specify exact columns in SELECT commands

* Rename: TriblerProcess.descrive() -> TriblerProcess.__str__()

* Fix TriblerProcess.is_current_process()

* TriblerProcess.mark_finished() -> TriblerProcess.finish()

* Some docstrings added

* Remove ProcessChecker

* Extract columns of SELECT queries to a separate constant

* Set Core API port in RESTManager

* Create current_process outside of ProcessManager

* Refactoring: ProcessManager.atomic_get_primary_process() -> TriblerProcess.become_primary()

* Use assignment expression

* Make ProcessManager.logger an attribute instead of a property

* Refactor QtSingleApplication; pass start_local_server argument instead of another_process_is_primary

* Satisfy linter

* Make primary and canceled true boolean attributes

* Fix test_become_primary and add comments to it

* Split test_tribler_process_is_running to five tests

* Split test_tribler_process_set_error to three tests

* Simplify test_tribler_process_save

* Simplify TriblerProcess tests

* Add an additional safety belt for test_global_process_manager

* Rename: set_error -> set_error_for_current_process; add docstring

* Remove set_error_for_current_process function

* Fix empty info log messages

* Fix: test_load_logger should not change log configuration as it can affect the subsequent tests

* Non-primary Tribler processes should not write logs into files

* Add try/finally block to make test_global_process_manager extra safe

* Remove test_global_process_manager according to @drew2a request
  • Loading branch information
kozlovsky committed Jan 9, 2023
1 parent d76a1f8 commit 8033362
Show file tree
Hide file tree
Showing 29 changed files with 913 additions and 520 deletions.
2 changes: 2 additions & 0 deletions src/run_tribler.py
Expand Up @@ -83,6 +83,8 @@ def init_boot_logger():
logger.info(f'Root state dir: {root_state_dir}')

api_port = os.environ.get('CORE_API_PORT')
api_port = int(api_port) if api_port else None

api_key = os.environ.get('CORE_API_KEY')

# Check whether we need to start the core or the user interface
Expand Down
7 changes: 7 additions & 0 deletions src/tribler/core/components/reporter/exception_handler.py
Expand Up @@ -10,6 +10,7 @@
from tribler.core.components.component import ComponentStartupException
from tribler.core.components.reporter.reported_error import ReportedError
from tribler.core.sentry_reporter.sentry_reporter import SentryReporter
from tribler.core.utilities.process_manager import get_global_process_manager

# There are some errors that we are ignoring.
IGNORED_ERRORS_BY_CODE = {
Expand Down Expand Up @@ -77,6 +78,7 @@ def unhandled_error_observer(self, _, context):
It broadcasts the tribler_exception event.
"""
self.logger.info('Processing unhandled error...')
process_manager = get_global_process_manager()
try:
self.sentry_reporter.ignore_logger(self.logger.name)

Expand Down Expand Up @@ -111,6 +113,8 @@ def unhandled_error_observer(self, _, context):
should_stop=should_stop
)
self.logger.error(f"Unhandled exception occurred! {reported_error}\n{reported_error.long_text}")
if process_manager:
process_manager.current_process.set_error(exception)

if self.report_callback:
self.logger.error('Call report callback')
Expand All @@ -123,6 +127,9 @@ def unhandled_error_observer(self, _, context):
self.unreported_error = reported_error

except Exception as ex:
if process_manager:
process_manager.current_process.set_error(ex)

self.sentry_reporter.capture_exception(ex)
self.logger.exception(f'Error occurred during the error handling: {ex}')
raise ex
Expand Down
11 changes: 10 additions & 1 deletion src/tribler/core/components/restapi/rest/rest_manager.py
Expand Up @@ -17,6 +17,7 @@
)
from tribler.core.components.restapi.rest.root_endpoint import RootEndpoint
from tribler.core.components.restapi.rest.settings import APISettings
from tribler.core.utilities.process_manager import get_global_process_manager
from tribler.core.version import version_id

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -86,6 +87,13 @@ def __init__(self, config: APISettings, root_endpoint: RootEndpoint, state_dir=N
def get_endpoint(self, name):
return self.root_endpoint.endpoints.get('/' + name)

def set_api_port(self, api_port: int):
if self.config.http_port != api_port:
self.config.http_port = api_port
process_manager = get_global_process_manager()
if process_manager:
process_manager.current_process.set_api_port(api_port)

async def start(self):
"""
Starts the HTTP API with the listen port as specified in the session configuration.
Expand Down Expand Up @@ -124,14 +132,15 @@ async def start(self):
api_port = self.config.http_port
if not self.config.retry_port:
self.site = web.TCPSite(self.runner, self.http_host, api_port)
self.set_api_port(api_port)
await self.site.start()
else:
bind_attempts = 0
while bind_attempts < 10:
try:
self.site = web.TCPSite(self.runner, self.http_host, api_port + bind_attempts)
await self.site.start()
self.config.http_port = api_port + bind_attempts
self.set_api_port(api_port + bind_attempts)
break
except OSError:
bind_attempts += 1
Expand Down
2 changes: 1 addition & 1 deletion src/tribler/core/components/session.py
Expand Up @@ -27,7 +27,7 @@ class Session:
def __init__(self, config: TriblerConfig = None, components: List[Component] = (),
shutdown_event: Event = None, notifier: Notifier = None, failfast: bool = True):
# deepcode ignore unguarded~next~call: not necessary to catch StopIteration on infinite iterator
self.exit_code = 0
self.exit_code = None
self.failfast = failfast
self.logger = logging.getLogger(self.__class__.__name__)
self.config: TriblerConfig = config or TriblerConfig()
Expand Down
3 changes: 0 additions & 3 deletions src/tribler/core/exceptions.py
Expand Up @@ -8,9 +8,6 @@
class TriblerException(Exception):
"""Super class for all Tribler-specific Exceptions the Tribler Core throws."""

def __str__(self):
return str(self.__class__) + ': ' + Exception.__str__(self)


class OperationNotPossibleAtRuntimeException(TriblerException):
"""The requested operation is not possible after the Session or Download has been started."""
Expand Down
14 changes: 11 additions & 3 deletions src/tribler/core/logger/logger.py
Expand Up @@ -16,13 +16,21 @@ def filter(self, record):
return record.levelno < logging.ERROR


def load_logger_config(app_mode, log_dir):
def load_logger_config(app_mode, log_dir, current_process_is_primary=True):
"""
Loads tribler-gui module logger configuration. Note that this function should be called explicitly to
enable GUI logs dump to a file in the log directory (default: inside state directory).
"""
logger_config_path = get_logger_config_path()
setup_logging(app_mode, Path(log_dir), logger_config_path)
if current_process_is_primary:
# Set up logging to files for primary process only, as logging module does not support
# writing to the same log file from multiple Python processes
logger_config_path = get_logger_config_path()
setup_logging(app_mode, Path(log_dir), logger_config_path)
else:
logger.info('Skip the initialization of a normal file-based logging as the current process is non-primary.\n'
'Continue using the basic logging config from the boot logger initialization.\n'
'Only primary Tribler process can write to Tribler log files, as logging module does not support\n'
'writing to files from multiple Python processes.')


def get_logger_config_path():
Expand Down
7 changes: 6 additions & 1 deletion src/tribler/core/sentry_reporter/sentry_reporter.py
Expand Up @@ -147,7 +147,8 @@ def add_breadcrumb(self, message='', category='', level='info', **kwargs):
return sentry_sdk.add_breadcrumb(crumb, **kwargs)

def send_event(self, event: Dict = None, post_data: Dict = None, sys_info: Dict = None,
additional_tags: List[str] = None, last_core_output: Optional[str] = None):
additional_tags: List[str] = None, last_core_output: Optional[str] = None,
last_processes: List[str] = None):
"""Send the event to the Sentry server
This method
Expand All @@ -168,6 +169,7 @@ def send_event(self, event: Dict = None, post_data: Dict = None, sys_info: Dict
sys_info: dictionary made by the feedbackdialog.py
additional_tags: tags that will be added to the event
last_core_output: string that represents last core output
last_processes: list of strings describing last Tribler GUI/Core processes
Returns:
Event that was sent to Sentry server
Expand Down Expand Up @@ -219,6 +221,9 @@ def send_event(self, event: Dict = None, post_data: Dict = None, sys_info: Dict
reporter['events'] = extract_dict(sys_info, r'^(event|request)')
reporter[SYSINFO] = {key: sys_info[key] for key in sys_info if key not in reporter['events']}

if last_processes:
reporter['last_processes'] = last_processes

# try to retrieve an error from the last_core_output
if last_core_output:
# split for better representation in the web view
Expand Down
42 changes: 29 additions & 13 deletions src/tribler/core/start_core.py
Expand Up @@ -16,6 +16,7 @@
from tribler.core.components.component import Component
from tribler.core.components.gigachannel.gigachannel_component import GigaChannelComponent
from tribler.core.components.gigachannel_manager.gigachannel_manager_component import GigachannelManagerComponent
from tribler.core.components.gui_process_watcher.gui_process_watcher import GuiProcessWatcher
from tribler.core.components.gui_process_watcher.gui_process_watcher_component import GuiProcessWatcherComponent
from tribler.core.components.ipv8.ipv8_component import Ipv8Component
from tribler.core.components.key.key_component import KeyComponent
Expand All @@ -38,7 +39,8 @@
from tribler.core.logger.logger import load_logger_config
from tribler.core.sentry_reporter.sentry_reporter import SentryReporter, SentryStrategy
from tribler.core.upgrade.version_manager import VersionHistory
from tribler.core.utilities.process_checker import single_tribler_instance
from tribler.core.utilities.process_manager import ProcessKind, ProcessManager, TriblerProcess, \
set_global_process_manager

logger = logging.getLogger(__name__)
CONFIG_FILE_NAME = 'triblerd.conf'
Expand Down Expand Up @@ -119,7 +121,7 @@ async def core_session(config: TriblerConfig, components: List[Component]) -> in
return session.exit_code


def run_tribler_core_session(api_port: str, api_key: str, state_dir: Path, gui_test_mode: bool = False) -> int:
def run_tribler_core_session(api_port: int, api_key: str, state_dir: Path, gui_test_mode: bool = False) -> int:
"""
This method will start a new Tribler session.
Note that there is no direct communication between the GUI process and the core: all communication is performed
Expand All @@ -137,7 +139,7 @@ def run_tribler_core_session(api_port: str, api_key: str, state_dir: Path, gui_t
if SentryReporter.is_in_test_mode():
default_core_exception_handler.sentry_reporter.global_strategy = SentryStrategy.SEND_ALLOWED

config.api.http_port = int(api_port)
config.api.http_port = api_port
# If the API key is set to an empty string, it will remain disabled
if config.api.key not in ('', api_key):
config.api.key = api_key
Expand Down Expand Up @@ -174,13 +176,27 @@ def run_tribler_core_session(api_port: str, api_key: str, state_dir: Path, gui_t


def run_core(api_port, api_key, root_state_dir, parsed_args):
logger.info('Running Core' + ' in gui_test_mode' if parsed_args.gui_test_mode else '')
load_logger_config('tribler-core', root_state_dir)

with single_tribler_instance(root_state_dir):
version_history = VersionHistory(root_state_dir)
state_dir = version_history.code_version.directory
exit_code = run_tribler_core_session(api_port, api_key, state_dir, gui_test_mode=parsed_args.gui_test_mode)

if exit_code:
sys.exit(exit_code)
logger.info(f"Running Core in {'gui_test_mode' if parsed_args.gui_test_mode else 'normal mode'}")

gui_pid = GuiProcessWatcher.get_gui_pid()
current_process = TriblerProcess.current_process(ProcessKind.Core, creator_pid=gui_pid)
process_manager = ProcessManager(root_state_dir, current_process)
set_global_process_manager(process_manager)
current_process_is_primary = process_manager.current_process.become_primary()

load_logger_config('tribler-core', root_state_dir, current_process_is_primary)

if not current_process_is_primary:
msg = 'Another Core process is already running'
logger.warning(msg)
process_manager.sys_exit(1, msg)

if api_port is None:
msg = 'api_port is not specified for a core process'
logger.error(msg)
process_manager.sys_exit(1, msg)

version_history = VersionHistory(root_state_dir)
state_dir = version_history.code_version.directory
exit_code = run_tribler_core_session(api_port, api_key, state_dir, gui_test_mode=parsed_args.gui_test_mode)
process_manager.sys_exit(exit_code)
169 changes: 0 additions & 169 deletions src/tribler/core/utilities/process_checker.py

This file was deleted.

0 comments on commit 8033362

Please sign in to comment.