-
Notifications
You must be signed in to change notification settings - Fork 356
/
timeflies_tkinter.py
50 lines (36 loc) · 1.31 KB
/
timeflies_tkinter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import tkinter
from tkinter import Event, Frame, Label, Tk
from typing import Any, Tuple
import reactivex
from reactivex import Observable
from reactivex import operators as ops
from reactivex.scheduler.mainloop import TkinterScheduler
from reactivex.subject import Subject
def main() -> None:
root = Tk()
root.title("Rx for Python rocks")
scheduler = TkinterScheduler(root)
mousemoves: Subject[Event[Any]] = Subject()
frame = Frame(root, width=600, height=600)
frame.bind("<Motion>", mousemoves.on_next)
text = "TIME FLIES LIKE AN ARROW"
def on_next(info: Tuple[tkinter.Label, "Event[Frame]", int]) -> None:
label, ev, i = info
label.place(x=ev.x + i * 12 + 15, y=ev.y)
def label2stream(
label: tkinter.Label, index: int
) -> Observable[Tuple[tkinter.Label, "Event[Frame]", int]]:
return mousemoves.pipe(
ops.map(lambda ev: (label, ev, index)),
ops.delay(index * 0.1),
)
def char2label(char: str) -> Label:
return Label(frame, text=char, borderwidth=0, padx=0, pady=0)
reactivex.from_(text).pipe(
ops.map(char2label),
ops.flat_map_indexed(label2stream),
).subscribe(on_next, on_error=print, scheduler=scheduler)
frame.pack()
root.mainloop()
if __name__ == "__main__":
main()