/
pty.py
176 lines (136 loc) · 5.03 KB
/
pty.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import array
import errno
import fcntl
import io
import os
import pty
import select
import shlex
import signal
import struct
import sys
import termios
import time
from asciinema.term import raw
def record(command, writer, env=os.environ, rec_stdin=False, time_offset=0, notifier=None, key_bindings={}):
master_fd = None
start_time = None
pause_time = None
prefix_mode = False
prefix_key = key_bindings.get('prefix')
pause_key = key_bindings.get('pause')
def _notify(text):
if notifier:
notifier.notify(text)
def _set_pty_size():
'''
Sets the window size of the child pty based on the window size
of our own controlling terminal.
'''
# Get the terminal size of the real terminal, set it on the pseudoterminal.
if os.isatty(pty.STDOUT_FILENO):
buf = array.array('h', [0, 0, 0, 0])
fcntl.ioctl(pty.STDOUT_FILENO, termios.TIOCGWINSZ, buf, True)
else:
buf = array.array('h', [24, 80, 0, 0])
fcntl.ioctl(master_fd, termios.TIOCSWINSZ, buf)
def _write_stdout(data):
'''Writes to stdout as if the child process had written the data.'''
os.write(pty.STDOUT_FILENO, data)
def _handle_master_read(data):
'''Handles new data on child process stdout.'''
if not pause_time:
writer.write_stdout(time.time() - start_time, data)
_write_stdout(data)
def _write_master(data):
'''Writes to the child process from its controlling terminal.'''
while data:
n = os.write(master_fd, data)
data = data[n:]
def _handle_stdin_read(data):
'''Handles new data on child process stdin.'''
nonlocal pause_time
nonlocal start_time
nonlocal prefix_mode
if not prefix_mode and prefix_key and data == prefix_key:
prefix_mode = True
return
if prefix_mode or (not prefix_key and data in [pause_key]):
prefix_mode = False
if data == pause_key:
if pause_time:
start_time = start_time + (time.time() - pause_time)
pause_time = None
_notify('Resumed recording')
else:
pause_time = time.time()
_notify('Paused recording')
return
_write_master(data)
if rec_stdin and not pause_time:
writer.write_stdin(time.time() - start_time, data)
def _signals(signal_list):
old_handlers = []
for sig, handler in signal_list:
old_handlers.append((sig, signal.signal(sig, handler)))
return old_handlers
def _copy(signal_fd):
'''Main select loop.
Passes control to _master_read() or _stdin_read()
when new data arrives.
'''
fds = [master_fd, pty.STDIN_FILENO, signal_fd]
while True:
try:
rfds, wfds, xfds = select.select(fds, [], [])
except OSError as e: # Python >= 3.3
if e.errno == errno.EINTR:
continue
except select.error as e: # Python < 3.3
if e.args[0] == 4:
continue
if master_fd in rfds:
data = os.read(master_fd, 1024)
if not data: # Reached EOF.
fds.remove(master_fd)
else:
_handle_master_read(data)
if pty.STDIN_FILENO in rfds:
data = os.read(pty.STDIN_FILENO, 1024)
if not data:
fds.remove(pty.STDIN_FILENO)
else:
_handle_stdin_read(data)
if signal_fd in rfds:
data = os.read(signal_fd, 1024)
if data:
signals = struct.unpack('%uB' % len(data), data)
for sig in signals:
if sig in [signal.SIGCHLD, signal.SIGHUP, signal.SIGTERM, signal.SIGQUIT]:
os.close(master_fd)
return
elif sig == signal.SIGWINCH:
_set_pty_size()
pid, master_fd = pty.fork()
if pid == pty.CHILD:
os.execvpe(command[0], command, env)
pipe_r, pipe_w = os.pipe()
flags = fcntl.fcntl(pipe_w, fcntl.F_GETFL, 0)
flags = flags | os.O_NONBLOCK
flags = fcntl.fcntl(pipe_w, fcntl.F_SETFL, flags)
signal.set_wakeup_fd(pipe_w)
old_handlers = _signals(map(lambda s: (s, lambda signal, frame: None),
[signal.SIGWINCH,
signal.SIGCHLD,
signal.SIGHUP,
signal.SIGTERM,
signal.SIGQUIT]))
_set_pty_size()
start_time = time.time() - time_offset
with raw(pty.STDIN_FILENO):
try:
_copy(pipe_r)
except (IOError, OSError):
pass
_signals(old_handlers)
os.waitpid(pid, 0)