Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Synchronous updates by default #367

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 7 additions & 76 deletions src/connection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,77 +4,10 @@ export AbstractConnection

abstract type AbstractConnection end

"""
The maximum number of messages to allow into the outbox.
"""
const DEFAULT_OUTBOX_LIMIT = 32

"""
ConnectionPool([outbox[, connections]])

Manages the distribution of messages from the `outbox` channel to a set of
connections. The ConnectionPool asynchronously takes messages from its outbox
and sends each message to every connection in the pool. Any closed connections
are automatically removed from the pool.
"""
struct ConnectionPool
outbox::Channel
connections::Set{AbstractConnection}
condition::Condition
end

function ConnectionPool(
outbox::Channel = Channel{Any}(DEFAULT_OUTBOX_LIMIT),
connections=Set{AbstractConnection}(),
)
pool = ConnectionPool(
outbox,
connections,
Condition(),
)

# Catch errors here, otherwise they are lost to the void.
@async try
process_messages(pool)
catch exc
@error(
"An error ocurred in the while processing messages from a "
* "ConnectionPool.",
exception=exc,
)
end

return pool
function sendall(pool::Set, msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm kind of partial to leaving ConnectionPool as a struct, even if we remove the outbox, and possibly just defining Sockets.send(::ConnectionPool, ...).

Minor thing though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naahhh, if it's a set of connections, why not just Set{AbstractConnection}? Least number of names is the best number of names.

process_messages(pool, msg)
end

function addconnection!(pool::ConnectionPool, conn::AbstractConnection)
push!(pool.connections, conn)
notify(pool)
end

function Sockets.send(pool::ConnectionPool, msg)
if length(pool.outbox.data) >= pool.outbox.sz_max
# TODO: https://github.com/JuliaGizmos/WebIO.jl/issues/343
return
end
put!(pool.outbox, msg)
end

"""
ensure_connection(pool)

Ensure that the pool has at least one connection, potentially blocking the
current task until that is the case. Also processes incoming connections.
"""
function ensure_connection(pool::ConnectionPool)
if isempty(pool.connections)
wait(pool.condition)
end
end

Base.wait(pool::ConnectionPool) = ensure_connection(pool)
Base.notify(pool::ConnectionPool) = notify(pool.condition)

"""
process_messages(pool)

Expand All @@ -83,16 +16,14 @@ frontends.

This function should be run as a task (it will block forever otherwise).
"""
function process_messages(pool::ConnectionPool)
ensure_connection(pool)
while true
msg = take!(pool.outbox)
function process_messages(pool::Set, msg)
if !isempty(pool)
@sync begin
# This may result in sending to no connections, but we're okay with
# that because we'll just get the next value of the observable
# (messages are fire and forget - WebIO makes no guarantees that
# messages are ever actually delivered).
for connection in pool.connections
for connection in pool
@async send_message(pool, connection, msg)
end
end
Expand All @@ -106,7 +37,7 @@ Send a message to an individual connection within a pool, handling errors and
deleting the connection from the pool if necessary.
"""
function send_message(
pool::ConnectionPool,
pool::Set,
connection::AbstractConnection,
msg,
)::Nothing
Expand All @@ -115,7 +46,7 @@ function send_message(
send(connection, msg)
else
@info "Connection is not open." connection
delete!(pool.connections, connection)
delete!(pool, connection)
end
catch ex
@error(
Expand Down
2 changes: 1 addition & 1 deletion src/messaging.jl
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ function dispatch_command(conn::AbstractConnection, data)
if cmd == "_setup_scope"
@warn("Client used deprecated command: _setup_scope.", maxlog=1)
end
addconnection!(scope.pool, conn)
push!(scope.pool, conn)
elseif cmd == "update_observable"
if !haskey(data, "name")
@error "update_observable message missing \"name\" key."
Expand Down
15 changes: 6 additions & 9 deletions src/scope.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ export Scope,
onimport,
ondependencies,
adddeps!,
import!,
addconnection!
import!

import Compat.Sockets: send
import Observables: Observable, AbstractObservable, listeners
Expand All @@ -34,7 +33,7 @@ mutable struct Scope
# where each JS-string is a function that is invoked when the observable
# changes.
jshandlers::Any
pool::ConnectionPool
pool::Set{AbstractConnection}

mount_callbacks::Vector{JSString}

Expand All @@ -45,7 +44,7 @@ mutable struct Scope
systemjs_options::Any,
imports::Vector{Asset},
jshandlers::Any,
pool::ConnectionPool,
pool,
mount_callbacks::Vector{JSString}
)
scope = new(
Expand Down Expand Up @@ -119,7 +118,6 @@ myscope = Scope(
"""
function Scope(;
dom = dom"span"(),
outbox::Union{Channel, Nothing} = nothing,
observs::Dict = ObsDict(),
private_obs::Set{String} = Set{String}(),
systemjs_options = nothing,
Expand All @@ -137,7 +135,7 @@ function Scope(;
)
end
imports = Asset[Asset(i) for i in imports]
pool = outbox !== nothing ? ConnectionPool(outbox) : ConnectionPool()
pool = Set{AbstractConnection}()
return Scope(
dom, observs, private_obs, systemjs_options,
imports, jshandlers, pool, mount_callbacks
Expand All @@ -153,7 +151,6 @@ function Scope(id; kwargs...)
end

(w::Scope)(arg) = (w.dom = arg; w)
Base.wait(scope::Scope) = ensure_connection(scope.pool)

function Observables.on(f, w::Scope, key)
key = string(key)
Expand Down Expand Up @@ -277,7 +274,7 @@ function send_command(scope::Scope, command, data::Pair...)
"scope" => scopeid(scope),
data...
)
send(scope.pool, message)
sendall(scope.pool, message)
nothing
end

Expand Down Expand Up @@ -369,7 +366,7 @@ function ensure_sync(ctx, key)
ob = ctx.observs[key][1]
# have at most one synchronizing handler per observable
if !any(x->isa(x, SyncCallback) && x.ctx==ctx, listeners(ob))
f = SyncCallback(ctx, (msg) -> send_update_observable(ctx, key, msg))
f = (msg) -> send_update_observable(ctx, key, msg)
on(SyncCallback(ctx, f), ob)
end
end
Expand Down