-
Notifications
You must be signed in to change notification settings - Fork 259
/
tmp.py
152 lines (109 loc) · 3.33 KB
/
tmp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from flexx import event
# from flexx import react
# from flexx import watch
# from flexx import behold
# from flexx import observe
class MyObject(event.HasEvents):
@event.connect('foo')
def _handle_foo(self, *events):
print('_handle_foo', events)
def on_foo(self, *events):
# This should just work, as in vispy, and as in overloadable
print('on_foo', events)
@event.prop
def bar(self, v=3):
return float(v)
@event.prop
def prop_without_connections(self, v=3):
return float(v)
def on_bar(self, *events):
print('on_bar', events)
@event.prop
def sub(self, ob=None):
return ob
@event.connect('sub.bar')
def _handle_sub_bar(self, *events):
print('sub bar', events)
h = MyObject(bar=5)
h1 = MyObject(bar=11)
h2 = MyObject(bar=12)
h.sub = h1
@h.connect('foo:crap')
def handle_foo(*events):
print('single func, handle foo', [ev.type for ev in events])
@h.connect('bar')
def handle_bar(*events):
print('keep track of bar', events)
with event.loop:
h.emit('foo', dict(msg='he'))
h.emit('foo', dict(msg='ho'))
## Readonly and emitter
from flexx import event
class Foo(event.HasEvents):
@event.readonly
def bar(self, v=42):
return float(v)
def on_bar(self, *events):
print('bar changed %i times' % len(events))
@event.emitter
def spam(self, x):
return dict(value=x)
def on_spam(self, *events):
for ev in events:
print('spam event:', ev)
foo = Foo()
## Two properties that depend on each-other
from flexx import event
class Temperature(event.HasEvents):
""" Wow, this works even easier as it did with signals!
"""
@event.prop
def C(self, t=0):
t = float(t)
self.F = t * 1.8 + 32
return t
@event.prop
def F(self, t=0):
t = float(t)
self.C = (t - 32) / 1.8
return t
@event.connect('C:c', 'F:f')
def on_temp_change(self, *events):
# This gets called once with two events when either C or F is changed.
print('temp changed!', events)
t = Temperature()
with event.loop:
t.C = 10
## Caching
from flexx import event
class CachingExample(event.HasEvents):
""" Demonstrate the use of caching, an example use of tha analog
of "streams" in reactive programming. We use a readonly property
to cache the result.
"""
def __init__(self):
super().__init__()
@event.prop
def input(self, v=0):
""" The input for the calcualtions. There could be more inputs."""
return float(v)
@event.connect('input')
def calculate_results(self, *events):
""" A long running calculation on the input. """
# This takes a while
import time
time.sleep(self.input)
if self.input > 0:
self._set_prop('result', self.input + 0.1)
@event.readonly
def result(self, v=None):
""" readonly prop to cache the result. """
return v
@event.connect('result')
def show_result(self, *events):
""" handler to show result. Can be called at any time. """
if self.result:
print('The result is', self.result)
c = CachingExample()
event.loop.iter()
# with event.loop: c.input = 3