Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Added Rx and non-Rx versions of event storage

  • Loading branch information...
commit ee77ffae749e9d4e2fe073d0d1db7eb273817ce0 1 parent 5953f5e
Tuomas Hietanen authored
6 CQRS.sln
View
@@ -3,12 +3,18 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 11
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "CommandSide", "CommandSide\CommandSide.fsproj", "{33E62F1D-EB1E-409E-9F2D-159646C3D5C1}"
EndProject
+Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "QuerySide", "QuerySide\QuerySide.fsproj", "{0A31C965-B5E5-46D4-873B-F3C9BD7DB995}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {0A31C965-B5E5-46D4-873B-F3C9BD7DB995}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {0A31C965-B5E5-46D4-873B-F3C9BD7DB995}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {0A31C965-B5E5-46D4-873B-F3C9BD7DB995}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {0A31C965-B5E5-46D4-873B-F3C9BD7DB995}.Release|Any CPU.Build.0 = Release|Any CPU
{33E62F1D-EB1E-409E-9F2D-159646C3D5C1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{33E62F1D-EB1E-409E-9F2D-159646C3D5C1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{33E62F1D-EB1E-409E-9F2D-159646C3D5C1}.Release|Any CPU.ActiveCfg = Release|Any CPU
BIN  CQRS.sln.docstates.suo
View
Binary file not shown
BIN  CQRS.suo
View
Binary file not shown
BIN  CQRS.v11.suo
View
Binary file not shown
9 CommandSide/CommandSide.fsproj
View
@@ -37,16 +37,7 @@
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
- <Reference Include="System.CoreEx">
- <HintPath>..\..\..\..\..\..\..\Program Files (x86)\Microsoft Cloud Programmability\Reactive Extensions\v1.0.2838.0\Net4\System.CoreEx.dll</HintPath>
- </Reference>
- <Reference Include="System.Interactive">
- <HintPath>..\..\..\..\..\..\..\Program Files (x86)\Microsoft Cloud Programmability\Reactive Extensions\v1.0.2838.0\Net4\System.Interactive.dll</HintPath>
- </Reference>
<Reference Include="System.Numerics" />
- <Reference Include="System.Reactive">
- <HintPath>..\..\..\..\..\..\..\Program Files (x86)\Microsoft Cloud Programmability\Reactive Extensions\v1.0.2838.0\Net4\System.Reactive.dll</HintPath>
- </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Events.fs" />
1  CommandSide/Domain.fs
View
@@ -17,7 +17,6 @@
member x.ApplyChange (evt:Event) =
x.Apply evt
changes <- evt :: changes
- do eventBusSubject.OnNext(evt)
/// Load from event history
let LoadsFromHistory<'t when 't :> AggregateRoot> (history:Event list) (item:'t) =
12 CommandSide/EventStorage.fs
View
@@ -20,6 +20,10 @@ type EventDescriptor(id:Guid, eventData:Event) =
member x.Id = id
member x.EventData = eventData
+///Used just to notify others if anyone would be interested
+let public EventBus = new Microsoft.FSharp.Control.Event<Event>()
+let public MonitorEvents (eventHandle: Events.Event -> unit) = EventBus.Publish |> Observable.add(eventHandle)
+
/// Custom implementation of in-memory time async event storage. Using message passing.
type EventStorage() =
let eventstorage = MailboxProcessor.Start(fun ev ->
@@ -29,7 +33,11 @@ type EventStorage() =
| Quit -> return ()
| SaveEvents(id, events) ->
- let descriptors = events |> List.map (fun e -> EventDescriptor(id, e))
+ let storeAndPublish evt =
+ EventBus.Trigger evt
+ EventDescriptor(id, evt)
+
+ let descriptors = events |> List.map (storeAndPublish)
return! msgPassing(descriptors @ history)
| GetEventsForAggregate(id, reply) ->
@@ -65,7 +73,7 @@ type EventStorage() =
// Tests for Interactive:
let storage = new EventStorage()
let id = System.Guid.NewGuid()
-(storage :> IRepository).Save (new Item(id, CreateType.New, "testi"))
+(storage :> IRepository).Save (new InventoryItem(id, CreateType.New, "testi"))
storage.ShowItemHistory id
storage.Quit
*)
66 CommandSide/EventStorageRx.fs
View
@@ -0,0 +1,66 @@
+/// Event storage using reactive extensions (Rx)
+module EventStorage
+
+open Domain
+open Events
+open System
+
+/// Infrastructure to save and restore data from some storage
+type IRepository =
+ abstract Save : AggregateRoot -> string
+ abstract GetHistoryById<'t when 't :> AggregateRoot> : Guid -> ('t -> 't)
+
+/// Container to capsulate events
+type EventDescriptor(id:Guid, eventData:Event) =
+ member x.Id = id
+ member x.EventData = eventData
+
+///Used just to notify others if anyone would be interested
+let private eventBusSubject = new System.Collections.Generic.Subject<Event>()
+let public EventBus = eventBusSubject :> IObservable<Event>
+let public MonitorEvents (eventHandle: Events.Event -> unit) = EventBus.Subscribe(eventHandle)
+
+/// Custom implementation of in-memory time async event storage. Using message passing.
+type EventStorage() =
+
+ let eventstorage = new System.Collections.Generic.ReplaySubject<EventDescriptor>()
+
+ let SaveEvents id events =
+ let storeAndPublish evt =
+ eventBusSubject.OnNext evt
+ EventDescriptor(id, evt)
+ |> eventstorage.OnNext
+
+ events |> List.iter storeAndPublish
+
+ let GetEventsForAggregate id =
+ eventstorage
+ |> Observable.filter(fun i -> i.Id=id)
+ |> Observable.map (fun i -> i.EventData)
+
+ interface IRepository with
+ member x.Save (item:AggregateRoot) =
+ SaveEvents item.Id item.GetUncommittedChanges
+ "saved"
+
+ member x.GetHistoryById id =
+ let currentEvents =
+ let currentEventList = new System.Collections.Generic.List<Event>()
+ GetEventsForAggregate id
+ |> Observable.subscribe(fun e -> currentEventList.Add(e))
+ |> ignore
+ currentEventList.ToArray() |> Array.toList
+
+ LoadsFromHistory currentEvents
+
+ member x.ShowItemHistory id =
+ GetEventsForAggregate id
+ |> Observable.subscribe(Console.WriteLine)
+ |> ignore
+(*
+// Tests for Interactive:
+let storage = new EventStorage()
+let id = System.Guid.NewGuid()
+(storage :> IRepository).Save (new InventoryItem(id, CreateType.New, "testi"))
+storage.ShowItemHistory id
+*)
9 CommandSide/Events.fs
View
@@ -6,11 +6,10 @@
#r "System.CoreEx.dll"
#r "System.Reactive.dll"
*)
- open System.Collections.Generic
// Events implemented as discriminated union.
// If you use a big solution, change to a base type
- // or just use many Subjects and concatenate / merge with LINQ
+ // or just use many event storages and concatenate / merge them with LINQ
type Event =
| InventoryItemCreated of Guid * string
| InventoryItemDeactivated of Guid
@@ -24,8 +23,4 @@
| InventoryItemDeactivated(i) -> "Item deactivated (id:" + i.ToString() + ")"
| InventoryItemRenamed(i,n) -> "Item renamed to " + n + " created (id:" + i.ToString() + ")"
| ItemsCheckedInToInventory(i,c) -> "Check-in " + c.ToString() + " of item (id:" + i.ToString() + ")"
- | ItemsRemovedFromInventory(i,c) -> "Removed " + c.ToString() + " of item (id:" + i.ToString() + ")"
-
- ///Used just to notify others if anyone would be interested
- let eventBusSubject = new Subject<Event>()
- let public EventBus = eventBusSubject :> IObservable<Event>
+ | ItemsRemovedFromInventory(i,c) -> "Removed " + c.ToString() + " of item (id:" + i.ToString() + ")"
1  CommandSide/Script.fsx
View
@@ -11,6 +11,7 @@
#load "Events.fs"
#load "Domain.fs"
#load "EventStorage.fs"
+//#load "EventStorageRx.fs"
#load "Commands.fs"
#load "CommandHandler.fs"
2  QuerySide/QuerySide.fsproj.DotSettings.user
View
@@ -1,2 +0,0 @@
-<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
- <s:Boolean x:Key="/Default/Housekeeping/ProjectSettingsUpgraded/IsUpgraded/@EntryValue">True</s:Boolean></wpf:ResourceDictionary>
23 README
View
@@ -2,21 +2,32 @@ Simple CQRS on F# (F-Sharp) 3.0
Based on Greg Young's CQRS: https://github.com/gregoryyoung/m-r/tree/master/SimpleCQRS
+CQRS is Command and Query Responsibility Segregation -pattern.
+If you don't know that, use Google or look http://martinfowler.com/bliki/CQRS.html
+I have separated command side and query side as different projects.
+
This is F# solution. Compared to C#:
+ Simpler source code
+ No need for any "InfrastructureCrap"
+ Domain objects doesn't need to have parameter less constructors
+ Interactive-driven development
- (+ No locks, uses Message Passing)
What you will need to run this:
+ Visual Studio 11 (Beta): http://www.microsoft.com/visualstudio/11/en-us
- + Reactive extensions: http://msdn.microsoft.com/en-us/data/gg577609
+Optional:
+ + Reactive extensions: http://msdn.microsoft.com/en-us/data/gg577609
-It uses Reactive Extensions (Rx) Subject-class for message bus and a MailboxProcessor for event storage.
-(Either one of them could have been enough by itself.)
+There are two versions of event storage:
+1) Pure F# as EventStorage.fs
+ - Uses pure .NET Events with F# Observable as event bus
+ - Uses MailboxProcessor ("agents"/message passing) as event storage
+2) F# with Reactive Extensions as EventStorageRx.fs
+ - Uses reactive framework Subject<T> as event bus
+ - Uses reactive framework ReplaySubject<T> as event storage
+Although technical concept is different, the functionality is identical.
+You can run this with F# interactive, directly from files and/or using Script.fsx.
-First version has just command side and event sourcing.
-(Query side is more trivial, I suggest using Type Providers...)
+This QuerySide is just in-memory one as in Greg's example.
+I suggest using F# Type Providers in QuerySide with real database...
Please sign in to comment.
Something went wrong with that request. Please try again.