Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: File watch events refactor #4247

Merged
merged 8 commits into from
Sep 18, 2013
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1075,8 +1075,7 @@ export
takebuf_array,
takebuf_string,
truncate,
UV_READABLE,
UV_WRITABLE,
watch_file,
WindowsRawSocket,
write,
writecsv,
Expand Down
95 changes: 69 additions & 26 deletions base/poll.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,51 @@ function close(t::FileMonitor)
end
end

immutable FileEvent
renamed::Bool
changed::Bool
timeout::Bool
end

immutable FDEvent
readable::Bool
writable::Bool
timeout::Bool
end
FDEvent() = FDEvent(false, false, false)
Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd prefer if these used a single integer flags field to store the flag state and provide accessor functions. This is a place where having bit fields like C would be really nice since we could get both at the same time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, define functions like readable(f::FDEvent) = f.flags[1], etc.?

Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's kind of what I was thinking, but input from @JeffBezanson, @loladiro, @vtjnash & co. would be helpful. This is kind of a taste thing – for APIs like this I tend to favor a more C-like style than is traditional in other high-level languages.

Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like we need @enum_mask #2988 (with a better names) :)


function fileevent(events)
# libuv file watching event flags
const UV_RENAME = 1
const UV_CHANGE = 2
timeout = (events & (UV_RENAME | UV_CHANGE) == 0)
FileEvent((events & UV_RENAME) != 0, (events & UV_CHANGE) != 0, timeout)
end

function (|)(e1::FileEvent, e2::FileEvent)
FileEvent(
e1.renamed || e2.renamed,
e1.changed || e2.changed,
e1.timeout || e2.timeout)
end

# libuv file descriptor event flags
const UV_READABLE = 1
const UV_WRITABLE = 2
function fdevent(events)
timeout = (events & (UV_READABLE | UV_WRITABLE) == 0)
FDEvent((events & UV_READABLE) != 0, (events & UV_WRITABLE) != 0, timeout)
end

fdtimeout() = FDEvent(false, false, true)

function (|)(e1::FDEvent, e2::FDEvent)
FDEvent(
e1.readable || e2.readable,
e1.writable || e2.writable,
e1.timeout || e2.timeout)
end
convert(::Type{Int32}, e::FDEvent) = e.readable*UV_READABLE + e.writable*UV_WRITABLE
convert(::Type{Int32},fd::RawFD) = fd.fd

#Wrapper for an OS file descriptor (for Windows)
Expand Down Expand Up @@ -69,9 +111,9 @@ type FDWatcher <: UVPollingWatcher
open::Bool
notify::Condition
cb::Callback
events::Int32
FDWatcher(handle::Ptr,fd,open::Bool,notify::Condition,cb::Callback,events::Integer) =
new(handle,_get_osfhandle(fd),open,notify,cb,int32(events))
events::FDEvent
FDWatcher(handle::Ptr,fd,open::Bool,notify::Condition,cb::Callback,events::FDEvent) =
new(handle,_get_osfhandle(fd),open,notify,cb,events)
end
function FDWatcher(fd::RawFD)
handle = c_malloc(_sizeof_uv_poll)
Expand All @@ -84,7 +126,7 @@ function FDWatcher(fd::RawFD)
c_free(handle)
throw(UVError("FDWatcher",err))
end
this = FDWatcher(handle,fd,false,Condition(),false,0)
this = FDWatcher(handle,fd,false,Condition(),false,FDEvent())
associate_julia_struct(handle,this)
finalizer(this,close)
this
Expand All @@ -97,13 +139,13 @@ end
c_free(handle)
throw(UVError("FDWatcher",err))
end
this = FDWatcher(handle,fd,false,Condition(),false,0)
this = FDWatcher(handle,fd,false,Condition(),false,FDEvent())
associate_julia_struct(handle,this)
finalizer(this,close)
this
end

