/
threading.py
63 lines (46 loc) · 1.61 KB
/
threading.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
""" This file is licensed under GPLv3, see https://www.gnu.org/licenses/ """
import traceback
import uuid
from threading import Lock
from typing import List, Callable, Any, Dict
from .pprint import print_stderr
class ThreadSafeSequentialStorage():
_storage: Dict[uuid.UUID, Any] = {}
_locks: Dict[uuid.UUID, Lock] = {}
@classmethod
def _check_lock_and_storage(cls, _id) -> None:
if _id not in cls._storage:
cls._storage[_id] = []
cls._locks[_id] = Lock()
@classmethod
def _get_storage(cls, _id: uuid.UUID) -> List:
cls._check_lock_and_storage(_id)
return cls._storage[_id]
@classmethod
def _get_lock(cls, _id: uuid.UUID) -> Lock:
cls._check_lock_and_storage(_id)
return cls._locks[_id]
@classmethod
def _add_item(cls, _id: uuid.UUID, item: Any) -> None:
lock = cls._get_lock(_id)
lock.acquire()
cls._get_storage(_id).append(item)
lock.release()
class ThreadSafeBytesStorage(ThreadSafeSequentialStorage):
@classmethod
def add_bytes(cls, _id: uuid.UUID, chars: bytes) -> None:
super()._add_item(_id, chars)
@classmethod
def get_bytes_output(cls, _id: uuid.UUID) -> bytes:
return b''.join(cls._get_storage(_id))
def handle_exception_in_thread(fun: Callable) -> Callable:
def decorated(*args: Any, **kwargs: Any):
try:
return fun(*args, **kwargs)
# except OSError:
# pass
except Exception as exc:
print_stderr('Error in the thread:')
traceback.print_exc()
raise exc
return decorated