-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstock_monitor_simple.py
158 lines (117 loc) · 3.92 KB
/
stock_monitor_simple.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# examples/stock_monitor_simple.py
# pylint: disable=no-member
# pylint: disable=unused-argument
"""
Stock monitor simple example.
This module shows a straightforward example of using a worker (`DataWorker`)
to generate data continuously and a display (`DataDisplay`) to process and
log that data on the main thread. It's a minimal demonstration of TSignal's
thread-safe signal/slot invocation.
"""
import asyncio
import logging
import threading
import time
from utils import logger_setup
from tsignal import t_with_signals, t_signal, t_slot, t_with_worker
logger_setup("tsignal", level=logging.DEBUG)
logger = logger_setup(__name__, level=logging.DEBUG)
@t_with_worker
class DataWorker:
"""
A simple data worker that emits incrementing integers every second.
Attributes
----------
_running : bool
Indicates whether the update loop is active.
_update_task : asyncio.Task, optional
The asynchronous task that updates and emits data.
Signals
-------
data_processed
Emitted with the incremented integer each time data is processed.
Lifecycle
---------
- `run(...)` is called automatically in the worker thread.
- `stop()` stops the worker, cancelling the update loop.
"""
def __init__(self):
self._running = False
self._update_task = None
@t_signal
def data_processed(self):
"""
Signal emitted when data is processed.
Receives an integer count.
"""
async def run(self, *args, **kwargs):
"""
Worker initialization and main event loop.
Creates the update loop task and waits until the worker is stopped.
"""
logger.info("[DataWorker][run] Starting")
self._running = True
self._update_task = asyncio.create_task(self.update_loop())
# Wait until run() is finished
await self.wait_for_stop()
# Clean up
self._running = False
if self._update_task:
self._update_task.cancel()
try:
await self._update_task
except asyncio.CancelledError:
pass
async def update_loop(self):
"""
Periodically emits a counter value.
Every second, the counter increments and `data_processed` is emitted.
"""
count = 0
while self._running:
logger.debug("[Worker] Processing data %d", count)
self.data_processed.emit(count)
count += 1
await asyncio.sleep(1)
@t_with_signals
class DataDisplay:
"""
A display class that receives the processed data from the worker.
Attributes
----------
last_value : int or None
Stores the last received value from the worker.
"""
def __init__(self):
self.last_value = None
logger.debug("[Display] Created in thread: %s", threading.current_thread().name)
@t_slot
def on_data_processed(self, value):
"""
Slot called when data is processed.
Logs the received value and simulates a brief processing delay.
"""
current_thread = threading.current_thread()
logger.debug(
"[Display] Received value %d in thread: %s", value, current_thread.name
)
self.last_value = value
# Add a small delay to check the result
time.sleep(0.1)
logger.debug("[Display] Processed value %d", value)
async def main():
"""
Main function demonstrating how to set up and run the worker and display.
"""
logger.debug("[Main] Starting in thread: %s", threading.current_thread().name)
worker = DataWorker()
display = DataDisplay()
# Both are in the main thread at the connection point
worker.data_processed.connect(display, display.on_data_processed)
worker.start()
try:
await asyncio.sleep(3) # Run for 3 seconds
finally:
worker.stop()
if __name__ == "__main__":
asyncio.run(main())