function fdw_wait_cb(fdw::FDWatcher,status,events)
function fdw_wait_cb(fdw::FDWatcher, events::FDEvent, status)
if status == -1
notify_error(fdw.notify,UVError("FDWatcher",status))
else
Expand All @@ -112,9 +154,8 @@ function fdw_wait_cb(fdw::FDWatcher,status,events)
end

function _wait(fdw::FDWatcher,readable,writable)
events = (readable ? UV_READABLE : 0) |
(writable ? UV_WRITABLE : 0)
if events == 0
events = FDEvent(readable, writable, false)
if !readable && ! writable
error("Must be watching for at least one event")
end
events |= fdw.events
Expand All @@ -124,8 +165,7 @@ function _wait(fdw::FDWatcher,readable,writable)
end
while true
events = wait(fdw.notify)
if (readable && (events & UV_READABLE) != 0) ||
(writable && (events & UV_WRITABLE) != 0)
if isa(events, FDEvent) && ((events.readable == readable) || (events.writable == writable))
break
end
end
Expand All @@ -142,7 +182,7 @@ end

let
global fdwatcher_reinit
const empty_watcher = FDWatcher(C_NULL,RawFD(-1),false,Condition(),false,0)
const empty_watcher = FDWatcher(C_NULL,RawFD(-1),false,Condition(),false,FDEvent())
@unix_only begin
local fdwatcher_array = Array(FDWatcher,0)
function fdwatcher_reinit()
Expand Down Expand Up @@ -180,7 +220,7 @@ let
end
end

function pfw_wait_cb(pfw::PollingFileWatcher, status, prev, cur)
function pfw_wait_cb(pfw::PollingFileWatcher, prev, cur, status)
if status < 0
notify_error(pfw.notify,UVError("PollingFileWatcher",status))
else
Expand All @@ -207,15 +247,15 @@ end

close(t::UVPollingWatcher) = ccall(:jl_close_uv,Void,(Ptr{Void},),t.handle)

function start_watching(t::FDWatcher, events)
function start_watching(t::FDWatcher, events::FDEvent)
associate_julia_struct(t.handle, t)
@unix_only if ccall(:jl_uv_unix_fd_is_watched,Int32,(Int32,Ptr{Void},Ptr{Void}),t.fd,t.handle,eventloop()) == 1
error("Cannot watch an FD more than once on Unix")
end
uv_error("start_watching (FD)",
ccall(:jl_poll_start,Int32,(Ptr{Void},Int32),t.handle,events))
ccall(:jl_poll_start,Int32,(Ptr{Void},Int32),t.handle,int32(events)))
end
start_watching(f::Function, t::FDWatcher, events) = (t.cb = f; start_watching(t,events))
start_watching(f::Function, t::FDWatcher, events::FDEvent) = (t.cb = f; start_watching(t,events))

function start_watching(t::PollingFileWatcher, interval)
associate_julia_struct(t.handle, t)
Expand All @@ -237,25 +277,27 @@ function stop_watching(t::PollingFileWatcher)
end

function _uv_hook_fseventscb(t::FileMonitor,filename::Ptr,events::Int32,status::Int32)
fname = bytestring(convert(Ptr{Uint8},filename))
fileEvent = fileevent(events)
if isa(t.cb,Function)
# bytestring(convert(Ptr{Uint8},filename)) - seems broken at the moment - got NULL
t.cb(status, events, status)
if status == -1
notify_error(t.notify,UVError("FileMonitor",status))
else
notify(t.notify,events)
end
t.cb(fname, fileEvent, status)
end
if status < 0
notify_error(t.notify,(UVError("FileMonitor",status), fname, fileEvent))
else
notify(t.notify,(status, fname, fileEvent))
end
end

function _uv_hook_pollcb(t::FDWatcher,status::Int32,events::Int32)
if isa(t.cb,Function)
t.cb(t,status, events)
t.cb(t, fdevent(events), status)
end
end

