Skip to content

Commit

Permalink
Add process_locker
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovsky committed Dec 9, 2022
1 parent 9cd05b7 commit 7ae2a89
Show file tree
Hide file tree
Showing 5 changed files with 331 additions and 18 deletions.
10 changes: 8 additions & 2 deletions src/tribler/core/start_core.py
Expand Up @@ -16,6 +16,7 @@
from tribler.core.components.component import Component
from tribler.core.components.gigachannel.gigachannel_component import GigaChannelComponent
from tribler.core.components.gigachannel_manager.gigachannel_manager_component import GigachannelManagerComponent
from tribler.core.components.gui_process_watcher.gui_process_watcher import GuiProcessWatcher
from tribler.core.components.gui_process_watcher.gui_process_watcher_component import GuiProcessWatcherComponent
from tribler.core.components.ipv8.ipv8_component import Ipv8Component
from tribler.core.components.key.key_component import KeyComponent
Expand All @@ -39,6 +40,7 @@
from tribler.core.sentry_reporter.sentry_reporter import SentryReporter, SentryStrategy
from tribler.core.upgrade.version_manager import VersionHistory
from tribler.core.utilities.process_checker import single_tribler_instance
from tribler.core.utilities.process_locker import ProcessKind, ProcessLocker

logger = logging.getLogger(__name__)
CONFIG_FILE_NAME = 'triblerd.conf'
Expand Down Expand Up @@ -177,10 +179,14 @@ def run_core(api_port, api_key, root_state_dir, parsed_args):
logger.info('Running Core' + ' in gui_test_mode' if parsed_args.gui_test_mode else '')
load_logger_config('tribler-core', root_state_dir)

gui_pid = GuiProcessWatcher.get_gui_pid()
process_locker = ProcessLocker(root_state_dir, ProcessKind.Core, gui_pid)
if not process_locker.current_process.active:
logger.warning('The current')

with single_tribler_instance(root_state_dir):
version_history = VersionHistory(root_state_dir)
state_dir = version_history.code_version.directory
exit_code = run_tribler_core_session(api_port, api_key, state_dir, gui_test_mode=parsed_args.gui_test_mode)

if exit_code:
sys.exit(exit_code)
process_locker.sys_exit(exit_code)
295 changes: 295 additions & 0 deletions src/tribler/core/utilities/process_locker.py
@@ -0,0 +1,295 @@
from __future__ import annotations

import json
import logging
import os
import sqlite3
import sys
import time
from enum import Enum
from pathlib import Path
from threading import Lock
from typing import ContextManager, Optional

import psutil
from decorator import contextmanager

from tribler.core.version import version_id

DB_FILENAME = 'processes.sqlite'


class DbException(Exception):
pass


class TransactionException(DbException):
pass


class UpdateException(DbException):
pass


class ProcessKind(Enum):
GUI = 'gui'
Core = 'core'


CREATE_SQL = """
CREATE TABLE IF NOT EXISTS processes (
rowid INTEGER PRIMARY KEY AUTOINCREMENT,
row_version INTEGER NOT NULL DEFAULT 0,
pid INTEGER NOT NULL,
kind TEXT NOT NULL,
active INT NOT NULL,
canceled INT NOT NULL,
app_version TEXT NOT NULL,
started_at INT NOT NULL,
creator_pid INT,
api_port INT,
shutdown_request_pid INT,
shutdown_requested_at INT,
finished_at INT,
exit_code INT,
error_msg TEXT,
error_info JSON,
other_params JSON
)
"""


logger = logging.getLogger(__name__)


def to_json(value) -> Optional[str]:
if value is None:
return None
return json.dumps(value)


def from_json(value) -> Optional[dict]:
if value is None:
return None
return json.loads(value)


class ProcessInfo:
def __init__(self, pid: int, kind: ProcessKind, app_version: str, started_at: int,
rowid: Optional[int] = None, creator_pid: Optional[int] = None, active: int = 0, canceled: int = 0,
row_version: int = 0, api_port: Optional[int] = None, finished_at: Optional[int] = None,
exit_code: Optional[int] = None, error_msg: Optional[str] = None, error_info: Optional[dict] = None,
shutdown_request_pid: Optional[int] = None, shutdown_requested_at: Optional[int] = None,
other_params: Optional[dict] = None):
self.rowid = rowid
self.row_version = row_version
self.pid = pid
self.kind = kind
self.active = active
self.canceled = canceled
self.app_version = app_version
self.started_at = started_at
self.creator_pid = creator_pid
self.api_port = api_port
self.finished_at = finished_at
self.exit_code = exit_code
self.error_msg = error_msg
self.error_info = error_info
self.shutdown_request_pid = shutdown_request_pid
self.shutdown_requested_at = shutdown_requested_at
self.other_params = other_params

