-
Notifications
You must be signed in to change notification settings - Fork 445
/
process_locker.py
295 lines (236 loc) · 10.8 KB
/
process_locker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
from __future__ import annotations
import json
import logging
import os
import sqlite3
import sys
import time
from enum import Enum
from pathlib import Path
from threading import Lock
from typing import ContextManager, Optional
import psutil
from decorator import contextmanager
from tribler.core.version import version_id
DB_FILENAME = 'processes.sqlite'
class DbException(Exception):
pass
class TransactionException(DbException):
pass
class UpdateException(DbException):
pass
class ProcessKind(Enum):
GUI = 'gui'
Core = 'core'
CREATE_SQL = """
CREATE TABLE IF NOT EXISTS processes (
rowid INTEGER PRIMARY KEY AUTOINCREMENT,
row_version INTEGER NOT NULL DEFAULT 0,
pid INTEGER NOT NULL,
kind TEXT NOT NULL,
active INT NOT NULL,
canceled INT NOT NULL,
app_version TEXT NOT NULL,
started_at INT NOT NULL,
creator_pid INT,
api_port INT,
shutdown_request_pid INT,
shutdown_requested_at INT,
finished_at INT,
exit_code INT,
error_msg TEXT,
error_info JSON,
other_params JSON
)
"""
logger = logging.getLogger(__name__)
def to_json(value) -> Optional[str]:
if value is None:
return None
return json.dumps(value)
def from_json(value) -> Optional[dict]:
if value is None:
return None
return json.loads(value)
class ProcessInfo:
def __init__(self, pid: int, kind: ProcessKind, app_version: str, started_at: int,
rowid: Optional[int] = None, creator_pid: Optional[int] = None, active: int = 0, canceled: int = 0,
row_version: int = 0, api_port: Optional[int] = None, finished_at: Optional[int] = None,
exit_code: Optional[int] = None, error_msg: Optional[str] = None, error_info: Optional[dict] = None,
shutdown_request_pid: Optional[int] = None, shutdown_requested_at: Optional[int] = None,
other_params: Optional[dict] = None):
self.rowid = rowid
self.row_version = row_version
self.pid = pid
self.kind = kind
self.active = active
self.canceled = canceled
self.app_version = app_version
self.started_at = started_at
self.creator_pid = creator_pid
self.api_port = api_port
self.finished_at = finished_at
self.exit_code = exit_code
self.error_msg = error_msg
self.error_info = error_info
self.shutdown_request_pid = shutdown_request_pid
self.shutdown_requested_at = shutdown_requested_at
self.other_params = other_params
@classmethod
def from_row(cls, row: tuple) -> ProcessInfo:
rowid, row_version, pid, kind, active, canceled, app_version, started_at, creator_pid, api_port, \
shutdown_request_pid, shutdown_requested_at, finished_at, exit_code, error_msg, error_info, \
other_params = row
kind = ProcessKind(kind)
return ProcessInfo(rowid=rowid, row_version=row_version, pid=pid, kind=kind, active=active, canceled=canceled,
app_version=app_version, started_at=started_at, creator_pid=creator_pid,
api_port=api_port, shutdown_request_pid=shutdown_request_pid,
shutdown_requested_at=shutdown_requested_at, finished_at=finished_at,
exit_code=exit_code, error_msg=error_msg, error_info=from_json(error_info),
other_params=from_json(other_params))
@classmethod
def current_process(cls, kind: ProcessKind, creator_pid: Optional[int] = None, **other_params) -> ProcessInfo:
return cls(pid=os.getpid(), kind=kind, app_version=version_id, started_at=int(time.time()),
creator_pid=creator_pid, row_version=0, other_params=other_params or None)
def is_current_process(self):
return self.pid == os.getpid()
def still_active(self):
if not psutil.pid_exists(self.pid):
return False
try:
process = psutil.Process(self.pid)
status = process.status()
except psutil.Error as e:
logger.warning(e)
return False
if status == psutil.STATUS_ZOMBIE:
return False
if process.create_time() > self.started_at:
return False
return True
def set_error(self, error_msg: Optional[str] = None, error_info: Optional[dict] = None,
exc: Optional[Exception] = None, replace: bool = False):
if exc and not error_msg:
error_msg = f"{exc.__class__.__name__}: {exc}"
if replace:
self.error_msg = error_msg
self.error_info = error_info
else:
self.error_msg = self.error_msg or error_msg
self.error_info = self.error_info or error_info
def mark_finished(self, exit_code: Optional[int] = None):
self.active = 0
self.finished_at = time.time()
self.exit_code = exit_code
def save(self, con: sqlite3.Connection):
cursor = con.cursor()
if self.rowid is None:
self._before_insert_check()
self.row_version = 0
cursor.execute("""
INSERT INTO processes (
pid, kind, active, canceled, app_version, started_at,
creator_pid, api_port, shutdown_request_pid, shutdown_requested_at,
finished_at, exit_code, error_msg, error_info, other_params
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [self.pid, self.kind.value, self.active, self.canceled, self.app_version, self.started_at,
self.creator_pid, self.api_port, self.shutdown_request_pid, self.shutdown_requested_at,
self.finished_at, self.exit_code, self.error_msg, to_json(self.error_info),
to_json(self.other_params)])
self.rowid = cursor.lastrowid
else:
self._before_update_check()
prev_version = self.row_version
self.row_version += 1
cursor.execute("""
UPDATE processes
SET row_version = ?, active = ?, canceled = ?, creator_pid = ?, api_port = ?,
shutdown_request_pid = ?, shutdown_requested_at = ?, finished_at = ?,
exit_code = ?, error_msg = ?, error_info = ?, other_params = ?
WHERE rowid = ? and row_version = ? and pid = ? and kind = ? and app_version = ? and started_at = ?
""", [self.row_version, self.active, self.canceled, self.creator_pid, self.api_port,
self.shutdown_request_pid, self.shutdown_requested_at, self.finished_at,
self.exit_code, self.error_msg, to_json(self.error_info), to_json(self.other_params),
self.rowid, prev_version, self.pid, self.kind.value, self.app_version, self.started_at])
if cursor.rowcount == 0:
raise UpdateException(f'Row {self.rowid} with row version {prev_version} was not found')
def _before_insert_check(self):
if self.row_version:
logging.error(f"row_version column for a new row should not be set. Got: {self.row_version}")
def _before_update_check(self):
if self.rowid is None:
logging.error("rowid for an existing row should not be None")
if self.row_version is None:
logging.error(f"row_version column for an existing row {self.rowid} should not be None")
global_process_locker: Optional[{ProcessLocker}] = None
_lock = Lock()
def set_global_process_locker(process_locker: ProcessLocker):
global global_process_locker
with _lock:
global_process_locker = process_locker
def get_global_process_locker() -> Optional[ProcessLocker]:
with _lock:
return global_process_locker
def set_error(error_msg: Optional[str] = None, error_info: Optional[dict] = None,
exc: Optional[Exception] = None, replace: bool = False):
process_locker = get_global_process_locker()
if process_locker is None:
logger.warning('Cannot set error for process locker: process locker is not set')
process_locker.current_process.set_error(error_msg, error_info, exc, replace)
class ProcessLocker:
filename: Path
current_process: ProcessInfo
active_process: ProcessInfo
def __init__(self, root_dir: Path, process_kind: ProcessKind, creator_pid: Optional[int] = None, **other_params):
filename = root_dir / DB_FILENAME
self.filename = filename
self.current_process = ProcessInfo.current_process(process_kind, creator_pid, **other_params)
self.active_process = self.atomic_get_active_process(process_kind, self.current_process)
def connect(self) -> sqlite3.Connection:
connection = sqlite3.connect(self.filename) # TODO: check that the file is not corrupted
connection.execute('BEGIN EXCLUSIVE TRANSACTION')
connection.execute(CREATE_SQL)
return connection
def transaction(self) -> ContextManager[sqlite3.Connection]:
@contextmanager
def transaction_context_manager(): # this additional level of wrapping is a workaround for PyCharm bug
connection = self.connect()
try:
yield connection
connection.execute('COMMIT')
connection.close()
except Exception as e:
connection.execute('ROLLBACK')
connection.close()
raise e
return transaction_context_manager()
def atomic_get_active_process(self, kind: ProcessKind,
current_process: Optional[ProcessInfo] = None) -> Optional[ProcessInfo]:
active_process = None
with self.transaction() as connection:
cursor = connection.execute("""
SELECT * FROM processes WHERE kind = ? and active = 1 ORDER BY rowid DESC LIMIT 1
""", [kind.value])
row = cursor.fetchone()
if row is not None:
previous_active_process = ProcessInfo.from_row(row)
if previous_active_process.still_active():
active_process = previous_active_process
else:
previous_active_process.active = 0
previous_active_process.save(connection)
if active_process is None:
current_process.active = 1
active_process = current_process
else:
current_process.active = 0
current_process.canceled = 1
current_process.save(connection)
return active_process
def save(self, process):
with self.transaction() as connection:
process.save(connection)
def sys_exit(self, exit_code: int = 0):
self.current_process.mark_finished(exit_code)
self.save(self.current_process)
sys.exit(exit_code)