-
Notifications
You must be signed in to change notification settings - Fork 122
/
client.rb
390 lines (349 loc) · 13.9 KB
/
client.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
# frozen_string_literal: true
require "concurrent"
module RubyEventStore
class Client
def initialize(
repository: InMemoryRepository.new,
mapper: Mappers::Default.new,
subscriptions: Subscriptions.new,
dispatcher: Dispatcher.new,
clock: default_clock,
correlation_id_generator: default_correlation_id_generator,
event_type_resolver: EventTypeResolver.new
)
@repository = repository
@mapper = mapper
@subscriptions = subscriptions
@broker = Broker.new(subscriptions: subscriptions, dispatcher: dispatcher)
@clock = clock
@metadata = Concurrent::ThreadLocalVar.new
@correlation_id_generator = correlation_id_generator
@event_type_resolver = event_type_resolver
end
# Persists events and notifies subscribed handlers about them
#
# @param events [Array<Event>, Event] event(s)
# @param stream_name [String] name of the stream for persisting events.
# @param expected_version [:any, :auto, :none, Integer] controls optimistic locking strategy. {http://railseventstore.org/docs/expected_version/ Read more}
# @return [self]
def publish(events, stream_name: GLOBAL_STREAM, expected_version: :any)
enriched_events = enrich_events_metadata(events)
records = transform(enriched_events)
append_records_to_stream(records, stream_name: stream_name, expected_version: expected_version)
enriched_events.zip(records) do |event, record|
with_metadata(correlation_id: event.metadata.fetch(:correlation_id), causation_id: event.event_id) do
broker.(event, record)
end
end
self
end
# Persists new event(s) without notifying any subscribed handlers
#
# @param (see #publish)
# @return [self]
def append(events, stream_name: GLOBAL_STREAM, expected_version: :any)
append_records_to_stream(
transform(enrich_events_metadata(events)),
stream_name: stream_name,
expected_version: expected_version
)
self
end
# Links already persisted event(s) to a different stream.
# Does not notify any subscribed handlers.
#
# @param event_ids [String, Array<String>] ids of events
# @param stream_name (see #publish)
# @param expected_version (see #publish)
# @return [self]
def link(event_ids, stream_name:, expected_version: :any)
repository.link_to_stream(Array(event_ids), Stream.new(stream_name), ExpectedVersion.new(expected_version))
self
end
# Deletes a stream.
# All events from the stream remain intact but they are no
# longer linked to the stream.
#
# @param stream_name [String] name of the stream to be cleared.
# @return [self]
def delete_stream(stream_name)
repository.delete_stream(Stream.new(stream_name))
self
end
# Starts building a query specification for reading events.
# {http://railseventstore.org/docs/read/ More info.}
#
# @return [Specification]
def read
Specification.new(SpecificationReader.new(repository, mapper))
end
# Gets list of streams where event is stored or linked
#
# @return [Array<Stream>] where event is stored or linked
def streams_of(event_id)
repository.streams_of(event_id)
end
def search_streams(stream)
repository.search_streams(stream)
end
# Gets position of the event in given stream
#
# The position is always nonnegative.
# Returns nil if the event has no specific position in stream.
# Raise error if event is not present in stream.
#
# @param event_id [String]
# @param stream_name [String]
# @return [Integer] nonnegative integer position of event in stream
# @raise [EventNotInStream]
def position_in_stream(event_id, stream_name)
repository.position_in_stream(event_id, Stream.new(stream_name))
end
# Gets position of the event in global stream
#
# The position is always nonnegative.
# Global position may have gaps, meaning, there may be event at
# position 40, but no event at position 39.
#
# @param event_id [String]
# @raise [EventNotFound]
# @return [Integer] nonnegno ative integer position of event in global stream
def global_position(event_id)
repository.global_position(event_id)
end
# Checks whether event is linked in given stream
#
# @param event_id [String]
# @param stream_name [String]
# @return [Boolean] true if event is linked to given stream, false otherwise
def event_in_stream?(event_id, stream_name)
stream = Stream.new(stream_name)
stream.global? ? repository.has_event?(event_id) : repository.event_in_stream?(event_id, stream)
end
# Subscribes a handler (subscriber) that will be invoked for published events of provided type.
#
# @overload subscribe(subscriber, to:)
# @param to [Array<Class>] types of events to subscribe
# @param subscriber [Object, Class] handler
# @return [Proc] - unsubscribe proc. Call to unsubscribe.
# @raise [ArgumentError, SubscriberNotExist]
# @overload subscribe(to:, &subscriber)
# @param to [Array<Class>] types of events to subscribe
# @param subscriber [Proc] handler
# @return [Proc] - unsubscribe proc. Call to unsubscribe.
# @raise [ArgumentError, SubscriberNotExist]
def subscribe(subscriber = nil, to:, &proc)
raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
subscriber ||= proc
broker.add_subscription(subscriber, to.map { |event_klass| event_type_resolver.call(event_klass) })
end
# Subscribes a handler (subscriber) that will be invoked for all published events
#
# @overload subscribe_to_all_events(subscriber)
# @param subscriber [Object, Class] handler
# @return [Proc] - unsubscribe proc. Call to unsubscribe.
# @raise [ArgumentError, SubscriberNotExist]
# @overload subscribe_to_all_events(&subscriber)
# @param subscriber [Proc] handler
# @return [Proc] - unsubscribe proc. Call to unsubscribe.
# @raise [ArgumentError, SubscriberNotExist]
def subscribe_to_all_events(subscriber = nil, &proc)
raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
broker.add_global_subscription(subscriber || proc)
end
# Get list of handlers subscribed to an event
#
# @param to [Class, String] type of events to get list of sybscribed handlers
# @return [Array<Object, Class>]
def subscribers_for(event_class)
subscriptions.all_for(event_type_resolver.call(event_class))
end
# Builder object for collecting temporary handlers (subscribers)
# which are active only during the invocation of the provided
# block of code.
class Within
def initialize(block, broker, resolver)
@block = block
@broker = broker
@global_subscribers = []
@subscribers = Hash.new { [] }
@resolver = resolver
end
# Subscribes temporary handlers that
# will be called for all published events.
# The subscription is active only during the invocation
# of the block of code provided to {Client#within}.
# {http://railseventstore.org/docs/subscribe/#temporary-subscriptions Read more.}
#
# @param handlers [Object, Class] handlers passed as objects or classes
# @param handler2 [Proc] handler passed as proc
# @return [self]
def subscribe_to_all_events(*handlers, &handler2)
handlers << handler2 if handler2
@global_subscribers += handlers
self
end
# Subscribes temporary handlers that
# will be called for published events of provided type.
# The subscription is active only during the invocation
# of the block of code provided to {Client#within}.
# {http://railseventstore.org/docs/subscribe/#temporary-subscriptions Read more.}
#
# @overload subscribe(handler, to:)
# @param handler [Object, Class] handler passed as objects or classes
# @param to [Array<Class>] types of events to subscribe
# @return [self]
# @overload subscribe(to:, &handler)
# @param to [Array<Class>] types of events to subscribe
# @param handler [Proc] handler passed as proc
# @return [self]
def subscribe(handler = nil, to:, &handler2)
raise ArgumentError if handler && handler2
@subscribers[handler || handler2] += Array(to).map { |event_klass| resolver.call(event_klass) }
self
end
# Invokes the block of code provided to {Client#within}
# and then unsubscribes temporary handlers.
# {http://railseventstore.org/docs/subscribe/#temporary-subscriptions Read more.}
#
# @return [Object] value returned by the invoked block of code
def call
unsubs = add_thread_global_subscribers
unsubs += add_thread_subscribers
@block.call
ensure
unsubs.each(&:call) if unsubs
end
private
attr_reader :resolver
def add_thread_subscribers
@subscribers.map { |subscriber, types| @broker.add_thread_subscription(subscriber, types) }
end
def add_thread_global_subscribers
@global_subscribers.map { |subscriber| @broker.add_thread_global_subscription(subscriber) }
end
end
# Use for starting temporary subscriptions.
# {http://railseventstore.org/docs/subscribe/#temporary-subscriptions Read more}
#
# @param block [Proc] block of code during which the temporary subscriptions will be active
# @return [Within] builder object which collects temporary subscriptions
def within(&block)
raise ArgumentError if block.nil?
Within.new(block, broker, event_type_resolver)
end
# Set additional metadata for all events published within the provided block
# {http://railseventstore.org/docs/request_metadata#passing-your-own-metadata-using-with_metadata-method Read more}
#
# @param metadata [Hash] metadata to set for events
# @param block [Proc] block of code during which the metadata will be added
# @return [Object] last value returned by the provided block
def with_metadata(metadata_for_block, &block)
previous_metadata = metadata
self.metadata = previous_metadata.merge(metadata_for_block)
block.call if block_given?
ensure
self.metadata = previous_metadata
end
# Deserialize event which was serialized for async event handlers
# {http://railseventstore.org/docs/subscribe/#async-handlers Read more}
#
# @return [Event] deserialized event
def deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil)
extract_timestamp = lambda { |m| (m[:timestamp] || Time.parse(m.fetch("timestamp"))).iso8601 }
mapper.record_to_event(
SerializedRecord
.new(
event_type: event_type,
event_id: event_id,
data: data,
metadata: metadata,
timestamp: timestamp || timestamp_ = extract_timestamp[serializer.load(metadata)],
valid_at: valid_at || timestamp_
)
.deserialize(serializer)
)
end
# Read additional metadata which will be added for published events
# {http://railseventstore.org/docs/request_metadata#passing-your-own-metadata-using-with_metadata-method Read more}
#
# @return [Hash]
def metadata
@metadata.value || EMPTY_HASH
end
# Overwrite existing event(s) with the same ID.
#
# Does not notify any subscribed handlers.
# Does not enrich with additional current metadata.
# Does not allow changing which streams these events are in.
# {http://railseventstore.org/docs/migrating_messages Read more}
#
# @example Add data and metadata to existing events
#
# events = event_store.read.limit(10).to_a
# events.each do |ev|
# ev.data[:tenant_id] = 1
# ev.metadata[:server_id] = "eu-west-2"
# end
# event_store.overwrite(events)
#
# @example Change event type
#
# events = event_store.read.limit(10).each.select{|ev| OldType === ev }.map do |ev|
# NewType.new(
# event_id: ev.event_id,
# data: ev.data,
# metadata: ev.metadata,
# )
# end
# event_store.overwrite(events)
#
# @param events [Array<Event>, Event] event(s) to serialize and overwrite again
# @return [self]
def overwrite(events_or_event)
repository.update_messages(transform(Array(events_or_event)))
self
end
def inspect
"#<#{self.class}:0x#{__id__.to_s(16)}>"
end
def inspect_repository
if repository.public_method(:inspect).owner.equal?(Kernel)
repository.class.inspect
else
repository.inspect
end
end
EMPTY_HASH = {}.freeze
private_constant :EMPTY_HASH
private
def transform(events)
events.map { |ev| mapper.event_to_record(ev) }
end
def enrich_events_metadata(events)
events = Array(events)
events.each { |event| enrich_event_metadata(event) }
events
end
def enrich_event_metadata(event)
metadata.each { |key, value| event.metadata[key] ||= value }
event.metadata[:timestamp] ||= clock.call
event.metadata[:valid_at] ||= event.metadata.fetch(:timestamp)
event.metadata[:correlation_id] ||= correlation_id_generator.call
end
def append_records_to_stream(records, stream_name:, expected_version:)
repository.append_to_stream(records, Stream.new(stream_name), ExpectedVersion.new(expected_version))
end
protected
def metadata=(value)
@metadata.value = value
end
def default_clock
-> { Time.now.utc.round(TIMESTAMP_PRECISION) }
end
def default_correlation_id_generator
-> { SecureRandom.uuid }
end
attr_reader :repository, :mapper, :subscriptions, :broker, :clock, :correlation_id_generator, :event_type_resolver
end
end