-
Notifications
You must be signed in to change notification settings - Fork 44
/
ActorRefs.fs
272 lines (225 loc) · 10.9 KB
/
ActorRefs.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
//-----------------------------------------------------------------------
// <copyright file="ActorRefs.fs" company="Akka.NET Project">
// Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
// Copyright (C) 2013-2015 Akka.NET project <https://github.com/akkadotnet/akka.net>
// Copyright (C) 2015-2020 Bartosz Sypytkowski <gttps://github.com/Horusiath>
// </copyright>
//-----------------------------------------------------------------------
[<AutoOpen>]
module Akkling.ActorRefs
open Akka.Actor
open Akka.Util
open System
open System.Threading.Tasks
/// <summary>
/// Typed version of <see cref="ICanTell"/> interface. Allows to tell/ask using only messages of restricted type.
/// </summary>
[<Interface>]
type ICanTell<'Message> =
abstract Tell : 'Message * IActorRef -> unit
abstract Ask : 'Message * TimeSpan option -> Async<'Response>
abstract AskWith : (ICanTell<'Response> -> 'Message) * TimeSpan option -> Async<'Response>
abstract member Underlying : ICanTell
/// INTERNAL API.
[<Interface>]
type IInternalTypedActorRef =
abstract member Underlying : IActorRef
abstract member MessageType : Type
/// <summary>
/// Typed version of <see cref="IActorRef"/> interface. Allows to tell/ask using only messages of restricted type.
/// </summary>
[<Interface>]
type IActorRef<'Message> =
inherit IInternalTypedActorRef
inherit ICanTell<'Message>
inherit IEquatable<IActorRef<'Message>>
inherit IComparable<IActorRef<'Message>>
inherit ISurrogated
inherit IComparable
/// <summary>
/// Changes the type of handled messages, returning new typed ActorRef.
/// </summary>
abstract Retype<'T> : unit -> IActorRef<'T>
abstract Forward : 'Message -> unit
abstract member Path : ActorPath
/// <summary>
/// Wrapper around untyped instance of IActorRef interface.
/// </summary>
[<Struct>]
[<CustomEquality>]
[<CustomComparison>]
type TypedActorRef<'Message>(underlyingRef : IActorRef) =
/// <summary>
/// Gets an underlying actor reference wrapped by current object.
/// </summary>
member __.Underlying = underlyingRef
override __.ToString () = underlyingRef.ToString ()
override __.GetHashCode () = underlyingRef.GetHashCode ()
override this.Equals o =
match o with
| :? IInternalTypedActorRef as ref -> underlyingRef.Equals(ref.Underlying)
| _ -> false
interface IInternalTypedActorRef with
member __.Underlying = underlyingRef
member __.MessageType = typeof<'Message>
interface IActorRef<'Message> with
/// <summary>
/// Changes the type of handled messages, returning new typed ActorRef.
/// </summary>
member __.Retype<'T>() = TypedActorRef<'T>(underlyingRef) :> IActorRef<'T>
member __.Tell(message : 'Message, sender : IActorRef) = underlyingRef.Tell(message :> obj, sender)
member __.Forward(message : 'Message) = underlyingRef.Forward(message)
member __.Ask(message : 'Message, timeout : TimeSpan option) : Async<'Response> =
let ref = underlyingRef
async {
let! reply = ref.Ask(message, Option.toNullable timeout) |> Async.AwaitTask
match reply with
| :? Status.Failure as f ->
raise f.Cause
return Unchecked.defaultof<'Response>
| other -> return other :?> 'Response }
member __.AskWith(messageFactory: ICanTell<'Response> -> 'Message, timeout: TimeSpan option): Async<'Response> =
let ref = underlyingRef
async {
let! reply = ref.Ask(Func<IActorRef,obj>(fun ref -> upcast messageFactory(TypedActorRef<'T>(ref) :> IActorRef<'T>)), Option.toNullable timeout) |> Async.AwaitTask
match reply with
| :? Status.Failure as f ->
raise f.Cause
return Unchecked.defaultof<'Response>
| other -> return other :?> 'Response }
member __.Underlying = underlyingRef :> ICanTell
member __.Path = underlyingRef.Path
member __.Equals other =
underlyingRef.Equals(other.Underlying)
member __.CompareTo (other: obj) =
match other with
| :? IInternalTypedActorRef as typed -> underlyingRef.CompareTo(typed.Underlying)
| _ -> underlyingRef.CompareTo(other)
member __.CompareTo (other: IActorRef<'Message>) =
underlyingRef.CompareTo(other.Underlying)
interface ISurrogated with
member this.ToSurrogate _system =
let surrogate : TypedActorRefSurrogate<'Message> = { Wrapped = underlyingRef }
surrogate :> ISurrogate
and TypedActorRefSurrogate<'Message> =
{ Wrapped : IActorRef }
interface ISurrogate with
member this.FromSurrogate _system =
let tref = new TypedActorRef<'Message>(this.Wrapped)
tref :> ISurrogated
/// <summary>
/// Returns typed wrapper over provided actor reference.
/// </summary>
let inline typed (actorRef : IActorRef) : IActorRef<'Message> =
(TypedActorRef<'Message> actorRef) :> IActorRef<'Message>
/// <summary>
/// Returns untyped <see cref="IActorRef" /> form of current typed actor.
/// </summary>
let inline untyped (typedRef: IActorRef<'Message>) : IActorRef =
(typedRef :?> TypedActorRef<'Message>).Underlying
/// <summary>
/// Changes type of messages handled by provided typedRef, returning new typed actor ref.
/// </summary>
let inline retype (typedRef: IActorRef<'T>) : IActorRef<'U> = typedRef.Retype<'U>()
/// <summary>
/// Typed wrapper for <see cref="ActorSelection"/> objects.
/// </summary>
[<Struct>]
[<CustomEquality>]
[<CustomComparison>]
type TypedActorSelection<'Message>(selection : ActorSelection) =
/// <summary>
/// Returns an underlying untyped <see cref="ActorSelection"/> instance.
/// </summary>
member __.Underlying = selection
/// <summary>
/// Gets and actor ref anchor for current selection.
/// </summary>
member __.Anchor with get (): IActorRef<'Message> = typed selection.Anchor
/// <summary>
/// Gets string representation for all elements in actor selection path.
/// </summary>
member __.PathString with get () = selection.PathString
/// <summary>
/// Gets collection of elements, actor selection path is build from.
/// </summary>
member __.Path with get () = selection.Path
override __.ToString () = selection.ToString ()
/// <summary>
/// Tries to resolve an actor reference from current actor selection.
/// </summary>
member __.ResolveOne (timeout: TimeSpan): Async<IActorRef<'Message>> =
let convertToTyped (t: System.Threading.Tasks.Task<IActorRef>) = typed t.Result
selection.ResolveOne(timeout).ContinueWith(convertToTyped)
|> Async.AwaitTask
override x.Equals (o:obj) =
if obj.ReferenceEquals(x, o) then
true
else
match o with
| :? TypedActorSelection<'Message> as t -> x.Underlying.Equals t.Underlying
| _ -> x.Underlying.Equals o
override __.GetHashCode () = selection.GetHashCode() ^^^ typeof<'Message>.GetHashCode()
interface ICanTell with
member __.Tell(message : obj, sender : IActorRef) = selection.Tell(message, sender)
interface ICanTell<'Message> with
member __.Tell(message : 'Message, sender : IActorRef) : unit = selection.Tell(message, sender)
member x.Ask(message : 'Message, timeout : TimeSpan option) : Async<'Response> =
let ref = selection
async {
let! reply = ref.Ask(message, Option.toNullable timeout) |> Async.AwaitTask
match reply with
| :? Status.Failure as f ->
raise f.Cause
return Unchecked.defaultof<'Response>
| other -> return other :?> 'Response }
member __.AskWith(messageFactory: ICanTell<'Response> -> 'Message, timeout: TimeSpan option): Async<'Response> =
let ref = selection
async {
let! reply = ref.Ask(Func<IActorRef,obj>(fun ref -> upcast messageFactory(TypedActorRef<'T>(ref) :> IActorRef<'T>)), Option.toNullable timeout) |> Async.AwaitTask
match reply with
| :? Status.Failure as f ->
raise f.Cause
return Unchecked.defaultof<'Response>
| other -> return other :?> 'Response }
member __.Underlying = selection :> ICanTell
interface IComparable with
member this.CompareTo other =
match other with
| :? TypedActorSelection<obj> as typed -> typed.Underlying.PathString.CompareTo (this.Underlying.PathString)
| :? ActorSelection as untyped -> untyped.PathString.CompareTo (this.Underlying.PathString)
| _ -> -1
/// <summary>
/// Unidirectional send operator.
/// Sends a message object directly to actor tracked by actorRef.
/// </summary>
let inline (<!) (actorRef : #ICanTell<'Message>) (msg : 'Message) : unit =
actorRef.Tell(msg, ActorCell.GetCurrentSelfOrNoSender())
/// <summary>
/// Bidirectional send operator. Sends a message object directly to actor
/// tracked by actorRef and awaits for response send back from corresponding actor.
/// </summary>
let inline (<?) (tell : #ICanTell<'Message>) (msg : 'Message) : Async<'Response> = tell.Ask<'Response>(msg, None)
/// <summary>
/// Unidirectional forward operator.
/// Sends a message object directly to actor tracked by actorRef without overriding it's sender.
/// </summary>
let inline (<<!) (actorRef : #IActorRef<'Message>) (msg : 'Message) : unit =
actorRef.Forward(msg)
/// Pipes an output of asynchronous expression directly to the recipients mailbox.
let pipeTo (sender : IActorRef) (recipient : ICanTell<'Message>) (computation : Async<'Message>): unit =
let success (result : 'Message) : unit = recipient.Tell(result, sender)
let failure (err : exn) : unit = recipient.Underlying.Tell(Status.Failure(err), sender)
Async.StartWithContinuations(computation, success, failure, failure)
/// Pipe operator which sends an output of asynchronous expression directly to the recipients mailbox.
let inline (|!>) (computation : Async<'Message>) (recipient : ICanTell<'Message>) =
pipeTo ActorRefs.NoSender recipient computation
/// Pipe operator which sends an output of asynchronous expression directly to the recipients mailbox
let inline (<!|) (recipient : ICanTell<'Message>) (computation : Async<'Message>) =
pipeTo ActorRefs.NoSender recipient computation
/// <summary>
/// Returns an instance of <see cref="ActorSelection" /> for specified path.
/// If no matching receiver will be found, a <see cref="ActorRefs.NoSender" /> instance will be returned.
/// </summary>
let inline select (selector : IActorRefFactory) (path : string) : TypedActorSelection<'Message> =
TypedActorSelection(selector.ActorSelection path)