-
Notifications
You must be signed in to change notification settings - Fork 0
/
reactor.rb
157 lines (120 loc) · 3.22 KB
/
reactor.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
require 'singleton'
require 'fiber'
require "logger"
module Cucub
class Reactor
include Singleton
attr_accessor :logger
def initialize
if not Cucub::VM.instance.threaded
self.logger = Logger.new($stderr)
self.logger.level = Logger::DEBUG
self.class.send(:include, ::Servolux::Threaded)
end
@actors = []
@reactor_thread = nil
@state = :idle
trap("INT") { self.stop }
if not Cucub::VM.instance.threaded
start
join
end
self
end
def run
if @state == :idle
$stdout.puts "idle"
plug_actors
init_channels
set_inbound_listen
$stdout.puts "receiving"
# This should be on inbound creation method
@inbound.on_receive { |msg|
$stdout.puts "received: #{msg.inspect}"
#msg[msg.size - 1] = unwrap_message(msg) #.last)
msg = unwrap_message(msg)
#@actors.first.wire(msg)
@actors.first.process(msg)
$stdout.puts "\n"
}
@state = :preparing_container
end
if @state == :preparing_container
$stdout.puts "preparing container"
container_prepare
end
# container.resume
begin
$stdout.puts "running container"
container_run
# @reactor_thread.join
rescue Exception => e
puts "handled exception"
end
end
def set_inbound_listen
Cucub::VM.instance.configuration.classes.each do |class_name|
@inbound.listen("#{class_name}##{Cucub::VM.instance.uid}")
end
end
def plug_actors
Cucub::ObjectsHub.instance.objects.each do |object|
actor = Cucub::Actor.new(object)
plug_actor(actor)
end
end
def container_prepare
#@reactor_fiber = Fiber.new {
@reactor_thread ||= Thread.new {
begin
# worker is going to be a Class, fibered-aware, which can receive messages
# relay(@worker)
$stdout.puts "prepared to receive."
while @state != :stopped
case @state
when :preparing_container
Thread.stop
when :running
PanZMQ::Poller.instance.poll
end
end
rescue Exception => e
puts "exception: #{e.exception}"
end
}
end
def container_run
@state = :running
@reactor_thread.run
# @reactor_thread.join
end
def container_stop
@state = :stopped
# @reactor_thread.raise "terminated!"
end
def unwrap_message(message)
message = message.split("##{Cucub::VM.instance.uid} ")[1]
unserialized = Cucub::Message.parse(message)
unserialized
end
def plug_actor(actor)
@actors << actor
# maybe send plugged event ?
end
def kill_actors
$stdout.puts "4. Kill Actors"
@actors.each { |actor| actor.kill }
$stdout.puts "5. Killed"
end
def init_channels
@inbound = Cucub::Channel.vm_inner_inbound
@outbound = Cucub::Channel.vm_inner_outbound
end
def stop
@state = :stopped
Cucub::Channel.shutdown!
kill_actors
exit
end
end
end