-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsignal_basic.py
88 lines (65 loc) · 2.17 KB
/
signal_basic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# examples/signal_basic.py
"""
Basic Signal-Slot Example
This example demonstrates the fundamental usage of TSignal with a synchronous slot:
1. Creating a signal
2. Connecting a regular method as a slot (without @t_slot)
3. Emitting a signal to trigger slot execution
Key Points:
- Showcases the most basic form of signal-slot connection.
- The slot is a normal instance method of a class, not decorated with @t_slot.
- Emphasizes that even without @t_slot, a callable method can act as a slot.
- Introduces the concept of signal emission and immediate slot execution.
"""
import asyncio
import time
from tsignal.core import t_with_signals, t_signal, t_slot
@t_with_signals
class Counter:
"""
A simple counter class that emits a signal when its count changes.
"""
def __init__(self):
self.count = 0
@t_signal
def count_changed(self):
"""Signal emitted when count changes"""
def increment(self):
"""Increment counter and emit signal"""
self.count += 1
print(f"Counter incremented to: {self.count}")
self.count_changed.emit(self.count)
@t_with_signals
class Display:
"""
A simple display class that receives count updates and processes them.
"""
def __init__(self):
self.last_value = None
def on_count_changed(self, value):
"""slot that receives count updates"""
print(f"Display processing count: {value}")
# Simulate some heavy processing
time.sleep(1)
self.last_value = value
print(f"Display finished processing: {value}")
async def main():
"""
Main function to run the async counter example.
"""
# Create instances
counter = Counter()
display = Display()
# Connect signal to slot
counter.count_changed.connect(display.on_count_changed)
print("Starting counter example...")
print("Press Enter to increment counter, or 'q' to quit")
print("(Notice the 1 second delay in processing)")
while True:
line = input("> ")
if line.lower() == "q":
break
# Increment counter which will emit signal
counter.increment()
if __name__ == "__main__":
asyncio.run(main())