@classmethod
def from_row(cls, row: tuple) -> ProcessInfo:
rowid, row_version, pid, kind, active, canceled, app_version, started_at, creator_pid, api_port, \
shutdown_request_pid, shutdown_requested_at, finished_at, exit_code, error_msg, error_info, \
other_params = row

kind = ProcessKind(kind)

return ProcessInfo(rowid=rowid, row_version=row_version, pid=pid, kind=kind, active=active, canceled=canceled,
app_version=app_version, started_at=started_at, creator_pid=creator_pid,
api_port=api_port, shutdown_request_pid=shutdown_request_pid,
shutdown_requested_at=shutdown_requested_at, finished_at=finished_at,
exit_code=exit_code, error_msg=error_msg, error_info=from_json(error_info),
other_params=from_json(other_params))

@classmethod
def current_process(cls, kind: ProcessKind, creator_pid: Optional[int] = None, **other_params) -> ProcessInfo:
return cls(pid=os.getpid(), kind=kind, app_version=version_id, started_at=int(time.time()),
creator_pid=creator_pid, row_version=0, other_params=other_params or None)

def is_current_process(self):
return self.pid == os.getpid()

def still_active(self):
if not psutil.pid_exists(self.pid):
return False

try:
process = psutil.Process(self.pid)
status = process.status()
except psutil.Error as e:
logger.warning(e)
return False

if status == psutil.STATUS_ZOMBIE:
return False

if process.create_time() > self.started_at:
return False

return True

def set_error(self, error_msg: Optional[str] = None, error_info: Optional[dict] = None,
exc: Optional[Exception] = None, replace: bool = False):
if exc and not error_msg:
error_msg = f"{exc.__class__.__name__}: {exc}"

if replace:
self.error_msg = error_msg
self.error_info = error_info
else:
self.error_msg = self.error_msg or error_msg
self.error_info = self.error_info or error_info

def mark_finished(self, exit_code: Optional[int] = None):
self.active = 0
self.finished_at = time.time()
self.exit_code = exit_code

