-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathActor.h
154 lines (128 loc) · 3.95 KB
/
Actor.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#pragma once
#include "CommonDefines.h"
#include "Supervisor.h"
#include "Processor.h"
#include "ActorData.h"
#include "ActorPolicy.h"
namespace RxActor { namespace details {
/**
* All actors: send and receive capabilities
*
* Many to one: Register this actor in RxDiscovery using ActorPath as locator (or similar). Other Actors can lookup this actor via ActorPath.
* One to Many: Publisher to Subscribers
* Many to Many: PubSub to PubSubs
*
* Is the communication technique an actor trait or a channel trait?
*
* Does an Actor need an ActorId?
* If yes, ActorId == UUID, map to, ActorPath and vice-versa
*
* All actors: Create other actors - Use ActorSystem?
*
* TODO API:
* - Become(Processor<T> p) -> Means that actor is mutable. Avoid
* - Execute(ChannelHandle h)
* - OnUnhandledDo(std::function<void (T)> f)
*/
template <typename T>
class Actor
: public RxActor::Actor<T>
, protected Templates::ContextObjectShared<ActorPolicy, ActorData<T>>
{
public:
Actor(ChannelHandle handle, ActorId actorId, std::shared_ptr<Processor<T>> processor, Supervisor supervisor, ActorPolicy policy)
: Templates::ContextObjectShared<ActorPolicy, ActorData<T>>
(
new ActorPolicy(policy), new ActorData<T>(processor, handle, actorId, supervisor)
)
{ }
virtual ~Actor()
{ }
// ---------------------------------------------------------
// Initialize methods
// ---------------------------------------------------------
/**
* Create on Actor to be extended, and one Actor to be used with lambdas?
*/
virtual bool Initialize()
{
this->data()->channel_->subscriber()->OnDataDo(
[=](T value)
{
// TODO: Get sender from channel?
//this->data()->channel_->subscriber()->Sender();
if(this->data()->processor_)
{
try
{
this->data()->processor_->Process(this, value);
}
catch(Exception e)
{
// TODO: Feedback to supervisor ActorController
}
}
else
{
Unhandled(value);
}
}
);
return true;
}
virtual bool IsInitialized() const
{
return true;
}
// ---------------------------------------------------------
// Send functions
// ---------------------------------------------------------
virtual MessageFuture<T> Send(T value)
{
return this->data()->channel_->publisher()->Next(value);
}
// ---------------------------------------------------------
// Various functions
// ---------------------------------------------------------
const Supervisor& SupervisorStrategy()
{
return this->data()->supervisor_;
}
virtual ActorProxy<T> Sender() const
{
return ActorProxy<T>::NoSender();
}
virtual ActorProxy<T> Self() const
{
return ActorProxy<T>(this->data()->handle_, this->data()->actorId_);
}
virtual void Unhandled(T)
{
IWARNING() << "Unhandled value ";
}
virtual Policy::Deadline ReceiveDeadline() const
{
return Policy::Deadline::FromSeconds(5);
}
// ---------------------------------------------------------
// Lifecycle methods that is called when an actor is started,
// stopped or restarted.
// ---------------------------------------------------------
virtual void OnPreStart()
{
this->data()->processor_->OnPreStart();
}
virtual void OnPostStop()
{
this->data()->processor_->OnPostStop();
}
virtual void OnPreRestart(BaseLib::Exception e)
{
this->data()->processor_->OnPreRestart(e);
}
virtual void OnPostRestart(BaseLib::Exception e)
{
this->data()->processor_->OnPostRestart(e);
}
};
}}