-
-
Notifications
You must be signed in to change notification settings - Fork 407
/
stdio.jl
195 lines (177 loc) · 6.19 KB
/
stdio.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# During handling of an execute_request (when execute_msg is !nothing),
# we redirect STDOUT and STDERR into "stream" messages sent to the IPython
# front-end.
# logging in verbose mode goes to original stdio streams. Use macros
# so that we do not even evaluate the arguments in no-verbose modes
function get_log_preface()
t = now()
taskname = get(task_local_storage(), :IJulia_task, "")
@sprintf("%02d:%02d:%02d(%s): ", Dates.hour(t),Dates.minute(t),Dates.second(t),taskname)
end
macro vprintln(x...)
quote
if verbose::Bool
println(orig_STDOUT, get_log_preface(), $(x...))
end
end
end
macro verror_show(e, bt)
quote
if verbose::Bool
showerror(orig_STDERR, $e, $bt)
end
end
end
#name=>iobuffer for each stream ("stdout","stderr") so they can be sent in flush
const bufs = Dict{ASCIIString,IOBuffer}()
const stream_interval = 0.1
const max_bytes = 10*1024
"""Continually read from (size limited) Libuv/OS buffer into an (effectively unlimited) `IObuffer`
to avoid problems when the Libuv/OS buffer gets full (https://github.com/JuliaLang/julia/issues/8789).
Send data immediately when buffer contains more than `max_bytes` bytes. Otherwise, if data is available
it will be sent every `stream_interval` seconds (see the Timers set up in watch_stdio)"""
function watch_stream(rd::IO, name::AbstractString)
task_local_storage(:IJulia_task, "read $name task")
try
buf = IOBuffer()
bufs[name] = buf
while !eof(rd) # blocks until something is available
nb = nb_available(rd)
if nb > 0
write(buf, read(rd, nb))
end
if buf.size > 0
if buf.size >= max_bytes
#send immediately
send_stream(name)
end
end
end
catch e
# the IPython manager may send us a SIGINT if the user
# chooses to interrupt the kernel; don't crash on this
if isa(e, InterruptException)
watch_stream(rd, name)
else
rethrow()
end
end
end
function send_stdio(name)
if verbose::Bool && !haskey(task_local_storage(), :IJulia_task)
task_local_storage(:IJulia_task, "send $name task")
end
send_stream(name)
end
send_stdout(t::Timer) = send_stdio("stdout")
send_stderr(t::Timer) = send_stdio("stderr")
function send_stream(name::AbstractString)
buf = bufs[name]
if buf.size > 0
d = takebuf_array(buf)
n = num_utf8_trailing(d)
dextra = d[end-(n-1):end]
resize!(d, length(d) - n)
s = UTF8String(d)
if isvalid(s)
write(buf, dextra) # assume that the rest of the string will be written later
length(d) == 0 && return
else
# fallback: base64-encode non-UTF8 binary data
sbuf = IOBuffer()
print(sbuf, "base64 binary data: ")
b64 = Base64EncodePipe(sbuf)
write(b64, d)
write(b64, dextra)
close(b64)
print(sbuf, '\n')
s = takebuf_string(sbuf)
end
send_ipython(publish,
msg_pub(execute_msg, "stream",
@compat Dict("name" => name, "text" => s)))
end
end
"""
If `d` ends with an incomplete UTF8-encoded character, return the number of trailing incomplete bytes.
Otherwise, return `0`.
"""
function num_utf8_trailing(d::Vector{UInt8})
i = length(d)
# find last non-continuation byte in d:
while i >= 1 && ((d[i] & 0xc0) == 0x80)
i -= 1
end
i < 1 && return 0
c = d[i]
# compute number of expected UTF-8 bytes starting at i:
n = c <= 0x7f ? 1 : c < 0xe0 ? 2 : c < 0xf0 ? 3 : 4
nend = length(d) + 1 - i # num bytes from i to end
return nend == n ? 0 : nend
end
# this is hacky: we overload some of the I/O functions on pipe endpoints
# in order to fix some interactions with stdio.
if VERSION < v"0.4.0-dev+6987" # JuliaLang/julia#12739
const StdioPipe = Base.Pipe
else
const StdioPipe = Base.PipeEndpoint
end
# IJulia issue #42: there doesn't seem to be a good way to make a task
# that blocks until there is a read request from STDIN ... this makes
# it very hard to properly redirect all reads from STDIN to pyin messages.
# In the meantime, however, we can just hack it so that readline works:
import Base.readline
function readline(io::StdioPipe)
if io == STDIN
if !execute_msg.content["allow_stdin"]
error("IJulia: this front-end does not implement stdin")
end
send_ipython(raw_input,
msg_reply(execute_msg, "input_request",
@compat Dict("prompt"=>"STDIN> ", "password"=>false)))
while true
msg = recv_ipython(raw_input)
if msg.header["msg_type"] == "input_reply"
return msg.content["value"]
else
error("IJulia error: unknown stdin reply")
end
end
else
invoke(readline, (supertype(StdioPipe),), io)
end
end
function watch_stdio()
task_local_storage(:IJulia_task, "init task")
read_task = @async watch_stream(read_stdout, "stdout")
#send STDOUT stream msgs every stream_interval secs (if there is output to send)
Timer(send_stdout, stream_interval, stream_interval)
if capture_stderr
readerr_task = @async watch_stream(read_stderr, "stderr")
#send STDERR stream msgs every stream_interval secs (if there is output to send)
Timer(send_stderr, stream_interval, stream_interval)
end
end
function flush_all()
flush_cstdio() # flush writes to stdout/stderr by external C code
flush(STDOUT)
flush(STDERR)
end
function oslibuv_flush()
#refs: https://github.com/JuliaLang/IJulia.jl/issues/347#issuecomment-144505862
# https://github.com/JuliaLang/IJulia.jl/issues/347#issuecomment-144605024
@windows_only ccall(:SwitchToThread, stdcall, Void, ())
yield()
yield()
end
import Base.flush
function flush(io::StdioPipe)
invoke(flush, (supertype(StdioPipe),), io)
if io == STDOUT
oslibuv_flush()
send_stream("stdout")
elseif io == STDERR
oslibuv_flush()
send_stream("stderr")
end
end