def save(self, con: sqlite3.Connection):
cursor = con.cursor()
if self.rowid is None:
self._before_insert_check()
self.row_version = 0
cursor.execute("""
INSERT INTO processes (
pid, kind, active, canceled, app_version, started_at,
creator_pid, api_port, shutdown_request_pid, shutdown_requested_at,
finished_at, exit_code, error_msg, error_info, other_params
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [self.pid, self.kind.value, self.active, self.canceled, self.app_version, self.started_at,
self.creator_pid, self.api_port, self.shutdown_request_pid, self.shutdown_requested_at,
self.finished_at, self.exit_code, self.error_msg, to_json(self.error_info),
to_json(self.other_params)])
self.rowid = cursor.lastrowid
else:
self._before_update_check()
prev_version = self.row_version
self.row_version += 1
cursor.execute("""
UPDATE processes
SET row_version = ?, active = ?, canceled = ?, creator_pid = ?, api_port = ?,
shutdown_request_pid = ?, shutdown_requested_at = ?, finished_at = ?,
exit_code = ?, error_msg = ?, error_info = ?, other_params = ?
WHERE rowid = ? and row_version = ? and pid = ? and kind = ? and app_version = ? and started_at = ?
""", [self.row_version, self.active, self.canceled, self.creator_pid, self.api_port,
self.shutdown_request_pid, self.shutdown_requested_at, self.finished_at,
self.exit_code, self.error_msg, to_json(self.error_info), to_json(self.other_params),
self.rowid, prev_version, self.pid, self.kind.value, self.app_version, self.started_at])
if cursor.rowcount == 0:
raise UpdateException(f'Row {self.rowid} with row version {prev_version} was not found')

def _before_insert_check(self):
if self.row_version:
logging.error(f"row_version column for a new row should not be set. Got: {self.row_version}")

def _before_update_check(self):
if self.rowid is None:
logging.error("rowid for an existing row should not be None")
if self.row_version is None:
logging.error(f"row_version column for an existing row {self.rowid} should not be None")


global_process_locker: Optional[{ProcessLocker}] = None

_lock = Lock()


def set_global_process_locker(process_locker: ProcessLocker):
global global_process_locker
with _lock:
global_process_locker = process_locker


def get_global_process_locker() -> Optional[ProcessLocker]:
with _lock:
return global_process_locker


def set_error(error_msg: Optional[str] = None, error_info: Optional[dict] = None,
exc: Optional[Exception] = None, replace: bool = False):
process_locker = get_global_process_locker()
if process_locker is None:
logger.warning('Cannot set error for process locker: process locker is not set')
process_locker.current_process.set_error(error_msg, error_info, exc, replace)


class ProcessLocker:
filename: Path
current_process: ProcessInfo
active_process: ProcessInfo

def __init__(self, root_dir: Path, process_kind: ProcessKind, creator_pid: Optional[int] = None, **other_params):
filename = root_dir / DB_FILENAME
self.filename = filename
self.current_process = ProcessInfo.current_process(process_kind, creator_pid, **other_params)
self.active_process = self.atomic_get_active_process(process_kind, self.current_process)

def connect(self) -> sqlite3.Connection:
connection = sqlite3.connect(self.filename) # TODO: check that the file is not corrupted
connection.execute('BEGIN EXCLUSIVE TRANSACTION')
connection.execute(CREATE_SQL)
return connection

def transaction(self) -> ContextManager[sqlite3.Connection]:
@contextmanager
def transaction_context_manager(): # this additional level of wrapping is a workaround for PyCharm bug
connection = self.connect()
try:
yield connection
connection.execute('COMMIT')
connection.close()

except Exception as e:
connection.execute('ROLLBACK')
connection.close()
raise e

return transaction_context_manager()

def atomic_get_active_process(self, kind: ProcessKind,
current_process: Optional[ProcessInfo] = None) -> Optional[ProcessInfo]:
active_process = None
with self.transaction() as connection:
cursor = connection.execute("""
SELECT * FROM processes WHERE kind = ? and active = 1 ORDER BY rowid DESC LIMIT 1
""", [kind.value])
row = cursor.fetchone()
if row is not None:
previous_active_process = ProcessInfo.from_row(row)
if previous_active_process.still_active():
active_process = previous_active_process
else:
previous_active_process.active = 0
previous_active_process.save(connection)

if active_process is None:
current_process.active = 1
active_process = current_process
else:
current_process.active = 0
current_process.canceled = 1

current_process.save(connection)
return active_process

def save(self, process):
with self.transaction() as connection:
process.save(connection)

def sys_exit(self, exit_code: int = 0):
self.current_process.mark_finished(exit_code)
self.save(self.current_process)
sys.exit(exit_code)
7 changes: 5 additions & 2 deletions src/tribler/gui/single_application.py
Expand Up @@ -8,6 +8,7 @@
from PyQt5.QtNetwork import QLocalServer, QLocalSocket
from PyQt5.QtWidgets import QApplication

from tribler.core.utilities.process_locker import ProcessLocker
from tribler.gui.tribler_window import TriblerWindow
from tribler.gui.utilities import connect, disconnect

Expand All @@ -20,7 +21,7 @@ class QtSingleApplication(QApplication):

message_received = pyqtSignal(str)

def __init__(self, win_id, *argv):
def __init__(self, win_id, process_locker: ProcessLocker, *argv):
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.info(f'Start Tribler application. Win id: "{win_id}". '
f'Sys argv: "{sys.argv}"')
Expand All @@ -30,12 +31,14 @@ def __init__(self, win_id, *argv):

self._id = win_id

self.process_locker = process_locker

# Is there another instance running?
self._outgoing_connection = QLocalSocket()
self._outgoing_connection.connectToServer(self._id)

connected_to_previous_instance = self._outgoing_connection.waitForConnected()
self._is_app_already_running = connected_to_previous_instance
self._is_app_already_running = connected_to_previous_instance or not process_locker.current_process.active

self._stream_to_running_app = None
self._incoming_connection = None
Expand Down

0 comments on commit 7ae2a89

Please sign in to comment.