Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

[feature] stdlib/network: Network.cloud now distributed (as opposed t…

…o shared)
  • Loading branch information...
commit 06f07538a76604411e8addb0557be7af99bbffa1 1 parent ea7bbab
François-Régis Sinot authored
View
65 stdlib/core/rpc/core/network.opa
@@ -52,6 +52,20 @@ type Network.instruction('a) = {add: channel('a)} / {remove: channel('a)} / {bro
*/
@abstract type Network.observer = -> void
+
+/**
+ * Layered Networks
+ *
+ * Like networks, but networks can be observer and act as proxys
+ *
+ * Kept private because very experimental (used for distribution)
+ */
+@private type Network.Layered.instruction('a) =
+ Network.instruction('a)
+ / {add_networks: list(Network.Layered.network('a))}
+ / {shallow_broadcast: 'a}
+@private type Network.Layered.network('a) = channel(Network.Layered.instruction('a))
+
/**
* {1 Interface}
*/
@@ -91,8 +105,9 @@ Network = {{
* been created with [key], subsequent calls to [Network.cloud(key)], on any server, will return the same [network].
**/
cloud(key : string) : Network.network('a) =
- make_generic((init, handler -> Session.cloud(key, init, handler)))
-
+ if Session.cloud_mode
+ then make_distributed(key)
+ else make_generic(Session.cloud(key, _, _))
/**
@@ -137,6 +152,11 @@ Network = {{
me
)
+ make_distributed(key : string) : Network.network('a) =
+ hn = Layered.make_distributed(key)
+ Session.NonBlocking.map((msg : Network.instruction('a)) -> (msg <: Network.Layered.instruction('a)), hn)
+
+
/**
* Add a channel to a network.
*
@@ -229,5 +249,46 @@ Network = {{
_ = observe((a -> broadcast(f(a), nw)), net)
net
+ @private Layered = {{
+
+ empty() : Network.Layered.network('a) =
+ Session.NonBlocking.make(
+ { channels = ChannelSet.empty : channelset('a);
+ networks = ChannelSet.empty : channelset(Network.Layered.instruction('a)) },
+ (msg, o ->
+ update_channels(f) = o.update(s -> {s with channels = f(s.channels)})
+ update_networks(f) = o.update(s -> {s with networks = f(s.networks)})
+ match msg with
+ | {add=c} ->
+ do Session.on_remove(c, (-> update_channels(ChannelSet.remove(c, _))))
+ update_channels(ChannelSet.add(c, _))
+ | {add_networks=nws} ->
+ do List.iter(nw -> Session.on_remove(nw, (-> update_networks(ChannelSet.remove(nw, _)))), nws)
+ update_networks(List.fold(ChannelSet.add, nws, _))
+ | {remove=c} ->
+ update_channels(ChannelSet.remove(c, _))
+ | {broadcast=msg} ->
+ do ChannelSet.iter(Session.send(_, msg), o.get().channels)
+ do ChannelSet.iter(Session.send(_, {shallow_broadcast=msg}), o.get().networks)
+ void
+ | {shallow_broadcast=msg} ->
+ do ChannelSet.iter(Session.send(_, msg), o.get().channels)
+ void
+ )
+ )
+
+ make_distributed(key : string) : Network.Layered.network('a) =
+ shared = Session.cloud(key, [] : list(Network.Layered.network('a)),
+ (set, (c : Network.Layered.network('a)) ->
+ do Scheduler.push(-> List.iter(Session.send(_, {add_networks=[c]}), set))
+ do Scheduler.push(-> Session.send(c, {add_networks=set}))
+ {set = [c | set]}
+ )
+ ) : channel(Network.Layered.network('a))
+ local = empty() : Network.Layered.network('a)
+ do Session.send(shared, local)
+ local
+
+ }}
}}
View
4 stdlib/core/rpc/core/session.opa
@@ -349,7 +349,7 @@ Session = {{
) : channel('message)
- @private @publish @server shared_mode =
+ @package @publish @server cloud_mode =
commandline : CommandLine.family(bool) = {
title = "Distribution of sessions"
init = false
@@ -380,7 +380,7 @@ Session = {{
* @param on_message As in [Session.make]
**/
@server cloud(key, state, handler) =
- if shared_mode
+ if cloud_mode
then make_shared(key, state, handler)
else get_local_cloud(key, state, handler)
Please sign in to comment.
Something went wrong with that request. Please try again.