/
em-synchrony.rb
124 lines (106 loc) · 3.37 KB
/
em-synchrony.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
$:.unshift(File.dirname(__FILE__) + '/../lib')
require "eventmachine"
begin
require "fiber"
rescue LoadError => error
raise error unless defined? Fiber
end
require "em-synchrony/core_ext"
require "em-synchrony/thread"
require "em-synchrony/em-multi"
require "em-synchrony/tcpsocket"
require "em-synchrony/connection_pool"
require "em-synchrony/keyboard"
require "em-synchrony/iterator" if EventMachine::VERSION > '0.12.10'
module EventMachine
# A convenience method for wrapping EM.run body within
# a Ruby Fiber such that async operations can be transparently
# paused and resumed based on IO scheduling.
def self.synchrony(blk=nil, tail=nil, &block)
blk ||= block
context = Proc.new { Fiber.new { blk.call }.resume }
self.run(context, tail)
end
module Synchrony
# sync is a close relative to inlineCallbacks from Twisted (Python)
#
# Synchrony.sync allows you to write sequential code while using asynchronous
# or callback-based methods under the hood. Example:
#
# result = EM::Synchrony.sync EventMachine::HttpRequest.new(URL).get
# p result.response
#
# As long as the asynchronous function returns a Deferrable object, which
# has a "callback" and an "errback", the sync methond will automatically
# yield and automatically resume your code (via Fibers) when the call
# either succeeds or fails. You do not need to patch or modify the
# Deferrable object, simply pass it to EM::Synchrony.sync
#
def self.sync(df)
f = Fiber.current
xback = proc do |*args|
if f == Fiber.current
return args.size == 1 ? args.first : args
else
f.resume(*args)
end
end
df.callback(&xback)
df.errback(&xback)
Fiber.yield
end
# Fiber-aware sleep function using an EM timer
#
# Execution is stopped for specified amount of seconds
# and then automatically resumed (just like regular sleep)
# except without locking the reactor thread
#
def self.sleep(secs)
fiber = Fiber.current
EM::Timer.new(secs) { fiber.resume }
Fiber.yield
end
# Fiber-aware EventMachine timer: wraps the passed in
# block within a new fiber context such that you can
# continue using synchrony methods
#
def self.add_timer(interval, &blk)
EM::Timer.new(interval) do
Fiber.new { blk.call }.resume
end
end
# Fiber-aware EventMachine timer: wraps the passed in
# block within a new fiber (new fiber on every invocation)
# to allow you to continue using synchrony methods
#
def self.add_periodic_timer(interval, &blk)
EM.add_periodic_timer(interval) do
Fiber.new { blk.call }.resume
end
end
# Fiber-aware EM.next_tick convenience function
#
def self.next_tick(&blk)
EM.next_tick { Fiber.new { blk.call }.resume }
end
# Fiber-aware EM.defer
#
def self.defer op = nil, &blk
fiber = Fiber.current
EM.defer(op || blk, lambda{ |result| fiber.resume(result) })
Fiber.yield
end
# Fiber-aware EM.system
#
def self.system cmd, *args
fiber = Fiber.current
EM.system(cmd, *args){ |out, status| fiber.resume( [out, status] ) }
Fiber.yield
end
# Routes to EM::Synchrony::Keyboard
#
def self.gets
EventMachine::Synchrony::Keyboard.new.gets
end
end
end