function _uv_hook_fspollcb(t::PollingFileWatcher,status::Int32,prev::Ptr,cur::Ptr)
if isa(t.cb,Function)
t.cb(t, status, Stat(convert(Ptr{Uint8},prev)), Stat(convert(Ptr{Uint8},cur)))
t.cb(t, Stat(convert(Ptr{Uint8},prev)), Stat(convert(Ptr{Uint8},cur)), status)
end
end

Expand All @@ -266,7 +308,7 @@ function poll_fd(s, seconds::Real; readable=false, writable=false)
wt = Condition()

@schedule (args = wait(s; readable=readable, writable=writable); notify(wt,(:poll,args)))
@schedule (sleep(seconds); notify(wt,(:timeout,0)))
@schedule (sleep(seconds); notify(wt,(:timeout,fdtimeout())))

_, ret = wait(wt)

Expand All @@ -282,6 +324,7 @@ function poll_file(s, interval_seconds::Real, seconds::Real)
wait(wt) == :poll
end

watch_file(s; poll=false) = watch_file(false, s, poll=poll)
function watch_file(cb, s; poll=false)
if poll
pfw = PollingFileWatcher(cb,s)
Expand Down
4 changes: 4 additions & 0 deletions doc/stdlib/base.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1429,6 +1429,10 @@ Network I/O
Create a TcpServer on any port, using hint as a starting point. Returns a tuple of the actual port that the server
was created on and the server itself.

.. function:: watch_file(cb=false, s; poll=false)

Watch file or directory ``s`` and run callback ``cb`` when ``s`` is modified. The ``poll`` parameter specifies whether to use file system event monitoring or polling. The callback function ``cb`` should accept 3 arguments: ``(filename, events, status)`` where ``filename`` is the name of file that was modified, ``events`` can be ``UV_CHANGE`` or ``UV_RENAME`` when using file system event monitoring, or ``UV_READABLE`` or ``UV_WRITABLE`` when using polling, and ``status`` is always 0. Pass ``false`` for ``cb`` to not use a callback function.

.. function:: poll_fd(fd, seconds::Real; readable=false, writable=false)

Poll a file descriptor fd for changes in the read or write availability and with a timeout given by the second argument.
Expand Down
20 changes: 17 additions & 3 deletions test/file.jl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ function test_timeout(tval)
tr = take(channel)
t_elapsed = toq()

@test tr == 0
@test tr == false

tdiff = t_elapsed * 1000
@test tval <= tdiff
Expand All @@ -72,7 +72,7 @@ function test_touch(slval)

tr = take(channel)

# @test tr == 1
@test tr == true
end


Expand All @@ -85,18 +85,32 @@ function test_monitor(slval)
f = open(file,"a")
write(f,"Hello World\n")
close(f)
sleep(9slval/10_000)
sleep(slval/10_000)
@test FsMonitorPassed
close(fm)
end

function test_monitor_wait(tval)
fm = watch_file(file)
@async begin
sleep(tval/10_000)
f = open(file,"a")
write(f,"Hello World\n")
close(f)
end
fname, events = wait(fm)
@test fname == basename(file)
@test events.changed == true
end

# Commented out the tests below due to issues 3015, 3016 and 3020
test_timeout(0.1)
test_timeout(1)
test_touch(0.1)
test_touch(1)
test_monitor(1)
test_monitor(0.1)
test_monitor_wait(0.1)

##########
# mmap #
Expand Down
4 changes: 2 additions & 2 deletions test/pollfd.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ function test_timeout(tval)
tr = consume(t)
t_elapsed = toq()

@test tr == 0
@test tr.timeout == true

tdiff = t_elapsed * 1000
@test tval <= tdiff
Expand All @@ -32,7 +32,7 @@ function test_read(slval)
tr = consume(t)
t_elapsed = toq()

@test tr == UV_READABLE || (UV_READABLE | UV_WRITABLE)
@test tr.readable || tr.writable

dout = Array(Uint8, 1)
@test 1 == ccall(:read, Csize_t, (Cint, Ptr{Uint8},Csize_t), pipe_fds[1], dout, 1)
Expand Down