-
-
Notifications
You must be signed in to change notification settings - Fork 14
/
publisher.rb
122 lines (102 loc) · 2.75 KB
/
publisher.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
require 'concurrent/map'
require 'dry/core/class_attributes'
require 'dry/events/constants'
require 'dry/events/event'
require 'dry/events/bus'
module Dry
module Events
# Extension used for classes that can trigger events
#
# @api public
class Publisher < Module
include Dry::Equalizer(:id)
# @api private
def self.registry
@__registry__ ||= Concurrent::Map.new
end
attr_reader :id
def self.[](id)
new(id)
end
def initialize(id)
@id = id
end
# @api private
def included(klass)
klass.extend(ClassMethods)
klass.include(InstanceMethods)
self.class.registry[id] = klass
super
end
module ClassMethods
# Register an event
#
# @param [String] id A unique event key
# @param [Hash] info
#
# @api public
def register_event(id, info = EMPTY_HASH)
events[id] = Event.new(id, info)
self
end
# @api public
def subscribe(event_id, query = EMPTY_HASH, &block)
listeners[event_id] << [block, query]
self
end
# @api private
def new_bus
Bus.new(name, events: events.dup, listeners: listeners.dup)
end
# @api private
def events
@__events__ ||= Concurrent::Map.new
end
# @api private
def listeners
@__listeners__ ||= LISTENERS_HASH.dup
end
end
module InstanceMethods
# @api private
def __bus__
@__bus__ ||= self.class.new_bus
end
# Publish an event
#
# @param [String] event_id The event key
# @param [Hash] payload An optional payload
#
# @api public
def publish(event_id, payload = EMPTY_HASH)
__bus__.publish(event_id, payload)
self
end
alias_method :trigger, :publish
# Subscribe to events.
# If the query parameter is provided, filters events by payload.
#
# @param [String] event_id The event key
# @param [Hash] query An optional event filter
# @yield [block] The callback
# @return [Object] self
#
# @api public
def subscribe(event_id, query = EMPTY_HASH, &block)
__bus__.subscribe(event_id, query, &block)
self
end
# Return true if a given listener has been subscribed to any event
#
# @api public
def subscribed?(listener)
__bus__.subscribed?(listener)
end
# @api public
def process(event_id, payload = EMPTY_HASH, &block)
__bus__.process(event_id, payload, &block)
end
end
end
end
end