Skip to content

Commit

Permalink
Start implementation of aggregatebuilder/finderproxy
Browse files Browse the repository at this point in the history
  • Loading branch information
jesjos committed Mar 18, 2015
1 parent 2d5af8a commit 9a30e54
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 1 deletion.
116 changes: 116 additions & 0 deletions lib/sandthorn/aggregate_builder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
module Sandthorn
class AggregateBuilder
def initialize(aggregate_klass)
@aggregate_klass = aggregate_klass
end

def build(aggregate_id, sequence_number: nil)
aggregate_id = aggregate_id.to_s
if sequence_number
build_from_sequence_number(aggregate_id, sequence_number)
else
build_from_events(all_events(aggregate_id))
end
end

def build_from_time(aggregate_id, time)
build_from_events(events_up_to_time(aggregate_id, time))
end

def build_from_version(aggregate_id, version)
build_from_events(events_up_to_version(aggregate_id, version))
end

def build_from_sequence_number(aggregate_id, sequence_number)
build_from_events(events_up_to_sequence_number(aggregate_id, sequence_number))
end

def build_from_events(events)
raise Errors::AggregateNotFound if events.nil? || events.empty?
events, aggregate = init_aggregate(events)
events.each do |event|
if snapshot?(event)
set_from_snapshot(aggregate, event)
else
handle_event(aggregate, event)
end
end
aggregate.send :clear_aggregate_events
aggregate.send :load_aggregate_stored_instance_variables
return aggregate
end

private

def init_aggregate(events)
if snapshot?(events.first)
init_from_snapshot(events)
else
return events, @aggregate_klass.allocate
end
end

def snapshot?(event)
event[:event_name] == "aggregate_set_from_snapshot"
end

def set_from_snapshot(aggregate, event)
event_args = event[:event_args]
aggregate.send(:aggregate_set_from_snapshot, *event_args)
set_aggregate_versions(aggregate, aggregate.aggregate_originating_version)
end

def handle_event(aggregate, event)
event_args = event[:event_args]
attribute_deltas = event_args[:attribute_deltas]
aggregate_version = event[:aggregate_version]
set_aggregate_versions(aggregate, aggregate_version) unless aggregate_version.nil?
set_aggregate_deltas(aggregate, attribute_deltas) unless attribute_deltas.nil?
end

def set_aggregate_versions(aggregate, aggregate_version)
aggregate.send :set_orginating_aggregate_version!, aggregate_version
aggregate.send :set_current_aggregate_version!, aggregate_version
end

def set_aggregate_deltas(aggregate, attribute_deltas)
attribute_deltas.each do |delta|
aggregate.instance_variable_set delta[:attribute_name], delta[:new_value]
end
end

def init_from_snapshot(events)
aggregate = events.first[:event_args].first
events.shift
return events, aggregate
end

def events_up_to_time(aggregate_id, time)
all_events(aggregate_id).select do |event|
event[:timestamp] <= time
end
end

def events_up_to_version(aggregate_id, version)
all_events(aggregate_id).select do |event|
event[:aggregate_version] <= version
end
end

def events_up_to_sequence_number(aggregate_id, sequence_number)
all_events(aggregate_id).select do |event|
event[:sequence_number] <= sequence_number
end
end

def all_events(aggregate_id)
unpack(Sandthorn.get_aggregate_events(aggregate_id, @aggregate_klass.to_s))
end

def unpack(events)
events.map do |e|
e.merge(event_args: UpptecEventFramework.deserialize(e[:event_data]))
end
end
end
end
12 changes: 11 additions & 1 deletion lib/sandthorn/aggregate_root_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ def save
end

def ==(other)
other.respond_to?(:aggregate_id) && aggregate_id == other.aggregate_id
other.respond_to?(:aggregate_id)\
&& aggregate_id == other.aggregate_id\
&& other.aggregate_current_event_version == aggregate_current_event_version
end

def aggregate_trace args
Expand Down Expand Up @@ -103,6 +105,14 @@ def find id
return id.map { |e| aggregate_find e }
end

