Skip to content

Commit

Permalink
adding store for temp change on aggregate that reflects emitted event…
Browse files Browse the repository at this point in the history
…s during command excution
  • Loading branch information
mjaric committed Sep 25, 2019
1 parent 0332728 commit 3a7b66b
Showing 1 changed file with 47 additions and 3 deletions.
50 changes: 47 additions & 3 deletions lib/helios/aggregate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ defmodule Helios.Aggregate do

@type init_args :: [otp_app: atom, id: aggregate_id]

@type t :: struct

@doc """
Returns unique identifier for stream to which events will be persisted.
Expand All @@ -37,9 +39,9 @@ defmodule Helios.Aggregate do
struct is defined in different module.
"""
@callback new(args :: init_args) ::
{:ok, aggregate}
| {:stop, term}
| :ignore
{:ok, aggregate}
| {:stop, term}
| :ignore

@doc """
Applies single event to aggregate when replied or after `handle_exec/3` is executed.
Expand Down Expand Up @@ -92,7 +94,49 @@ defmodule Helios.Aggregate do
end
end

@doc """
Returns aggregate state from context.
State can be only accessed once pipeline enters in aggregate process.
"""
@spec state(Helios.Context.t()) :: t | nil
def state(%Context{assigns: %{aggregate: aggregate}}), do: aggregate

def state(_ctx), do: nil

@change_key :helios_aggregate_change

@spec get_change(Helios.Context.t()) :: struct
def get_change(ctx) do
Map.get(ctx.private, @change_key, state(ctx))
end

@spec put_change(Context.t(), struct) :: Context.t()
def put_change(ctx, change) do
private = Map.put(ctx.private, @change_key, change)
%{ctx | private: private}
end



#
# Validators
#

@doc """
Validates required parameters
"""
def validate_required(ctx, fields, opts \\ []) do
fields =
fields
|> List.wrap()
|> Enum.map(fn key ->
case key do
k when is_atom(k) -> Atom.to_string(k)
k -> k
end
end)
end
end

0 comments on commit 3a7b66b

Please sign in to comment.