def aggregate_build_to(aggregate_id, sequence_number: nil)
AggregateBuilder.new(self).build(aggregate_id, sequence_number: sequence_number)
end

def at(*args, &block)
FinderProxy.new(self, *args, &block)
end

def aggregate_find aggregate_id
events = Sandthorn.get_aggregate(aggregate_id, self)
unless events && !events.empty?
Expand Down
41 changes: 41 additions & 0 deletions lib/sandthorn/finder_proxy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using Sandthorn::Refinements

module Sandthorn
class FinderProxy
def initialize(aggregate_type, *args)
@aggregate_type = aggregate_type
@args = args
end

def find(id)
if id.respond_to?(:each)
id.map { |id| aggregate_find(id) }.compact
else
aggregate_find(id)
end
end

def find!(id)
if id.respond_to?(:each)
id.map { |id| aggregate_find!(id) }.compact
else
aggregate_find!(id)
end
end

def aggregate_find(id)
aggregate_find!(id)
rescue Errors::AggregateNotFound
nil
end

def aggregate_find!(id)
@aggregate_type.aggregate_build_to(id.to_id, *@args)
end

def all
ids = Sandthorn.get_aggregate_list_by_type(@aggregate_type)
find(ids)
end
end
end
1 change: 1 addition & 0 deletions lib/sandthorn/refinements.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
require "sandthorn/refinements/string"
9 changes: 9 additions & 0 deletions lib/sandthorn/refinements/string.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module Sandthorn
module Refinements
refine String do
def to_id
self
end
end
end
end
Empty file added spec/aggregate_builder_spec.rb
Empty file.
77 changes: 77 additions & 0 deletions spec/finder_proxy_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
require 'spec_helper'

module Sandthorn
describe FinderProxy do
class FooAggregate
include AggregateRoot
def inc
record_event
end
end

let(:sequence_number) { 10 }

let(:at_sequence_number) do
FinderProxy.new(FooAggregate, sequence_number: sequence_number)
end

describe "#find" do
context "when given a single id" do
context "when given a sequence number" do
it "calls aggregate_build_from on the aggregate type with the correct args" do
expect(FooAggregate).to receive(:aggregate_build_to).with("foo", sequence_number: sequence_number)
at_sequence_number.find("foo")
end
end
end
context "when given an aggregate that doesn't exist" do
it "returns nil" do
expect(at_sequence_number.find("foo")).to be_nil
end
end
end

describe "#find!" do
context "when given an aggregate that doesn't exist" do
it "raises an error" do
expect { at_sequence_number.find!("foo") }.to raise_error(Errors::AggregateNotFound)
end
end
context "when given multiple aggregates and at least one doesn't exist" do
let!(:aggregate) do
FooAggregate.new.save
end
it "raises an error" do
expect { at_sequence_number.find!([aggregate.id, "foo"])}.to raise_error(Errors::AggregateNotFound)
end
end
end

describe "#all" do
before do
Timecop.freeze(1999, 1, 1)
3.times do
aggregate = FooAggregate.new
3.times { aggregate.inc }
aggregate.save
end
end
it "returns all available aggregates at the given point" do
aggregates = FinderProxy.new(FooAggregate, sequence_number: 1).all
expect(aggregates.length).to eq(3)
end

context "when an aggregate has no events at that time" do
let!(:newer_aggregate) do
Timecop.freeze(Time.new(2015,1,1))
FooAggregate.new.save
end
it "doesn't return that aggregate" do
aggregates = FinderProxy.new(FooAggregate, sequence_number: 1).all
aggregate_ids = aggregates.map(&:id)
expect(aggregate_ids).to_not include newer_aggregate.id
end
end
end
end
end
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require 'coveralls'
Coveralls.wear!
require "ap"
require "timecop"
require "bundler"
require "sandthorn_driver_sequel"
require "support/custom_matchers"
Expand Down

0 comments on commit 9a30e54

Please sign in to comment.